Skip to main content

hashtree_cli/
storage.rs

1use anyhow::{Context, Result};
2use async_trait::async_trait;
3use futures::executor::block_on as sync_block_on;
4use futures::StreamExt;
5use hashtree_config::StorageBackend;
6use hashtree_core::store::{Store, StoreError};
7use hashtree_core::{
8    from_hex, sha256, to_hex, types::Hash, Cid, HashTree, HashTreeConfig, TreeNode,
9};
10use hashtree_fs::FsBlobStore;
11#[cfg(feature = "lmdb")]
12use hashtree_lmdb::LmdbBlobStore;
13use heed::types::*;
14use heed::{Database, EnvOpenOptions};
15use serde::{Deserialize, Serialize};
16use std::io::Write;
17use std::path::Path;
18use std::sync::Arc;
19use std::time::{SystemTime, UNIX_EPOCH};
20
21mod upload;
22
23mod maintenance;
24mod retention;
25
26pub use maintenance::VerifyResult;
27pub use retention::{PinnedItem, StorageByPriority, StorageStats, TreeMeta};
28
29/// Priority levels for tree eviction
30pub const PRIORITY_OTHER: u8 = 64;
31pub const PRIORITY_FOLLOWED: u8 = 128;
32pub const PRIORITY_OWN: u8 = 255;
33const LMDB_MAX_READERS: u32 = 1024;
34#[cfg(feature = "lmdb")]
35const LMDB_BLOB_MIN_MAP_SIZE_BYTES: u64 = 10 * 1024 * 1024 * 1024;
36
37/// Cached root info from Nostr events.
38#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct CachedRoot {
40    /// Root hash (hex)
41    pub hash: String,
42    /// Optional decryption key (hex)
43    pub key: Option<String>,
44    /// Unix timestamp when this was cached (from event created_at)
45    pub updated_at: u64,
46    /// Visibility: "public", "link-visible", or "private"
47    pub visibility: String,
48}
49
50/// Storage statistics
51#[derive(Debug, Clone)]
52pub struct LocalStoreStats {
53    pub count: usize,
54    pub total_bytes: u64,
55}
56
57/// Local blob store - wraps either FsBlobStore or LmdbBlobStore
58pub enum LocalStore {
59    Fs(FsBlobStore),
60    #[cfg(feature = "lmdb")]
61    Lmdb(LmdbBlobStore),
62}
63
64impl LocalStore {
65    /// Create a new unbounded local store.
66    ///
67    /// Higher-level stores that need quota enforcement should manage eviction
68    /// above this layer so tree metadata, pins, and archival policies stay
69    /// coherent.
70    pub fn new<P: AsRef<Path>>(path: P, backend: &StorageBackend) -> Result<Self, StoreError> {
71        Self::new_unbounded(path, backend)
72    }
73
74    /// Create a new local store with an explicit LMDB map size when using the LMDB backend.
75    pub fn new_with_lmdb_map_size<P: AsRef<Path>>(
76        path: P,
77        backend: &StorageBackend,
78        _map_size_bytes: Option<u64>,
79    ) -> Result<Self, StoreError> {
80        match backend {
81            StorageBackend::Fs => Ok(LocalStore::Fs(FsBlobStore::new(path)?)),
82            #[cfg(feature = "lmdb")]
83            StorageBackend::Lmdb => match _map_size_bytes {
84                Some(map_size_bytes) => {
85                    let map_size = usize::try_from(map_size_bytes).map_err(|_| {
86                        StoreError::Other("LMDB map size exceeds usize".to_string())
87                    })?;
88                    Ok(LocalStore::Lmdb(LmdbBlobStore::with_map_size(
89                        path, map_size,
90                    )?))
91                }
92                None => Ok(LocalStore::Lmdb(LmdbBlobStore::new(path)?)),
93            },
94            #[cfg(not(feature = "lmdb"))]
95            StorageBackend::Lmdb => {
96                tracing::warn!(
97                    "LMDB backend requested but lmdb feature not enabled, using filesystem storage"
98                );
99                Ok(LocalStore::Fs(FsBlobStore::new(path)?))
100            }
101        }
102    }
103
104    /// Create a new unbounded local store for a specific backend.
105    pub fn new_unbounded<P: AsRef<Path>>(
106        path: P,
107        backend: &StorageBackend,
108    ) -> Result<Self, StoreError> {
109        Self::new_with_lmdb_map_size(path, backend, None)
110    }
111
112    pub fn backend(&self) -> StorageBackend {
113        match self {
114            LocalStore::Fs(_) => StorageBackend::Fs,
115            #[cfg(feature = "lmdb")]
116            LocalStore::Lmdb(_) => StorageBackend::Lmdb,
117        }
118    }
119
120    /// Sync put operation
121    pub fn put_sync(&self, hash: Hash, data: &[u8]) -> Result<bool, StoreError> {
122        match self {
123            LocalStore::Fs(store) => store.put_sync(hash, data),
124            #[cfg(feature = "lmdb")]
125            LocalStore::Lmdb(store) => store.put_sync(hash, data),
126        }
127    }
128
129    /// Sync batch put operation.
130    pub fn put_many_sync(&self, items: &[(Hash, Vec<u8>)]) -> Result<usize, StoreError> {
131        match self {
132            LocalStore::Fs(store) => {
133                let mut inserted = 0usize;
134                for (hash, data) in items {
135                    if store.put_sync(*hash, data.as_slice())? {
136                        inserted += 1;
137                    }
138                }
139                Ok(inserted)
140            }
141            #[cfg(feature = "lmdb")]
142            LocalStore::Lmdb(store) => store.put_many_sync(items),
143        }
144    }
145
146    /// Sync get operation
147    pub fn get_sync(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
148        match self {
149            LocalStore::Fs(store) => store.get_sync(hash),
150            #[cfg(feature = "lmdb")]
151            LocalStore::Lmdb(store) => store.get_sync(hash),
152        }
153    }
154
155    /// Check if hash exists
156    pub fn exists(&self, hash: &Hash) -> Result<bool, StoreError> {
157        match self {
158            LocalStore::Fs(store) => Ok(store.exists(hash)),
159            #[cfg(feature = "lmdb")]
160            LocalStore::Lmdb(store) => store.exists(hash),
161        }
162    }
163
164    /// Sync delete operation
165    pub fn delete_sync(&self, hash: &Hash) -> Result<bool, StoreError> {
166        match self {
167            LocalStore::Fs(store) => store.delete_sync(hash),
168            #[cfg(feature = "lmdb")]
169            LocalStore::Lmdb(store) => store.delete_sync(hash),
170        }
171    }
172
173    /// Get storage statistics
174    pub fn stats(&self) -> Result<LocalStoreStats, StoreError> {
175        match self {
176            LocalStore::Fs(store) => {
177                let stats = store.stats()?;
178                Ok(LocalStoreStats {
179                    count: stats.count,
180                    total_bytes: stats.total_bytes,
181                })
182            }
183            #[cfg(feature = "lmdb")]
184            LocalStore::Lmdb(store) => {
185                let stats = store.stats()?;
186                Ok(LocalStoreStats {
187                    count: stats.count,
188                    total_bytes: stats.total_bytes,
189                })
190            }
191        }
192    }
193
194    /// List all hashes in the store
195    pub fn list(&self) -> Result<Vec<Hash>, StoreError> {
196        match self {
197            LocalStore::Fs(store) => store.list(),
198            #[cfg(feature = "lmdb")]
199            LocalStore::Lmdb(store) => store.list(),
200        }
201    }
202}
203
204#[async_trait]
205impl Store for LocalStore {
206    async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
207        self.put_sync(hash, &data)
208    }
209
210    async fn put_many(&self, items: Vec<(Hash, Vec<u8>)>) -> Result<usize, StoreError> {
211        self.put_many_sync(&items)
212    }
213
214    async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
215        self.get_sync(hash)
216    }
217
218    async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
219        self.exists(hash)
220    }
221
222    async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
223        self.delete_sync(hash)
224    }
225}
226
227#[cfg(feature = "s3")]
228use tokio::sync::mpsc;
229
230use crate::config::S3Config;
231
232/// Message for background S3 sync
233#[cfg(feature = "s3")]
234enum S3SyncMessage {
235    Upload { hash: Hash, data: Vec<u8> },
236    Delete { hash: Hash },
237}
238
239/// Storage router - local store primary with optional S3 backup
240///
241/// Write path: local first (fast), then queue S3 upload (non-blocking)
242/// Read path: local first, fall back to S3 if miss
243pub struct StorageRouter {
244    /// Primary local store (always used)
245    local: Arc<LocalStore>,
246    /// Optional S3 client for backup
247    #[cfg(feature = "s3")]
248    s3_client: Option<aws_sdk_s3::Client>,
249    #[cfg(feature = "s3")]
250    s3_bucket: Option<String>,
251    #[cfg(feature = "s3")]
252    s3_prefix: String,
253    /// Channel to send uploads to background task
254    #[cfg(feature = "s3")]
255    sync_tx: Option<mpsc::UnboundedSender<S3SyncMessage>>,
256}
257
258impl StorageRouter {
259    /// Create router with local storage only
260    pub fn new(local: Arc<LocalStore>) -> Self {
261        Self {
262            local,
263            #[cfg(feature = "s3")]
264            s3_client: None,
265            #[cfg(feature = "s3")]
266            s3_bucket: None,
267            #[cfg(feature = "s3")]
268            s3_prefix: String::new(),
269            #[cfg(feature = "s3")]
270            sync_tx: None,
271        }
272    }
273
274    /// Create router with local storage + S3 backup
275    #[cfg(feature = "s3")]
276    pub async fn with_s3(local: Arc<LocalStore>, config: &S3Config) -> Result<Self, anyhow::Error> {
277        use aws_sdk_s3::Client as S3Client;
278
279        // Build AWS config
280        let mut aws_config_loader = aws_config::from_env();
281        aws_config_loader =
282            aws_config_loader.region(aws_sdk_s3::config::Region::new(config.region.clone()));
283        let aws_config = aws_config_loader.load().await;
284
285        // Build S3 client with custom endpoint
286        let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&aws_config);
287        s3_config_builder = s3_config_builder
288            .endpoint_url(&config.endpoint)
289            .force_path_style(true);
290
291        let s3_client = S3Client::from_conf(s3_config_builder.build());
292        let bucket = config.bucket.clone();
293        let prefix = config.prefix.clone().unwrap_or_default();
294
295        // Create background sync channel
296        let (sync_tx, mut sync_rx) = mpsc::unbounded_channel::<S3SyncMessage>();
297
298        // Spawn background sync task with bounded concurrent uploads
299        let sync_client = s3_client.clone();
300        let sync_bucket = bucket.clone();
301        let sync_prefix = prefix.clone();
302
303        tokio::spawn(async move {
304            use aws_sdk_s3::primitives::ByteStream;
305
306            tracing::info!("S3 background sync task started");
307
308            // Limit concurrent uploads to prevent overwhelming the runtime
309            let semaphore = std::sync::Arc::new(tokio::sync::Semaphore::new(32));
310            let client = std::sync::Arc::new(sync_client);
311            let bucket = std::sync::Arc::new(sync_bucket);
312            let prefix = std::sync::Arc::new(sync_prefix);
313
314            while let Some(msg) = sync_rx.recv().await {
315                let client = client.clone();
316                let bucket = bucket.clone();
317                let prefix = prefix.clone();
318                let semaphore = semaphore.clone();
319
320                // Spawn each upload with semaphore-bounded concurrency
321                tokio::spawn(async move {
322                    // Acquire permit before uploading
323                    let _permit = semaphore.acquire().await;
324
325                    match msg {
326                        S3SyncMessage::Upload { hash, data } => {
327                            let key = format!("{}{}.bin", prefix, to_hex(&hash));
328                            tracing::debug!("S3 uploading {} ({} bytes)", &key, data.len());
329
330                            match client
331                                .put_object()
332                                .bucket(bucket.as_str())
333                                .key(&key)
334                                .body(ByteStream::from(data))
335                                .send()
336                                .await
337                            {
338                                Ok(_) => tracing::debug!("S3 upload succeeded: {}", &key),
339                                Err(e) => tracing::error!("S3 upload failed {}: {}", &key, e),
340                            }
341                        }
342                        S3SyncMessage::Delete { hash } => {
343                            let key = format!("{}{}.bin", prefix, to_hex(&hash));
344                            tracing::debug!("S3 deleting {}", &key);
345
346                            if let Err(e) = client
347                                .delete_object()
348                                .bucket(bucket.as_str())
349                                .key(&key)
350                                .send()
351                                .await
352                            {
353                                tracing::error!("S3 delete failed {}: {}", &key, e);
354                            }
355                        }
356                    }
357                });
358            }
359        });
360
361        tracing::info!(
362            "S3 storage initialized: bucket={}, prefix={}",
363            bucket,
364            prefix
365        );
366
367        Ok(Self {
368            local,
369            s3_client: Some(s3_client),
370            s3_bucket: Some(bucket),
371            s3_prefix: prefix,
372            sync_tx: Some(sync_tx),
373        })
374    }
375
376    /// Store data - writes to LMDB, queues S3 upload in background
377    pub fn put_sync(&self, hash: Hash, data: &[u8]) -> Result<bool, StoreError> {
378        // Always write to local first
379        let is_new = self.local.put_sync(hash, data)?;
380
381        // Queue S3 upload if configured (non-blocking)
382        // Always upload to S3 (even if not new locally) to ensure S3 has all blobs
383        #[cfg(feature = "s3")]
384        if let Some(ref tx) = self.sync_tx {
385            tracing::info!(
386                "Queueing S3 upload for {} ({} bytes, is_new={})",
387                crate::storage::to_hex(&hash)[..16].to_string(),
388                data.len(),
389                is_new
390            );
391            if let Err(e) = tx.send(S3SyncMessage::Upload {
392                hash,
393                data: data.to_vec(),
394            }) {
395                tracing::error!("Failed to queue S3 upload: {}", e);
396            }
397        }
398
399        Ok(is_new)
400    }
401
402    /// Store multiple blobs with a single local batch write when supported.
403    pub fn put_many_sync(&self, items: &[(Hash, Vec<u8>)]) -> Result<usize, StoreError> {
404        let inserted = self.local.put_many_sync(items)?;
405
406        #[cfg(feature = "s3")]
407        if let Some(ref tx) = self.sync_tx {
408            for (hash, data) in items {
409                if let Err(e) = tx.send(S3SyncMessage::Upload {
410                    hash: *hash,
411                    data: data.clone(),
412                }) {
413                    tracing::error!("Failed to queue S3 upload: {}", e);
414                }
415            }
416        }
417
418        Ok(inserted)
419    }
420
421    /// Get data - tries LMDB first, falls back to S3
422    pub fn get_sync(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
423        // Try local first
424        if let Some(data) = self.local.get_sync(hash)? {
425            return Ok(Some(data));
426        }
427
428        // Fall back to S3 if configured
429        #[cfg(feature = "s3")]
430        if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
431            let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
432
433            match sync_block_on(async { client.get_object().bucket(bucket).key(&key).send().await })
434            {
435                Ok(output) => {
436                    if let Ok(body) = sync_block_on(output.body.collect()) {
437                        let data = body.into_bytes().to_vec();
438                        // Cache locally for future reads
439                        let _ = self.local.put_sync(*hash, &data);
440                        return Ok(Some(data));
441                    }
442                }
443                Err(e) => {
444                    let service_err = e.into_service_error();
445                    if !service_err.is_no_such_key() {
446                        tracing::warn!("S3 get failed: {}", service_err);
447                    }
448                }
449            }
450        }
451
452        Ok(None)
453    }
454
455    /// Check if hash exists
456    pub fn exists(&self, hash: &Hash) -> Result<bool, StoreError> {
457        // Check local first
458        if self.local.exists(hash)? {
459            return Ok(true);
460        }
461
462        // Check S3 if configured
463        #[cfg(feature = "s3")]
464        if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
465            let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
466
467            match sync_block_on(async {
468                client.head_object().bucket(bucket).key(&key).send().await
469            }) {
470                Ok(_) => return Ok(true),
471                Err(e) => {
472                    let service_err = e.into_service_error();
473                    if !service_err.is_not_found() {
474                        tracing::warn!("S3 head failed: {}", service_err);
475                    }
476                }
477            }
478        }
479
480        Ok(false)
481    }
482
483    /// Delete data from both local and S3 stores
484    pub fn delete_sync(&self, hash: &Hash) -> Result<bool, StoreError> {
485        let deleted = self.local.delete_sync(hash)?;
486
487        // Queue S3 delete if configured
488        #[cfg(feature = "s3")]
489        if let Some(ref tx) = self.sync_tx {
490            let _ = tx.send(S3SyncMessage::Delete { hash: *hash });
491        }
492
493        Ok(deleted)
494    }
495
496    /// Delete data from local store only (don't propagate to S3)
497    /// Used for eviction where we want to keep S3 as archive
498    pub fn delete_local_only(&self, hash: &Hash) -> Result<bool, StoreError> {
499        self.local.delete_sync(hash)
500    }
501
502    /// Get stats from local store
503    pub fn stats(&self) -> Result<LocalStoreStats, StoreError> {
504        self.local.stats()
505    }
506
507    /// List all hashes from local store
508    pub fn list(&self) -> Result<Vec<Hash>, StoreError> {
509        self.local.list()
510    }
511
512    /// Get the underlying local store for HashTree operations
513    pub fn local_store(&self) -> Arc<LocalStore> {
514        Arc::clone(&self.local)
515    }
516}
517
518// Implement async Store trait for StorageRouter so it can be used directly with HashTree
519// This ensures all writes go through S3 sync
520#[async_trait]
521impl Store for StorageRouter {
522    async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
523        self.put_sync(hash, &data)
524    }
525
526    async fn put_many(&self, items: Vec<(Hash, Vec<u8>)>) -> Result<usize, StoreError> {
527        self.put_many_sync(&items)
528    }
529
530    async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
531        self.get_sync(hash)
532    }
533
534    async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
535        self.exists(hash)
536    }
537
538    async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
539        self.delete_sync(hash)
540    }
541}
542
543pub struct HashtreeStore {
544    env: heed::Env,
545    /// Set of pinned hashes (32-byte raw hashes, prevents garbage collection)
546    pins: Database<Bytes, Unit>,
547    /// Blob ownership: sha256 (32 bytes) ++ pubkey (32 bytes) -> () (composite key for multi-owner)
548    blob_owners: Database<Bytes, Unit>,
549    /// Maps pubkey (32 bytes) -> blob metadata JSON (for blossom list)
550    pubkey_blobs: Database<Bytes, Bytes>,
551    /// Tree metadata for eviction: tree_root_hash (32 bytes) -> TreeMeta (msgpack)
552    tree_meta: Database<Bytes, Bytes>,
553    /// Blob-to-tree mapping: blob_hash ++ tree_hash (64 bytes) -> ()
554    blob_trees: Database<Bytes, Unit>,
555    /// Tree refs: "npub/path" -> tree_root_hash (32 bytes) - for replacing old versions
556    tree_refs: Database<Str, Bytes>,
557    /// Cached roots from Nostr: "pubkey_hex/tree_name" -> CachedRoot (msgpack)
558    cached_roots: Database<Str, Bytes>,
559    /// Storage router - handles LMDB + optional S3 (Arc for sharing with HashTree)
560    router: Arc<StorageRouter>,
561    /// Maximum storage size in bytes (from config)
562    max_size_bytes: u64,
563}
564
565impl HashtreeStore {
566    /// Create a new store with the configured local storage limit.
567    pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
568        let config = hashtree_config::Config::load_or_default();
569        let max_size_bytes = config
570            .storage
571            .max_size_gb
572            .saturating_mul(1024 * 1024 * 1024);
573        Self::with_options_and_backend(path, None, max_size_bytes, &config.storage.backend)
574    }
575
576    /// Create a new store with an explicit local backend and size limit.
577    pub fn new_with_backend<P: AsRef<Path>>(
578        path: P,
579        backend: hashtree_config::StorageBackend,
580        max_size_bytes: u64,
581    ) -> Result<Self> {
582        Self::with_options_and_backend(path, None, max_size_bytes, &backend)
583    }
584
585    /// Create a new store with optional S3 backend and the configured local storage limit.
586    pub fn with_s3<P: AsRef<Path>>(path: P, s3_config: Option<&S3Config>) -> Result<Self> {
587        let config = hashtree_config::Config::load_or_default();
588        let max_size_bytes = config
589            .storage
590            .max_size_gb
591            .saturating_mul(1024 * 1024 * 1024);
592        Self::with_options_and_backend(path, s3_config, max_size_bytes, &config.storage.backend)
593    }
594
595    /// Create a new store with optional S3 backend and custom size limit.
596    ///
597    /// The raw local blob backend remains unbounded. `HashtreeStore` enforces
598    /// `max_size_bytes` at the tree-management layer so eviction can honor pins,
599    /// orphan handling, and local-only eviction when S3 is used as archive.
600    pub fn with_options<P: AsRef<Path>>(
601        path: P,
602        s3_config: Option<&S3Config>,
603        max_size_bytes: u64,
604    ) -> Result<Self> {
605        let config = hashtree_config::Config::load_or_default();
606        Self::with_options_and_backend(path, s3_config, max_size_bytes, &config.storage.backend)
607    }
608
609    fn with_options_and_backend<P: AsRef<Path>>(
610        path: P,
611        s3_config: Option<&S3Config>,
612        max_size_bytes: u64,
613        backend: &hashtree_config::StorageBackend,
614    ) -> Result<Self> {
615        let path = path.as_ref();
616        std::fs::create_dir_all(path)?;
617
618        let env = unsafe {
619            EnvOpenOptions::new()
620                .map_size(10 * 1024 * 1024 * 1024) // 10GB virtual address space
621                .max_dbs(8) // pins, blob_owners, pubkey_blobs, tree_meta, blob_trees, tree_refs, cached_roots, blobs
622                .max_readers(LMDB_MAX_READERS)
623                .open(path)?
624        };
625        let _ = env.clear_stale_readers();
626
627        let mut wtxn = env.write_txn()?;
628        let pins = env.create_database(&mut wtxn, Some("pins"))?;
629        let blob_owners = env.create_database(&mut wtxn, Some("blob_owners"))?;
630        let pubkey_blobs = env.create_database(&mut wtxn, Some("pubkey_blobs"))?;
631        let tree_meta = env.create_database(&mut wtxn, Some("tree_meta"))?;
632        let blob_trees = env.create_database(&mut wtxn, Some("blob_trees"))?;
633        let tree_refs = env.create_database(&mut wtxn, Some("tree_refs"))?;
634        let cached_roots = env.create_database(&mut wtxn, Some("cached_roots"))?;
635        wtxn.commit()?;
636
637        // Intentionally keep the raw blob backend unbounded here. HashtreeStore
638        // owns quota policy above this layer, where it can coordinate eviction
639        // with tree refs, blob ownership, pins, and S3 archival behavior.
640        let local_store = Arc::new(match backend {
641            hashtree_config::StorageBackend::Fs => LocalStore::Fs(
642                FsBlobStore::new(path.join("blobs"))
643                    .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
644            ),
645            #[cfg(feature = "lmdb")]
646            hashtree_config::StorageBackend::Lmdb => {
647                let requested_map_size = max_size_bytes.max(LMDB_BLOB_MIN_MAP_SIZE_BYTES);
648                let map_size = usize::try_from(requested_map_size)
649                    .context("LMDB blob map size exceeds usize")?;
650                LocalStore::Lmdb(
651                    LmdbBlobStore::with_map_size(path.join("blobs"), map_size)
652                        .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
653                )
654            }
655            #[cfg(not(feature = "lmdb"))]
656            hashtree_config::StorageBackend::Lmdb => {
657                tracing::warn!(
658                    "LMDB backend requested but lmdb feature not enabled, using filesystem storage"
659                );
660                LocalStore::Fs(
661                    FsBlobStore::new(path.join("blobs"))
662                        .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
663                )
664            }
665        });
666
667        // Create storage router with optional S3
668        #[cfg(feature = "s3")]
669        let router = Arc::new(if let Some(s3_cfg) = s3_config {
670            tracing::info!(
671                "Initializing S3 storage backend: bucket={}, endpoint={}",
672                s3_cfg.bucket,
673                s3_cfg.endpoint
674            );
675
676            sync_block_on(async { StorageRouter::with_s3(local_store, s3_cfg).await })?
677        } else {
678            StorageRouter::new(local_store)
679        });
680
681        #[cfg(not(feature = "s3"))]
682        let router = Arc::new({
683            if s3_config.is_some() {
684                tracing::warn!(
685                    "S3 config provided but S3 feature not enabled. Using local storage only."
686                );
687            }
688            StorageRouter::new(local_store)
689        });
690
691        Ok(Self {
692            env,
693            pins,
694            blob_owners,
695            pubkey_blobs,
696            tree_meta,
697            blob_trees,
698            tree_refs,
699            cached_roots,
700            router,
701            max_size_bytes,
702        })
703    }
704
705    /// Get the storage router
706    pub fn router(&self) -> &StorageRouter {
707        &self.router
708    }
709
710    /// Get the storage router as Arc (for use with HashTree which needs Arc<dyn Store>)
711    /// All writes through this go to both LMDB and S3
712    pub fn store_arc(&self) -> Arc<StorageRouter> {
713        Arc::clone(&self.router)
714    }
715
716    /// Get tree node by hash (raw bytes)
717    pub fn get_tree_node(&self, hash: &[u8; 32]) -> Result<Option<TreeNode>> {
718        let store = self.store_arc();
719        let tree = HashTree::new(HashTreeConfig::new(store).public());
720
721        sync_block_on(async {
722            tree.get_tree_node(hash)
723                .await
724                .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))
725        })
726    }
727
728    /// Store a raw blob, returns SHA256 hash as hex.
729    pub fn put_blob(&self, data: &[u8]) -> Result<String> {
730        let hash = sha256(data);
731        self.router
732            .put_sync(hash, data)
733            .map_err(|e| anyhow::anyhow!("Failed to store blob: {}", e))?;
734        Ok(to_hex(&hash))
735    }
736
737    /// Get a raw blob by SHA256 hash (raw bytes).
738    pub fn get_blob(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
739        self.router
740            .get_sync(hash)
741            .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))
742    }
743
744    /// Check if a blob exists by SHA256 hash (raw bytes).
745    pub fn blob_exists(&self, hash: &[u8; 32]) -> Result<bool> {
746        self.router
747            .exists(hash)
748            .map_err(|e| anyhow::anyhow!("Failed to check blob: {}", e))
749    }
750
751    // === Blossom ownership tracking ===
752    // Uses composite key: sha256 (32 bytes) ++ pubkey (32 bytes) -> ()
753    // This allows efficient multi-owner tracking with O(1) lookups
754
755    /// Build composite key for blob_owners: sha256 ++ pubkey (64 bytes total)
756    fn blob_owner_key(sha256: &[u8; 32], pubkey: &[u8; 32]) -> [u8; 64] {
757        let mut key = [0u8; 64];
758        key[..32].copy_from_slice(sha256);
759        key[32..].copy_from_slice(pubkey);
760        key
761    }
762
763    /// Add an owner (pubkey) to a blob for Blossom protocol
764    /// Multiple users can own the same blob - it's only deleted when all owners remove it
765    pub fn set_blob_owner(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<()> {
766        let key = Self::blob_owner_key(sha256, pubkey);
767        let mut wtxn = self.env.write_txn()?;
768
769        // Add ownership entry (idempotent - put overwrites)
770        self.blob_owners.put(&mut wtxn, &key[..], &())?;
771
772        // Convert sha256 to hex for BlobMetadata (which stores sha256 as hex string)
773        let sha256_hex = to_hex(sha256);
774
775        // Get existing blobs for this pubkey (for /list endpoint)
776        let mut blobs: Vec<BlobMetadata> = self
777            .pubkey_blobs
778            .get(&wtxn, pubkey)?
779            .and_then(|b| serde_json::from_slice(b).ok())
780            .unwrap_or_default();
781
782        // Check if blob already exists for this pubkey
783        if !blobs.iter().any(|b| b.sha256 == sha256_hex) {
784            let now = SystemTime::now()
785                .duration_since(UNIX_EPOCH)
786                .unwrap()
787                .as_secs();
788
789            // Get size from raw blob
790            let size = self
791                .get_blob(sha256)?
792                .map(|data| data.len() as u64)
793                .unwrap_or(0);
794
795            blobs.push(BlobMetadata {
796                sha256: sha256_hex,
797                size,
798                mime_type: "application/octet-stream".to_string(),
799                uploaded: now,
800            });
801
802            let blobs_json = serde_json::to_vec(&blobs)?;
803            self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
804        }
805
806        wtxn.commit()?;
807        Ok(())
808    }
809
810    /// Check if a pubkey owns a blob
811    pub fn is_blob_owner(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<bool> {
812        let key = Self::blob_owner_key(sha256, pubkey);
813        let rtxn = self.env.read_txn()?;
814        Ok(self.blob_owners.get(&rtxn, &key[..])?.is_some())
815    }
816
817    /// Get all owners (pubkeys) of a blob via prefix scan (returns raw bytes)
818    pub fn get_blob_owners(&self, sha256: &[u8; 32]) -> Result<Vec<[u8; 32]>> {
819        let rtxn = self.env.read_txn()?;
820
821        let mut owners = Vec::new();
822        for item in self.blob_owners.prefix_iter(&rtxn, &sha256[..])? {
823            let (key, _) = item?;
824            if key.len() == 64 {
825                // Extract pubkey from composite key (bytes 32-64)
826                let mut pubkey = [0u8; 32];
827                pubkey.copy_from_slice(&key[32..64]);
828                owners.push(pubkey);
829            }
830        }
831        Ok(owners)
832    }
833
834    /// Check if blob has any owners
835    pub fn blob_has_owners(&self, sha256: &[u8; 32]) -> Result<bool> {
836        let rtxn = self.env.read_txn()?;
837
838        // Just check if any entry exists with this prefix
839        for item in self.blob_owners.prefix_iter(&rtxn, &sha256[..])? {
840            if item.is_ok() {
841                return Ok(true);
842            }
843        }
844        Ok(false)
845    }
846
847    /// Get the first owner (pubkey) of a blob (for backwards compatibility)
848    pub fn get_blob_owner(&self, sha256: &[u8; 32]) -> Result<Option<[u8; 32]>> {
849        Ok(self.get_blob_owners(sha256)?.into_iter().next())
850    }
851
852    /// Remove a user's ownership of a blossom blob
853    /// Only deletes the actual blob when no owners remain
854    /// Returns true if the blob was actually deleted (no owners left)
855    pub fn delete_blossom_blob(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<bool> {
856        let key = Self::blob_owner_key(sha256, pubkey);
857        let mut wtxn = self.env.write_txn()?;
858
859        // Remove this pubkey's ownership entry
860        self.blob_owners.delete(&mut wtxn, &key[..])?;
861
862        // Hex strings for logging and BlobMetadata (which stores sha256 as hex string)
863        let sha256_hex = to_hex(sha256);
864
865        // Remove from pubkey's blob list
866        if let Some(blobs_bytes) = self.pubkey_blobs.get(&wtxn, pubkey)? {
867            if let Ok(mut blobs) = serde_json::from_slice::<Vec<BlobMetadata>>(blobs_bytes) {
868                blobs.retain(|b| b.sha256 != sha256_hex);
869                let blobs_json = serde_json::to_vec(&blobs)?;
870                self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
871            }
872        }
873
874        // Check if any other owners remain (prefix scan)
875        let mut has_other_owners = false;
876        for item in self.blob_owners.prefix_iter(&wtxn, &sha256[..])? {
877            if item.is_ok() {
878                has_other_owners = true;
879                break;
880            }
881        }
882
883        if has_other_owners {
884            wtxn.commit()?;
885            tracing::debug!(
886                "Removed {} from blob {} owners, other owners remain",
887                &to_hex(pubkey)[..8],
888                &sha256_hex[..8]
889            );
890            return Ok(false);
891        }
892
893        // No owners left - delete the blob completely
894        tracing::info!(
895            "All owners removed from blob {}, deleting",
896            &sha256_hex[..8]
897        );
898
899        // Delete raw blob (by content hash) - this deletes from S3 too
900        let _ = self.router.delete_sync(sha256);
901
902        wtxn.commit()?;
903        Ok(true)
904    }
905
906    /// List all blobs owned by a pubkey (for Blossom /list endpoint)
907    pub fn list_blobs_by_pubkey(
908        &self,
909        pubkey: &[u8; 32],
910    ) -> Result<Vec<crate::server::blossom::BlobDescriptor>> {
911        let rtxn = self.env.read_txn()?;
912
913        let blobs: Vec<BlobMetadata> = self
914            .pubkey_blobs
915            .get(&rtxn, pubkey)?
916            .and_then(|b| serde_json::from_slice(b).ok())
917            .unwrap_or_default();
918
919        Ok(blobs
920            .into_iter()
921            .map(|b| crate::server::blossom::BlobDescriptor {
922                url: format!("/{}", b.sha256),
923                sha256: b.sha256,
924                size: b.size,
925                mime_type: b.mime_type,
926                uploaded: b.uploaded,
927            })
928            .collect())
929    }
930
931    /// Get a single chunk/blob by hash (raw bytes)
932    pub fn get_chunk(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
933        self.router
934            .get_sync(hash)
935            .map_err(|e| anyhow::anyhow!("Failed to get chunk: {}", e))
936    }
937
938    /// Get file content by hash (raw bytes)
939    /// Returns raw bytes (caller handles decryption if needed)
940    pub fn get_file(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
941        let store = self.store_arc();
942        let tree = HashTree::new(HashTreeConfig::new(store).public());
943
944        sync_block_on(async {
945            tree.read_file(hash)
946                .await
947                .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
948        })
949    }
950
951    /// Get file content by Cid (hash + optional decryption key as raw bytes)
952    /// Handles decryption automatically if key is present
953    pub fn get_file_by_cid(&self, cid: &Cid) -> Result<Option<Vec<u8>>> {
954        let store = self.store_arc();
955        let tree = HashTree::new(HashTreeConfig::new(store).public());
956
957        sync_block_on(async {
958            tree.get(cid, None)
959                .await
960                .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
961        })
962    }
963
964    fn ensure_cid_exists(&self, cid: &Cid) -> Result<()> {
965        let exists = self
966            .router
967            .exists(&cid.hash)
968            .map_err(|e| anyhow::anyhow!("Failed to check cid existence: {}", e))?;
969        if !exists {
970            anyhow::bail!("CID not found: {}", to_hex(&cid.hash));
971        }
972        Ok(())
973    }
974
975    /// Stream file content identified by Cid into a writer without buffering full file in memory.
976    pub fn write_file_by_cid_to_writer<W: Write>(&self, cid: &Cid, writer: &mut W) -> Result<u64> {
977        self.ensure_cid_exists(cid)?;
978
979        let store = self.store_arc();
980        let tree = HashTree::new(HashTreeConfig::new(store).public());
981        let mut total_bytes = 0u64;
982        let mut streamed_any_chunk = false;
983
984        sync_block_on(async {
985            let mut stream = tree.get_stream(cid);
986            while let Some(chunk) = stream.next().await {
987                streamed_any_chunk = true;
988                let chunk =
989                    chunk.map_err(|e| anyhow::anyhow!("Failed to stream file chunk: {}", e))?;
990                writer
991                    .write_all(&chunk)
992                    .map_err(|e| anyhow::anyhow!("Failed to write file chunk: {}", e))?;
993                total_bytes += chunk.len() as u64;
994            }
995            Ok::<(), anyhow::Error>(())
996        })?;
997
998        if !streamed_any_chunk {
999            anyhow::bail!("CID not found: {}", to_hex(&cid.hash));
1000        }
1001
1002        writer
1003            .flush()
1004            .map_err(|e| anyhow::anyhow!("Failed to flush output: {}", e))?;
1005        Ok(total_bytes)
1006    }
1007
1008    /// Stream file content identified by Cid directly into a destination path.
1009    pub fn write_file_by_cid<P: AsRef<Path>>(&self, cid: &Cid, output_path: P) -> Result<u64> {
1010        self.ensure_cid_exists(cid)?;
1011
1012        let output_path = output_path.as_ref();
1013        if let Some(parent) = output_path.parent() {
1014            if !parent.as_os_str().is_empty() {
1015                std::fs::create_dir_all(parent).with_context(|| {
1016                    format!("Failed to create output directory {}", parent.display())
1017                })?;
1018            }
1019        }
1020
1021        let mut file = std::fs::File::create(output_path)
1022            .with_context(|| format!("Failed to create output file {}", output_path.display()))?;
1023        self.write_file_by_cid_to_writer(cid, &mut file)
1024    }
1025
1026    /// Stream a public (unencrypted) file by hash directly into a destination path.
1027    pub fn write_file<P: AsRef<Path>>(&self, hash: &[u8; 32], output_path: P) -> Result<u64> {
1028        self.write_file_by_cid(&Cid::public(*hash), output_path)
1029    }
1030
1031    /// Resolve a path within a tree (returns Cid with key if encrypted)
1032    pub fn resolve_path(&self, cid: &Cid, path: &str) -> Result<Option<Cid>> {
1033        let store = self.store_arc();
1034        let tree = HashTree::new(HashTreeConfig::new(store).public());
1035
1036        sync_block_on(async {
1037            tree.resolve_path(cid, path)
1038                .await
1039                .map_err(|e| anyhow::anyhow!("Failed to resolve path: {}", e))
1040        })
1041    }
1042
1043    /// Get chunk metadata for a file (chunk list, sizes, total size)
1044    pub fn get_file_chunk_metadata(&self, hash: &[u8; 32]) -> Result<Option<FileChunkMetadata>> {
1045        let store = self.store_arc();
1046        let tree = HashTree::new(HashTreeConfig::new(store.clone()).public());
1047
1048        sync_block_on(async {
1049            // First check if the hash exists in the store at all
1050            // (either as a blob or tree node)
1051            let exists = store
1052                .has(hash)
1053                .await
1054                .map_err(|e| anyhow::anyhow!("Failed to check existence: {}", e))?;
1055
1056            if !exists {
1057                return Ok(None);
1058            }
1059
1060            // Get total size
1061            let total_size = tree
1062                .get_size(hash)
1063                .await
1064                .map_err(|e| anyhow::anyhow!("Failed to get size: {}", e))?;
1065
1066            // Check if it's a tree (chunked) or blob
1067            let is_tree_node = tree
1068                .is_tree(hash)
1069                .await
1070                .map_err(|e| anyhow::anyhow!("Failed to check tree: {}", e))?;
1071
1072            if !is_tree_node {
1073                // Single blob, not chunked
1074                return Ok(Some(FileChunkMetadata {
1075                    total_size,
1076                    chunk_hashes: vec![],
1077                    chunk_sizes: vec![],
1078                    is_chunked: false,
1079                }));
1080            }
1081
1082            // Get tree node to extract chunk info
1083            let node = match tree
1084                .get_tree_node(hash)
1085                .await
1086                .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
1087            {
1088                Some(n) => n,
1089                None => return Ok(None),
1090            };
1091
1092            // Check if it's a directory (has named links)
1093            let is_directory = tree
1094                .is_directory(hash)
1095                .await
1096                .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1097
1098            if is_directory {
1099                return Ok(None); // Not a file
1100            }
1101
1102            // Extract chunk info from links
1103            let chunk_hashes: Vec<Hash> = node.links.iter().map(|l| l.hash).collect();
1104            let chunk_sizes: Vec<u64> = node.links.iter().map(|l| l.size).collect();
1105
1106            Ok(Some(FileChunkMetadata {
1107                total_size,
1108                chunk_hashes,
1109                chunk_sizes,
1110                is_chunked: !node.links.is_empty(),
1111            }))
1112        })
1113    }
1114
1115    /// Get byte range from file
1116    pub fn get_file_range(
1117        &self,
1118        hash: &[u8; 32],
1119        start: u64,
1120        end: Option<u64>,
1121    ) -> Result<Option<(Vec<u8>, u64)>> {
1122        let metadata = match self.get_file_chunk_metadata(hash)? {
1123            Some(m) => m,
1124            None => return Ok(None),
1125        };
1126
1127        if metadata.total_size == 0 {
1128            return Ok(Some((Vec::new(), 0)));
1129        }
1130
1131        if start >= metadata.total_size {
1132            return Ok(None);
1133        }
1134
1135        let end = end
1136            .unwrap_or(metadata.total_size - 1)
1137            .min(metadata.total_size - 1);
1138
1139        // For non-chunked files, load entire file
1140        if !metadata.is_chunked {
1141            let content = self.get_file(hash)?.unwrap_or_default();
1142            let range_content = if start < content.len() as u64 {
1143                content[start as usize..=(end as usize).min(content.len() - 1)].to_vec()
1144            } else {
1145                Vec::new()
1146            };
1147            return Ok(Some((range_content, metadata.total_size)));
1148        }
1149
1150        // For chunked files, load only needed chunks
1151        let mut result = Vec::new();
1152        let mut current_offset = 0u64;
1153
1154        for (i, chunk_hash) in metadata.chunk_hashes.iter().enumerate() {
1155            let chunk_size = metadata.chunk_sizes[i];
1156            let chunk_end = current_offset + chunk_size - 1;
1157
1158            // Check if this chunk overlaps with requested range
1159            if chunk_end >= start && current_offset <= end {
1160                let chunk_content = match self.get_chunk(chunk_hash)? {
1161                    Some(content) => content,
1162                    None => {
1163                        return Err(anyhow::anyhow!("Chunk {} not found", to_hex(chunk_hash)));
1164                    }
1165                };
1166
1167                let chunk_read_start = if current_offset >= start {
1168                    0
1169                } else {
1170                    (start - current_offset) as usize
1171                };
1172
1173                let chunk_read_end = if chunk_end <= end {
1174                    chunk_size as usize - 1
1175                } else {
1176                    (end - current_offset) as usize
1177                };
1178
1179                result.extend_from_slice(&chunk_content[chunk_read_start..=chunk_read_end]);
1180            }
1181
1182            current_offset += chunk_size;
1183
1184            if current_offset > end {
1185                break;
1186            }
1187        }
1188
1189        Ok(Some((result, metadata.total_size)))
1190    }
1191
1192    /// Stream file range as chunks using Arc for async/Send contexts
1193    pub fn stream_file_range_chunks_owned(
1194        self: Arc<Self>,
1195        hash: &[u8; 32],
1196        start: u64,
1197        end: u64,
1198    ) -> Result<Option<FileRangeChunksOwned>> {
1199        let metadata = match self.get_file_chunk_metadata(hash)? {
1200            Some(m) => m,
1201            None => return Ok(None),
1202        };
1203
1204        if metadata.total_size == 0 || start >= metadata.total_size {
1205            return Ok(None);
1206        }
1207
1208        let end = end.min(metadata.total_size - 1);
1209
1210        Ok(Some(FileRangeChunksOwned {
1211            store: self,
1212            metadata,
1213            start,
1214            end,
1215            current_chunk_idx: 0,
1216            current_offset: 0,
1217        }))
1218    }
1219
1220    /// Get directory structure by hash (raw bytes)
1221    pub fn get_directory_listing(&self, hash: &[u8; 32]) -> Result<Option<DirectoryListing>> {
1222        let store = self.store_arc();
1223        let tree = HashTree::new(HashTreeConfig::new(store).public());
1224
1225        sync_block_on(async {
1226            // Check if it's a directory
1227            let is_dir = tree
1228                .is_directory(hash)
1229                .await
1230                .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1231
1232            if !is_dir {
1233                return Ok(None);
1234            }
1235
1236            // Get directory entries (public Cid - no encryption key)
1237            let cid = hashtree_core::Cid::public(*hash);
1238            let tree_entries = tree
1239                .list_directory(&cid)
1240                .await
1241                .map_err(|e| anyhow::anyhow!("Failed to list directory: {}", e))?;
1242
1243            let entries: Vec<DirEntry> = tree_entries
1244                .into_iter()
1245                .map(|e| DirEntry {
1246                    name: e.name,
1247                    cid: to_hex(&e.hash),
1248                    is_directory: e.link_type.is_tree(),
1249                    size: e.size,
1250                })
1251                .collect();
1252
1253            Ok(Some(DirectoryListing {
1254                dir_name: String::new(),
1255                entries,
1256            }))
1257        })
1258    }
1259
1260    /// Get directory structure by CID, supporting encrypted directories.
1261    pub fn get_directory_listing_by_cid(&self, cid: &Cid) -> Result<Option<DirectoryListing>> {
1262        let store = self.store_arc();
1263        let tree = HashTree::new(HashTreeConfig::new(store).public());
1264        let cid = cid.clone();
1265
1266        sync_block_on(async {
1267            let is_dir = tree
1268                .is_dir(&cid)
1269                .await
1270                .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1271
1272            if !is_dir {
1273                return Ok(None);
1274            }
1275
1276            let tree_entries = tree
1277                .list_directory(&cid)
1278                .await
1279                .map_err(|e| anyhow::anyhow!("Failed to list directory: {}", e))?;
1280
1281            let entries: Vec<DirEntry> = tree_entries
1282                .into_iter()
1283                .map(|e| DirEntry {
1284                    name: e.name,
1285                    cid: Cid {
1286                        hash: e.hash,
1287                        key: e.key,
1288                    }
1289                    .to_string(),
1290                    is_directory: e.link_type.is_tree(),
1291                    size: e.size,
1292                })
1293                .collect();
1294
1295            Ok(Some(DirectoryListing {
1296                dir_name: String::new(),
1297                entries,
1298            }))
1299        })
1300    }
1301
1302    // === Cached roots ===
1303
1304    /// Get cached root for a pubkey/tree_name pair
1305    pub fn get_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<Option<CachedRoot>> {
1306        let key = format!("{}/{}", pubkey_hex, tree_name);
1307        let rtxn = self.env.read_txn()?;
1308        if let Some(bytes) = self.cached_roots.get(&rtxn, &key)? {
1309            let root: CachedRoot = rmp_serde::from_slice(bytes)
1310                .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
1311            Ok(Some(root))
1312        } else {
1313            Ok(None)
1314        }
1315    }
1316
1317    /// Set cached root for a pubkey/tree_name pair
1318    pub fn set_cached_root(
1319        &self,
1320        pubkey_hex: &str,
1321        tree_name: &str,
1322        hash: &str,
1323        key: Option<&str>,
1324        visibility: &str,
1325        updated_at: u64,
1326    ) -> Result<()> {
1327        let db_key = format!("{}/{}", pubkey_hex, tree_name);
1328        let root = CachedRoot {
1329            hash: hash.to_string(),
1330            key: key.map(|k| k.to_string()),
1331            updated_at,
1332            visibility: visibility.to_string(),
1333        };
1334        let bytes = rmp_serde::to_vec(&root)
1335            .map_err(|e| anyhow::anyhow!("Failed to serialize CachedRoot: {}", e))?;
1336        let mut wtxn = self.env.write_txn()?;
1337        self.cached_roots.put(&mut wtxn, &db_key, &bytes)?;
1338        wtxn.commit()?;
1339        Ok(())
1340    }
1341
1342    /// List all cached roots for a pubkey
1343    pub fn list_cached_roots(&self, pubkey_hex: &str) -> Result<Vec<(String, CachedRoot)>> {
1344        let prefix = format!("{}/", pubkey_hex);
1345        let rtxn = self.env.read_txn()?;
1346        let mut results = Vec::new();
1347
1348        for item in self.cached_roots.iter(&rtxn)? {
1349            let (key, bytes) = item?;
1350            if key.starts_with(&prefix) {
1351                let tree_name = key.strip_prefix(&prefix).unwrap_or(key);
1352                let root: CachedRoot = rmp_serde::from_slice(bytes)
1353                    .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
1354                results.push((tree_name.to_string(), root));
1355            }
1356        }
1357
1358        Ok(results)
1359    }
1360
1361    /// Delete a cached root
1362    pub fn delete_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<bool> {
1363        let key = format!("{}/{}", pubkey_hex, tree_name);
1364        let mut wtxn = self.env.write_txn()?;
1365        let deleted = self.cached_roots.delete(&mut wtxn, &key)?;
1366        wtxn.commit()?;
1367        Ok(deleted)
1368    }
1369}
1370
1371#[derive(Debug, Clone)]
1372pub struct FileChunkMetadata {
1373    pub total_size: u64,
1374    pub chunk_hashes: Vec<Hash>,
1375    pub chunk_sizes: Vec<u64>,
1376    pub is_chunked: bool,
1377}
1378
1379/// Owned iterator for async streaming
1380pub struct FileRangeChunksOwned {
1381    store: Arc<HashtreeStore>,
1382    metadata: FileChunkMetadata,
1383    start: u64,
1384    end: u64,
1385    current_chunk_idx: usize,
1386    current_offset: u64,
1387}
1388
1389impl Iterator for FileRangeChunksOwned {
1390    type Item = Result<Vec<u8>>;
1391
1392    fn next(&mut self) -> Option<Self::Item> {
1393        if !self.metadata.is_chunked || self.current_chunk_idx >= self.metadata.chunk_hashes.len() {
1394            return None;
1395        }
1396
1397        if self.current_offset > self.end {
1398            return None;
1399        }
1400
1401        let chunk_hash = &self.metadata.chunk_hashes[self.current_chunk_idx];
1402        let chunk_size = self.metadata.chunk_sizes[self.current_chunk_idx];
1403        let chunk_end = self.current_offset + chunk_size - 1;
1404
1405        self.current_chunk_idx += 1;
1406
1407        if chunk_end < self.start || self.current_offset > self.end {
1408            self.current_offset += chunk_size;
1409            return self.next();
1410        }
1411
1412        let chunk_content = match self.store.get_chunk(chunk_hash) {
1413            Ok(Some(content)) => content,
1414            Ok(None) => {
1415                return Some(Err(anyhow::anyhow!(
1416                    "Chunk {} not found",
1417                    to_hex(chunk_hash)
1418                )));
1419            }
1420            Err(e) => {
1421                return Some(Err(e));
1422            }
1423        };
1424
1425        let chunk_read_start = if self.current_offset >= self.start {
1426            0
1427        } else {
1428            (self.start - self.current_offset) as usize
1429        };
1430
1431        let chunk_read_end = if chunk_end <= self.end {
1432            chunk_size as usize - 1
1433        } else {
1434            (self.end - self.current_offset) as usize
1435        };
1436
1437        let result = chunk_content[chunk_read_start..=chunk_read_end].to_vec();
1438        self.current_offset += chunk_size;
1439
1440        Some(Ok(result))
1441    }
1442}
1443
1444#[derive(Debug)]
1445pub struct GcStats {
1446    pub deleted_dags: usize,
1447    pub freed_bytes: u64,
1448}
1449
1450#[derive(Debug, Clone)]
1451pub struct DirEntry {
1452    pub name: String,
1453    pub cid: String,
1454    pub is_directory: bool,
1455    pub size: u64,
1456}
1457
1458#[derive(Debug, Clone)]
1459pub struct DirectoryListing {
1460    pub dir_name: String,
1461    pub entries: Vec<DirEntry>,
1462}
1463
1464/// Blob metadata for Blossom protocol
1465#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
1466pub struct BlobMetadata {
1467    pub sha256: String,
1468    pub size: u64,
1469    pub mime_type: String,
1470    pub uploaded: u64,
1471}
1472
1473// Implement ContentStore trait for WebRTC data exchange
1474impl crate::webrtc::ContentStore for HashtreeStore {
1475    fn get(&self, hash_hex: &str) -> Result<Option<Vec<u8>>> {
1476        let hash = from_hex(hash_hex).map_err(|e| anyhow::anyhow!("Invalid hash: {}", e))?;
1477        self.get_chunk(&hash)
1478    }
1479}
1480
1481#[cfg(test)]
1482mod tests {
1483    #[cfg(feature = "lmdb")]
1484    use super::*;
1485    #[cfg(feature = "lmdb")]
1486    use tempfile::TempDir;
1487
1488    #[cfg(feature = "lmdb")]
1489    #[test]
1490    fn hashtree_store_expands_blob_lmdb_map_size_to_storage_budget() -> Result<()> {
1491        let temp = TempDir::new()?;
1492        let requested = LMDB_BLOB_MIN_MAP_SIZE_BYTES + 64 * 1024 * 1024;
1493        let store = HashtreeStore::with_options_and_backend(
1494            temp.path(),
1495            None,
1496            requested,
1497            &StorageBackend::Lmdb,
1498        )?;
1499
1500        let map_size = match store.router.local.as_ref() {
1501            LocalStore::Lmdb(local) => local.map_size_bytes() as u64,
1502            LocalStore::Fs(_) => panic!("expected LMDB local store"),
1503        };
1504
1505        assert!(
1506            map_size >= requested,
1507            "expected blob LMDB map to grow to at least {requested} bytes, got {map_size}"
1508        );
1509
1510        drop(store);
1511        Ok(())
1512    }
1513
1514    #[cfg(feature = "lmdb")]
1515    #[test]
1516    fn local_store_can_override_lmdb_map_size() -> Result<()> {
1517        let temp = TempDir::new()?;
1518        let requested = 512 * 1024 * 1024u64;
1519        let store = LocalStore::new_with_lmdb_map_size(
1520            temp.path().join("lmdb-blobs"),
1521            &StorageBackend::Lmdb,
1522            Some(requested),
1523        )?;
1524
1525        let map_size = match store {
1526            LocalStore::Lmdb(local) => local.map_size_bytes() as u64,
1527            LocalStore::Fs(_) => panic!("expected LMDB local store"),
1528        };
1529
1530        assert!(
1531            map_size >= requested,
1532            "expected LMDB map to grow to at least {requested} bytes, got {map_size}"
1533        );
1534
1535        Ok(())
1536    }
1537}