Skip to main content

hashtree_cli/
storage.rs

1use anyhow::{Context, Result};
2use async_trait::async_trait;
3use futures::executor::block_on as sync_block_on;
4use futures::StreamExt;
5use hashtree_config::StorageBackend;
6use hashtree_core::store::{Store, StoreError};
7use hashtree_core::{
8    from_hex, sha256, to_hex, types::Hash, Cid, HashTree, HashTreeConfig, TreeNode,
9};
10use hashtree_fs::FsBlobStore;
11#[cfg(feature = "lmdb")]
12use hashtree_lmdb::LmdbBlobStore;
13use heed::types::*;
14use heed::{Database, EnvOpenOptions};
15use serde::{Deserialize, Serialize};
16use std::io::Write;
17use std::path::{Path, PathBuf};
18use std::sync::Arc;
19use std::time::{SystemTime, UNIX_EPOCH};
20
21mod upload;
22
23mod maintenance;
24mod retention;
25
26pub use maintenance::VerifyResult;
27pub use retention::{PinnedItem, StorageByPriority, StorageStats, TreeMeta};
28
29/// Priority levels for tree eviction
30pub const PRIORITY_OTHER: u8 = 64;
31pub const PRIORITY_FOLLOWED: u8 = 128;
32pub const PRIORITY_OWN: u8 = 255;
33const LMDB_MAX_READERS: u32 = 1024;
34#[cfg(feature = "lmdb")]
35const LMDB_BLOB_MIN_MAP_SIZE_BYTES: u64 = 10 * 1024 * 1024 * 1024;
36
37/// Cached root info from Nostr events.
38#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct CachedRoot {
40    /// Root hash (hex)
41    pub hash: String,
42    /// Optional decryption key (hex)
43    pub key: Option<String>,
44    /// Unix timestamp when this was cached (from event created_at)
45    pub updated_at: u64,
46    /// Visibility: "public", "link-visible", or "private"
47    pub visibility: String,
48}
49
50/// Storage statistics
51#[derive(Debug, Clone)]
52pub struct LocalStoreStats {
53    pub count: usize,
54    pub total_bytes: u64,
55}
56
57/// Local blob store - wraps either FsBlobStore or LmdbBlobStore
58pub enum LocalStore {
59    Fs(FsBlobStore),
60    #[cfg(feature = "lmdb")]
61    Lmdb(LmdbBlobStore),
62}
63
64#[cfg(feature = "lmdb")]
65fn is_fs_blob_shard_dir(path: &Path) -> bool {
66    path.file_name()
67        .and_then(|name| name.to_str())
68        .map(|name| name.len() == 2 && name.as_bytes().iter().all(u8::is_ascii_hexdigit))
69        .unwrap_or(false)
70}
71
72#[cfg(feature = "lmdb")]
73fn remove_stale_fs_blob_shards(path: &Path) -> Result<(), StoreError> {
74    let entries = std::fs::read_dir(path).map_err(StoreError::Io)?;
75    for entry in entries {
76        let entry = entry.map_err(StoreError::Io)?;
77        let entry_path = entry.path();
78        if entry_path.is_dir() && is_fs_blob_shard_dir(&entry_path) {
79            std::fs::remove_dir_all(&entry_path).map_err(StoreError::Io)?;
80            tracing::info!(
81                "Removed stale filesystem blob shard directory after LMDB cutover: {}",
82                entry_path.display()
83            );
84        }
85    }
86    Ok(())
87}
88
89impl LocalStore {
90    /// Create a new unbounded local store.
91    ///
92    /// Higher-level stores that need quota enforcement should manage eviction
93    /// above this layer so tree metadata, pins, and archival policies stay
94    /// coherent.
95    pub fn new<P: AsRef<Path>>(path: P, backend: &StorageBackend) -> Result<Self, StoreError> {
96        Self::new_unbounded(path, backend)
97    }
98
99    /// Create a new local store with an explicit LMDB map size when using the LMDB backend.
100    pub fn new_with_lmdb_map_size<P: AsRef<Path>>(
101        path: P,
102        backend: &StorageBackend,
103        _map_size_bytes: Option<u64>,
104    ) -> Result<Self, StoreError> {
105        match backend {
106            StorageBackend::Fs => Ok(LocalStore::Fs(FsBlobStore::new(path)?)),
107            #[cfg(feature = "lmdb")]
108            StorageBackend::Lmdb => match _map_size_bytes {
109                Some(map_size_bytes) => {
110                    std::fs::create_dir_all(path.as_ref()).map_err(StoreError::Io)?;
111                    remove_stale_fs_blob_shards(path.as_ref())?;
112                    let map_size = usize::try_from(map_size_bytes).map_err(|_| {
113                        StoreError::Other("LMDB map size exceeds usize".to_string())
114                    })?;
115                    Ok(LocalStore::Lmdb(LmdbBlobStore::with_map_size(
116                        path, map_size,
117                    )?))
118                }
119                None => {
120                    std::fs::create_dir_all(path.as_ref()).map_err(StoreError::Io)?;
121                    remove_stale_fs_blob_shards(path.as_ref())?;
122                    Ok(LocalStore::Lmdb(LmdbBlobStore::new(path)?))
123                }
124            },
125            #[cfg(not(feature = "lmdb"))]
126            StorageBackend::Lmdb => {
127                tracing::warn!(
128                    "LMDB backend requested but lmdb feature not enabled, using filesystem storage"
129                );
130                Ok(LocalStore::Fs(FsBlobStore::new(path)?))
131            }
132        }
133    }
134
135    /// Create a new unbounded local store for a specific backend.
136    pub fn new_unbounded<P: AsRef<Path>>(
137        path: P,
138        backend: &StorageBackend,
139    ) -> Result<Self, StoreError> {
140        Self::new_with_lmdb_map_size(path, backend, None)
141    }
142
143    pub fn backend(&self) -> StorageBackend {
144        match self {
145            LocalStore::Fs(_) => StorageBackend::Fs,
146            #[cfg(feature = "lmdb")]
147            LocalStore::Lmdb(_) => StorageBackend::Lmdb,
148        }
149    }
150
151    /// Sync put operation
152    pub fn put_sync(&self, hash: Hash, data: &[u8]) -> Result<bool, StoreError> {
153        match self {
154            LocalStore::Fs(store) => store.put_sync(hash, data),
155            #[cfg(feature = "lmdb")]
156            LocalStore::Lmdb(store) => store.put_sync(hash, data),
157        }
158    }
159
160    /// Sync batch put operation.
161    pub fn put_many_sync(&self, items: &[(Hash, Vec<u8>)]) -> Result<usize, StoreError> {
162        match self {
163            LocalStore::Fs(store) => {
164                let mut inserted = 0usize;
165                for (hash, data) in items {
166                    if store.put_sync(*hash, data.as_slice())? {
167                        inserted += 1;
168                    }
169                }
170                Ok(inserted)
171            }
172            #[cfg(feature = "lmdb")]
173            LocalStore::Lmdb(store) => store.put_many_sync(items),
174        }
175    }
176
177    /// Sync get operation
178    pub fn get_sync(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
179        match self {
180            LocalStore::Fs(store) => store.get_sync(hash),
181            #[cfg(feature = "lmdb")]
182            LocalStore::Lmdb(store) => store.get_sync(hash),
183        }
184    }
185
186    /// Check if hash exists
187    pub fn exists(&self, hash: &Hash) -> Result<bool, StoreError> {
188        match self {
189            LocalStore::Fs(store) => Ok(store.exists(hash)),
190            #[cfg(feature = "lmdb")]
191            LocalStore::Lmdb(store) => store.exists(hash),
192        }
193    }
194
195    /// Sync delete operation
196    pub fn delete_sync(&self, hash: &Hash) -> Result<bool, StoreError> {
197        match self {
198            LocalStore::Fs(store) => store.delete_sync(hash),
199            #[cfg(feature = "lmdb")]
200            LocalStore::Lmdb(store) => store.delete_sync(hash),
201        }
202    }
203
204    /// Get storage statistics
205    pub fn stats(&self) -> Result<LocalStoreStats, StoreError> {
206        match self {
207            LocalStore::Fs(store) => {
208                let stats = store.stats()?;
209                Ok(LocalStoreStats {
210                    count: stats.count,
211                    total_bytes: stats.total_bytes,
212                })
213            }
214            #[cfg(feature = "lmdb")]
215            LocalStore::Lmdb(store) => {
216                let stats = store.stats()?;
217                Ok(LocalStoreStats {
218                    count: stats.count,
219                    total_bytes: stats.total_bytes,
220                })
221            }
222        }
223    }
224
225    /// List all hashes in the store
226    pub fn list(&self) -> Result<Vec<Hash>, StoreError> {
227        match self {
228            LocalStore::Fs(store) => store.list(),
229            #[cfg(feature = "lmdb")]
230            LocalStore::Lmdb(store) => store.list(),
231        }
232    }
233}
234
235#[async_trait]
236impl Store for LocalStore {
237    async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
238        self.put_sync(hash, &data)
239    }
240
241    async fn put_many(&self, items: Vec<(Hash, Vec<u8>)>) -> Result<usize, StoreError> {
242        self.put_many_sync(&items)
243    }
244
245    async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
246        self.get_sync(hash)
247    }
248
249    async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
250        self.exists(hash)
251    }
252
253    async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
254        self.delete_sync(hash)
255    }
256}
257
258#[cfg(feature = "s3")]
259use tokio::sync::mpsc;
260
261use crate::config::S3Config;
262
263/// Message for background S3 sync
264#[cfg(feature = "s3")]
265enum S3SyncMessage {
266    Upload { hash: Hash, data: Vec<u8> },
267    Delete { hash: Hash },
268}
269
270/// Storage router - local store primary with optional S3 backup
271///
272/// Write path: local first (fast), then queue S3 upload (non-blocking)
273/// Read path: local first, fall back to S3 if miss
274pub struct StorageRouter {
275    /// Primary local store (always used)
276    local: Arc<LocalStore>,
277    /// Optional S3 client for backup
278    #[cfg(feature = "s3")]
279    s3_client: Option<aws_sdk_s3::Client>,
280    #[cfg(feature = "s3")]
281    s3_bucket: Option<String>,
282    #[cfg(feature = "s3")]
283    s3_prefix: String,
284    /// Channel to send uploads to background task
285    #[cfg(feature = "s3")]
286    sync_tx: Option<mpsc::UnboundedSender<S3SyncMessage>>,
287}
288
289impl StorageRouter {
290    /// Create router with local storage only
291    pub fn new(local: Arc<LocalStore>) -> Self {
292        Self {
293            local,
294            #[cfg(feature = "s3")]
295            s3_client: None,
296            #[cfg(feature = "s3")]
297            s3_bucket: None,
298            #[cfg(feature = "s3")]
299            s3_prefix: String::new(),
300            #[cfg(feature = "s3")]
301            sync_tx: None,
302        }
303    }
304
305    /// Create router with local storage + S3 backup
306    #[cfg(feature = "s3")]
307    pub async fn with_s3(local: Arc<LocalStore>, config: &S3Config) -> Result<Self, anyhow::Error> {
308        use aws_sdk_s3::Client as S3Client;
309
310        // Build AWS config
311        let mut aws_config_loader = aws_config::from_env();
312        aws_config_loader =
313            aws_config_loader.region(aws_sdk_s3::config::Region::new(config.region.clone()));
314        let aws_config = aws_config_loader.load().await;
315
316        // Build S3 client with custom endpoint
317        let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&aws_config);
318        s3_config_builder = s3_config_builder
319            .endpoint_url(&config.endpoint)
320            .force_path_style(true);
321
322        let s3_client = S3Client::from_conf(s3_config_builder.build());
323        let bucket = config.bucket.clone();
324        let prefix = config.prefix.clone().unwrap_or_default();
325
326        // Create background sync channel
327        let (sync_tx, mut sync_rx) = mpsc::unbounded_channel::<S3SyncMessage>();
328
329        // Spawn background sync task with bounded concurrent uploads
330        let sync_client = s3_client.clone();
331        let sync_bucket = bucket.clone();
332        let sync_prefix = prefix.clone();
333
334        tokio::spawn(async move {
335            use aws_sdk_s3::primitives::ByteStream;
336
337            tracing::info!("S3 background sync task started");
338
339            // Limit concurrent uploads to prevent overwhelming the runtime
340            let semaphore = std::sync::Arc::new(tokio::sync::Semaphore::new(32));
341            let client = std::sync::Arc::new(sync_client);
342            let bucket = std::sync::Arc::new(sync_bucket);
343            let prefix = std::sync::Arc::new(sync_prefix);
344
345            while let Some(msg) = sync_rx.recv().await {
346                let client = client.clone();
347                let bucket = bucket.clone();
348                let prefix = prefix.clone();
349                let semaphore = semaphore.clone();
350
351                // Spawn each upload with semaphore-bounded concurrency
352                tokio::spawn(async move {
353                    // Acquire permit before uploading
354                    let _permit = semaphore.acquire().await;
355
356                    match msg {
357                        S3SyncMessage::Upload { hash, data } => {
358                            let key = format!("{}{}.bin", prefix, to_hex(&hash));
359                            tracing::debug!("S3 uploading {} ({} bytes)", &key, data.len());
360
361                            match client
362                                .put_object()
363                                .bucket(bucket.as_str())
364                                .key(&key)
365                                .body(ByteStream::from(data))
366                                .send()
367                                .await
368                            {
369                                Ok(_) => tracing::debug!("S3 upload succeeded: {}", &key),
370                                Err(e) => tracing::error!("S3 upload failed {}: {}", &key, e),
371                            }
372                        }
373                        S3SyncMessage::Delete { hash } => {
374                            let key = format!("{}{}.bin", prefix, to_hex(&hash));
375                            tracing::debug!("S3 deleting {}", &key);
376
377                            if let Err(e) = client
378                                .delete_object()
379                                .bucket(bucket.as_str())
380                                .key(&key)
381                                .send()
382                                .await
383                            {
384                                tracing::error!("S3 delete failed {}: {}", &key, e);
385                            }
386                        }
387                    }
388                });
389            }
390        });
391
392        tracing::info!(
393            "S3 storage initialized: bucket={}, prefix={}",
394            bucket,
395            prefix
396        );
397
398        Ok(Self {
399            local,
400            s3_client: Some(s3_client),
401            s3_bucket: Some(bucket),
402            s3_prefix: prefix,
403            sync_tx: Some(sync_tx),
404        })
405    }
406
407    /// Store data - writes to LMDB, queues S3 upload in background
408    pub fn put_sync(&self, hash: Hash, data: &[u8]) -> Result<bool, StoreError> {
409        // Always write to local first
410        let is_new = self.local.put_sync(hash, data)?;
411
412        // Queue S3 upload if configured (non-blocking)
413        // Always upload to S3 (even if not new locally) to ensure S3 has all blobs
414        #[cfg(feature = "s3")]
415        if let Some(ref tx) = self.sync_tx {
416            tracing::info!(
417                "Queueing S3 upload for {} ({} bytes, is_new={})",
418                crate::storage::to_hex(&hash)[..16].to_string(),
419                data.len(),
420                is_new
421            );
422            if let Err(e) = tx.send(S3SyncMessage::Upload {
423                hash,
424                data: data.to_vec(),
425            }) {
426                tracing::error!("Failed to queue S3 upload: {}", e);
427            }
428        }
429
430        Ok(is_new)
431    }
432
433    /// Store multiple blobs with a single local batch write when supported.
434    pub fn put_many_sync(&self, items: &[(Hash, Vec<u8>)]) -> Result<usize, StoreError> {
435        let inserted = self.local.put_many_sync(items)?;
436
437        #[cfg(feature = "s3")]
438        if let Some(ref tx) = self.sync_tx {
439            for (hash, data) in items {
440                if let Err(e) = tx.send(S3SyncMessage::Upload {
441                    hash: *hash,
442                    data: data.clone(),
443                }) {
444                    tracing::error!("Failed to queue S3 upload: {}", e);
445                }
446            }
447        }
448
449        Ok(inserted)
450    }
451
452    /// Get data - tries LMDB first, falls back to S3
453    pub fn get_sync(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
454        // Try local first
455        if let Some(data) = self.local.get_sync(hash)? {
456            return Ok(Some(data));
457        }
458
459        // Fall back to S3 if configured
460        #[cfg(feature = "s3")]
461        if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
462            let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
463
464            match sync_block_on(async { client.get_object().bucket(bucket).key(&key).send().await })
465            {
466                Ok(output) => {
467                    if let Ok(body) = sync_block_on(output.body.collect()) {
468                        let data = body.into_bytes().to_vec();
469                        // Cache locally for future reads
470                        let _ = self.local.put_sync(*hash, &data);
471                        return Ok(Some(data));
472                    }
473                }
474                Err(e) => {
475                    let service_err = e.into_service_error();
476                    if !service_err.is_no_such_key() {
477                        tracing::warn!("S3 get failed: {}", service_err);
478                    }
479                }
480            }
481        }
482
483        Ok(None)
484    }
485
486    /// Check if hash exists
487    pub fn exists(&self, hash: &Hash) -> Result<bool, StoreError> {
488        // Check local first
489        if self.local.exists(hash)? {
490            return Ok(true);
491        }
492
493        // Check S3 if configured
494        #[cfg(feature = "s3")]
495        if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
496            let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
497
498            match sync_block_on(async {
499                client.head_object().bucket(bucket).key(&key).send().await
500            }) {
501                Ok(_) => return Ok(true),
502                Err(e) => {
503                    let service_err = e.into_service_error();
504                    if !service_err.is_not_found() {
505                        tracing::warn!("S3 head failed: {}", service_err);
506                    }
507                }
508            }
509        }
510
511        Ok(false)
512    }
513
514    /// Delete data from both local and S3 stores
515    pub fn delete_sync(&self, hash: &Hash) -> Result<bool, StoreError> {
516        let deleted = self.local.delete_sync(hash)?;
517
518        // Queue S3 delete if configured
519        #[cfg(feature = "s3")]
520        if let Some(ref tx) = self.sync_tx {
521            let _ = tx.send(S3SyncMessage::Delete { hash: *hash });
522        }
523
524        Ok(deleted)
525    }
526
527    /// Delete data from local store only (don't propagate to S3)
528    /// Used for eviction where we want to keep S3 as archive
529    pub fn delete_local_only(&self, hash: &Hash) -> Result<bool, StoreError> {
530        self.local.delete_sync(hash)
531    }
532
533    /// Get stats from local store
534    pub fn stats(&self) -> Result<LocalStoreStats, StoreError> {
535        self.local.stats()
536    }
537
538    /// List all hashes from local store
539    pub fn list(&self) -> Result<Vec<Hash>, StoreError> {
540        self.local.list()
541    }
542
543    /// Get the underlying local store for HashTree operations
544    pub fn local_store(&self) -> Arc<LocalStore> {
545        Arc::clone(&self.local)
546    }
547}
548
549// Implement async Store trait for StorageRouter so it can be used directly with HashTree
550// This ensures all writes go through S3 sync
551#[async_trait]
552impl Store for StorageRouter {
553    async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
554        self.put_sync(hash, &data)
555    }
556
557    async fn put_many(&self, items: Vec<(Hash, Vec<u8>)>) -> Result<usize, StoreError> {
558        self.put_many_sync(&items)
559    }
560
561    async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
562        self.get_sync(hash)
563    }
564
565    async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
566        self.exists(hash)
567    }
568
569    async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
570        self.delete_sync(hash)
571    }
572}
573
574pub struct HashtreeStore {
575    base_path: PathBuf,
576    env: heed::Env,
577    /// Set of pinned hashes (32-byte raw hashes, prevents garbage collection)
578    pins: Database<Bytes, Unit>,
579    /// Mutable published refs that should stay subscribed and keep following updates
580    pinned_refs: Database<Str, Unit>,
581    /// Blob ownership: sha256 (32 bytes) ++ pubkey (32 bytes) -> () (composite key for multi-owner)
582    blob_owners: Database<Bytes, Unit>,
583    /// Maps pubkey (32 bytes) -> blob metadata JSON (for blossom list)
584    pubkey_blobs: Database<Bytes, Bytes>,
585    /// Tree metadata for eviction: tree_root_hash (32 bytes) -> TreeMeta (msgpack)
586    tree_meta: Database<Bytes, Bytes>,
587    /// Blob-to-tree mapping: blob_hash ++ tree_hash (64 bytes) -> ()
588    blob_trees: Database<Bytes, Unit>,
589    /// Tree refs: "npub/path" -> tree_root_hash (32 bytes) - for replacing old versions
590    tree_refs: Database<Str, Bytes>,
591    /// Cached roots from Nostr: "pubkey_hex/tree_name" -> CachedRoot (msgpack)
592    cached_roots: Database<Str, Bytes>,
593    /// Storage router - handles LMDB + optional S3 (Arc for sharing with HashTree)
594    router: Arc<StorageRouter>,
595    /// Maximum storage size in bytes (from config)
596    max_size_bytes: u64,
597    /// Whether quota enforcement may delete local blobs not tracked by any indexed tree.
598    evict_orphans: bool,
599}
600
601impl HashtreeStore {
602    /// Create a new store with the configured local storage limit.
603    pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
604        let config = hashtree_config::Config::load_or_default();
605        let max_size_bytes = config
606            .storage
607            .max_size_gb
608            .saturating_mul(1024 * 1024 * 1024);
609        Self::with_options_and_backend(
610            path,
611            None,
612            max_size_bytes,
613            config.storage.evict_orphans,
614            &config.storage.backend,
615        )
616    }
617
618    /// Create a new store with an explicit local backend and size limit.
619    pub fn new_with_backend<P: AsRef<Path>>(
620        path: P,
621        backend: hashtree_config::StorageBackend,
622        max_size_bytes: u64,
623    ) -> Result<Self> {
624        Self::with_options_and_backend(path, None, max_size_bytes, true, &backend)
625    }
626
627    /// Create a new store with optional S3 backend and the configured local storage limit.
628    pub fn with_s3<P: AsRef<Path>>(path: P, s3_config: Option<&S3Config>) -> Result<Self> {
629        let config = hashtree_config::Config::load_or_default();
630        let max_size_bytes = config
631            .storage
632            .max_size_gb
633            .saturating_mul(1024 * 1024 * 1024);
634        Self::with_options_and_backend(
635            path,
636            s3_config,
637            max_size_bytes,
638            config.storage.evict_orphans,
639            &config.storage.backend,
640        )
641    }
642
643    /// Create a new store with optional S3 backend and custom size limit.
644    ///
645    /// The raw local blob backend remains unbounded. `HashtreeStore` enforces
646    /// `max_size_bytes` at the tree-management layer so eviction can honor pins,
647    /// orphan handling, and local-only eviction when S3 is used as archive.
648    pub fn with_options<P: AsRef<Path>>(
649        path: P,
650        s3_config: Option<&S3Config>,
651        max_size_bytes: u64,
652    ) -> Result<Self> {
653        let config = hashtree_config::Config::load_or_default();
654        Self::with_options_and_backend(
655            path,
656            s3_config,
657            max_size_bytes,
658            config.storage.evict_orphans,
659            &config.storage.backend,
660        )
661    }
662
663    fn with_options_and_backend<P: AsRef<Path>>(
664        path: P,
665        s3_config: Option<&S3Config>,
666        max_size_bytes: u64,
667        evict_orphans: bool,
668        backend: &hashtree_config::StorageBackend,
669    ) -> Result<Self> {
670        let path = path.as_ref();
671        std::fs::create_dir_all(path)?;
672
673        let env = unsafe {
674            EnvOpenOptions::new()
675                .map_size(10 * 1024 * 1024 * 1024) // 10GB virtual address space
676                .max_dbs(9) // pins, pinned_refs, blob_owners, pubkey_blobs, tree_meta, blob_trees, tree_refs, cached_roots, blobs
677                .max_readers(LMDB_MAX_READERS)
678                .open(path)?
679        };
680        let _ = env.clear_stale_readers();
681
682        let mut wtxn = env.write_txn()?;
683        let pins = env.create_database(&mut wtxn, Some("pins"))?;
684        let pinned_refs = env.create_database(&mut wtxn, Some("pinned_refs"))?;
685        let blob_owners = env.create_database(&mut wtxn, Some("blob_owners"))?;
686        let pubkey_blobs = env.create_database(&mut wtxn, Some("pubkey_blobs"))?;
687        let tree_meta = env.create_database(&mut wtxn, Some("tree_meta"))?;
688        let blob_trees = env.create_database(&mut wtxn, Some("blob_trees"))?;
689        let tree_refs = env.create_database(&mut wtxn, Some("tree_refs"))?;
690        let cached_roots = env.create_database(&mut wtxn, Some("cached_roots"))?;
691        wtxn.commit()?;
692
693        // Intentionally keep the raw blob backend unbounded here. HashtreeStore
694        // owns quota policy above this layer, where it can coordinate eviction
695        // with tree refs, blob ownership, pins, and S3 archival behavior.
696        let local_store = Arc::new(match backend {
697            hashtree_config::StorageBackend::Fs => LocalStore::Fs(
698                FsBlobStore::new(path.join("blobs"))
699                    .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
700            ),
701            #[cfg(feature = "lmdb")]
702            hashtree_config::StorageBackend::Lmdb => {
703                std::fs::create_dir_all(path.join("blobs"))?;
704                remove_stale_fs_blob_shards(&path.join("blobs"))
705                    .map_err(|e| anyhow::anyhow!("Failed to clean LMDB blob store path: {}", e))?;
706                let requested_map_size = max_size_bytes.max(LMDB_BLOB_MIN_MAP_SIZE_BYTES);
707                let map_size = usize::try_from(requested_map_size)
708                    .context("LMDB blob map size exceeds usize")?;
709                LocalStore::Lmdb(
710                    LmdbBlobStore::with_map_size(path.join("blobs"), map_size)
711                        .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
712                )
713            }
714            #[cfg(not(feature = "lmdb"))]
715            hashtree_config::StorageBackend::Lmdb => {
716                tracing::warn!(
717                    "LMDB backend requested but lmdb feature not enabled, using filesystem storage"
718                );
719                LocalStore::Fs(
720                    FsBlobStore::new(path.join("blobs"))
721                        .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
722                )
723            }
724        });
725
726        // Create storage router with optional S3
727        #[cfg(feature = "s3")]
728        let router = Arc::new(if let Some(s3_cfg) = s3_config {
729            tracing::info!(
730                "Initializing S3 storage backend: bucket={}, endpoint={}",
731                s3_cfg.bucket,
732                s3_cfg.endpoint
733            );
734
735            sync_block_on(async { StorageRouter::with_s3(local_store, s3_cfg).await })?
736        } else {
737            StorageRouter::new(local_store)
738        });
739
740        #[cfg(not(feature = "s3"))]
741        let router = Arc::new({
742            if s3_config.is_some() {
743                tracing::warn!(
744                    "S3 config provided but S3 feature not enabled. Using local storage only."
745                );
746            }
747            StorageRouter::new(local_store)
748        });
749
750        Ok(Self {
751            base_path: path.to_path_buf(),
752            env,
753            pins,
754            pinned_refs,
755            blob_owners,
756            pubkey_blobs,
757            tree_meta,
758            blob_trees,
759            tree_refs,
760            cached_roots,
761            router,
762            max_size_bytes,
763            evict_orphans,
764        })
765    }
766
767    pub fn base_path(&self) -> &Path {
768        &self.base_path
769    }
770
771    /// Get the storage router
772    pub fn router(&self) -> &StorageRouter {
773        &self.router
774    }
775
776    /// Get the storage router as Arc (for use with HashTree which needs Arc<dyn Store>)
777    /// All writes through this go to both LMDB and S3
778    pub fn store_arc(&self) -> Arc<StorageRouter> {
779        Arc::clone(&self.router)
780    }
781
782    /// Get tree node by hash (raw bytes)
783    pub fn get_tree_node(&self, hash: &[u8; 32]) -> Result<Option<TreeNode>> {
784        let store = self.store_arc();
785        let tree = HashTree::new(HashTreeConfig::new(store).public());
786
787        sync_block_on(async {
788            tree.get_tree_node(hash)
789                .await
790                .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))
791        })
792    }
793
794    /// Store a raw blob, returns SHA256 hash as hex.
795    pub fn put_blob(&self, data: &[u8]) -> Result<String> {
796        let hash = sha256(data);
797        self.router
798            .put_sync(hash, data)
799            .map_err(|e| anyhow::anyhow!("Failed to store blob: {}", e))?;
800        Ok(to_hex(&hash))
801    }
802
803    /// Store an opportunistically cached blob.
804    ///
805    /// Unlike durable `put_blob` writes, this path may evict disposable orphaned
806    /// blobs to make room under storage pressure. It intentionally avoids touching
807    /// indexed trees, social-graph roots, explicit pins, and owned Blossom blobs.
808    pub fn put_cached_blob(&self, data: &[u8]) -> Result<String> {
809        let hash = sha256(data);
810        if self
811            .router
812            .exists(&hash)
813            .map_err(|e| anyhow::anyhow!("Failed to check cached blob: {}", e))?
814        {
815            return Ok(to_hex(&hash));
816        }
817
818        let incoming_bytes = data.len() as u64;
819        let _ = self.make_room_for_cached_blob(incoming_bytes);
820
821        let mut retried_after_cleanup = false;
822        loop {
823            match self.router.put_sync(hash, data) {
824                Ok(_) => return Ok(to_hex(&hash)),
825                Err(err) if !retried_after_cleanup && is_map_full_store_error(&err) => {
826                    let freed = self.relieve_cached_blob_write_pressure(incoming_bytes)?;
827                    if freed == 0 {
828                        return Err(anyhow::anyhow!("Failed to store cached blob: {}", err));
829                    }
830                    retried_after_cleanup = true;
831                }
832                Err(err) => return Err(anyhow::anyhow!("Failed to store cached blob: {}", err)),
833            }
834        }
835    }
836
837    /// Get a raw blob by SHA256 hash (raw bytes).
838    pub fn get_blob(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
839        self.router
840            .get_sync(hash)
841            .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))
842    }
843
844    /// Check if a blob exists by SHA256 hash (raw bytes).
845    pub fn blob_exists(&self, hash: &[u8; 32]) -> Result<bool> {
846        self.router
847            .exists(hash)
848            .map_err(|e| anyhow::anyhow!("Failed to check blob: {}", e))
849    }
850
851    // === Blossom ownership tracking ===
852    // Uses composite key: sha256 (32 bytes) ++ pubkey (32 bytes) -> ()
853    // This allows efficient multi-owner tracking with O(1) lookups
854
855    /// Build composite key for blob_owners: sha256 ++ pubkey (64 bytes total)
856    fn blob_owner_key(sha256: &[u8; 32], pubkey: &[u8; 32]) -> [u8; 64] {
857        let mut key = [0u8; 64];
858        key[..32].copy_from_slice(sha256);
859        key[32..].copy_from_slice(pubkey);
860        key
861    }
862
863    /// Add an owner (pubkey) to a blob for Blossom protocol
864    /// Multiple users can own the same blob - it's only deleted when all owners remove it
865    pub fn set_blob_owner(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<()> {
866        let key = Self::blob_owner_key(sha256, pubkey);
867        let mut wtxn = self.env.write_txn()?;
868
869        // Add ownership entry (idempotent - put overwrites)
870        self.blob_owners.put(&mut wtxn, &key[..], &())?;
871
872        // Convert sha256 to hex for BlobMetadata (which stores sha256 as hex string)
873        let sha256_hex = to_hex(sha256);
874
875        // Get existing blobs for this pubkey (for /list endpoint)
876        let mut blobs: Vec<BlobMetadata> = self
877            .pubkey_blobs
878            .get(&wtxn, pubkey)?
879            .and_then(|b| serde_json::from_slice(b).ok())
880            .unwrap_or_default();
881
882        // Check if blob already exists for this pubkey
883        if !blobs.iter().any(|b| b.sha256 == sha256_hex) {
884            let now = SystemTime::now()
885                .duration_since(UNIX_EPOCH)
886                .unwrap()
887                .as_secs();
888
889            // Get size from raw blob
890            let size = self
891                .get_blob(sha256)?
892                .map(|data| data.len() as u64)
893                .unwrap_or(0);
894
895            blobs.push(BlobMetadata {
896                sha256: sha256_hex,
897                size,
898                mime_type: "application/octet-stream".to_string(),
899                uploaded: now,
900            });
901
902            let blobs_json = serde_json::to_vec(&blobs)?;
903            self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
904        }
905
906        wtxn.commit()?;
907        Ok(())
908    }
909
910    /// Check if a pubkey owns a blob
911    pub fn is_blob_owner(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<bool> {
912        let key = Self::blob_owner_key(sha256, pubkey);
913        let rtxn = self.env.read_txn()?;
914        Ok(self.blob_owners.get(&rtxn, &key[..])?.is_some())
915    }
916
917    /// Get all owners (pubkeys) of a blob via prefix scan (returns raw bytes)
918    pub fn get_blob_owners(&self, sha256: &[u8; 32]) -> Result<Vec<[u8; 32]>> {
919        let rtxn = self.env.read_txn()?;
920
921        let mut owners = Vec::new();
922        for item in self.blob_owners.prefix_iter(&rtxn, &sha256[..])? {
923            let (key, _) = item?;
924            if key.len() == 64 {
925                // Extract pubkey from composite key (bytes 32-64)
926                let mut pubkey = [0u8; 32];
927                pubkey.copy_from_slice(&key[32..64]);
928                owners.push(pubkey);
929            }
930        }
931        Ok(owners)
932    }
933
934    /// Check if blob has any owners
935    pub fn blob_has_owners(&self, sha256: &[u8; 32]) -> Result<bool> {
936        let rtxn = self.env.read_txn()?;
937
938        // Just check if any entry exists with this prefix
939        for item in self.blob_owners.prefix_iter(&rtxn, &sha256[..])? {
940            if item.is_ok() {
941                return Ok(true);
942            }
943        }
944        Ok(false)
945    }
946
947    /// Get the first owner (pubkey) of a blob (for backwards compatibility)
948    pub fn get_blob_owner(&self, sha256: &[u8; 32]) -> Result<Option<[u8; 32]>> {
949        Ok(self.get_blob_owners(sha256)?.into_iter().next())
950    }
951
952    /// Remove a user's ownership of a blossom blob
953    /// Only deletes the actual blob when no owners remain
954    /// Returns true if the blob was actually deleted (no owners left)
955    pub fn delete_blossom_blob(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<bool> {
956        let key = Self::blob_owner_key(sha256, pubkey);
957        let mut wtxn = self.env.write_txn()?;
958
959        // Remove this pubkey's ownership entry
960        self.blob_owners.delete(&mut wtxn, &key[..])?;
961
962        // Hex strings for logging and BlobMetadata (which stores sha256 as hex string)
963        let sha256_hex = to_hex(sha256);
964
965        // Remove from pubkey's blob list
966        if let Some(blobs_bytes) = self.pubkey_blobs.get(&wtxn, pubkey)? {
967            if let Ok(mut blobs) = serde_json::from_slice::<Vec<BlobMetadata>>(blobs_bytes) {
968                blobs.retain(|b| b.sha256 != sha256_hex);
969                let blobs_json = serde_json::to_vec(&blobs)?;
970                self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
971            }
972        }
973
974        // Check if any other owners remain (prefix scan)
975        let mut has_other_owners = false;
976        for item in self.blob_owners.prefix_iter(&wtxn, &sha256[..])? {
977            if item.is_ok() {
978                has_other_owners = true;
979                break;
980            }
981        }
982
983        if has_other_owners {
984            wtxn.commit()?;
985            tracing::debug!(
986                "Removed {} from blob {} owners, other owners remain",
987                &to_hex(pubkey)[..8],
988                &sha256_hex[..8]
989            );
990            return Ok(false);
991        }
992
993        // No owners left - delete the blob completely
994        tracing::info!(
995            "All owners removed from blob {}, deleting",
996            &sha256_hex[..8]
997        );
998
999        // Delete raw blob (by content hash) - this deletes from S3 too
1000        let _ = self.router.delete_sync(sha256);
1001
1002        wtxn.commit()?;
1003        Ok(true)
1004    }
1005
1006    /// List all blobs owned by a pubkey (for Blossom /list endpoint)
1007    pub fn list_blobs_by_pubkey(
1008        &self,
1009        pubkey: &[u8; 32],
1010    ) -> Result<Vec<crate::server::blossom::BlobDescriptor>> {
1011        let rtxn = self.env.read_txn()?;
1012
1013        let blobs: Vec<BlobMetadata> = self
1014            .pubkey_blobs
1015            .get(&rtxn, pubkey)?
1016            .and_then(|b| serde_json::from_slice(b).ok())
1017            .unwrap_or_default();
1018
1019        Ok(blobs
1020            .into_iter()
1021            .map(|b| crate::server::blossom::BlobDescriptor {
1022                url: format!("/{}", b.sha256),
1023                sha256: b.sha256,
1024                size: b.size,
1025                mime_type: b.mime_type,
1026                uploaded: b.uploaded,
1027            })
1028            .collect())
1029    }
1030
1031    /// Get a single chunk/blob by hash (raw bytes)
1032    pub fn get_chunk(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
1033        self.router
1034            .get_sync(hash)
1035            .map_err(|e| anyhow::anyhow!("Failed to get chunk: {}", e))
1036    }
1037
1038    /// Get file content by hash (raw bytes)
1039    /// Returns raw bytes (caller handles decryption if needed)
1040    pub fn get_file(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
1041        let store = self.store_arc();
1042        let tree = HashTree::new(HashTreeConfig::new(store).public());
1043
1044        sync_block_on(async {
1045            tree.read_file(hash)
1046                .await
1047                .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
1048        })
1049    }
1050
1051    /// Get file content by Cid (hash + optional decryption key as raw bytes)
1052    /// Handles decryption automatically if key is present
1053    pub fn get_file_by_cid(&self, cid: &Cid) -> Result<Option<Vec<u8>>> {
1054        let store = self.store_arc();
1055        let tree = HashTree::new(HashTreeConfig::new(store).public());
1056
1057        sync_block_on(async {
1058            tree.get(cid, None)
1059                .await
1060                .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
1061        })
1062    }
1063
1064    fn ensure_cid_exists(&self, cid: &Cid) -> Result<()> {
1065        let exists = self
1066            .router
1067            .exists(&cid.hash)
1068            .map_err(|e| anyhow::anyhow!("Failed to check cid existence: {}", e))?;
1069        if !exists {
1070            anyhow::bail!("CID not found: {}", to_hex(&cid.hash));
1071        }
1072        Ok(())
1073    }
1074
1075    /// Stream file content identified by Cid into a writer without buffering full file in memory.
1076    pub fn write_file_by_cid_to_writer<W: Write>(&self, cid: &Cid, writer: &mut W) -> Result<u64> {
1077        self.ensure_cid_exists(cid)?;
1078
1079        let store = self.store_arc();
1080        let tree = HashTree::new(HashTreeConfig::new(store).public());
1081        let mut total_bytes = 0u64;
1082        let mut streamed_any_chunk = false;
1083
1084        sync_block_on(async {
1085            let mut stream = tree.get_stream(cid);
1086            while let Some(chunk) = stream.next().await {
1087                streamed_any_chunk = true;
1088                let chunk =
1089                    chunk.map_err(|e| anyhow::anyhow!("Failed to stream file chunk: {}", e))?;
1090                writer
1091                    .write_all(&chunk)
1092                    .map_err(|e| anyhow::anyhow!("Failed to write file chunk: {}", e))?;
1093                total_bytes += chunk.len() as u64;
1094            }
1095            Ok::<(), anyhow::Error>(())
1096        })?;
1097
1098        if !streamed_any_chunk {
1099            anyhow::bail!("CID not found: {}", to_hex(&cid.hash));
1100        }
1101
1102        writer
1103            .flush()
1104            .map_err(|e| anyhow::anyhow!("Failed to flush output: {}", e))?;
1105        Ok(total_bytes)
1106    }
1107
1108    /// Stream file content identified by Cid directly into a destination path.
1109    pub fn write_file_by_cid<P: AsRef<Path>>(&self, cid: &Cid, output_path: P) -> Result<u64> {
1110        self.ensure_cid_exists(cid)?;
1111
1112        let output_path = output_path.as_ref();
1113        if let Some(parent) = output_path.parent() {
1114            if !parent.as_os_str().is_empty() {
1115                std::fs::create_dir_all(parent).with_context(|| {
1116                    format!("Failed to create output directory {}", parent.display())
1117                })?;
1118            }
1119        }
1120
1121        let mut file = std::fs::File::create(output_path)
1122            .with_context(|| format!("Failed to create output file {}", output_path.display()))?;
1123        self.write_file_by_cid_to_writer(cid, &mut file)
1124    }
1125
1126    /// Stream a public (unencrypted) file by hash directly into a destination path.
1127    pub fn write_file<P: AsRef<Path>>(&self, hash: &[u8; 32], output_path: P) -> Result<u64> {
1128        self.write_file_by_cid(&Cid::public(*hash), output_path)
1129    }
1130
1131    /// Resolve a path within a tree (returns Cid with key if encrypted)
1132    pub fn resolve_path(&self, cid: &Cid, path: &str) -> Result<Option<Cid>> {
1133        let store = self.store_arc();
1134        let tree = HashTree::new(HashTreeConfig::new(store).public());
1135
1136        sync_block_on(async {
1137            tree.resolve_path(cid, path)
1138                .await
1139                .map_err(|e| anyhow::anyhow!("Failed to resolve path: {}", e))
1140        })
1141    }
1142
1143    /// Get chunk metadata for a file (chunk list, sizes, total size)
1144    pub fn get_file_chunk_metadata(&self, hash: &[u8; 32]) -> Result<Option<FileChunkMetadata>> {
1145        let store = self.store_arc();
1146        let tree = HashTree::new(HashTreeConfig::new(store.clone()).public());
1147
1148        sync_block_on(async {
1149            // First check if the hash exists in the store at all
1150            // (either as a blob or tree node)
1151            let exists = store
1152                .has(hash)
1153                .await
1154                .map_err(|e| anyhow::anyhow!("Failed to check existence: {}", e))?;
1155
1156            if !exists {
1157                return Ok(None);
1158            }
1159
1160            // Get total size
1161            let total_size = tree
1162                .get_size(hash)
1163                .await
1164                .map_err(|e| anyhow::anyhow!("Failed to get size: {}", e))?;
1165
1166            // Check if it's a tree (chunked) or blob
1167            let is_tree_node = tree
1168                .is_tree(hash)
1169                .await
1170                .map_err(|e| anyhow::anyhow!("Failed to check tree: {}", e))?;
1171
1172            if !is_tree_node {
1173                // Single blob, not chunked
1174                return Ok(Some(FileChunkMetadata {
1175                    total_size,
1176                    chunk_hashes: vec![],
1177                    chunk_sizes: vec![],
1178                    is_chunked: false,
1179                }));
1180            }
1181
1182            // Get tree node to extract chunk info
1183            let node = match tree
1184                .get_tree_node(hash)
1185                .await
1186                .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
1187            {
1188                Some(n) => n,
1189                None => return Ok(None),
1190            };
1191
1192            // Check if it's a directory (has named links)
1193            let is_directory = tree
1194                .is_directory(hash)
1195                .await
1196                .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1197
1198            if is_directory {
1199                return Ok(None); // Not a file
1200            }
1201
1202            // Extract chunk info from links
1203            let chunk_hashes: Vec<Hash> = node.links.iter().map(|l| l.hash).collect();
1204            let chunk_sizes: Vec<u64> = node.links.iter().map(|l| l.size).collect();
1205
1206            Ok(Some(FileChunkMetadata {
1207                total_size,
1208                chunk_hashes,
1209                chunk_sizes,
1210                is_chunked: !node.links.is_empty(),
1211            }))
1212        })
1213    }
1214
1215    /// Get byte range from file
1216    pub fn get_file_range(
1217        &self,
1218        hash: &[u8; 32],
1219        start: u64,
1220        end: Option<u64>,
1221    ) -> Result<Option<(Vec<u8>, u64)>> {
1222        let metadata = match self.get_file_chunk_metadata(hash)? {
1223            Some(m) => m,
1224            None => return Ok(None),
1225        };
1226
1227        if metadata.total_size == 0 {
1228            return Ok(Some((Vec::new(), 0)));
1229        }
1230
1231        if start >= metadata.total_size {
1232            return Ok(None);
1233        }
1234
1235        let end = end
1236            .unwrap_or(metadata.total_size - 1)
1237            .min(metadata.total_size - 1);
1238
1239        // For non-chunked files, load entire file
1240        if !metadata.is_chunked {
1241            let content = self.get_file(hash)?.unwrap_or_default();
1242            let range_content = if start < content.len() as u64 {
1243                content[start as usize..=(end as usize).min(content.len() - 1)].to_vec()
1244            } else {
1245                Vec::new()
1246            };
1247            return Ok(Some((range_content, metadata.total_size)));
1248        }
1249
1250        // For chunked files, load only needed chunks
1251        let mut result = Vec::new();
1252        let mut current_offset = 0u64;
1253
1254        for (i, chunk_hash) in metadata.chunk_hashes.iter().enumerate() {
1255            let chunk_size = metadata.chunk_sizes[i];
1256            let chunk_end = current_offset + chunk_size - 1;
1257
1258            // Check if this chunk overlaps with requested range
1259            if chunk_end >= start && current_offset <= end {
1260                let chunk_content = match self.get_chunk(chunk_hash)? {
1261                    Some(content) => content,
1262                    None => {
1263                        return Err(anyhow::anyhow!("Chunk {} not found", to_hex(chunk_hash)));
1264                    }
1265                };
1266
1267                let chunk_read_start = if current_offset >= start {
1268                    0
1269                } else {
1270                    (start - current_offset) as usize
1271                };
1272
1273                let chunk_read_end = if chunk_end <= end {
1274                    chunk_size as usize - 1
1275                } else {
1276                    (end - current_offset) as usize
1277                };
1278
1279                result.extend_from_slice(&chunk_content[chunk_read_start..=chunk_read_end]);
1280            }
1281
1282            current_offset += chunk_size;
1283
1284            if current_offset > end {
1285                break;
1286            }
1287        }
1288
1289        Ok(Some((result, metadata.total_size)))
1290    }
1291
1292    /// Stream file range as chunks using Arc for async/Send contexts
1293    pub fn stream_file_range_chunks_owned(
1294        self: Arc<Self>,
1295        hash: &[u8; 32],
1296        start: u64,
1297        end: u64,
1298    ) -> Result<Option<FileRangeChunksOwned>> {
1299        let metadata = match self.get_file_chunk_metadata(hash)? {
1300            Some(m) => m,
1301            None => return Ok(None),
1302        };
1303
1304        if metadata.total_size == 0 || start >= metadata.total_size {
1305            return Ok(None);
1306        }
1307
1308        let end = end.min(metadata.total_size - 1);
1309
1310        Ok(Some(FileRangeChunksOwned {
1311            store: self,
1312            metadata,
1313            start,
1314            end,
1315            current_chunk_idx: 0,
1316            current_offset: 0,
1317        }))
1318    }
1319
1320    /// Get directory structure by hash (raw bytes)
1321    pub fn get_directory_listing(&self, hash: &[u8; 32]) -> Result<Option<DirectoryListing>> {
1322        let store = self.store_arc();
1323        let tree = HashTree::new(HashTreeConfig::new(store).public());
1324
1325        sync_block_on(async {
1326            // Check if it's a directory
1327            let is_dir = tree
1328                .is_directory(hash)
1329                .await
1330                .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1331
1332            if !is_dir {
1333                return Ok(None);
1334            }
1335
1336            // Get directory entries (public Cid - no encryption key)
1337            let cid = hashtree_core::Cid::public(*hash);
1338            let tree_entries = tree
1339                .list_directory(&cid)
1340                .await
1341                .map_err(|e| anyhow::anyhow!("Failed to list directory: {}", e))?;
1342
1343            let entries: Vec<DirEntry> = tree_entries
1344                .into_iter()
1345                .map(|e| DirEntry {
1346                    name: e.name,
1347                    cid: to_hex(&e.hash),
1348                    is_directory: e.link_type.is_tree(),
1349                    size: e.size,
1350                })
1351                .collect();
1352
1353            Ok(Some(DirectoryListing {
1354                dir_name: String::new(),
1355                entries,
1356            }))
1357        })
1358    }
1359
1360    /// Get directory structure by CID, supporting encrypted directories.
1361    pub fn get_directory_listing_by_cid(&self, cid: &Cid) -> Result<Option<DirectoryListing>> {
1362        let store = self.store_arc();
1363        let tree = HashTree::new(HashTreeConfig::new(store).public());
1364        let cid = cid.clone();
1365
1366        sync_block_on(async {
1367            let is_dir = tree
1368                .is_dir(&cid)
1369                .await
1370                .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1371
1372            if !is_dir {
1373                return Ok(None);
1374            }
1375
1376            let tree_entries = tree
1377                .list_directory(&cid)
1378                .await
1379                .map_err(|e| anyhow::anyhow!("Failed to list directory: {}", e))?;
1380
1381            let entries: Vec<DirEntry> = tree_entries
1382                .into_iter()
1383                .map(|e| DirEntry {
1384                    name: e.name,
1385                    cid: Cid {
1386                        hash: e.hash,
1387                        key: e.key,
1388                    }
1389                    .to_string(),
1390                    is_directory: e.link_type.is_tree(),
1391                    size: e.size,
1392                })
1393                .collect();
1394
1395            Ok(Some(DirectoryListing {
1396                dir_name: String::new(),
1397                entries,
1398            }))
1399        })
1400    }
1401
1402    // === Cached roots ===
1403
1404    /// Persist a mutable published ref that should stay subscribed.
1405    pub fn add_pinned_ref(&self, key: &str) -> Result<()> {
1406        let mut wtxn = self.env.write_txn()?;
1407        self.pinned_refs.put(&mut wtxn, key, &())?;
1408        wtxn.commit()?;
1409        Ok(())
1410    }
1411
1412    /// Remove a mutable published ref from the live pinned set.
1413    pub fn remove_pinned_ref(&self, key: &str) -> Result<bool> {
1414        let mut wtxn = self.env.write_txn()?;
1415        let removed = self.pinned_refs.delete(&mut wtxn, key)?;
1416        wtxn.commit()?;
1417        Ok(removed)
1418    }
1419
1420    /// List mutable published refs that should stay subscribed.
1421    pub fn list_pinned_refs(&self) -> Result<Vec<String>> {
1422        let rtxn = self.env.read_txn()?;
1423        let mut refs = Vec::new();
1424
1425        for item in self.pinned_refs.iter(&rtxn)? {
1426            let (key, _) = item?;
1427            refs.push(key.to_string());
1428        }
1429
1430        refs.sort();
1431        Ok(refs)
1432    }
1433
1434    /// Get cached root for a pubkey/tree_name pair
1435    pub fn get_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<Option<CachedRoot>> {
1436        let key = format!("{}/{}", pubkey_hex, tree_name);
1437        let rtxn = self.env.read_txn()?;
1438        if let Some(bytes) = self.cached_roots.get(&rtxn, &key)? {
1439            let root: CachedRoot = rmp_serde::from_slice(bytes)
1440                .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
1441            Ok(Some(root))
1442        } else {
1443            Ok(None)
1444        }
1445    }
1446
1447    /// Set cached root for a pubkey/tree_name pair
1448    pub fn set_cached_root(
1449        &self,
1450        pubkey_hex: &str,
1451        tree_name: &str,
1452        hash: &str,
1453        key: Option<&str>,
1454        visibility: &str,
1455        updated_at: u64,
1456    ) -> Result<()> {
1457        let db_key = format!("{}/{}", pubkey_hex, tree_name);
1458        let root = CachedRoot {
1459            hash: hash.to_string(),
1460            key: key.map(|k| k.to_string()),
1461            updated_at,
1462            visibility: visibility.to_string(),
1463        };
1464        let bytes = rmp_serde::to_vec(&root)
1465            .map_err(|e| anyhow::anyhow!("Failed to serialize CachedRoot: {}", e))?;
1466        let mut wtxn = self.env.write_txn()?;
1467        self.cached_roots.put(&mut wtxn, &db_key, &bytes)?;
1468        wtxn.commit()?;
1469        Ok(())
1470    }
1471
1472    /// List all cached roots for a pubkey
1473    pub fn list_cached_roots(&self, pubkey_hex: &str) -> Result<Vec<(String, CachedRoot)>> {
1474        let prefix = format!("{}/", pubkey_hex);
1475        let rtxn = self.env.read_txn()?;
1476        let mut results = Vec::new();
1477
1478        for item in self.cached_roots.iter(&rtxn)? {
1479            let (key, bytes) = item?;
1480            if key.starts_with(&prefix) {
1481                let tree_name = key.strip_prefix(&prefix).unwrap_or(key);
1482                let root: CachedRoot = rmp_serde::from_slice(bytes)
1483                    .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
1484                results.push((tree_name.to_string(), root));
1485            }
1486        }
1487
1488        Ok(results)
1489    }
1490
1491    /// Delete a cached root
1492    pub fn delete_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<bool> {
1493        let key = format!("{}/{}", pubkey_hex, tree_name);
1494        let mut wtxn = self.env.write_txn()?;
1495        let deleted = self.cached_roots.delete(&mut wtxn, &key)?;
1496        wtxn.commit()?;
1497        Ok(deleted)
1498    }
1499}
1500
1501fn is_map_full_store_error(err: &StoreError) -> bool {
1502    let message = err.to_string();
1503    message.contains("MDB_MAP_FULL") || message.contains("MapFull")
1504}
1505
1506#[derive(Debug, Clone)]
1507pub struct FileChunkMetadata {
1508    pub total_size: u64,
1509    pub chunk_hashes: Vec<Hash>,
1510    pub chunk_sizes: Vec<u64>,
1511    pub is_chunked: bool,
1512}
1513
1514/// Owned iterator for async streaming
1515pub struct FileRangeChunksOwned {
1516    store: Arc<HashtreeStore>,
1517    metadata: FileChunkMetadata,
1518    start: u64,
1519    end: u64,
1520    current_chunk_idx: usize,
1521    current_offset: u64,
1522}
1523
1524impl Iterator for FileRangeChunksOwned {
1525    type Item = Result<Vec<u8>>;
1526
1527    fn next(&mut self) -> Option<Self::Item> {
1528        if !self.metadata.is_chunked || self.current_chunk_idx >= self.metadata.chunk_hashes.len() {
1529            return None;
1530        }
1531
1532        if self.current_offset > self.end {
1533            return None;
1534        }
1535
1536        let chunk_hash = &self.metadata.chunk_hashes[self.current_chunk_idx];
1537        let chunk_size = self.metadata.chunk_sizes[self.current_chunk_idx];
1538        let chunk_end = self.current_offset + chunk_size - 1;
1539
1540        self.current_chunk_idx += 1;
1541
1542        if chunk_end < self.start || self.current_offset > self.end {
1543            self.current_offset += chunk_size;
1544            return self.next();
1545        }
1546
1547        let chunk_content = match self.store.get_chunk(chunk_hash) {
1548            Ok(Some(content)) => content,
1549            Ok(None) => {
1550                return Some(Err(anyhow::anyhow!(
1551                    "Chunk {} not found",
1552                    to_hex(chunk_hash)
1553                )));
1554            }
1555            Err(e) => {
1556                return Some(Err(e));
1557            }
1558        };
1559
1560        let chunk_read_start = if self.current_offset >= self.start {
1561            0
1562        } else {
1563            (self.start - self.current_offset) as usize
1564        };
1565
1566        let chunk_read_end = if chunk_end <= self.end {
1567            chunk_size as usize - 1
1568        } else {
1569            (self.end - self.current_offset) as usize
1570        };
1571
1572        let result = chunk_content[chunk_read_start..=chunk_read_end].to_vec();
1573        self.current_offset += chunk_size;
1574
1575        Some(Ok(result))
1576    }
1577}
1578
1579#[derive(Debug)]
1580pub struct GcStats {
1581    pub deleted_dags: usize,
1582    pub freed_bytes: u64,
1583}
1584
1585#[derive(Debug, Clone)]
1586pub struct DirEntry {
1587    pub name: String,
1588    pub cid: String,
1589    pub is_directory: bool,
1590    pub size: u64,
1591}
1592
1593#[derive(Debug, Clone)]
1594pub struct DirectoryListing {
1595    pub dir_name: String,
1596    pub entries: Vec<DirEntry>,
1597}
1598
1599/// Blob metadata for Blossom protocol
1600#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
1601pub struct BlobMetadata {
1602    pub sha256: String,
1603    pub size: u64,
1604    pub mime_type: String,
1605    pub uploaded: u64,
1606}
1607
1608// Implement ContentStore trait for WebRTC data exchange
1609impl crate::webrtc::ContentStore for HashtreeStore {
1610    fn get(&self, hash_hex: &str) -> Result<Option<Vec<u8>>> {
1611        let hash = from_hex(hash_hex).map_err(|e| anyhow::anyhow!("Invalid hash: {}", e))?;
1612        self.get_chunk(&hash)
1613    }
1614}
1615
1616#[cfg(test)]
1617mod tests {
1618    #[cfg(feature = "lmdb")]
1619    use super::*;
1620    #[cfg(feature = "lmdb")]
1621    use tempfile::TempDir;
1622
1623    #[cfg(feature = "lmdb")]
1624    #[test]
1625    fn hashtree_store_expands_blob_lmdb_map_size_to_storage_budget() -> Result<()> {
1626        let temp = TempDir::new()?;
1627        let requested = LMDB_BLOB_MIN_MAP_SIZE_BYTES + 64 * 1024 * 1024;
1628        let store = HashtreeStore::with_options_and_backend(
1629            temp.path(),
1630            None,
1631            requested,
1632            true,
1633            &StorageBackend::Lmdb,
1634        )?;
1635
1636        let map_size = match store.router.local.as_ref() {
1637            LocalStore::Lmdb(local) => local.map_size_bytes() as u64,
1638            LocalStore::Fs(_) => panic!("expected LMDB local store"),
1639        };
1640
1641        assert!(
1642            map_size >= requested,
1643            "expected blob LMDB map to grow to at least {requested} bytes, got {map_size}"
1644        );
1645
1646        drop(store);
1647        Ok(())
1648    }
1649
1650    #[cfg(feature = "lmdb")]
1651    #[test]
1652    fn local_store_can_override_lmdb_map_size() -> Result<()> {
1653        let temp = TempDir::new()?;
1654        let requested = 512 * 1024 * 1024u64;
1655        let store = LocalStore::new_with_lmdb_map_size(
1656            temp.path().join("lmdb-blobs"),
1657            &StorageBackend::Lmdb,
1658            Some(requested),
1659        )?;
1660
1661        let map_size = match store {
1662            LocalStore::Lmdb(local) => local.map_size_bytes() as u64,
1663            LocalStore::Fs(_) => panic!("expected LMDB local store"),
1664        };
1665
1666        assert!(
1667            map_size >= requested,
1668            "expected LMDB map to grow to at least {requested} bytes, got {map_size}"
1669        );
1670
1671        Ok(())
1672    }
1673
1674    #[cfg(feature = "lmdb")]
1675    #[test]
1676    fn lmdb_local_store_removes_stale_fs_blob_shard_dirs() -> Result<()> {
1677        let temp = TempDir::new()?;
1678        let path = temp.path().join("lmdb-blobs");
1679        std::fs::create_dir_all(path.join("aa"))?;
1680        std::fs::create_dir_all(path.join("b2"))?;
1681        std::fs::create_dir_all(path.join("keep-me"))?;
1682        std::fs::write(path.join("aa").join("blob.bin"), b"old fs shard")?;
1683        std::fs::write(path.join("b2").join("blob.bin"), b"old fs shard")?;
1684        std::fs::write(path.join("keep-me").join("note.txt"), b"keep")?;
1685
1686        let _store = LocalStore::new_with_lmdb_map_size(
1687            &path,
1688            &StorageBackend::Lmdb,
1689            Some(128 * 1024 * 1024),
1690        )?;
1691
1692        assert!(!path.join("aa").exists());
1693        assert!(!path.join("b2").exists());
1694        assert!(path.join("keep-me").exists());
1695        assert!(path.join("data.mdb").exists());
1696        assert!(path.join("lock.mdb").exists());
1697
1698        Ok(())
1699    }
1700
1701    #[cfg(feature = "lmdb")]
1702    #[test]
1703    fn replacing_tree_ref_unpins_and_unindexes_superseded_root() -> Result<()> {
1704        let temp = TempDir::new()?;
1705        let store = HashtreeStore::with_options_and_backend(
1706            temp.path(),
1707            None,
1708            LMDB_BLOB_MIN_MAP_SIZE_BYTES,
1709            true,
1710            &StorageBackend::Lmdb,
1711        )?;
1712
1713        let old_bytes = b"old published root";
1714        let new_bytes = b"new published root";
1715        let old_root = sha256(old_bytes);
1716        let new_root = sha256(new_bytes);
1717
1718        store.put_blob(old_bytes)?;
1719        store.pin(&old_root)?;
1720        store.index_tree(
1721            &old_root,
1722            "owner",
1723            Some("playlist"),
1724            PRIORITY_OWN,
1725            Some("npub1owner/playlist"),
1726        )?;
1727
1728        assert!(store.is_pinned(&old_root)?);
1729        assert!(store.get_tree_meta(&old_root)?.is_some());
1730
1731        store.put_blob(new_bytes)?;
1732        store.pin(&new_root)?;
1733        store.index_tree(
1734            &new_root,
1735            "owner",
1736            Some("playlist"),
1737            PRIORITY_OWN,
1738            Some("npub1owner/playlist"),
1739        )?;
1740
1741        assert!(
1742            !store.is_pinned(&old_root)?,
1743            "superseded root should be unpinned when ref is replaced"
1744        );
1745        assert!(
1746            store.get_tree_meta(&old_root)?.is_none(),
1747            "superseded root metadata should be removed when ref is replaced"
1748        );
1749        assert!(store.is_pinned(&new_root)?);
1750        assert!(store.get_tree_meta(&new_root)?.is_some());
1751
1752        Ok(())
1753    }
1754}