Skip to main content

hashtree_cli/
storage.rs

1use anyhow::{Context, Result};
2use async_trait::async_trait;
3use futures::executor::block_on as sync_block_on;
4use futures::StreamExt;
5use hashtree_config::StorageBackend;
6use hashtree_core::store::{Store, StoreError};
7use hashtree_core::{
8    from_hex, sha256, to_hex, types::Hash, Cid, HashTree, HashTreeConfig, TreeNode,
9};
10use hashtree_fs::FsBlobStore;
11#[cfg(feature = "lmdb")]
12use hashtree_lmdb::LmdbBlobStore;
13use heed::types::*;
14use heed::{Database, EnvOpenOptions};
15use serde::{Deserialize, Serialize};
16#[cfg(feature = "s3")]
17use std::future::Future;
18use std::io::Write;
19use std::path::{Path, PathBuf};
20use std::sync::Arc;
21use std::time::{SystemTime, UNIX_EPOCH};
22
23mod upload;
24
25mod maintenance;
26mod retention;
27
28pub use maintenance::{compact_lmdb_environments_under, CompactResult, VerifyResult};
29pub use retention::{PinnedItem, StorageByPriority, StorageStats, TreeMeta};
30
31/// Priority levels for tree eviction
32pub const PRIORITY_OTHER: u8 = 64;
33pub const PRIORITY_FOLLOWED: u8 = 128;
34pub const PRIORITY_OWN: u8 = 255;
35const LMDB_MAX_READERS: u32 = 1024;
36const LMDB_METADATA_MIN_MAP_SIZE_BYTES: u64 = 1024 * 1024;
37#[cfg(feature = "lmdb")]
38const LMDB_BLOB_MIN_MAP_SIZE_BYTES: u64 = 16 * 1024 * 1024;
39
40/// Cached root info from Nostr events.
41#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct CachedRoot {
43    /// Root hash (hex)
44    pub hash: String,
45    /// Optional decryption key (hex)
46    pub key: Option<String>,
47    /// Unix timestamp when this was cached (from event created_at)
48    pub updated_at: u64,
49    /// Visibility: "public", "link-visible", or "private"
50    pub visibility: String,
51}
52
53/// Storage statistics
54#[derive(Debug, Clone)]
55pub struct LocalStoreStats {
56    pub count: usize,
57    pub total_bytes: u64,
58}
59
60/// Local blob store - wraps either FsBlobStore or LmdbBlobStore
61pub enum LocalStore {
62    Fs(FsBlobStore),
63    #[cfg(feature = "lmdb")]
64    Lmdb(LmdbBlobStore),
65}
66
67#[cfg(feature = "lmdb")]
68fn is_fs_blob_shard_dir(path: &Path) -> bool {
69    path.file_name()
70        .and_then(|name| name.to_str())
71        .map(|name| name.len() == 2 && name.as_bytes().iter().all(u8::is_ascii_hexdigit))
72        .unwrap_or(false)
73}
74
75fn lmdb_map_size_for_existing_env(path: &Path, minimum_bytes: u64) -> Result<usize> {
76    let existing_bytes = std::fs::metadata(path.join("data.mdb"))
77        .map(|metadata| metadata.len())
78        .unwrap_or(0);
79    let requested = align_lmdb_map_size(minimum_bytes.max(existing_bytes));
80    usize::try_from(requested).context("LMDB map size exceeds usize")
81}
82
83fn align_lmdb_map_size(bytes: u64) -> u64 {
84    let page_size = (page_size::get() as u64).max(4096);
85    let remainder = bytes % page_size;
86    if remainder == 0 {
87        bytes
88    } else {
89        bytes.saturating_add(page_size - remainder)
90    }
91}
92
93#[cfg(feature = "lmdb")]
94fn remove_stale_fs_blob_shards(path: &Path) -> Result<(), StoreError> {
95    let entries = std::fs::read_dir(path).map_err(StoreError::Io)?;
96    for entry in entries {
97        let entry = entry.map_err(StoreError::Io)?;
98        let entry_path = entry.path();
99        if entry_path.is_dir() && is_fs_blob_shard_dir(&entry_path) {
100            std::fs::remove_dir_all(&entry_path).map_err(StoreError::Io)?;
101            tracing::info!(
102                "Removed stale filesystem blob shard directory after LMDB cutover: {}",
103                entry_path.display()
104            );
105        }
106    }
107    Ok(())
108}
109
110impl LocalStore {
111    /// Create a new unbounded local store.
112    ///
113    /// Higher-level stores that need quota enforcement should manage eviction
114    /// above this layer so tree metadata, pins, and archival policies stay
115    /// coherent.
116    pub fn new<P: AsRef<Path>>(path: P, backend: &StorageBackend) -> Result<Self, StoreError> {
117        Self::new_unbounded(path, backend)
118    }
119
120    /// Create a new local store with an explicit LMDB logical size cap when using the LMDB backend.
121    ///
122    /// The requested size is used for both the LMDB map and the store's built-in
123    /// byte quota, so standalone blob envs can evict instead of growing forever.
124    pub fn new_with_lmdb_map_size<P: AsRef<Path>>(
125        path: P,
126        backend: &StorageBackend,
127        _map_size_bytes: Option<u64>,
128    ) -> Result<Self, StoreError> {
129        match backend {
130            StorageBackend::Fs => Ok(LocalStore::Fs(FsBlobStore::new(path)?)),
131            #[cfg(feature = "lmdb")]
132            StorageBackend::Lmdb => match _map_size_bytes {
133                Some(map_size_bytes) => {
134                    std::fs::create_dir_all(path.as_ref()).map_err(StoreError::Io)?;
135                    remove_stale_fs_blob_shards(path.as_ref())?;
136                    Ok(LocalStore::Lmdb(LmdbBlobStore::with_max_bytes(
137                        path,
138                        map_size_bytes,
139                    )?))
140                }
141                None => {
142                    std::fs::create_dir_all(path.as_ref()).map_err(StoreError::Io)?;
143                    remove_stale_fs_blob_shards(path.as_ref())?;
144                    Ok(LocalStore::Lmdb(LmdbBlobStore::new(path)?))
145                }
146            },
147            #[cfg(not(feature = "lmdb"))]
148            StorageBackend::Lmdb => {
149                tracing::warn!(
150                    "LMDB backend requested but lmdb feature not enabled, using filesystem storage"
151                );
152                Ok(LocalStore::Fs(FsBlobStore::new(path)?))
153            }
154        }
155    }
156
157    /// Create a new unbounded local store for a specific backend.
158    pub fn new_unbounded<P: AsRef<Path>>(
159        path: P,
160        backend: &StorageBackend,
161    ) -> Result<Self, StoreError> {
162        Self::new_with_lmdb_map_size(path, backend, None)
163    }
164
165    pub fn backend(&self) -> StorageBackend {
166        match self {
167            LocalStore::Fs(_) => StorageBackend::Fs,
168            #[cfg(feature = "lmdb")]
169            LocalStore::Lmdb(_) => StorageBackend::Lmdb,
170        }
171    }
172
173    /// Sync put operation
174    pub fn put_sync(&self, hash: Hash, data: &[u8]) -> Result<bool, StoreError> {
175        match self {
176            LocalStore::Fs(store) => store.put_sync(hash, data),
177            #[cfg(feature = "lmdb")]
178            LocalStore::Lmdb(store) => store.put_sync(hash, data),
179        }
180    }
181
182    /// Sync batch put operation.
183    pub fn put_many_sync(&self, items: &[(Hash, Vec<u8>)]) -> Result<usize, StoreError> {
184        match self {
185            LocalStore::Fs(store) => {
186                let mut inserted = 0usize;
187                for (hash, data) in items {
188                    if store.put_sync(*hash, data.as_slice())? {
189                        inserted += 1;
190                    }
191                }
192                Ok(inserted)
193            }
194            #[cfg(feature = "lmdb")]
195            LocalStore::Lmdb(store) => store.put_many_sync(items),
196        }
197    }
198
199    /// Sync get operation
200    pub fn get_sync(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
201        match self {
202            LocalStore::Fs(store) => store.get_sync(hash),
203            #[cfg(feature = "lmdb")]
204            LocalStore::Lmdb(store) => store.get_sync(hash),
205        }
206    }
207
208    /// Check if hash exists
209    pub fn exists(&self, hash: &Hash) -> Result<bool, StoreError> {
210        match self {
211            LocalStore::Fs(store) => Ok(store.exists(hash)),
212            #[cfg(feature = "lmdb")]
213            LocalStore::Lmdb(store) => store.exists(hash),
214        }
215    }
216
217    /// Sync delete operation
218    pub fn delete_sync(&self, hash: &Hash) -> Result<bool, StoreError> {
219        match self {
220            LocalStore::Fs(store) => store.delete_sync(hash),
221            #[cfg(feature = "lmdb")]
222            LocalStore::Lmdb(store) => store.delete_sync(hash),
223        }
224    }
225
226    /// Get storage statistics
227    pub fn stats(&self) -> Result<LocalStoreStats, StoreError> {
228        match self {
229            LocalStore::Fs(store) => {
230                let stats = store.stats()?;
231                Ok(LocalStoreStats {
232                    count: stats.count,
233                    total_bytes: stats.total_bytes,
234                })
235            }
236            #[cfg(feature = "lmdb")]
237            LocalStore::Lmdb(store) => {
238                let stats = store.stats()?;
239                Ok(LocalStoreStats {
240                    count: stats.count,
241                    total_bytes: stats.total_bytes,
242                })
243            }
244        }
245    }
246
247    /// List all hashes in the store
248    pub fn list(&self) -> Result<Vec<Hash>, StoreError> {
249        match self {
250            LocalStore::Fs(store) => store.list(),
251            #[cfg(feature = "lmdb")]
252            LocalStore::Lmdb(store) => store.list(),
253        }
254    }
255}
256
257#[async_trait]
258impl Store for LocalStore {
259    async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
260        self.put_sync(hash, &data)
261    }
262
263    async fn put_many(&self, items: Vec<(Hash, Vec<u8>)>) -> Result<usize, StoreError> {
264        self.put_many_sync(&items)
265    }
266
267    async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
268        self.get_sync(hash)
269    }
270
271    async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
272        self.exists(hash)
273    }
274
275    async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
276        self.delete_sync(hash)
277    }
278}
279
280#[cfg(feature = "s3")]
281use tokio::sync::mpsc;
282
283use crate::config::S3Config;
284
285/// Message for background S3 sync
286#[cfg(feature = "s3")]
287enum S3SyncMessage {
288    Upload { hash: Hash, data: Vec<u8> },
289    Delete { hash: Hash },
290}
291
292/// Storage router - local store primary with optional S3 backup
293///
294/// Write path: local first (fast), then queue S3 upload (non-blocking)
295/// Read path: local first, fall back to S3 if miss
296pub struct StorageRouter {
297    /// Primary local store (always used)
298    local: Arc<LocalStore>,
299    /// Optional S3 client for backup
300    #[cfg(feature = "s3")]
301    s3_client: Option<aws_sdk_s3::Client>,
302    #[cfg(feature = "s3")]
303    s3_bucket: Option<String>,
304    #[cfg(feature = "s3")]
305    s3_prefix: String,
306    /// Channel to send uploads to background task
307    #[cfg(feature = "s3")]
308    sync_tx: Option<mpsc::UnboundedSender<S3SyncMessage>>,
309}
310
311impl StorageRouter {
312    #[cfg(feature = "s3")]
313    fn run_s3_future_sync<F, T>(future: F) -> Result<T, StoreError>
314    where
315        F: Future<Output = T> + Send + 'static,
316        T: Send + 'static,
317    {
318        if tokio::runtime::Handle::try_current().is_ok() {
319            return std::thread::Builder::new()
320                .name("storage-s3-sync".to_string())
321                .spawn(move || {
322                    let runtime = tokio::runtime::Builder::new_current_thread()
323                        .enable_all()
324                        .build()
325                        .map_err(|err| {
326                            StoreError::Other(format!("build storage s3 sync runtime: {err}"))
327                        })?;
328                    Ok(runtime.block_on(future))
329                })
330                .map_err(|err| StoreError::Other(format!("spawn S3 sync helper thread: {err}")))?
331                .join()
332                .map_err(|_| StoreError::Other("S3 sync helper thread panicked".to_string()))?;
333        }
334
335        let runtime = tokio::runtime::Builder::new_current_thread()
336            .enable_all()
337            .build()
338            .map_err(|err| StoreError::Other(format!("build storage s3 sync runtime: {err}")))?;
339        Ok(runtime.block_on(future))
340    }
341
342    /// Create router with local storage only
343    pub fn new(local: Arc<LocalStore>) -> Self {
344        Self {
345            local,
346            #[cfg(feature = "s3")]
347            s3_client: None,
348            #[cfg(feature = "s3")]
349            s3_bucket: None,
350            #[cfg(feature = "s3")]
351            s3_prefix: String::new(),
352            #[cfg(feature = "s3")]
353            sync_tx: None,
354        }
355    }
356
357    /// Create router with local storage + S3 backup
358    #[cfg(feature = "s3")]
359    pub async fn with_s3(local: Arc<LocalStore>, config: &S3Config) -> Result<Self, anyhow::Error> {
360        use aws_sdk_s3::Client as S3Client;
361
362        // Build AWS config
363        let mut aws_config_loader = aws_config::from_env();
364        aws_config_loader =
365            aws_config_loader.region(aws_sdk_s3::config::Region::new(config.region.clone()));
366        let aws_config = aws_config_loader.load().await;
367
368        // Build S3 client with custom endpoint
369        let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&aws_config);
370        s3_config_builder = s3_config_builder
371            .endpoint_url(&config.endpoint)
372            .force_path_style(true);
373
374        let s3_client = S3Client::from_conf(s3_config_builder.build());
375        let bucket = config.bucket.clone();
376        let prefix = config.prefix.clone().unwrap_or_default();
377
378        // Create background sync channel
379        let (sync_tx, mut sync_rx) = mpsc::unbounded_channel::<S3SyncMessage>();
380
381        // Spawn background sync task with bounded concurrent uploads
382        let sync_client = s3_client.clone();
383        let sync_bucket = bucket.clone();
384        let sync_prefix = prefix.clone();
385
386        tokio::spawn(async move {
387            use aws_sdk_s3::primitives::ByteStream;
388
389            tracing::info!("S3 background sync task started");
390
391            // Keep S3 writes parallel, but avoid dispatch failures during mirror backfill bursts.
392            let semaphore = std::sync::Arc::new(tokio::sync::Semaphore::new(8));
393            let client = std::sync::Arc::new(sync_client);
394            let bucket = std::sync::Arc::new(sync_bucket);
395            let prefix = std::sync::Arc::new(sync_prefix);
396
397            while let Some(msg) = sync_rx.recv().await {
398                let client = client.clone();
399                let bucket = bucket.clone();
400                let prefix = prefix.clone();
401                let semaphore = semaphore.clone();
402
403                // Spawn each upload with semaphore-bounded concurrency
404                tokio::spawn(async move {
405                    // Acquire permit before uploading
406                    let _permit = semaphore.acquire().await;
407
408                    match msg {
409                        S3SyncMessage::Upload { hash, data } => {
410                            let key = format!("{}{}.bin", prefix, to_hex(&hash));
411                            tracing::debug!("S3 uploading {} ({} bytes)", &key, data.len());
412
413                            let mut attempt = 1u8;
414                            loop {
415                                match client
416                                    .put_object()
417                                    .bucket(bucket.as_str())
418                                    .key(&key)
419                                    .body(ByteStream::from(data.clone()))
420                                    .send()
421                                    .await
422                                {
423                                    Ok(_) => {
424                                        tracing::debug!("S3 upload succeeded: {}", &key);
425                                        break;
426                                    }
427                                    Err(e) if attempt < 3 => {
428                                        tracing::warn!(
429                                            "S3 upload retrying {}: attempt={} error={}",
430                                            &key,
431                                            attempt,
432                                            e
433                                        );
434                                        tokio::time::sleep(std::time::Duration::from_millis(
435                                            250 * u64::from(attempt),
436                                        ))
437                                        .await;
438                                        attempt += 1;
439                                    }
440                                    Err(e) => {
441                                        tracing::error!(
442                                            "S3 upload failed {} after {} attempts: {}",
443                                            &key,
444                                            attempt,
445                                            e
446                                        );
447                                        break;
448                                    }
449                                }
450                            }
451                        }
452                        S3SyncMessage::Delete { hash } => {
453                            let key = format!("{}{}.bin", prefix, to_hex(&hash));
454                            tracing::debug!("S3 deleting {}", &key);
455
456                            let mut attempt = 1u8;
457                            loop {
458                                match client
459                                    .delete_object()
460                                    .bucket(bucket.as_str())
461                                    .key(&key)
462                                    .send()
463                                    .await
464                                {
465                                    Ok(_) => break,
466                                    Err(e) if attempt < 3 => {
467                                        tracing::warn!(
468                                            "S3 delete retrying {}: attempt={} error={}",
469                                            &key,
470                                            attempt,
471                                            e
472                                        );
473                                        tokio::time::sleep(std::time::Duration::from_millis(
474                                            250 * u64::from(attempt),
475                                        ))
476                                        .await;
477                                        attempt += 1;
478                                    }
479                                    Err(e) => {
480                                        tracing::error!(
481                                            "S3 delete failed {} after {} attempts: {}",
482                                            &key,
483                                            attempt,
484                                            e
485                                        );
486                                        break;
487                                    }
488                                }
489                            }
490                        }
491                    }
492                });
493            }
494        });
495
496        tracing::info!(
497            "S3 storage initialized: bucket={}, prefix={}",
498            bucket,
499            prefix
500        );
501
502        Ok(Self {
503            local,
504            s3_client: Some(s3_client),
505            s3_bucket: Some(bucket),
506            s3_prefix: prefix,
507            sync_tx: Some(sync_tx),
508        })
509    }
510
511    /// Store data - writes to LMDB, queues S3 upload in background
512    pub fn put_sync(&self, hash: Hash, data: &[u8]) -> Result<bool, StoreError> {
513        // Always write to local first
514        let is_new = self.local.put_sync(hash, data)?;
515
516        // Queue S3 upload only for newly inserted blobs.
517        // Existing local blobs were already persisted or are handled by explicit repair/push flows.
518        #[cfg(feature = "s3")]
519        if is_new {
520            if let Some(ref tx) = self.sync_tx {
521                tracing::debug!(
522                    "Queueing S3 upload for {} ({} bytes)",
523                    crate::storage::to_hex(&hash)[..16].to_string(),
524                    data.len(),
525                );
526                if let Err(e) = tx.send(S3SyncMessage::Upload {
527                    hash,
528                    data: data.to_vec(),
529                }) {
530                    tracing::error!("Failed to queue S3 upload: {}", e);
531                }
532            }
533        }
534
535        Ok(is_new)
536    }
537
538    /// Store multiple blobs with a single local batch write when supported.
539    pub fn put_many_sync(&self, items: &[(Hash, Vec<u8>)]) -> Result<usize, StoreError> {
540        #[cfg(feature = "s3")]
541        let pending_uploads = if self.sync_tx.is_some() {
542            let mut pending = Vec::new();
543            for (hash, data) in items {
544                if !self.local.exists(hash)? {
545                    pending.push((*hash, data.clone()));
546                }
547            }
548            pending
549        } else {
550            Vec::new()
551        };
552
553        let inserted = self.local.put_many_sync(items)?;
554
555        #[cfg(feature = "s3")]
556        if let Some(ref tx) = self.sync_tx {
557            for (hash, data) in pending_uploads {
558                if let Err(e) = tx.send(S3SyncMessage::Upload { hash, data }) {
559                    tracing::error!("Failed to queue S3 upload: {}", e);
560                }
561            }
562        }
563
564        Ok(inserted)
565    }
566
567    /// Get data - tries LMDB first, falls back to S3
568    pub fn get_sync(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
569        // Try local first
570        if let Some(data) = self.local.get_sync(hash)? {
571            return Ok(Some(data));
572        }
573
574        // Fall back to S3 if configured
575        #[cfg(feature = "s3")]
576        if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
577            let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
578            let client = client.clone();
579            let bucket = bucket.clone();
580
581            match Self::run_s3_future_sync(async move {
582                client.get_object().bucket(bucket).key(key).send().await
583            }) {
584                Ok(Ok(output)) => {
585                    match Self::run_s3_future_sync(async move { output.body.collect().await }) {
586                        Ok(Ok(body)) => {
587                            let data = body.into_bytes().to_vec();
588                            // Cache locally for future reads
589                            let _ = self.local.put_sync(*hash, &data);
590                            return Ok(Some(data));
591                        }
592                        Ok(Err(err)) => {
593                            tracing::warn!("S3 body collect failed: {}", err);
594                        }
595                        Err(err) => {
596                            tracing::warn!("S3 body collect runtime failed: {}", err);
597                        }
598                    }
599                }
600                Ok(Err(err)) => {
601                    let service_err = err.into_service_error();
602                    if !service_err.is_no_such_key() {
603                        tracing::warn!("S3 get failed: {}", service_err);
604                    }
605                }
606                Err(err) => {
607                    tracing::warn!("S3 get runtime failed: {}", err);
608                }
609            }
610        }
611
612        Ok(None)
613    }
614
615    /// Check if hash exists
616    pub fn exists(&self, hash: &Hash) -> Result<bool, StoreError> {
617        // Check local first
618        if self.local.exists(hash)? {
619            return Ok(true);
620        }
621
622        // Check S3 if configured
623        #[cfg(feature = "s3")]
624        if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
625            let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
626            let client = client.clone();
627            let bucket = bucket.clone();
628
629            match Self::run_s3_future_sync(async move {
630                client.head_object().bucket(bucket).key(&key).send().await
631            }) {
632                Ok(Ok(_)) => return Ok(true),
633                Ok(Err(err)) => {
634                    let service_err = err.into_service_error();
635                    if !service_err.is_not_found() {
636                        tracing::warn!("S3 head failed: {}", service_err);
637                    }
638                }
639                Err(err) => {
640                    tracing::warn!("S3 head runtime failed: {}", err);
641                }
642            }
643        }
644
645        Ok(false)
646    }
647
648    /// Delete data from both local and S3 stores
649    pub fn delete_sync(&self, hash: &Hash) -> Result<bool, StoreError> {
650        let deleted = self.local.delete_sync(hash)?;
651
652        // Queue S3 delete if configured
653        #[cfg(feature = "s3")]
654        if let Some(ref tx) = self.sync_tx {
655            let _ = tx.send(S3SyncMessage::Delete { hash: *hash });
656        }
657
658        Ok(deleted)
659    }
660
661    /// Delete data from local store only (don't propagate to S3)
662    /// Used for eviction where we want to keep S3 as archive
663    pub fn delete_local_only(&self, hash: &Hash) -> Result<bool, StoreError> {
664        self.local.delete_sync(hash)
665    }
666
667    /// Get stats from local store
668    pub fn stats(&self) -> Result<LocalStoreStats, StoreError> {
669        self.local.stats()
670    }
671
672    /// List all hashes from local store
673    pub fn list(&self) -> Result<Vec<Hash>, StoreError> {
674        self.local.list()
675    }
676
677    /// Get the underlying local store for HashTree operations
678    pub fn local_store(&self) -> Arc<LocalStore> {
679        Arc::clone(&self.local)
680    }
681}
682
683// Implement async Store trait for StorageRouter so it can be used directly with HashTree
684// This ensures all writes go through S3 sync
685#[async_trait]
686impl Store for StorageRouter {
687    async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
688        self.put_sync(hash, &data)
689    }
690
691    async fn put_many(&self, items: Vec<(Hash, Vec<u8>)>) -> Result<usize, StoreError> {
692        self.put_many_sync(&items)
693    }
694
695    async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
696        self.get_sync(hash)
697    }
698
699    async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
700        self.exists(hash)
701    }
702
703    async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
704        self.delete_sync(hash)
705    }
706}
707
708pub struct HashtreeStore {
709    base_path: PathBuf,
710    env: heed::Env,
711    /// Set of pinned hashes (32-byte raw hashes, prevents garbage collection)
712    pins: Database<Bytes, Unit>,
713    /// Mutable published refs that should stay subscribed and keep following updates
714    pinned_refs: Database<Str, Unit>,
715    /// Authors whose hashtree publications should be mirrored continuously
716    tracked_authors: Database<Str, Unit>,
717    /// Blob ownership: sha256 (32 bytes) ++ pubkey (32 bytes) -> () (composite key for multi-owner)
718    blob_owners: Database<Bytes, Unit>,
719    /// Maps pubkey (32 bytes) -> blob metadata JSON (for blossom list)
720    pubkey_blobs: Database<Bytes, Bytes>,
721    /// Tree metadata for eviction: tree_root_hash (32 bytes) -> TreeMeta (msgpack)
722    tree_meta: Database<Bytes, Bytes>,
723    /// Blob-to-tree mapping: blob_hash ++ tree_hash (64 bytes) -> ()
724    blob_trees: Database<Bytes, Unit>,
725    /// Tree refs: "npub/path" -> tree_root_hash (32 bytes) - for replacing old versions
726    tree_refs: Database<Str, Bytes>,
727    /// Cached roots from Nostr: "pubkey_hex/tree_name" -> CachedRoot (msgpack)
728    cached_roots: Database<Str, Bytes>,
729    /// Storage router - handles LMDB + optional S3 (Arc for sharing with HashTree)
730    router: Arc<StorageRouter>,
731    /// Maximum storage size in bytes (from config)
732    max_size_bytes: u64,
733    /// Whether quota enforcement may delete local blobs not tracked by any indexed tree.
734    evict_orphans: bool,
735}
736
737impl HashtreeStore {
738    /// Create a new store with the configured local storage limit.
739    pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
740        let config = hashtree_config::Config::load_or_default();
741        let max_size_bytes = config
742            .storage
743            .max_size_gb
744            .saturating_mul(1024 * 1024 * 1024);
745        Self::with_options_and_backend(
746            path,
747            None,
748            max_size_bytes,
749            config.storage.evict_orphans,
750            &config.storage.backend,
751        )
752    }
753
754    /// Create a new store with an explicit local backend and size limit.
755    pub fn new_with_backend<P: AsRef<Path>>(
756        path: P,
757        backend: hashtree_config::StorageBackend,
758        max_size_bytes: u64,
759    ) -> Result<Self> {
760        Self::with_options_and_backend(path, None, max_size_bytes, true, &backend)
761    }
762
763    /// Create a new store with optional S3 backend and the configured local storage limit.
764    pub fn with_s3<P: AsRef<Path>>(path: P, s3_config: Option<&S3Config>) -> Result<Self> {
765        let config = hashtree_config::Config::load_or_default();
766        let max_size_bytes = config
767            .storage
768            .max_size_gb
769            .saturating_mul(1024 * 1024 * 1024);
770        Self::with_options_and_backend(
771            path,
772            s3_config,
773            max_size_bytes,
774            config.storage.evict_orphans,
775            &config.storage.backend,
776        )
777    }
778
779    /// Create a new store with optional S3 backend and custom size limit.
780    ///
781    /// The raw local blob backend remains unbounded. `HashtreeStore` enforces
782    /// `max_size_bytes` at the tree-management layer so eviction can honor pins,
783    /// orphan handling, and local-only eviction when S3 is used as archive.
784    pub fn with_options<P: AsRef<Path>>(
785        path: P,
786        s3_config: Option<&S3Config>,
787        max_size_bytes: u64,
788    ) -> Result<Self> {
789        let config = hashtree_config::Config::load_or_default();
790        Self::with_options_and_backend(
791            path,
792            s3_config,
793            max_size_bytes,
794            config.storage.evict_orphans,
795            &config.storage.backend,
796        )
797    }
798
799    pub fn with_options_and_backend<P: AsRef<Path>>(
800        path: P,
801        s3_config: Option<&S3Config>,
802        max_size_bytes: u64,
803        evict_orphans: bool,
804        backend: &hashtree_config::StorageBackend,
805    ) -> Result<Self> {
806        let path = path.as_ref();
807        std::fs::create_dir_all(path)?;
808        let metadata_map_size =
809            lmdb_map_size_for_existing_env(path, LMDB_METADATA_MIN_MAP_SIZE_BYTES)?;
810
811        let env = unsafe {
812            EnvOpenOptions::new()
813                .map_size(metadata_map_size)
814                .max_dbs(10) // pins, pinned_refs, tracked_authors, blob_owners, pubkey_blobs, tree_meta, blob_trees, tree_refs, cached_roots, blobs
815                .max_readers(LMDB_MAX_READERS)
816                .open(path)?
817        };
818        let _ = env.clear_stale_readers();
819
820        let mut wtxn = env.write_txn()?;
821        let pins = env.create_database(&mut wtxn, Some("pins"))?;
822        let pinned_refs = env.create_database(&mut wtxn, Some("pinned_refs"))?;
823        let tracked_authors = env.create_database(&mut wtxn, Some("tracked_authors"))?;
824        let blob_owners = env.create_database(&mut wtxn, Some("blob_owners"))?;
825        let pubkey_blobs = env.create_database(&mut wtxn, Some("pubkey_blobs"))?;
826        let tree_meta = env.create_database(&mut wtxn, Some("tree_meta"))?;
827        let blob_trees = env.create_database(&mut wtxn, Some("blob_trees"))?;
828        let tree_refs = env.create_database(&mut wtxn, Some("tree_refs"))?;
829        let cached_roots = env.create_database(&mut wtxn, Some("cached_roots"))?;
830        wtxn.commit()?;
831
832        // Intentionally keep the raw blob backend unbounded here. HashtreeStore
833        // owns quota policy above this layer, where it can coordinate eviction
834        // with tree refs, blob ownership, pins, and S3 archival behavior.
835        let local_store = Arc::new(match backend {
836            hashtree_config::StorageBackend::Fs => LocalStore::Fs(
837                FsBlobStore::new(path.join("blobs"))
838                    .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
839            ),
840            #[cfg(feature = "lmdb")]
841            hashtree_config::StorageBackend::Lmdb => {
842                std::fs::create_dir_all(path.join("blobs"))?;
843                remove_stale_fs_blob_shards(&path.join("blobs"))
844                    .map_err(|e| anyhow::anyhow!("Failed to clean LMDB blob store path: {}", e))?;
845                let requested_map_size = max_size_bytes.max(LMDB_BLOB_MIN_MAP_SIZE_BYTES);
846                let map_size = usize::try_from(requested_map_size)
847                    .context("LMDB blob map size exceeds usize")?;
848                LocalStore::Lmdb(
849                    LmdbBlobStore::with_map_size(path.join("blobs"), map_size)
850                        .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
851                )
852            }
853            #[cfg(not(feature = "lmdb"))]
854            hashtree_config::StorageBackend::Lmdb => {
855                tracing::warn!(
856                    "LMDB backend requested but lmdb feature not enabled, using filesystem storage"
857                );
858                LocalStore::Fs(
859                    FsBlobStore::new(path.join("blobs"))
860                        .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
861                )
862            }
863        });
864
865        // Create storage router with optional S3
866        #[cfg(feature = "s3")]
867        let router = Arc::new(if let Some(s3_cfg) = s3_config {
868            tracing::info!(
869                "Initializing S3 storage backend: bucket={}, endpoint={}",
870                s3_cfg.bucket,
871                s3_cfg.endpoint
872            );
873
874            sync_block_on(async { StorageRouter::with_s3(local_store, s3_cfg).await })?
875        } else {
876            StorageRouter::new(local_store)
877        });
878
879        #[cfg(not(feature = "s3"))]
880        let router = Arc::new({
881            if s3_config.is_some() {
882                tracing::warn!(
883                    "S3 config provided but S3 feature not enabled. Using local storage only."
884                );
885            }
886            StorageRouter::new(local_store)
887        });
888
889        Ok(Self {
890            base_path: path.to_path_buf(),
891            env,
892            pins,
893            pinned_refs,
894            tracked_authors,
895            blob_owners,
896            pubkey_blobs,
897            tree_meta,
898            blob_trees,
899            tree_refs,
900            cached_roots,
901            router,
902            max_size_bytes,
903            evict_orphans,
904        })
905    }
906
907    pub fn base_path(&self) -> &Path {
908        &self.base_path
909    }
910
911    /// Get the storage router
912    pub fn router(&self) -> &StorageRouter {
913        &self.router
914    }
915
916    /// Get the storage router as Arc (for use with HashTree which needs Arc<dyn Store>)
917    /// All writes through this go to both LMDB and S3
918    pub fn store_arc(&self) -> Arc<StorageRouter> {
919        Arc::clone(&self.router)
920    }
921
922    /// Get tree node by hash (raw bytes)
923    pub fn get_tree_node(&self, hash: &[u8; 32]) -> Result<Option<TreeNode>> {
924        let store = self.store_arc();
925        let tree = HashTree::new(HashTreeConfig::new(store).public());
926
927        sync_block_on(async {
928            tree.get_tree_node(hash)
929                .await
930                .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))
931        })
932    }
933
934    /// Store a raw blob, returns SHA256 hash as hex.
935    pub fn put_blob(&self, data: &[u8]) -> Result<String> {
936        let hash = sha256(data);
937        self.router
938            .put_sync(hash, data)
939            .map_err(|e| anyhow::anyhow!("Failed to store blob: {}", e))?;
940        Ok(to_hex(&hash))
941    }
942
943    /// Store an owned Blossom blob under the configured durable storage limit.
944    pub fn put_owned_blob(&self, data: &[u8], pubkey: &[u8; 32]) -> Result<String> {
945        let hash = sha256(data);
946        if !self
947            .router
948            .exists(&hash)
949            .map_err(|e| anyhow::anyhow!("Failed to check blob: {}", e))?
950        {
951            self.make_room_for_durable_blob(data.len() as u64)?;
952            self.router
953                .put_sync(hash, data)
954                .map_err(|e| anyhow::anyhow!("Failed to store blob: {}", e))?;
955        }
956        self.set_blob_owner(&hash, pubkey)?;
957        Ok(to_hex(&hash))
958    }
959
960    /// Store an opportunistically cached blob.
961    ///
962    /// Unlike durable `put_blob` writes, this path may evict disposable orphaned
963    /// blobs to make room under storage pressure. It intentionally avoids touching
964    /// indexed trees, social-graph roots, explicit pins, and owned Blossom blobs.
965    pub fn put_cached_blob(&self, data: &[u8]) -> Result<String> {
966        let hash = sha256(data);
967        if self
968            .router
969            .exists(&hash)
970            .map_err(|e| anyhow::anyhow!("Failed to check cached blob: {}", e))?
971        {
972            return Ok(to_hex(&hash));
973        }
974
975        let incoming_bytes = data.len() as u64;
976        let _ = self.make_room_for_cached_blob(incoming_bytes);
977
978        let mut retried_after_cleanup = false;
979        loop {
980            match self.router.put_sync(hash, data) {
981                Ok(_) => return Ok(to_hex(&hash)),
982                Err(err) if !retried_after_cleanup && is_map_full_store_error(&err) => {
983                    let freed = self.relieve_cached_blob_write_pressure(incoming_bytes)?;
984                    if freed == 0 {
985                        return Err(anyhow::anyhow!("Failed to store cached blob: {}", err));
986                    }
987                    retried_after_cleanup = true;
988                }
989                Err(err) => return Err(anyhow::anyhow!("Failed to store cached blob: {}", err)),
990            }
991        }
992    }
993
994    /// Get a raw blob by SHA256 hash (raw bytes).
995    pub fn get_blob(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
996        self.router
997            .get_sync(hash)
998            .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))
999    }
1000
1001    /// Check if a blob exists by SHA256 hash (raw bytes).
1002    pub fn blob_exists(&self, hash: &[u8; 32]) -> Result<bool> {
1003        self.router
1004            .exists(hash)
1005            .map_err(|e| anyhow::anyhow!("Failed to check blob: {}", e))
1006    }
1007
1008    // === Blossom ownership tracking ===
1009    // Uses composite key: sha256 (32 bytes) ++ pubkey (32 bytes) -> ()
1010    // This allows efficient multi-owner tracking with O(1) lookups
1011
1012    /// Build composite key for blob_owners: sha256 ++ pubkey (64 bytes total)
1013    fn blob_owner_key(sha256: &[u8; 32], pubkey: &[u8; 32]) -> [u8; 64] {
1014        let mut key = [0u8; 64];
1015        key[..32].copy_from_slice(sha256);
1016        key[32..].copy_from_slice(pubkey);
1017        key
1018    }
1019
1020    /// Add an owner (pubkey) to a blob for Blossom protocol
1021    /// Multiple users can own the same blob - it's only deleted when all owners remove it
1022    pub fn set_blob_owner(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<()> {
1023        let key = Self::blob_owner_key(sha256, pubkey);
1024        let mut wtxn = self.env.write_txn()?;
1025
1026        // Add ownership entry (idempotent - put overwrites)
1027        self.blob_owners.put(&mut wtxn, &key[..], &())?;
1028
1029        // Convert sha256 to hex for BlobMetadata (which stores sha256 as hex string)
1030        let sha256_hex = to_hex(sha256);
1031
1032        // Get existing blobs for this pubkey (for /list endpoint)
1033        let mut blobs: Vec<BlobMetadata> = self
1034            .pubkey_blobs
1035            .get(&wtxn, pubkey)?
1036            .and_then(|b| serde_json::from_slice(b).ok())
1037            .unwrap_or_default();
1038
1039        // Check if blob already exists for this pubkey
1040        if !blobs.iter().any(|b| b.sha256 == sha256_hex) {
1041            let now = SystemTime::now()
1042                .duration_since(UNIX_EPOCH)
1043                .unwrap()
1044                .as_secs();
1045
1046            // Get size from raw blob
1047            let size = self
1048                .get_blob(sha256)?
1049                .map(|data| data.len() as u64)
1050                .unwrap_or(0);
1051
1052            blobs.push(BlobMetadata {
1053                sha256: sha256_hex,
1054                size,
1055                mime_type: "application/octet-stream".to_string(),
1056                uploaded: now,
1057            });
1058
1059            let blobs_json = serde_json::to_vec(&blobs)?;
1060            self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
1061        }
1062
1063        wtxn.commit()?;
1064        Ok(())
1065    }
1066
1067    /// Check if a pubkey owns a blob
1068    pub fn is_blob_owner(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<bool> {
1069        let key = Self::blob_owner_key(sha256, pubkey);
1070        let rtxn = self.env.read_txn()?;
1071        Ok(self.blob_owners.get(&rtxn, &key[..])?.is_some())
1072    }
1073
1074    /// Get all owners (pubkeys) of a blob via prefix scan (returns raw bytes)
1075    pub fn get_blob_owners(&self, sha256: &[u8; 32]) -> Result<Vec<[u8; 32]>> {
1076        let rtxn = self.env.read_txn()?;
1077
1078        let mut owners = Vec::new();
1079        for item in self.blob_owners.prefix_iter(&rtxn, &sha256[..])? {
1080            let (key, _) = item?;
1081            if key.len() == 64 {
1082                // Extract pubkey from composite key (bytes 32-64)
1083                let mut pubkey = [0u8; 32];
1084                pubkey.copy_from_slice(&key[32..64]);
1085                owners.push(pubkey);
1086            }
1087        }
1088        Ok(owners)
1089    }
1090
1091    /// Check if blob has any owners
1092    pub fn blob_has_owners(&self, sha256: &[u8; 32]) -> Result<bool> {
1093        let rtxn = self.env.read_txn()?;
1094
1095        // Just check if any entry exists with this prefix
1096        for item in self.blob_owners.prefix_iter(&rtxn, &sha256[..])? {
1097            if item.is_ok() {
1098                return Ok(true);
1099            }
1100        }
1101        Ok(false)
1102    }
1103
1104    /// Get the first owner (pubkey) of a blob (for backwards compatibility)
1105    pub fn get_blob_owner(&self, sha256: &[u8; 32]) -> Result<Option<[u8; 32]>> {
1106        Ok(self.get_blob_owners(sha256)?.into_iter().next())
1107    }
1108
1109    /// Remove a user's ownership of a blossom blob
1110    /// Only deletes the actual blob when no owners remain
1111    /// Returns true if the blob was actually deleted (no owners left)
1112    pub fn delete_blossom_blob(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<bool> {
1113        let key = Self::blob_owner_key(sha256, pubkey);
1114        let mut wtxn = self.env.write_txn()?;
1115
1116        // Remove this pubkey's ownership entry
1117        self.blob_owners.delete(&mut wtxn, &key[..])?;
1118
1119        // Hex strings for logging and BlobMetadata (which stores sha256 as hex string)
1120        let sha256_hex = to_hex(sha256);
1121
1122        // Remove from pubkey's blob list
1123        if let Some(blobs_bytes) = self.pubkey_blobs.get(&wtxn, pubkey)? {
1124            if let Ok(mut blobs) = serde_json::from_slice::<Vec<BlobMetadata>>(blobs_bytes) {
1125                blobs.retain(|b| b.sha256 != sha256_hex);
1126                let blobs_json = serde_json::to_vec(&blobs)?;
1127                self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
1128            }
1129        }
1130
1131        // Check if any other owners remain (prefix scan)
1132        let mut has_other_owners = false;
1133        for item in self.blob_owners.prefix_iter(&wtxn, &sha256[..])? {
1134            if item.is_ok() {
1135                has_other_owners = true;
1136                break;
1137            }
1138        }
1139
1140        if has_other_owners {
1141            wtxn.commit()?;
1142            tracing::debug!(
1143                "Removed {} from blob {} owners, other owners remain",
1144                &to_hex(pubkey)[..8],
1145                &sha256_hex[..8]
1146            );
1147            return Ok(false);
1148        }
1149
1150        // No owners left - delete the blob completely
1151        tracing::info!(
1152            "All owners removed from blob {}, deleting",
1153            &sha256_hex[..8]
1154        );
1155
1156        // Delete raw blob (by content hash) - this deletes from S3 too
1157        let _ = self.router.delete_sync(sha256);
1158
1159        wtxn.commit()?;
1160        Ok(true)
1161    }
1162
1163    /// List all blobs owned by a pubkey (for Blossom /list endpoint)
1164    pub fn list_blobs_by_pubkey(
1165        &self,
1166        pubkey: &[u8; 32],
1167    ) -> Result<Vec<crate::server::blossom::BlobDescriptor>> {
1168        let rtxn = self.env.read_txn()?;
1169
1170        let blobs: Vec<BlobMetadata> = self
1171            .pubkey_blobs
1172            .get(&rtxn, pubkey)?
1173            .and_then(|b| serde_json::from_slice(b).ok())
1174            .unwrap_or_default();
1175
1176        Ok(blobs
1177            .into_iter()
1178            .map(|b| crate::server::blossom::BlobDescriptor {
1179                url: format!("/{}", b.sha256),
1180                sha256: b.sha256,
1181                size: b.size,
1182                mime_type: b.mime_type,
1183                uploaded: b.uploaded,
1184            })
1185            .collect())
1186    }
1187
1188    /// Get a single chunk/blob by hash (raw bytes)
1189    pub fn get_chunk(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
1190        self.router
1191            .get_sync(hash)
1192            .map_err(|e| anyhow::anyhow!("Failed to get chunk: {}", e))
1193    }
1194
1195    /// Get file content by hash (raw bytes)
1196    /// Returns raw bytes (caller handles decryption if needed)
1197    pub fn get_file(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
1198        let store = self.store_arc();
1199        let tree = HashTree::new(HashTreeConfig::new(store).public());
1200
1201        sync_block_on(async {
1202            tree.read_file(hash)
1203                .await
1204                .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
1205        })
1206    }
1207
1208    /// Get file content by Cid (hash + optional decryption key as raw bytes)
1209    /// Handles decryption automatically if key is present
1210    pub fn get_file_by_cid(&self, cid: &Cid) -> Result<Option<Vec<u8>>> {
1211        let store = self.store_arc();
1212        let tree = HashTree::new(HashTreeConfig::new(store).public());
1213
1214        sync_block_on(async {
1215            tree.get(cid, None)
1216                .await
1217                .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
1218        })
1219    }
1220
1221    fn ensure_cid_exists(&self, cid: &Cid) -> Result<()> {
1222        let exists = self
1223            .router
1224            .exists(&cid.hash)
1225            .map_err(|e| anyhow::anyhow!("Failed to check cid existence: {}", e))?;
1226        if !exists {
1227            anyhow::bail!("CID not found: {}", to_hex(&cid.hash));
1228        }
1229        Ok(())
1230    }
1231
1232    /// Stream file content identified by Cid into a writer without buffering full file in memory.
1233    pub fn write_file_by_cid_to_writer<W: Write>(&self, cid: &Cid, writer: &mut W) -> Result<u64> {
1234        self.ensure_cid_exists(cid)?;
1235
1236        let store = self.store_arc();
1237        let tree = HashTree::new(HashTreeConfig::new(store).public());
1238        let mut total_bytes = 0u64;
1239        let mut streamed_any_chunk = false;
1240
1241        sync_block_on(async {
1242            let mut stream = tree.get_stream(cid);
1243            while let Some(chunk) = stream.next().await {
1244                streamed_any_chunk = true;
1245                let chunk =
1246                    chunk.map_err(|e| anyhow::anyhow!("Failed to stream file chunk: {}", e))?;
1247                writer
1248                    .write_all(&chunk)
1249                    .map_err(|e| anyhow::anyhow!("Failed to write file chunk: {}", e))?;
1250                total_bytes += chunk.len() as u64;
1251            }
1252            Ok::<(), anyhow::Error>(())
1253        })?;
1254
1255        if !streamed_any_chunk {
1256            anyhow::bail!("CID not found: {}", to_hex(&cid.hash));
1257        }
1258
1259        writer
1260            .flush()
1261            .map_err(|e| anyhow::anyhow!("Failed to flush output: {}", e))?;
1262        Ok(total_bytes)
1263    }
1264
1265    /// Stream file content identified by Cid directly into a destination path.
1266    pub fn write_file_by_cid<P: AsRef<Path>>(&self, cid: &Cid, output_path: P) -> Result<u64> {
1267        self.ensure_cid_exists(cid)?;
1268
1269        let output_path = output_path.as_ref();
1270        if let Some(parent) = output_path.parent() {
1271            if !parent.as_os_str().is_empty() {
1272                std::fs::create_dir_all(parent).with_context(|| {
1273                    format!("Failed to create output directory {}", parent.display())
1274                })?;
1275            }
1276        }
1277
1278        let mut file = std::fs::File::create(output_path)
1279            .with_context(|| format!("Failed to create output file {}", output_path.display()))?;
1280        self.write_file_by_cid_to_writer(cid, &mut file)
1281    }
1282
1283    /// Stream a public (unencrypted) file by hash directly into a destination path.
1284    pub fn write_file<P: AsRef<Path>>(&self, hash: &[u8; 32], output_path: P) -> Result<u64> {
1285        self.write_file_by_cid(&Cid::public(*hash), output_path)
1286    }
1287
1288    /// Resolve a path within a tree (returns Cid with key if encrypted)
1289    pub fn resolve_path(&self, cid: &Cid, path: &str) -> Result<Option<Cid>> {
1290        let store = self.store_arc();
1291        let tree = HashTree::new(HashTreeConfig::new(store).public());
1292
1293        sync_block_on(async {
1294            tree.resolve_path(cid, path)
1295                .await
1296                .map_err(|e| anyhow::anyhow!("Failed to resolve path: {}", e))
1297        })
1298    }
1299
1300    /// Get chunk metadata for a file (chunk list, sizes, total size)
1301    pub fn get_file_chunk_metadata(&self, hash: &[u8; 32]) -> Result<Option<FileChunkMetadata>> {
1302        let store = self.store_arc();
1303        let tree = HashTree::new(HashTreeConfig::new(store.clone()).public());
1304
1305        sync_block_on(async {
1306            // First check if the hash exists in the store at all
1307            // (either as a blob or tree node)
1308            let exists = store
1309                .has(hash)
1310                .await
1311                .map_err(|e| anyhow::anyhow!("Failed to check existence: {}", e))?;
1312
1313            if !exists {
1314                return Ok(None);
1315            }
1316
1317            // Get total size
1318            let total_size = tree
1319                .get_size(hash)
1320                .await
1321                .map_err(|e| anyhow::anyhow!("Failed to get size: {}", e))?;
1322
1323            // Check if it's a tree (chunked) or blob
1324            let is_tree_node = tree
1325                .is_tree(hash)
1326                .await
1327                .map_err(|e| anyhow::anyhow!("Failed to check tree: {}", e))?;
1328
1329            if !is_tree_node {
1330                // Single blob, not chunked
1331                return Ok(Some(FileChunkMetadata {
1332                    total_size,
1333                    chunk_hashes: vec![],
1334                    chunk_sizes: vec![],
1335                    is_chunked: false,
1336                }));
1337            }
1338
1339            // Get tree node to extract chunk info
1340            let node = match tree
1341                .get_tree_node(hash)
1342                .await
1343                .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
1344            {
1345                Some(n) => n,
1346                None => return Ok(None),
1347            };
1348
1349            // Check if it's a directory (has named links)
1350            let is_directory = tree
1351                .is_directory(hash)
1352                .await
1353                .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1354
1355            if is_directory {
1356                return Ok(None); // Not a file
1357            }
1358
1359            // Extract chunk info from links
1360            let chunk_hashes: Vec<Hash> = node.links.iter().map(|l| l.hash).collect();
1361            let chunk_sizes: Vec<u64> = node.links.iter().map(|l| l.size).collect();
1362
1363            Ok(Some(FileChunkMetadata {
1364                total_size,
1365                chunk_hashes,
1366                chunk_sizes,
1367                is_chunked: !node.links.is_empty(),
1368            }))
1369        })
1370    }
1371
1372    /// Get byte range from file
1373    pub fn get_file_range(
1374        &self,
1375        hash: &[u8; 32],
1376        start: u64,
1377        end: Option<u64>,
1378    ) -> Result<Option<(Vec<u8>, u64)>> {
1379        let metadata = match self.get_file_chunk_metadata(hash)? {
1380            Some(m) => m,
1381            None => return Ok(None),
1382        };
1383
1384        if metadata.total_size == 0 {
1385            return Ok(Some((Vec::new(), 0)));
1386        }
1387
1388        if start >= metadata.total_size {
1389            return Ok(None);
1390        }
1391
1392        let end = end
1393            .unwrap_or(metadata.total_size - 1)
1394            .min(metadata.total_size - 1);
1395
1396        // For non-chunked files, load entire file
1397        if !metadata.is_chunked {
1398            let content = self.get_file(hash)?.unwrap_or_default();
1399            let range_content = if start < content.len() as u64 {
1400                content[start as usize..=(end as usize).min(content.len() - 1)].to_vec()
1401            } else {
1402                Vec::new()
1403            };
1404            return Ok(Some((range_content, metadata.total_size)));
1405        }
1406
1407        // For chunked files, load only needed chunks
1408        let mut result = Vec::new();
1409        let mut current_offset = 0u64;
1410
1411        for (i, chunk_hash) in metadata.chunk_hashes.iter().enumerate() {
1412            let chunk_size = metadata.chunk_sizes[i];
1413            let chunk_end = current_offset + chunk_size - 1;
1414
1415            // Check if this chunk overlaps with requested range
1416            if chunk_end >= start && current_offset <= end {
1417                let chunk_content = match self.get_chunk(chunk_hash)? {
1418                    Some(content) => content,
1419                    None => {
1420                        return Err(anyhow::anyhow!("Chunk {} not found", to_hex(chunk_hash)));
1421                    }
1422                };
1423
1424                let chunk_read_start = if current_offset >= start {
1425                    0
1426                } else {
1427                    (start - current_offset) as usize
1428                };
1429
1430                let chunk_read_end = if chunk_end <= end {
1431                    chunk_size as usize - 1
1432                } else {
1433                    (end - current_offset) as usize
1434                };
1435
1436                result.extend_from_slice(&chunk_content[chunk_read_start..=chunk_read_end]);
1437            }
1438
1439            current_offset += chunk_size;
1440
1441            if current_offset > end {
1442                break;
1443            }
1444        }
1445
1446        Ok(Some((result, metadata.total_size)))
1447    }
1448
1449    /// Stream file range as chunks using Arc for async/Send contexts
1450    pub fn stream_file_range_chunks_owned(
1451        self: Arc<Self>,
1452        hash: &[u8; 32],
1453        start: u64,
1454        end: u64,
1455    ) -> Result<Option<FileRangeChunksOwned>> {
1456        let metadata = match self.get_file_chunk_metadata(hash)? {
1457            Some(m) => m,
1458            None => return Ok(None),
1459        };
1460
1461        if metadata.total_size == 0 || start >= metadata.total_size {
1462            return Ok(None);
1463        }
1464
1465        let end = end.min(metadata.total_size - 1);
1466
1467        Ok(Some(FileRangeChunksOwned {
1468            store: self,
1469            metadata,
1470            start,
1471            end,
1472            current_chunk_idx: 0,
1473            current_offset: 0,
1474        }))
1475    }
1476
1477    /// Get directory structure by hash (raw bytes)
1478    pub fn get_directory_listing(&self, hash: &[u8; 32]) -> Result<Option<DirectoryListing>> {
1479        let store = self.store_arc();
1480        let tree = HashTree::new(HashTreeConfig::new(store).public());
1481
1482        sync_block_on(async {
1483            // Check if it's a directory
1484            let is_dir = tree
1485                .is_directory(hash)
1486                .await
1487                .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1488
1489            if !is_dir {
1490                return Ok(None);
1491            }
1492
1493            // Get directory entries (public Cid - no encryption key)
1494            let cid = hashtree_core::Cid::public(*hash);
1495            let tree_entries = tree
1496                .list_directory(&cid)
1497                .await
1498                .map_err(|e| anyhow::anyhow!("Failed to list directory: {}", e))?;
1499
1500            let entries: Vec<DirEntry> = tree_entries
1501                .into_iter()
1502                .map(|e| DirEntry {
1503                    name: e.name,
1504                    cid: to_hex(&e.hash),
1505                    is_directory: e.link_type.is_tree(),
1506                    size: e.size,
1507                })
1508                .collect();
1509
1510            Ok(Some(DirectoryListing {
1511                dir_name: String::new(),
1512                entries,
1513            }))
1514        })
1515    }
1516
1517    /// Get directory structure by CID, supporting encrypted directories.
1518    pub fn get_directory_listing_by_cid(&self, cid: &Cid) -> Result<Option<DirectoryListing>> {
1519        let store = self.store_arc();
1520        let tree = HashTree::new(HashTreeConfig::new(store).public());
1521        let cid = cid.clone();
1522
1523        sync_block_on(async {
1524            let is_dir = tree
1525                .is_dir(&cid)
1526                .await
1527                .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1528
1529            if !is_dir {
1530                return Ok(None);
1531            }
1532
1533            let tree_entries = tree
1534                .list_directory(&cid)
1535                .await
1536                .map_err(|e| anyhow::anyhow!("Failed to list directory: {}", e))?;
1537
1538            let entries: Vec<DirEntry> = tree_entries
1539                .into_iter()
1540                .map(|e| DirEntry {
1541                    name: e.name,
1542                    cid: Cid {
1543                        hash: e.hash,
1544                        key: e.key,
1545                    }
1546                    .to_string(),
1547                    is_directory: e.link_type.is_tree(),
1548                    size: e.size,
1549                })
1550                .collect();
1551
1552            Ok(Some(DirectoryListing {
1553                dir_name: String::new(),
1554                entries,
1555            }))
1556        })
1557    }
1558
1559    // === Cached roots ===
1560
1561    /// Persist a mutable published ref that should stay subscribed.
1562    pub fn add_pinned_ref(&self, key: &str) -> Result<()> {
1563        let mut wtxn = self.env.write_txn()?;
1564        self.pinned_refs.put(&mut wtxn, key, &())?;
1565        wtxn.commit()?;
1566        Ok(())
1567    }
1568
1569    /// Remove a mutable published ref from the live pinned set.
1570    pub fn remove_pinned_ref(&self, key: &str) -> Result<bool> {
1571        let mut wtxn = self.env.write_txn()?;
1572        let removed = self.pinned_refs.delete(&mut wtxn, key)?;
1573        wtxn.commit()?;
1574        Ok(removed)
1575    }
1576
1577    /// List mutable published refs that should stay subscribed.
1578    pub fn list_pinned_refs(&self) -> Result<Vec<String>> {
1579        let rtxn = self.env.read_txn()?;
1580        let mut refs = Vec::new();
1581
1582        for item in self.pinned_refs.iter(&rtxn)? {
1583            let (key, _) = item?;
1584            refs.push(key.to_string());
1585        }
1586
1587        refs.sort();
1588        Ok(refs)
1589    }
1590
1591    /// Persist an author whose published trees should stay mirrored.
1592    pub fn add_tracked_author(&self, npub: &str) -> Result<bool> {
1593        let mut wtxn = self.env.write_txn()?;
1594        let inserted = self.tracked_authors.get(&wtxn, npub)?.is_none();
1595        self.tracked_authors.put(&mut wtxn, npub, &())?;
1596        wtxn.commit()?;
1597        Ok(inserted)
1598    }
1599
1600    /// Remove an author from the continuous mirror set.
1601    pub fn remove_tracked_author(&self, npub: &str) -> Result<bool> {
1602        let mut wtxn = self.env.write_txn()?;
1603        let removed = self.tracked_authors.delete(&mut wtxn, npub)?;
1604        wtxn.commit()?;
1605        Ok(removed)
1606    }
1607
1608    /// List authors whose published trees should stay mirrored.
1609    pub fn list_tracked_authors(&self) -> Result<Vec<String>> {
1610        let rtxn = self.env.read_txn()?;
1611        let mut authors = Vec::new();
1612
1613        for item in self.tracked_authors.iter(&rtxn)? {
1614            let (npub, _) = item?;
1615            authors.push(npub.to_string());
1616        }
1617
1618        authors.sort();
1619        Ok(authors)
1620    }
1621
1622    /// Get cached root for a pubkey/tree_name pair
1623    pub fn get_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<Option<CachedRoot>> {
1624        let key = format!("{}/{}", pubkey_hex, tree_name);
1625        let rtxn = self.env.read_txn()?;
1626        if let Some(bytes) = self.cached_roots.get(&rtxn, &key)? {
1627            let root: CachedRoot = rmp_serde::from_slice(bytes)
1628                .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
1629            Ok(Some(root))
1630        } else {
1631            Ok(None)
1632        }
1633    }
1634
1635    /// Set cached root for a pubkey/tree_name pair
1636    pub fn set_cached_root(
1637        &self,
1638        pubkey_hex: &str,
1639        tree_name: &str,
1640        hash: &str,
1641        key: Option<&str>,
1642        visibility: &str,
1643        updated_at: u64,
1644    ) -> Result<()> {
1645        let db_key = format!("{}/{}", pubkey_hex, tree_name);
1646        let root = CachedRoot {
1647            hash: hash.to_string(),
1648            key: key.map(|k| k.to_string()),
1649            updated_at,
1650            visibility: visibility.to_string(),
1651        };
1652        let bytes = rmp_serde::to_vec(&root)
1653            .map_err(|e| anyhow::anyhow!("Failed to serialize CachedRoot: {}", e))?;
1654        let mut wtxn = self.env.write_txn()?;
1655        self.cached_roots.put(&mut wtxn, &db_key, &bytes)?;
1656        wtxn.commit()?;
1657        Ok(())
1658    }
1659
1660    /// List all cached roots for a pubkey
1661    pub fn list_cached_roots(&self, pubkey_hex: &str) -> Result<Vec<(String, CachedRoot)>> {
1662        let prefix = format!("{}/", pubkey_hex);
1663        let rtxn = self.env.read_txn()?;
1664        let mut results = Vec::new();
1665
1666        for item in self.cached_roots.iter(&rtxn)? {
1667            let (key, bytes) = item?;
1668            if key.starts_with(&prefix) {
1669                let tree_name = key.strip_prefix(&prefix).unwrap_or(key);
1670                let root: CachedRoot = rmp_serde::from_slice(bytes)
1671                    .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
1672                results.push((tree_name.to_string(), root));
1673            }
1674        }
1675
1676        Ok(results)
1677    }
1678
1679    /// Delete a cached root
1680    pub fn delete_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<bool> {
1681        let key = format!("{}/{}", pubkey_hex, tree_name);
1682        let mut wtxn = self.env.write_txn()?;
1683        let deleted = self.cached_roots.delete(&mut wtxn, &key)?;
1684        wtxn.commit()?;
1685        Ok(deleted)
1686    }
1687}
1688
1689fn is_map_full_store_error(err: &StoreError) -> bool {
1690    let message = err.to_string();
1691    message.contains("MDB_MAP_FULL") || message.contains("MapFull")
1692}
1693
1694#[derive(Debug, Clone)]
1695pub struct FileChunkMetadata {
1696    pub total_size: u64,
1697    pub chunk_hashes: Vec<Hash>,
1698    pub chunk_sizes: Vec<u64>,
1699    pub is_chunked: bool,
1700}
1701
1702/// Owned iterator for async streaming
1703pub struct FileRangeChunksOwned {
1704    store: Arc<HashtreeStore>,
1705    metadata: FileChunkMetadata,
1706    start: u64,
1707    end: u64,
1708    current_chunk_idx: usize,
1709    current_offset: u64,
1710}
1711
1712impl Iterator for FileRangeChunksOwned {
1713    type Item = Result<Vec<u8>>;
1714
1715    fn next(&mut self) -> Option<Self::Item> {
1716        if !self.metadata.is_chunked || self.current_chunk_idx >= self.metadata.chunk_hashes.len() {
1717            return None;
1718        }
1719
1720        if self.current_offset > self.end {
1721            return None;
1722        }
1723
1724        let chunk_hash = &self.metadata.chunk_hashes[self.current_chunk_idx];
1725        let chunk_size = self.metadata.chunk_sizes[self.current_chunk_idx];
1726        let chunk_end = self.current_offset + chunk_size - 1;
1727
1728        self.current_chunk_idx += 1;
1729
1730        if chunk_end < self.start || self.current_offset > self.end {
1731            self.current_offset += chunk_size;
1732            return self.next();
1733        }
1734
1735        let chunk_content = match self.store.get_chunk(chunk_hash) {
1736            Ok(Some(content)) => content,
1737            Ok(None) => {
1738                return Some(Err(anyhow::anyhow!(
1739                    "Chunk {} not found",
1740                    to_hex(chunk_hash)
1741                )));
1742            }
1743            Err(e) => {
1744                return Some(Err(e));
1745            }
1746        };
1747
1748        let chunk_read_start = if self.current_offset >= self.start {
1749            0
1750        } else {
1751            (self.start - self.current_offset) as usize
1752        };
1753
1754        let chunk_read_end = if chunk_end <= self.end {
1755            chunk_size as usize - 1
1756        } else {
1757            (self.end - self.current_offset) as usize
1758        };
1759
1760        let result = chunk_content[chunk_read_start..=chunk_read_end].to_vec();
1761        self.current_offset += chunk_size;
1762
1763        Some(Ok(result))
1764    }
1765}
1766
1767#[derive(Debug)]
1768pub struct GcStats {
1769    pub deleted_dags: usize,
1770    pub freed_bytes: u64,
1771}
1772
1773#[derive(Debug, Clone)]
1774pub struct DirEntry {
1775    pub name: String,
1776    pub cid: String,
1777    pub is_directory: bool,
1778    pub size: u64,
1779}
1780
1781#[derive(Debug, Clone)]
1782pub struct DirectoryListing {
1783    pub dir_name: String,
1784    pub entries: Vec<DirEntry>,
1785}
1786
1787/// Blob metadata for Blossom protocol
1788#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
1789pub struct BlobMetadata {
1790    pub sha256: String,
1791    pub size: u64,
1792    pub mime_type: String,
1793    pub uploaded: u64,
1794}
1795
1796// Implement ContentStore trait for WebRTC data exchange
1797impl crate::webrtc::ContentStore for HashtreeStore {
1798    fn get(&self, hash_hex: &str) -> Result<Option<Vec<u8>>> {
1799        let hash = from_hex(hash_hex).map_err(|e| anyhow::anyhow!("Invalid hash: {}", e))?;
1800        self.get_chunk(&hash)
1801    }
1802}
1803
1804#[cfg(test)]
1805mod tests {
1806    use super::*;
1807    #[cfg(feature = "lmdb")]
1808    use tempfile::TempDir;
1809
1810    #[cfg(feature = "lmdb")]
1811    #[test]
1812    fn hashtree_store_expands_blob_lmdb_map_size_to_storage_budget() -> Result<()> {
1813        let temp = TempDir::new()?;
1814        let requested = LMDB_BLOB_MIN_MAP_SIZE_BYTES + 64 * 1024 * 1024;
1815        let store = HashtreeStore::with_options_and_backend(
1816            temp.path(),
1817            None,
1818            requested,
1819            true,
1820            &StorageBackend::Lmdb,
1821        )?;
1822
1823        let map_size = match store.router.local.as_ref() {
1824            LocalStore::Lmdb(local) => local.map_size_bytes() as u64,
1825            LocalStore::Fs(_) => panic!("expected LMDB local store"),
1826        };
1827
1828        assert!(
1829            map_size >= requested,
1830            "expected blob LMDB map to grow to at least {requested} bytes, got {map_size}"
1831        );
1832
1833        drop(store);
1834        Ok(())
1835    }
1836
1837    #[cfg(feature = "lmdb")]
1838    #[test]
1839    fn local_store_can_override_lmdb_map_size() -> Result<()> {
1840        let temp = TempDir::new()?;
1841        let requested = 512 * 1024 * 1024u64;
1842        let store = LocalStore::new_with_lmdb_map_size(
1843            temp.path().join("lmdb-blobs"),
1844            &StorageBackend::Lmdb,
1845            Some(requested),
1846        )?;
1847
1848        let map_size = match store {
1849            LocalStore::Lmdb(local) => local.map_size_bytes() as u64,
1850            LocalStore::Fs(_) => panic!("expected LMDB local store"),
1851        };
1852
1853        assert!(
1854            map_size >= requested,
1855            "expected LMDB map to grow to at least {requested} bytes, got {map_size}"
1856        );
1857
1858        Ok(())
1859    }
1860
1861    #[cfg(feature = "lmdb")]
1862    #[test]
1863    fn lmdb_local_store_removes_stale_fs_blob_shard_dirs() -> Result<()> {
1864        let temp = TempDir::new()?;
1865        let path = temp.path().join("lmdb-blobs");
1866        std::fs::create_dir_all(path.join("aa"))?;
1867        std::fs::create_dir_all(path.join("b2"))?;
1868        std::fs::create_dir_all(path.join("keep-me"))?;
1869        std::fs::write(path.join("aa").join("blob.bin"), b"old fs shard")?;
1870        std::fs::write(path.join("b2").join("blob.bin"), b"old fs shard")?;
1871        std::fs::write(path.join("keep-me").join("note.txt"), b"keep")?;
1872
1873        let _store = LocalStore::new_with_lmdb_map_size(
1874            &path,
1875            &StorageBackend::Lmdb,
1876            Some(128 * 1024 * 1024),
1877        )?;
1878
1879        assert!(!path.join("aa").exists());
1880        assert!(!path.join("b2").exists());
1881        assert!(path.join("keep-me").exists());
1882        assert!(path.join("data.mdb").exists());
1883        assert!(path.join("lock.mdb").exists());
1884
1885        Ok(())
1886    }
1887
1888    #[cfg(feature = "lmdb")]
1889    #[test]
1890    fn replacing_tree_ref_unpins_and_unindexes_superseded_root() -> Result<()> {
1891        let temp = TempDir::new()?;
1892        let store = HashtreeStore::with_options_and_backend(
1893            temp.path(),
1894            None,
1895            LMDB_BLOB_MIN_MAP_SIZE_BYTES,
1896            true,
1897            &StorageBackend::Lmdb,
1898        )?;
1899
1900        let old_bytes = b"old published root";
1901        let new_bytes = b"new published root";
1902        let old_root = sha256(old_bytes);
1903        let new_root = sha256(new_bytes);
1904
1905        store.put_blob(old_bytes)?;
1906        store.pin(&old_root)?;
1907        store.index_tree(
1908            &old_root,
1909            "owner",
1910            Some("playlist"),
1911            PRIORITY_OWN,
1912            Some("npub1owner/playlist"),
1913        )?;
1914
1915        assert!(store.is_pinned(&old_root)?);
1916        assert!(store.get_tree_meta(&old_root)?.is_some());
1917
1918        store.put_blob(new_bytes)?;
1919        store.pin(&new_root)?;
1920        store.index_tree(
1921            &new_root,
1922            "owner",
1923            Some("playlist"),
1924            PRIORITY_OWN,
1925            Some("npub1owner/playlist"),
1926        )?;
1927
1928        assert!(
1929            !store.is_pinned(&old_root)?,
1930            "superseded root should be unpinned when ref is replaced"
1931        );
1932        assert!(
1933            store.get_tree_meta(&old_root)?.is_none(),
1934            "superseded root metadata should be removed when ref is replaced"
1935        );
1936        assert!(store.is_pinned(&new_root)?);
1937        assert!(store.get_tree_meta(&new_root)?.is_some());
1938
1939        Ok(())
1940    }
1941
1942    #[test]
1943    fn tracked_authors_round_trip_sorted_and_deduplicated() -> Result<()> {
1944        let temp = TempDir::new()?;
1945        let store = HashtreeStore::with_options(temp.path(), None, 1024 * 1024)?;
1946
1947        store
1948            .add_tracked_author("npub1zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzs9d3kk")?;
1949        store
1950            .add_tracked_author("npub1aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaqf5slm")?;
1951        store
1952            .add_tracked_author("npub1zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzs9d3kk")?;
1953
1954        assert_eq!(
1955            store.list_tracked_authors()?,
1956            vec![
1957                "npub1aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaqf5slm".to_string(),
1958                "npub1zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzs9d3kk".to_string(),
1959            ]
1960        );
1961        assert!(store.remove_tracked_author(
1962            "npub1aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaqf5slm"
1963        )?);
1964        assert!(!store.remove_tracked_author(
1965            "npub1bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbpqqqqq"
1966        )?);
1967        assert_eq!(
1968            store.list_tracked_authors()?,
1969            vec!["npub1zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzs9d3kk".to_string()]
1970        );
1971
1972        Ok(())
1973    }
1974
1975    #[cfg(feature = "s3")]
1976    #[test]
1977    fn async_store_s3_fallback_does_not_reenter_futures_executor() -> Result<()> {
1978        let temp = tempfile::TempDir::new()?;
1979        let local = Arc::new(LocalStore::new(
1980            temp.path().join("blobs"),
1981            &StorageBackend::Fs,
1982        )?);
1983
1984        let outcome = std::panic::catch_unwind(|| {
1985            sync_block_on(async {
1986                let aws_config = aws_config::from_env()
1987                    .region(aws_sdk_s3::config::Region::new("auto"))
1988                    .load()
1989                    .await;
1990                let s3_client = aws_sdk_s3::Client::from_conf(
1991                    aws_sdk_s3::config::Builder::from(&aws_config)
1992                        .endpoint_url("http://127.0.0.1:9")
1993                        .force_path_style(true)
1994                        .build(),
1995                );
1996
1997                let router = StorageRouter {
1998                    local,
1999                    s3_client: Some(s3_client),
2000                    s3_bucket: Some("test-bucket".to_string()),
2001                    s3_prefix: String::new(),
2002                    sync_tx: None,
2003                };
2004                let hash = [0u8; 32];
2005
2006                let _ = Store::has(&router, &hash).await;
2007                let _ = Store::get(&router, &hash).await;
2008            });
2009        });
2010
2011        assert!(
2012            outcome.is_ok(),
2013            "S3-backed async store methods should not panic inside futures::block_on"
2014        );
2015
2016        Ok(())
2017    }
2018}