Skip to main content

hashtree_cli/
storage.rs

1use anyhow::{Context, Result};
2use async_trait::async_trait;
3use futures::executor::block_on as sync_block_on;
4use futures::io::AllowStdIo;
5use futures::StreamExt;
6use hashtree_config::StorageBackend;
7use hashtree_core::store::{Store, StoreError};
8use hashtree_core::{
9    from_hex, sha256, to_hex, types::Hash, Cid, DirEntry as HashTreeDirEntry, HashTree,
10    HashTreeConfig, TreeNode,
11};
12use hashtree_fs::FsBlobStore;
13#[cfg(feature = "lmdb")]
14use hashtree_lmdb::LmdbBlobStore;
15use heed::types::*;
16use heed::{Database, EnvOpenOptions};
17use serde::{Deserialize, Serialize};
18use std::collections::HashSet;
19use std::io::{Read, Write};
20use std::path::Path;
21use std::sync::Arc;
22use std::time::{SystemTime, UNIX_EPOCH};
23
24/// Priority levels for tree eviction
25pub const PRIORITY_OTHER: u8 = 64;
26pub const PRIORITY_FOLLOWED: u8 = 128;
27pub const PRIORITY_OWN: u8 = 255;
28const LMDB_MAX_READERS: u32 = 1024;
29#[cfg(feature = "lmdb")]
30const LMDB_BLOB_MIN_MAP_SIZE_BYTES: u64 = 10 * 1024 * 1024 * 1024;
31
32/// Metadata for a synced tree (for eviction tracking)
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct TreeMeta {
35    /// Pubkey of tree owner
36    pub owner: String,
37    /// Tree name if known (from nostr key like "npub.../name")
38    pub name: Option<String>,
39    /// Unix timestamp when this tree was synced
40    pub synced_at: u64,
41    /// Total size of all blobs in this tree
42    pub total_size: u64,
43    /// Eviction priority: 255=own/pinned, 128=followed, 64=other
44    pub priority: u8,
45}
46
47/// Cached root info from Nostr events.
48#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct CachedRoot {
50    /// Root hash (hex)
51    pub hash: String,
52    /// Optional decryption key (hex)
53    pub key: Option<String>,
54    /// Unix timestamp when this was cached (from event created_at)
55    pub updated_at: u64,
56    /// Visibility: "public", "link-visible", or "private"
57    pub visibility: String,
58}
59
60/// Storage statistics
61#[derive(Debug, Clone)]
62pub struct LocalStoreStats {
63    pub count: usize,
64    pub total_bytes: u64,
65}
66
67/// Local blob store - wraps either FsBlobStore or LmdbBlobStore
68pub enum LocalStore {
69    Fs(FsBlobStore),
70    #[cfg(feature = "lmdb")]
71    Lmdb(LmdbBlobStore),
72}
73
74impl LocalStore {
75    /// Create a new unbounded local store.
76    ///
77    /// Higher-level stores that need quota enforcement should manage eviction
78    /// above this layer so tree metadata, pins, and archival policies stay
79    /// coherent.
80    pub fn new<P: AsRef<Path>>(path: P, backend: &StorageBackend) -> Result<Self, StoreError> {
81        Self::new_unbounded(path, backend)
82    }
83
84    /// Create a new local store with an explicit LMDB map size when using the LMDB backend.
85    pub fn new_with_lmdb_map_size<P: AsRef<Path>>(
86        path: P,
87        backend: &StorageBackend,
88        map_size_bytes: Option<u64>,
89    ) -> Result<Self, StoreError> {
90        match backend {
91            StorageBackend::Fs => Ok(LocalStore::Fs(FsBlobStore::new(path)?)),
92            #[cfg(feature = "lmdb")]
93            StorageBackend::Lmdb => match map_size_bytes {
94                Some(map_size_bytes) => {
95                    let map_size = usize::try_from(map_size_bytes).map_err(|_| {
96                        StoreError::Other("LMDB map size exceeds usize".to_string())
97                    })?;
98                    Ok(LocalStore::Lmdb(LmdbBlobStore::with_map_size(
99                        path, map_size,
100                    )?))
101                }
102                None => Ok(LocalStore::Lmdb(LmdbBlobStore::new(path)?)),
103            },
104            #[cfg(not(feature = "lmdb"))]
105            StorageBackend::Lmdb => {
106                tracing::warn!(
107                    "LMDB backend requested but lmdb feature not enabled, using filesystem storage"
108                );
109                Ok(LocalStore::Fs(FsBlobStore::new(path)?))
110            }
111        }
112    }
113
114    /// Create a new unbounded local store for a specific backend.
115    pub fn new_unbounded<P: AsRef<Path>>(
116        path: P,
117        backend: &StorageBackend,
118    ) -> Result<Self, StoreError> {
119        Self::new_with_lmdb_map_size(path, backend, None)
120    }
121
122    pub fn backend(&self) -> StorageBackend {
123        match self {
124            LocalStore::Fs(_) => StorageBackend::Fs,
125            #[cfg(feature = "lmdb")]
126            LocalStore::Lmdb(_) => StorageBackend::Lmdb,
127        }
128    }
129
130    /// Sync put operation
131    pub fn put_sync(&self, hash: Hash, data: &[u8]) -> Result<bool, StoreError> {
132        match self {
133            LocalStore::Fs(store) => store.put_sync(hash, data),
134            #[cfg(feature = "lmdb")]
135            LocalStore::Lmdb(store) => store.put_sync(hash, data),
136        }
137    }
138
139    /// Sync batch put operation.
140    pub fn put_many_sync(&self, items: &[(Hash, Vec<u8>)]) -> Result<usize, StoreError> {
141        match self {
142            LocalStore::Fs(store) => {
143                let mut inserted = 0usize;
144                for (hash, data) in items {
145                    if store.put_sync(*hash, data.as_slice())? {
146                        inserted += 1;
147                    }
148                }
149                Ok(inserted)
150            }
151            #[cfg(feature = "lmdb")]
152            LocalStore::Lmdb(store) => store.put_many_sync(items),
153        }
154    }
155
156    /// Sync get operation
157    pub fn get_sync(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
158        match self {
159            LocalStore::Fs(store) => store.get_sync(hash),
160            #[cfg(feature = "lmdb")]
161            LocalStore::Lmdb(store) => store.get_sync(hash),
162        }
163    }
164
165    /// Check if hash exists
166    pub fn exists(&self, hash: &Hash) -> Result<bool, StoreError> {
167        match self {
168            LocalStore::Fs(store) => Ok(store.exists(hash)),
169            #[cfg(feature = "lmdb")]
170            LocalStore::Lmdb(store) => store.exists(hash),
171        }
172    }
173
174    /// Sync delete operation
175    pub fn delete_sync(&self, hash: &Hash) -> Result<bool, StoreError> {
176        match self {
177            LocalStore::Fs(store) => store.delete_sync(hash),
178            #[cfg(feature = "lmdb")]
179            LocalStore::Lmdb(store) => store.delete_sync(hash),
180        }
181    }
182
183    /// Get storage statistics
184    pub fn stats(&self) -> Result<LocalStoreStats, StoreError> {
185        match self {
186            LocalStore::Fs(store) => {
187                let stats = store.stats()?;
188                Ok(LocalStoreStats {
189                    count: stats.count,
190                    total_bytes: stats.total_bytes,
191                })
192            }
193            #[cfg(feature = "lmdb")]
194            LocalStore::Lmdb(store) => {
195                let stats = store.stats()?;
196                Ok(LocalStoreStats {
197                    count: stats.count,
198                    total_bytes: stats.total_bytes,
199                })
200            }
201        }
202    }
203
204    /// List all hashes in the store
205    pub fn list(&self) -> Result<Vec<Hash>, StoreError> {
206        match self {
207            LocalStore::Fs(store) => store.list(),
208            #[cfg(feature = "lmdb")]
209            LocalStore::Lmdb(store) => store.list(),
210        }
211    }
212}
213
214#[async_trait]
215impl Store for LocalStore {
216    async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
217        self.put_sync(hash, &data)
218    }
219
220    async fn put_many(&self, items: Vec<(Hash, Vec<u8>)>) -> Result<usize, StoreError> {
221        self.put_many_sync(&items)
222    }
223
224    async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
225        self.get_sync(hash)
226    }
227
228    async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
229        self.exists(hash)
230    }
231
232    async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
233        self.delete_sync(hash)
234    }
235}
236
237#[cfg(feature = "s3")]
238use tokio::sync::mpsc;
239
240use crate::config::S3Config;
241
242/// Message for background S3 sync
243#[cfg(feature = "s3")]
244enum S3SyncMessage {
245    Upload { hash: Hash, data: Vec<u8> },
246    Delete { hash: Hash },
247}
248
249/// Storage router - local store primary with optional S3 backup
250///
251/// Write path: local first (fast), then queue S3 upload (non-blocking)
252/// Read path: local first, fall back to S3 if miss
253pub struct StorageRouter {
254    /// Primary local store (always used)
255    local: Arc<LocalStore>,
256    /// Optional S3 client for backup
257    #[cfg(feature = "s3")]
258    s3_client: Option<aws_sdk_s3::Client>,
259    #[cfg(feature = "s3")]
260    s3_bucket: Option<String>,
261    #[cfg(feature = "s3")]
262    s3_prefix: String,
263    /// Channel to send uploads to background task
264    #[cfg(feature = "s3")]
265    sync_tx: Option<mpsc::UnboundedSender<S3SyncMessage>>,
266}
267
268impl StorageRouter {
269    /// Create router with local storage only
270    pub fn new(local: Arc<LocalStore>) -> Self {
271        Self {
272            local,
273            #[cfg(feature = "s3")]
274            s3_client: None,
275            #[cfg(feature = "s3")]
276            s3_bucket: None,
277            #[cfg(feature = "s3")]
278            s3_prefix: String::new(),
279            #[cfg(feature = "s3")]
280            sync_tx: None,
281        }
282    }
283
284    /// Create router with local storage + S3 backup
285    #[cfg(feature = "s3")]
286    pub async fn with_s3(local: Arc<LocalStore>, config: &S3Config) -> Result<Self, anyhow::Error> {
287        use aws_sdk_s3::Client as S3Client;
288
289        // Build AWS config
290        let mut aws_config_loader = aws_config::from_env();
291        aws_config_loader =
292            aws_config_loader.region(aws_sdk_s3::config::Region::new(config.region.clone()));
293        let aws_config = aws_config_loader.load().await;
294
295        // Build S3 client with custom endpoint
296        let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&aws_config);
297        s3_config_builder = s3_config_builder
298            .endpoint_url(&config.endpoint)
299            .force_path_style(true);
300
301        let s3_client = S3Client::from_conf(s3_config_builder.build());
302        let bucket = config.bucket.clone();
303        let prefix = config.prefix.clone().unwrap_or_default();
304
305        // Create background sync channel
306        let (sync_tx, mut sync_rx) = mpsc::unbounded_channel::<S3SyncMessage>();
307
308        // Spawn background sync task with bounded concurrent uploads
309        let sync_client = s3_client.clone();
310        let sync_bucket = bucket.clone();
311        let sync_prefix = prefix.clone();
312
313        tokio::spawn(async move {
314            use aws_sdk_s3::primitives::ByteStream;
315
316            tracing::info!("S3 background sync task started");
317
318            // Limit concurrent uploads to prevent overwhelming the runtime
319            let semaphore = std::sync::Arc::new(tokio::sync::Semaphore::new(32));
320            let client = std::sync::Arc::new(sync_client);
321            let bucket = std::sync::Arc::new(sync_bucket);
322            let prefix = std::sync::Arc::new(sync_prefix);
323
324            while let Some(msg) = sync_rx.recv().await {
325                let client = client.clone();
326                let bucket = bucket.clone();
327                let prefix = prefix.clone();
328                let semaphore = semaphore.clone();
329
330                // Spawn each upload with semaphore-bounded concurrency
331                tokio::spawn(async move {
332                    // Acquire permit before uploading
333                    let _permit = semaphore.acquire().await;
334
335                    match msg {
336                        S3SyncMessage::Upload { hash, data } => {
337                            let key = format!("{}{}.bin", prefix, to_hex(&hash));
338                            tracing::debug!("S3 uploading {} ({} bytes)", &key, data.len());
339
340                            match client
341                                .put_object()
342                                .bucket(bucket.as_str())
343                                .key(&key)
344                                .body(ByteStream::from(data))
345                                .send()
346                                .await
347                            {
348                                Ok(_) => tracing::debug!("S3 upload succeeded: {}", &key),
349                                Err(e) => tracing::error!("S3 upload failed {}: {}", &key, e),
350                            }
351                        }
352                        S3SyncMessage::Delete { hash } => {
353                            let key = format!("{}{}.bin", prefix, to_hex(&hash));
354                            tracing::debug!("S3 deleting {}", &key);
355
356                            if let Err(e) = client
357                                .delete_object()
358                                .bucket(bucket.as_str())
359                                .key(&key)
360                                .send()
361                                .await
362                            {
363                                tracing::error!("S3 delete failed {}: {}", &key, e);
364                            }
365                        }
366                    }
367                });
368            }
369        });
370
371        tracing::info!(
372            "S3 storage initialized: bucket={}, prefix={}",
373            bucket,
374            prefix
375        );
376
377        Ok(Self {
378            local,
379            s3_client: Some(s3_client),
380            s3_bucket: Some(bucket),
381            s3_prefix: prefix,
382            sync_tx: Some(sync_tx),
383        })
384    }
385
386    /// Store data - writes to LMDB, queues S3 upload in background
387    pub fn put_sync(&self, hash: Hash, data: &[u8]) -> Result<bool, StoreError> {
388        // Always write to local first
389        let is_new = self.local.put_sync(hash, data)?;
390
391        // Queue S3 upload if configured (non-blocking)
392        // Always upload to S3 (even if not new locally) to ensure S3 has all blobs
393        #[cfg(feature = "s3")]
394        if let Some(ref tx) = self.sync_tx {
395            tracing::info!(
396                "Queueing S3 upload for {} ({} bytes, is_new={})",
397                crate::storage::to_hex(&hash)[..16].to_string(),
398                data.len(),
399                is_new
400            );
401            if let Err(e) = tx.send(S3SyncMessage::Upload {
402                hash,
403                data: data.to_vec(),
404            }) {
405                tracing::error!("Failed to queue S3 upload: {}", e);
406            }
407        }
408
409        Ok(is_new)
410    }
411
412    /// Store multiple blobs with a single local batch write when supported.
413    pub fn put_many_sync(&self, items: &[(Hash, Vec<u8>)]) -> Result<usize, StoreError> {
414        let inserted = self.local.put_many_sync(items)?;
415
416        #[cfg(feature = "s3")]
417        if let Some(ref tx) = self.sync_tx {
418            for (hash, data) in items {
419                if let Err(e) = tx.send(S3SyncMessage::Upload {
420                    hash: *hash,
421                    data: data.clone(),
422                }) {
423                    tracing::error!("Failed to queue S3 upload: {}", e);
424                }
425            }
426        }
427
428        Ok(inserted)
429    }
430
431    /// Get data - tries LMDB first, falls back to S3
432    pub fn get_sync(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
433        // Try local first
434        if let Some(data) = self.local.get_sync(hash)? {
435            return Ok(Some(data));
436        }
437
438        // Fall back to S3 if configured
439        #[cfg(feature = "s3")]
440        if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
441            let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
442
443            match sync_block_on(async { client.get_object().bucket(bucket).key(&key).send().await })
444            {
445                Ok(output) => {
446                    if let Ok(body) = sync_block_on(output.body.collect()) {
447                        let data = body.into_bytes().to_vec();
448                        // Cache locally for future reads
449                        let _ = self.local.put_sync(*hash, &data);
450                        return Ok(Some(data));
451                    }
452                }
453                Err(e) => {
454                    let service_err = e.into_service_error();
455                    if !service_err.is_no_such_key() {
456                        tracing::warn!("S3 get failed: {}", service_err);
457                    }
458                }
459            }
460        }
461
462        Ok(None)
463    }
464
465    /// Check if hash exists
466    pub fn exists(&self, hash: &Hash) -> Result<bool, StoreError> {
467        // Check local first
468        if self.local.exists(hash)? {
469            return Ok(true);
470        }
471
472        // Check S3 if configured
473        #[cfg(feature = "s3")]
474        if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
475            let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
476
477            match sync_block_on(async {
478                client.head_object().bucket(bucket).key(&key).send().await
479            }) {
480                Ok(_) => return Ok(true),
481                Err(e) => {
482                    let service_err = e.into_service_error();
483                    if !service_err.is_not_found() {
484                        tracing::warn!("S3 head failed: {}", service_err);
485                    }
486                }
487            }
488        }
489
490        Ok(false)
491    }
492
493    /// Delete data from both local and S3 stores
494    pub fn delete_sync(&self, hash: &Hash) -> Result<bool, StoreError> {
495        let deleted = self.local.delete_sync(hash)?;
496
497        // Queue S3 delete if configured
498        #[cfg(feature = "s3")]
499        if let Some(ref tx) = self.sync_tx {
500            let _ = tx.send(S3SyncMessage::Delete { hash: *hash });
501        }
502
503        Ok(deleted)
504    }
505
506    /// Delete data from local store only (don't propagate to S3)
507    /// Used for eviction where we want to keep S3 as archive
508    pub fn delete_local_only(&self, hash: &Hash) -> Result<bool, StoreError> {
509        self.local.delete_sync(hash)
510    }
511
512    /// Get stats from local store
513    pub fn stats(&self) -> Result<LocalStoreStats, StoreError> {
514        self.local.stats()
515    }
516
517    /// List all hashes from local store
518    pub fn list(&self) -> Result<Vec<Hash>, StoreError> {
519        self.local.list()
520    }
521
522    /// Get the underlying local store for HashTree operations
523    pub fn local_store(&self) -> Arc<LocalStore> {
524        Arc::clone(&self.local)
525    }
526}
527
528// Implement async Store trait for StorageRouter so it can be used directly with HashTree
529// This ensures all writes go through S3 sync
530#[async_trait]
531impl Store for StorageRouter {
532    async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
533        self.put_sync(hash, &data)
534    }
535
536    async fn put_many(&self, items: Vec<(Hash, Vec<u8>)>) -> Result<usize, StoreError> {
537        self.put_many_sync(&items)
538    }
539
540    async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
541        self.get_sync(hash)
542    }
543
544    async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
545        self.exists(hash)
546    }
547
548    async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
549        self.delete_sync(hash)
550    }
551}
552
553pub struct HashtreeStore {
554    env: heed::Env,
555    /// Set of pinned hashes (32-byte raw hashes, prevents garbage collection)
556    pins: Database<Bytes, Unit>,
557    /// Blob ownership: sha256 (32 bytes) ++ pubkey (32 bytes) -> () (composite key for multi-owner)
558    blob_owners: Database<Bytes, Unit>,
559    /// Maps pubkey (32 bytes) -> blob metadata JSON (for blossom list)
560    pubkey_blobs: Database<Bytes, Bytes>,
561    /// Tree metadata for eviction: tree_root_hash (32 bytes) -> TreeMeta (msgpack)
562    tree_meta: Database<Bytes, Bytes>,
563    /// Blob-to-tree mapping: blob_hash ++ tree_hash (64 bytes) -> ()
564    blob_trees: Database<Bytes, Unit>,
565    /// Tree refs: "npub/path" -> tree_root_hash (32 bytes) - for replacing old versions
566    tree_refs: Database<Str, Bytes>,
567    /// Cached roots from Nostr: "pubkey_hex/tree_name" -> CachedRoot (msgpack)
568    cached_roots: Database<Str, Bytes>,
569    /// Storage router - handles LMDB + optional S3 (Arc for sharing with HashTree)
570    router: Arc<StorageRouter>,
571    /// Maximum storage size in bytes (from config)
572    max_size_bytes: u64,
573}
574
575impl HashtreeStore {
576    /// Create a new store with the configured local storage limit.
577    pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
578        let config = hashtree_config::Config::load_or_default();
579        let max_size_bytes = config
580            .storage
581            .max_size_gb
582            .saturating_mul(1024 * 1024 * 1024);
583        Self::with_options_and_backend(path, None, max_size_bytes, &config.storage.backend)
584    }
585
586    /// Create a new store with an explicit local backend and size limit.
587    pub fn new_with_backend<P: AsRef<Path>>(
588        path: P,
589        backend: hashtree_config::StorageBackend,
590        max_size_bytes: u64,
591    ) -> Result<Self> {
592        Self::with_options_and_backend(path, None, max_size_bytes, &backend)
593    }
594
595    /// Create a new store with optional S3 backend and the configured local storage limit.
596    pub fn with_s3<P: AsRef<Path>>(path: P, s3_config: Option<&S3Config>) -> Result<Self> {
597        let config = hashtree_config::Config::load_or_default();
598        let max_size_bytes = config
599            .storage
600            .max_size_gb
601            .saturating_mul(1024 * 1024 * 1024);
602        Self::with_options_and_backend(path, s3_config, max_size_bytes, &config.storage.backend)
603    }
604
605    /// Create a new store with optional S3 backend and custom size limit.
606    ///
607    /// The raw local blob backend remains unbounded. `HashtreeStore` enforces
608    /// `max_size_bytes` at the tree-management layer so eviction can honor pins,
609    /// orphan handling, and local-only eviction when S3 is used as archive.
610    pub fn with_options<P: AsRef<Path>>(
611        path: P,
612        s3_config: Option<&S3Config>,
613        max_size_bytes: u64,
614    ) -> Result<Self> {
615        let config = hashtree_config::Config::load_or_default();
616        Self::with_options_and_backend(path, s3_config, max_size_bytes, &config.storage.backend)
617    }
618
619    fn with_options_and_backend<P: AsRef<Path>>(
620        path: P,
621        s3_config: Option<&S3Config>,
622        max_size_bytes: u64,
623        backend: &hashtree_config::StorageBackend,
624    ) -> Result<Self> {
625        let path = path.as_ref();
626        std::fs::create_dir_all(path)?;
627
628        let env = unsafe {
629            EnvOpenOptions::new()
630                .map_size(10 * 1024 * 1024 * 1024) // 10GB virtual address space
631                .max_dbs(8) // pins, blob_owners, pubkey_blobs, tree_meta, blob_trees, tree_refs, cached_roots, blobs
632                .max_readers(LMDB_MAX_READERS)
633                .open(path)?
634        };
635        let _ = env.clear_stale_readers();
636
637        let mut wtxn = env.write_txn()?;
638        let pins = env.create_database(&mut wtxn, Some("pins"))?;
639        let blob_owners = env.create_database(&mut wtxn, Some("blob_owners"))?;
640        let pubkey_blobs = env.create_database(&mut wtxn, Some("pubkey_blobs"))?;
641        let tree_meta = env.create_database(&mut wtxn, Some("tree_meta"))?;
642        let blob_trees = env.create_database(&mut wtxn, Some("blob_trees"))?;
643        let tree_refs = env.create_database(&mut wtxn, Some("tree_refs"))?;
644        let cached_roots = env.create_database(&mut wtxn, Some("cached_roots"))?;
645        wtxn.commit()?;
646
647        // Intentionally keep the raw blob backend unbounded here. HashtreeStore
648        // owns quota policy above this layer, where it can coordinate eviction
649        // with tree refs, blob ownership, pins, and S3 archival behavior.
650        let local_store = Arc::new(match backend {
651            hashtree_config::StorageBackend::Fs => LocalStore::Fs(
652                FsBlobStore::new(path.join("blobs"))
653                    .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
654            ),
655            #[cfg(feature = "lmdb")]
656            hashtree_config::StorageBackend::Lmdb => {
657                let requested_map_size = max_size_bytes.max(LMDB_BLOB_MIN_MAP_SIZE_BYTES);
658                let map_size = usize::try_from(requested_map_size)
659                    .context("LMDB blob map size exceeds usize")?;
660                LocalStore::Lmdb(
661                    LmdbBlobStore::with_map_size(path.join("blobs"), map_size)
662                        .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
663                )
664            }
665            #[cfg(not(feature = "lmdb"))]
666            hashtree_config::StorageBackend::Lmdb => {
667                tracing::warn!(
668                    "LMDB backend requested but lmdb feature not enabled, using filesystem storage"
669                );
670                LocalStore::Fs(
671                    FsBlobStore::new(path.join("blobs"))
672                        .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
673                )
674            }
675        });
676
677        // Create storage router with optional S3
678        #[cfg(feature = "s3")]
679        let router = Arc::new(if let Some(s3_cfg) = s3_config {
680            tracing::info!(
681                "Initializing S3 storage backend: bucket={}, endpoint={}",
682                s3_cfg.bucket,
683                s3_cfg.endpoint
684            );
685
686            sync_block_on(async { StorageRouter::with_s3(local_store, s3_cfg).await })?
687        } else {
688            StorageRouter::new(local_store)
689        });
690
691        #[cfg(not(feature = "s3"))]
692        let router = Arc::new({
693            if s3_config.is_some() {
694                tracing::warn!(
695                    "S3 config provided but S3 feature not enabled. Using local storage only."
696                );
697            }
698            StorageRouter::new(local_store)
699        });
700
701        Ok(Self {
702            env,
703            pins,
704            blob_owners,
705            pubkey_blobs,
706            tree_meta,
707            blob_trees,
708            tree_refs,
709            cached_roots,
710            router,
711            max_size_bytes,
712        })
713    }
714
715    /// Get the storage router
716    pub fn router(&self) -> &StorageRouter {
717        &self.router
718    }
719
720    /// Get the storage router as Arc (for use with HashTree which needs Arc<dyn Store>)
721    /// All writes through this go to both LMDB and S3
722    pub fn store_arc(&self) -> Arc<StorageRouter> {
723        Arc::clone(&self.router)
724    }
725
726    /// Upload a file as raw plaintext and return its CID, with auto-pin
727    pub fn upload_file<P: AsRef<Path>>(&self, file_path: P) -> Result<String> {
728        self.upload_file_internal(file_path, true)
729    }
730
731    /// Upload a file without pinning (for blossom uploads that can be evicted)
732    pub fn upload_file_no_pin<P: AsRef<Path>>(&self, file_path: P) -> Result<String> {
733        self.upload_file_internal(file_path, false)
734    }
735
736    fn upload_file_internal<P: AsRef<Path>>(&self, file_path: P, pin: bool) -> Result<String> {
737        let file_path = file_path.as_ref();
738        let file = std::fs::File::open(file_path)
739            .with_context(|| format!("Failed to open file {}", file_path.display()))?;
740
741        // Store raw plaintext blobs without CHK encryption, streaming from disk.
742        let store = self.store_arc();
743        let tree = HashTree::new(HashTreeConfig::new(store).public());
744
745        let (cid, _size) = sync_block_on(async { tree.put_stream(AllowStdIo::new(file)).await })
746            .context("Failed to store file")?;
747
748        // Only pin if requested (htree add = pin, blossom upload = no pin)
749        if pin {
750            let mut wtxn = self.env.write_txn()?;
751            self.pins.put(&mut wtxn, cid.hash.as_slice(), &())?;
752            wtxn.commit()?;
753        }
754
755        Ok(to_hex(&cid.hash))
756    }
757
758    /// Upload a file from a stream with progress callbacks
759    pub fn upload_file_stream<R: Read, F>(
760        &self,
761        reader: R,
762        _file_name: impl Into<String>,
763        mut callback: F,
764    ) -> Result<String>
765    where
766        F: FnMut(&str),
767    {
768        // Use HashTree.put_stream for streaming upload without CHK encryption.
769        let store = self.store_arc();
770        let tree = HashTree::new(HashTreeConfig::new(store).public());
771
772        let (cid, _size) = sync_block_on(async { tree.put_stream(AllowStdIo::new(reader)).await })
773            .context("Failed to store file")?;
774
775        let root_hex = to_hex(&cid.hash);
776        callback(&root_hex);
777
778        // Auto-pin on upload
779        let mut wtxn = self.env.write_txn()?;
780        self.pins.put(&mut wtxn, cid.hash.as_slice(), &())?;
781        wtxn.commit()?;
782
783        Ok(root_hex)
784    }
785
786    /// Upload a directory and return its root hash (hex)
787    /// Respects .gitignore by default
788    pub fn upload_dir<P: AsRef<Path>>(&self, dir_path: P) -> Result<String> {
789        self.upload_dir_with_options(dir_path, true)
790    }
791
792    /// Upload a directory with options as raw plaintext (no CHK encryption)
793    pub fn upload_dir_with_options<P: AsRef<Path>>(
794        &self,
795        dir_path: P,
796        respect_gitignore: bool,
797    ) -> Result<String> {
798        let dir_path = dir_path.as_ref();
799
800        let store = self.store_arc();
801        let tree = HashTree::new(HashTreeConfig::new(store).public());
802
803        let root_cid = sync_block_on(async {
804            self.upload_dir_recursive(&tree, dir_path, dir_path, respect_gitignore)
805                .await
806        })
807        .context("Failed to upload directory")?;
808
809        let root_hex = to_hex(&root_cid.hash);
810
811        let mut wtxn = self.env.write_txn()?;
812        self.pins.put(&mut wtxn, root_cid.hash.as_slice(), &())?;
813        wtxn.commit()?;
814
815        Ok(root_hex)
816    }
817
818    async fn upload_dir_recursive<S: Store>(
819        &self,
820        tree: &HashTree<S>,
821        _root_path: &Path,
822        current_path: &Path,
823        respect_gitignore: bool,
824    ) -> Result<Cid> {
825        use ignore::WalkBuilder;
826        use std::collections::HashMap;
827
828        // Build directory structure from flat file list - store full Cid with key
829        let mut dir_contents: HashMap<String, Vec<(String, Cid)>> = HashMap::new();
830        dir_contents.insert(String::new(), Vec::new()); // Root
831
832        let walker = WalkBuilder::new(current_path)
833            .git_ignore(respect_gitignore)
834            .git_global(respect_gitignore)
835            .git_exclude(respect_gitignore)
836            .hidden(false)
837            .build();
838
839        for result in walker {
840            let entry = result?;
841            let path = entry.path();
842
843            // Skip the root directory itself
844            if path == current_path {
845                continue;
846            }
847
848            let relative = path.strip_prefix(current_path).unwrap_or(path);
849
850            if path.is_file() {
851                let file = std::fs::File::open(path)
852                    .with_context(|| format!("Failed to open file {}", path.display()))?;
853                let (cid, _size) = tree.put_stream(AllowStdIo::new(file)).await.map_err(|e| {
854                    anyhow::anyhow!("Failed to upload file {}: {}", path.display(), e)
855                })?;
856
857                // Get parent directory path and file name
858                let parent = relative
859                    .parent()
860                    .map(|p| p.to_string_lossy().to_string())
861                    .unwrap_or_default();
862                let name = relative
863                    .file_name()
864                    .map(|n| n.to_string_lossy().to_string())
865                    .unwrap_or_default();
866
867                dir_contents.entry(parent).or_default().push((name, cid));
868            } else if path.is_dir() {
869                // Ensure directory entry exists
870                let dir_path = relative.to_string_lossy().to_string();
871                dir_contents.entry(dir_path).or_default();
872            }
873        }
874
875        // Build directory tree bottom-up
876        self.build_directory_tree(tree, &mut dir_contents).await
877    }
878
879    async fn build_directory_tree<S: Store>(
880        &self,
881        tree: &HashTree<S>,
882        dir_contents: &mut std::collections::HashMap<String, Vec<(String, Cid)>>,
883    ) -> Result<Cid> {
884        // Sort directories by depth (deepest first) to build bottom-up
885        let mut dirs: Vec<String> = dir_contents.keys().cloned().collect();
886        dirs.sort_by(|a, b| {
887            let depth_a = a.matches('/').count() + if a.is_empty() { 0 } else { 1 };
888            let depth_b = b.matches('/').count() + if b.is_empty() { 0 } else { 1 };
889            depth_b.cmp(&depth_a) // Deepest first
890        });
891
892        let mut dir_cids: std::collections::HashMap<String, Cid> = std::collections::HashMap::new();
893
894        for dir_path in dirs {
895            let files = dir_contents.get(&dir_path).cloned().unwrap_or_default();
896
897            let mut entries: Vec<HashTreeDirEntry> = files
898                .into_iter()
899                .map(|(name, cid)| HashTreeDirEntry::from_cid(name, &cid))
900                .collect();
901
902            // Add subdirectory entries
903            for (subdir_path, cid) in &dir_cids {
904                let parent = std::path::Path::new(subdir_path)
905                    .parent()
906                    .map(|p| p.to_string_lossy().to_string())
907                    .unwrap_or_default();
908
909                if parent == dir_path {
910                    let name = std::path::Path::new(subdir_path)
911                        .file_name()
912                        .map(|n| n.to_string_lossy().to_string())
913                        .unwrap_or_default();
914                    entries.push(HashTreeDirEntry::from_cid(name, cid));
915                }
916            }
917
918            let cid = tree
919                .put_directory(entries)
920                .await
921                .map_err(|e| anyhow::anyhow!("Failed to create directory node: {}", e))?;
922
923            dir_cids.insert(dir_path, cid);
924        }
925
926        // Return root Cid
927        dir_cids
928            .get("")
929            .cloned()
930            .ok_or_else(|| anyhow::anyhow!("No root directory"))
931    }
932
933    /// Upload a file with CHK encryption, returns CID in format "hash:key"
934    pub fn upload_file_encrypted<P: AsRef<Path>>(&self, file_path: P) -> Result<String> {
935        let file_path = file_path.as_ref();
936        let file = std::fs::File::open(file_path)
937            .with_context(|| format!("Failed to open file {}", file_path.display()))?;
938
939        // Use unified API with encryption enabled (default), streaming from disk.
940        let store = self.store_arc();
941        let tree = HashTree::new(HashTreeConfig::new(store));
942
943        let (cid, _size) = sync_block_on(async { tree.put_stream(AllowStdIo::new(file)).await })
944            .map_err(|e| anyhow::anyhow!("Failed to encrypt file: {}", e))?;
945
946        let cid_str = cid.to_string();
947
948        let mut wtxn = self.env.write_txn()?;
949        self.pins.put(&mut wtxn, cid.hash.as_slice(), &())?;
950        wtxn.commit()?;
951
952        Ok(cid_str)
953    }
954
955    /// Upload a directory with CHK encryption, returns CID
956    /// Respects .gitignore by default
957    pub fn upload_dir_encrypted<P: AsRef<Path>>(&self, dir_path: P) -> Result<String> {
958        self.upload_dir_encrypted_with_options(dir_path, true)
959    }
960
961    /// Upload a directory with CHK encryption and options
962    /// Returns CID as "hash:key" format for encrypted directories
963    pub fn upload_dir_encrypted_with_options<P: AsRef<Path>>(
964        &self,
965        dir_path: P,
966        respect_gitignore: bool,
967    ) -> Result<String> {
968        let dir_path = dir_path.as_ref();
969        let store = self.store_arc();
970
971        // Use unified API with encryption enabled (default)
972        let tree = HashTree::new(HashTreeConfig::new(store));
973
974        let root_cid = sync_block_on(async {
975            self.upload_dir_recursive(&tree, dir_path, dir_path, respect_gitignore)
976                .await
977        })
978        .context("Failed to upload encrypted directory")?;
979
980        let cid_str = root_cid.to_string(); // Returns "hash:key" or "hash"
981
982        let mut wtxn = self.env.write_txn()?;
983        // Pin by hash only (the key is for decryption, not identification)
984        self.pins.put(&mut wtxn, root_cid.hash.as_slice(), &())?;
985        wtxn.commit()?;
986
987        Ok(cid_str)
988    }
989
990    /// Get tree node by hash (raw bytes)
991    pub fn get_tree_node(&self, hash: &[u8; 32]) -> Result<Option<TreeNode>> {
992        let store = self.store_arc();
993        let tree = HashTree::new(HashTreeConfig::new(store).public());
994
995        sync_block_on(async {
996            tree.get_tree_node(hash)
997                .await
998                .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))
999        })
1000    }
1001
1002    /// Store a raw blob, returns SHA256 hash as hex.
1003    pub fn put_blob(&self, data: &[u8]) -> Result<String> {
1004        let hash = sha256(data);
1005        self.router
1006            .put_sync(hash, data)
1007            .map_err(|e| anyhow::anyhow!("Failed to store blob: {}", e))?;
1008        Ok(to_hex(&hash))
1009    }
1010
1011    /// Get a raw blob by SHA256 hash (raw bytes).
1012    pub fn get_blob(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
1013        self.router
1014            .get_sync(hash)
1015            .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))
1016    }
1017
1018    /// Check if a blob exists by SHA256 hash (raw bytes).
1019    pub fn blob_exists(&self, hash: &[u8; 32]) -> Result<bool> {
1020        self.router
1021            .exists(hash)
1022            .map_err(|e| anyhow::anyhow!("Failed to check blob: {}", e))
1023    }
1024
1025    // === Blossom ownership tracking ===
1026    // Uses composite key: sha256 (32 bytes) ++ pubkey (32 bytes) -> ()
1027    // This allows efficient multi-owner tracking with O(1) lookups
1028
1029    /// Build composite key for blob_owners: sha256 ++ pubkey (64 bytes total)
1030    fn blob_owner_key(sha256: &[u8; 32], pubkey: &[u8; 32]) -> [u8; 64] {
1031        let mut key = [0u8; 64];
1032        key[..32].copy_from_slice(sha256);
1033        key[32..].copy_from_slice(pubkey);
1034        key
1035    }
1036
1037    /// Add an owner (pubkey) to a blob for Blossom protocol
1038    /// Multiple users can own the same blob - it's only deleted when all owners remove it
1039    pub fn set_blob_owner(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<()> {
1040        let key = Self::blob_owner_key(sha256, pubkey);
1041        let mut wtxn = self.env.write_txn()?;
1042
1043        // Add ownership entry (idempotent - put overwrites)
1044        self.blob_owners.put(&mut wtxn, &key[..], &())?;
1045
1046        // Convert sha256 to hex for BlobMetadata (which stores sha256 as hex string)
1047        let sha256_hex = to_hex(sha256);
1048
1049        // Get existing blobs for this pubkey (for /list endpoint)
1050        let mut blobs: Vec<BlobMetadata> = self
1051            .pubkey_blobs
1052            .get(&wtxn, pubkey)?
1053            .and_then(|b| serde_json::from_slice(b).ok())
1054            .unwrap_or_default();
1055
1056        // Check if blob already exists for this pubkey
1057        if !blobs.iter().any(|b| b.sha256 == sha256_hex) {
1058            let now = SystemTime::now()
1059                .duration_since(UNIX_EPOCH)
1060                .unwrap()
1061                .as_secs();
1062
1063            // Get size from raw blob
1064            let size = self
1065                .get_blob(sha256)?
1066                .map(|data| data.len() as u64)
1067                .unwrap_or(0);
1068
1069            blobs.push(BlobMetadata {
1070                sha256: sha256_hex,
1071                size,
1072                mime_type: "application/octet-stream".to_string(),
1073                uploaded: now,
1074            });
1075
1076            let blobs_json = serde_json::to_vec(&blobs)?;
1077            self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
1078        }
1079
1080        wtxn.commit()?;
1081        Ok(())
1082    }
1083
1084    /// Check if a pubkey owns a blob
1085    pub fn is_blob_owner(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<bool> {
1086        let key = Self::blob_owner_key(sha256, pubkey);
1087        let rtxn = self.env.read_txn()?;
1088        Ok(self.blob_owners.get(&rtxn, &key[..])?.is_some())
1089    }
1090
1091    /// Get all owners (pubkeys) of a blob via prefix scan (returns raw bytes)
1092    pub fn get_blob_owners(&self, sha256: &[u8; 32]) -> Result<Vec<[u8; 32]>> {
1093        let rtxn = self.env.read_txn()?;
1094
1095        let mut owners = Vec::new();
1096        for item in self.blob_owners.prefix_iter(&rtxn, &sha256[..])? {
1097            let (key, _) = item?;
1098            if key.len() == 64 {
1099                // Extract pubkey from composite key (bytes 32-64)
1100                let mut pubkey = [0u8; 32];
1101                pubkey.copy_from_slice(&key[32..64]);
1102                owners.push(pubkey);
1103            }
1104        }
1105        Ok(owners)
1106    }
1107
1108    /// Check if blob has any owners
1109    pub fn blob_has_owners(&self, sha256: &[u8; 32]) -> Result<bool> {
1110        let rtxn = self.env.read_txn()?;
1111
1112        // Just check if any entry exists with this prefix
1113        for item in self.blob_owners.prefix_iter(&rtxn, &sha256[..])? {
1114            if item.is_ok() {
1115                return Ok(true);
1116            }
1117        }
1118        Ok(false)
1119    }
1120
1121    /// Get the first owner (pubkey) of a blob (for backwards compatibility)
1122    pub fn get_blob_owner(&self, sha256: &[u8; 32]) -> Result<Option<[u8; 32]>> {
1123        Ok(self.get_blob_owners(sha256)?.into_iter().next())
1124    }
1125
1126    /// Remove a user's ownership of a blossom blob
1127    /// Only deletes the actual blob when no owners remain
1128    /// Returns true if the blob was actually deleted (no owners left)
1129    pub fn delete_blossom_blob(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<bool> {
1130        let key = Self::blob_owner_key(sha256, pubkey);
1131        let mut wtxn = self.env.write_txn()?;
1132
1133        // Remove this pubkey's ownership entry
1134        self.blob_owners.delete(&mut wtxn, &key[..])?;
1135
1136        // Hex strings for logging and BlobMetadata (which stores sha256 as hex string)
1137        let sha256_hex = to_hex(sha256);
1138
1139        // Remove from pubkey's blob list
1140        if let Some(blobs_bytes) = self.pubkey_blobs.get(&wtxn, pubkey)? {
1141            if let Ok(mut blobs) = serde_json::from_slice::<Vec<BlobMetadata>>(blobs_bytes) {
1142                blobs.retain(|b| b.sha256 != sha256_hex);
1143                let blobs_json = serde_json::to_vec(&blobs)?;
1144                self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
1145            }
1146        }
1147
1148        // Check if any other owners remain (prefix scan)
1149        let mut has_other_owners = false;
1150        for item in self.blob_owners.prefix_iter(&wtxn, &sha256[..])? {
1151            if item.is_ok() {
1152                has_other_owners = true;
1153                break;
1154            }
1155        }
1156
1157        if has_other_owners {
1158            wtxn.commit()?;
1159            tracing::debug!(
1160                "Removed {} from blob {} owners, other owners remain",
1161                &to_hex(pubkey)[..8],
1162                &sha256_hex[..8]
1163            );
1164            return Ok(false);
1165        }
1166
1167        // No owners left - delete the blob completely
1168        tracing::info!(
1169            "All owners removed from blob {}, deleting",
1170            &sha256_hex[..8]
1171        );
1172
1173        // Delete raw blob (by content hash) - this deletes from S3 too
1174        let _ = self.router.delete_sync(sha256);
1175
1176        wtxn.commit()?;
1177        Ok(true)
1178    }
1179
1180    /// List all blobs owned by a pubkey (for Blossom /list endpoint)
1181    pub fn list_blobs_by_pubkey(
1182        &self,
1183        pubkey: &[u8; 32],
1184    ) -> Result<Vec<crate::server::blossom::BlobDescriptor>> {
1185        let rtxn = self.env.read_txn()?;
1186
1187        let blobs: Vec<BlobMetadata> = self
1188            .pubkey_blobs
1189            .get(&rtxn, pubkey)?
1190            .and_then(|b| serde_json::from_slice(b).ok())
1191            .unwrap_or_default();
1192
1193        Ok(blobs
1194            .into_iter()
1195            .map(|b| crate::server::blossom::BlobDescriptor {
1196                url: format!("/{}", b.sha256),
1197                sha256: b.sha256,
1198                size: b.size,
1199                mime_type: b.mime_type,
1200                uploaded: b.uploaded,
1201            })
1202            .collect())
1203    }
1204
1205    /// Get a single chunk/blob by hash (raw bytes)
1206    pub fn get_chunk(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
1207        self.router
1208            .get_sync(hash)
1209            .map_err(|e| anyhow::anyhow!("Failed to get chunk: {}", e))
1210    }
1211
1212    /// Get file content by hash (raw bytes)
1213    /// Returns raw bytes (caller handles decryption if needed)
1214    pub fn get_file(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
1215        let store = self.store_arc();
1216        let tree = HashTree::new(HashTreeConfig::new(store).public());
1217
1218        sync_block_on(async {
1219            tree.read_file(hash)
1220                .await
1221                .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
1222        })
1223    }
1224
1225    /// Get file content by Cid (hash + optional decryption key as raw bytes)
1226    /// Handles decryption automatically if key is present
1227    pub fn get_file_by_cid(&self, cid: &Cid) -> Result<Option<Vec<u8>>> {
1228        let store = self.store_arc();
1229        let tree = HashTree::new(HashTreeConfig::new(store).public());
1230
1231        sync_block_on(async {
1232            tree.get(cid, None)
1233                .await
1234                .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
1235        })
1236    }
1237
1238    fn ensure_cid_exists(&self, cid: &Cid) -> Result<()> {
1239        let exists = self
1240            .router
1241            .exists(&cid.hash)
1242            .map_err(|e| anyhow::anyhow!("Failed to check cid existence: {}", e))?;
1243        if !exists {
1244            anyhow::bail!("CID not found: {}", to_hex(&cid.hash));
1245        }
1246        Ok(())
1247    }
1248
1249    /// Stream file content identified by Cid into a writer without buffering full file in memory.
1250    pub fn write_file_by_cid_to_writer<W: Write>(&self, cid: &Cid, writer: &mut W) -> Result<u64> {
1251        self.ensure_cid_exists(cid)?;
1252
1253        let store = self.store_arc();
1254        let tree = HashTree::new(HashTreeConfig::new(store).public());
1255        let mut total_bytes = 0u64;
1256        let mut streamed_any_chunk = false;
1257
1258        sync_block_on(async {
1259            let mut stream = tree.get_stream(cid);
1260            while let Some(chunk) = stream.next().await {
1261                streamed_any_chunk = true;
1262                let chunk =
1263                    chunk.map_err(|e| anyhow::anyhow!("Failed to stream file chunk: {}", e))?;
1264                writer
1265                    .write_all(&chunk)
1266                    .map_err(|e| anyhow::anyhow!("Failed to write file chunk: {}", e))?;
1267                total_bytes += chunk.len() as u64;
1268            }
1269            Ok::<(), anyhow::Error>(())
1270        })?;
1271
1272        if !streamed_any_chunk {
1273            anyhow::bail!("CID not found: {}", to_hex(&cid.hash));
1274        }
1275
1276        writer
1277            .flush()
1278            .map_err(|e| anyhow::anyhow!("Failed to flush output: {}", e))?;
1279        Ok(total_bytes)
1280    }
1281
1282    /// Stream file content identified by Cid directly into a destination path.
1283    pub fn write_file_by_cid<P: AsRef<Path>>(&self, cid: &Cid, output_path: P) -> Result<u64> {
1284        self.ensure_cid_exists(cid)?;
1285
1286        let output_path = output_path.as_ref();
1287        if let Some(parent) = output_path.parent() {
1288            if !parent.as_os_str().is_empty() {
1289                std::fs::create_dir_all(parent).with_context(|| {
1290                    format!("Failed to create output directory {}", parent.display())
1291                })?;
1292            }
1293        }
1294
1295        let mut file = std::fs::File::create(output_path)
1296            .with_context(|| format!("Failed to create output file {}", output_path.display()))?;
1297        self.write_file_by_cid_to_writer(cid, &mut file)
1298    }
1299
1300    /// Stream a public (unencrypted) file by hash directly into a destination path.
1301    pub fn write_file<P: AsRef<Path>>(&self, hash: &[u8; 32], output_path: P) -> Result<u64> {
1302        self.write_file_by_cid(&Cid::public(*hash), output_path)
1303    }
1304
1305    /// Resolve a path within a tree (returns Cid with key if encrypted)
1306    pub fn resolve_path(&self, cid: &Cid, path: &str) -> Result<Option<Cid>> {
1307        let store = self.store_arc();
1308        let tree = HashTree::new(HashTreeConfig::new(store).public());
1309
1310        sync_block_on(async {
1311            tree.resolve_path(cid, path)
1312                .await
1313                .map_err(|e| anyhow::anyhow!("Failed to resolve path: {}", e))
1314        })
1315    }
1316
1317    /// Get chunk metadata for a file (chunk list, sizes, total size)
1318    pub fn get_file_chunk_metadata(&self, hash: &[u8; 32]) -> Result<Option<FileChunkMetadata>> {
1319        let store = self.store_arc();
1320        let tree = HashTree::new(HashTreeConfig::new(store.clone()).public());
1321
1322        sync_block_on(async {
1323            // First check if the hash exists in the store at all
1324            // (either as a blob or tree node)
1325            let exists = store
1326                .has(hash)
1327                .await
1328                .map_err(|e| anyhow::anyhow!("Failed to check existence: {}", e))?;
1329
1330            if !exists {
1331                return Ok(None);
1332            }
1333
1334            // Get total size
1335            let total_size = tree
1336                .get_size(hash)
1337                .await
1338                .map_err(|e| anyhow::anyhow!("Failed to get size: {}", e))?;
1339
1340            // Check if it's a tree (chunked) or blob
1341            let is_tree_node = tree
1342                .is_tree(hash)
1343                .await
1344                .map_err(|e| anyhow::anyhow!("Failed to check tree: {}", e))?;
1345
1346            if !is_tree_node {
1347                // Single blob, not chunked
1348                return Ok(Some(FileChunkMetadata {
1349                    total_size,
1350                    chunk_hashes: vec![],
1351                    chunk_sizes: vec![],
1352                    is_chunked: false,
1353                }));
1354            }
1355
1356            // Get tree node to extract chunk info
1357            let node = match tree
1358                .get_tree_node(hash)
1359                .await
1360                .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
1361            {
1362                Some(n) => n,
1363                None => return Ok(None),
1364            };
1365
1366            // Check if it's a directory (has named links)
1367            let is_directory = tree
1368                .is_directory(hash)
1369                .await
1370                .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1371
1372            if is_directory {
1373                return Ok(None); // Not a file
1374            }
1375
1376            // Extract chunk info from links
1377            let chunk_hashes: Vec<Hash> = node.links.iter().map(|l| l.hash).collect();
1378            let chunk_sizes: Vec<u64> = node.links.iter().map(|l| l.size).collect();
1379
1380            Ok(Some(FileChunkMetadata {
1381                total_size,
1382                chunk_hashes,
1383                chunk_sizes,
1384                is_chunked: !node.links.is_empty(),
1385            }))
1386        })
1387    }
1388
1389    /// Get byte range from file
1390    pub fn get_file_range(
1391        &self,
1392        hash: &[u8; 32],
1393        start: u64,
1394        end: Option<u64>,
1395    ) -> Result<Option<(Vec<u8>, u64)>> {
1396        let metadata = match self.get_file_chunk_metadata(hash)? {
1397            Some(m) => m,
1398            None => return Ok(None),
1399        };
1400
1401        if metadata.total_size == 0 {
1402            return Ok(Some((Vec::new(), 0)));
1403        }
1404
1405        if start >= metadata.total_size {
1406            return Ok(None);
1407        }
1408
1409        let end = end
1410            .unwrap_or(metadata.total_size - 1)
1411            .min(metadata.total_size - 1);
1412
1413        // For non-chunked files, load entire file
1414        if !metadata.is_chunked {
1415            let content = self.get_file(hash)?.unwrap_or_default();
1416            let range_content = if start < content.len() as u64 {
1417                content[start as usize..=(end as usize).min(content.len() - 1)].to_vec()
1418            } else {
1419                Vec::new()
1420            };
1421            return Ok(Some((range_content, metadata.total_size)));
1422        }
1423
1424        // For chunked files, load only needed chunks
1425        let mut result = Vec::new();
1426        let mut current_offset = 0u64;
1427
1428        for (i, chunk_hash) in metadata.chunk_hashes.iter().enumerate() {
1429            let chunk_size = metadata.chunk_sizes[i];
1430            let chunk_end = current_offset + chunk_size - 1;
1431
1432            // Check if this chunk overlaps with requested range
1433            if chunk_end >= start && current_offset <= end {
1434                let chunk_content = match self.get_chunk(chunk_hash)? {
1435                    Some(content) => content,
1436                    None => {
1437                        return Err(anyhow::anyhow!("Chunk {} not found", to_hex(chunk_hash)));
1438                    }
1439                };
1440
1441                let chunk_read_start = if current_offset >= start {
1442                    0
1443                } else {
1444                    (start - current_offset) as usize
1445                };
1446
1447                let chunk_read_end = if chunk_end <= end {
1448                    chunk_size as usize - 1
1449                } else {
1450                    (end - current_offset) as usize
1451                };
1452
1453                result.extend_from_slice(&chunk_content[chunk_read_start..=chunk_read_end]);
1454            }
1455
1456            current_offset += chunk_size;
1457
1458            if current_offset > end {
1459                break;
1460            }
1461        }
1462
1463        Ok(Some((result, metadata.total_size)))
1464    }
1465
1466    /// Stream file range as chunks using Arc for async/Send contexts
1467    pub fn stream_file_range_chunks_owned(
1468        self: Arc<Self>,
1469        hash: &[u8; 32],
1470        start: u64,
1471        end: u64,
1472    ) -> Result<Option<FileRangeChunksOwned>> {
1473        let metadata = match self.get_file_chunk_metadata(hash)? {
1474            Some(m) => m,
1475            None => return Ok(None),
1476        };
1477
1478        if metadata.total_size == 0 || start >= metadata.total_size {
1479            return Ok(None);
1480        }
1481
1482        let end = end.min(metadata.total_size - 1);
1483
1484        Ok(Some(FileRangeChunksOwned {
1485            store: self,
1486            metadata,
1487            start,
1488            end,
1489            current_chunk_idx: 0,
1490            current_offset: 0,
1491        }))
1492    }
1493
1494    /// Get directory structure by hash (raw bytes)
1495    pub fn get_directory_listing(&self, hash: &[u8; 32]) -> Result<Option<DirectoryListing>> {
1496        let store = self.store_arc();
1497        let tree = HashTree::new(HashTreeConfig::new(store).public());
1498
1499        sync_block_on(async {
1500            // Check if it's a directory
1501            let is_dir = tree
1502                .is_directory(hash)
1503                .await
1504                .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1505
1506            if !is_dir {
1507                return Ok(None);
1508            }
1509
1510            // Get directory entries (public Cid - no encryption key)
1511            let cid = hashtree_core::Cid::public(*hash);
1512            let tree_entries = tree
1513                .list_directory(&cid)
1514                .await
1515                .map_err(|e| anyhow::anyhow!("Failed to list directory: {}", e))?;
1516
1517            let entries: Vec<DirEntry> = tree_entries
1518                .into_iter()
1519                .map(|e| DirEntry {
1520                    name: e.name,
1521                    cid: to_hex(&e.hash),
1522                    is_directory: e.link_type.is_tree(),
1523                    size: e.size,
1524                })
1525                .collect();
1526
1527            Ok(Some(DirectoryListing {
1528                dir_name: String::new(),
1529                entries,
1530            }))
1531        })
1532    }
1533
1534    /// Get directory structure by CID, supporting encrypted directories.
1535    pub fn get_directory_listing_by_cid(&self, cid: &Cid) -> Result<Option<DirectoryListing>> {
1536        let store = self.store_arc();
1537        let tree = HashTree::new(HashTreeConfig::new(store).public());
1538        let cid = cid.clone();
1539
1540        sync_block_on(async {
1541            let is_dir = tree
1542                .is_dir(&cid)
1543                .await
1544                .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1545
1546            if !is_dir {
1547                return Ok(None);
1548            }
1549
1550            let tree_entries = tree
1551                .list_directory(&cid)
1552                .await
1553                .map_err(|e| anyhow::anyhow!("Failed to list directory: {}", e))?;
1554
1555            let entries: Vec<DirEntry> = tree_entries
1556                .into_iter()
1557                .map(|e| DirEntry {
1558                    name: e.name,
1559                    cid: Cid {
1560                        hash: e.hash,
1561                        key: e.key,
1562                    }
1563                    .to_string(),
1564                    is_directory: e.link_type.is_tree(),
1565                    size: e.size,
1566                })
1567                .collect();
1568
1569            Ok(Some(DirectoryListing {
1570                dir_name: String::new(),
1571                entries,
1572            }))
1573        })
1574    }
1575
1576    /// Pin a hash (prevent garbage collection)
1577    pub fn pin(&self, hash: &[u8; 32]) -> Result<()> {
1578        let mut wtxn = self.env.write_txn()?;
1579        self.pins.put(&mut wtxn, hash.as_slice(), &())?;
1580        wtxn.commit()?;
1581        Ok(())
1582    }
1583
1584    /// Unpin a hash (allow garbage collection)
1585    pub fn unpin(&self, hash: &[u8; 32]) -> Result<()> {
1586        let mut wtxn = self.env.write_txn()?;
1587        self.pins.delete(&mut wtxn, hash.as_slice())?;
1588        wtxn.commit()?;
1589        Ok(())
1590    }
1591
1592    /// Check if hash is pinned
1593    pub fn is_pinned(&self, hash: &[u8; 32]) -> Result<bool> {
1594        let rtxn = self.env.read_txn()?;
1595        Ok(self.pins.get(&rtxn, hash.as_slice())?.is_some())
1596    }
1597
1598    /// List all pinned hashes (raw bytes)
1599    pub fn list_pins_raw(&self) -> Result<Vec<[u8; 32]>> {
1600        let rtxn = self.env.read_txn()?;
1601        let mut pins = Vec::new();
1602
1603        for item in self.pins.iter(&rtxn)? {
1604            let (hash_bytes, _) = item?;
1605            if hash_bytes.len() == 32 {
1606                let mut hash = [0u8; 32];
1607                hash.copy_from_slice(hash_bytes);
1608                pins.push(hash);
1609            }
1610        }
1611
1612        Ok(pins)
1613    }
1614
1615    /// List all pinned hashes with names
1616    pub fn list_pins_with_names(&self) -> Result<Vec<PinnedItem>> {
1617        let rtxn = self.env.read_txn()?;
1618        let store = self.store_arc();
1619        let tree = HashTree::new(HashTreeConfig::new(store).public());
1620        let mut pins = Vec::new();
1621
1622        for item in self.pins.iter(&rtxn)? {
1623            let (hash_bytes, _) = item?;
1624            if hash_bytes.len() != 32 {
1625                continue;
1626            }
1627            let mut hash = [0u8; 32];
1628            hash.copy_from_slice(hash_bytes);
1629
1630            // Try to determine if it's a directory
1631            let is_directory =
1632                sync_block_on(async { tree.is_directory(&hash).await.unwrap_or(false) });
1633
1634            pins.push(PinnedItem {
1635                cid: to_hex(&hash),
1636                name: "Unknown".to_string(),
1637                is_directory,
1638            });
1639        }
1640
1641        Ok(pins)
1642    }
1643
1644    // === Tree indexing for eviction ===
1645
1646    /// Index a tree after sync - tracks all blobs in the tree for eviction
1647    ///
1648    /// If `ref_key` is provided (e.g. "npub.../name"), it will replace any existing
1649    /// tree with that ref, allowing old versions to be evicted.
1650    pub fn index_tree(
1651        &self,
1652        root_hash: &Hash,
1653        owner: &str,
1654        name: Option<&str>,
1655        priority: u8,
1656        ref_key: Option<&str>,
1657    ) -> Result<()> {
1658        let root_hex = to_hex(root_hash);
1659
1660        // If ref_key provided, check for and unindex old version
1661        if let Some(key) = ref_key {
1662            let rtxn = self.env.read_txn()?;
1663            if let Some(old_hash_bytes) = self.tree_refs.get(&rtxn, key)? {
1664                if old_hash_bytes != root_hash.as_slice() {
1665                    let old_hash: Hash = old_hash_bytes
1666                        .try_into()
1667                        .map_err(|_| anyhow::anyhow!("Invalid hash in tree_refs"))?;
1668                    drop(rtxn);
1669                    // Unindex old tree (will delete orphaned blobs)
1670                    let _ = self.unindex_tree(&old_hash);
1671                    tracing::debug!("Replaced old tree for ref {}", key);
1672                }
1673            }
1674        }
1675
1676        let store = self.store_arc();
1677        let tree = HashTree::new(HashTreeConfig::new(store).public());
1678
1679        // Walk tree and collect all blob hashes + compute total size
1680        let (blob_hashes, total_size) =
1681            sync_block_on(async { self.collect_tree_blobs(&tree, root_hash).await })?;
1682
1683        let mut wtxn = self.env.write_txn()?;
1684
1685        // Store blob-tree relationships (64-byte key: blob_hash ++ tree_hash)
1686        for blob_hash in &blob_hashes {
1687            let mut key = [0u8; 64];
1688            key[..32].copy_from_slice(blob_hash);
1689            key[32..].copy_from_slice(root_hash);
1690            self.blob_trees.put(&mut wtxn, &key[..], &())?;
1691        }
1692
1693        // Store tree metadata
1694        let meta = TreeMeta {
1695            owner: owner.to_string(),
1696            name: name.map(|s| s.to_string()),
1697            synced_at: SystemTime::now()
1698                .duration_since(UNIX_EPOCH)
1699                .unwrap()
1700                .as_secs(),
1701            total_size,
1702            priority,
1703        };
1704        let meta_bytes = rmp_serde::to_vec(&meta)
1705            .map_err(|e| anyhow::anyhow!("Failed to serialize TreeMeta: {}", e))?;
1706        self.tree_meta
1707            .put(&mut wtxn, root_hash.as_slice(), &meta_bytes)?;
1708
1709        // Store ref -> hash mapping if ref_key provided
1710        if let Some(key) = ref_key {
1711            self.tree_refs.put(&mut wtxn, key, root_hash.as_slice())?;
1712        }
1713
1714        wtxn.commit()?;
1715
1716        tracing::debug!(
1717            "Indexed tree {} ({} blobs, {} bytes, priority {})",
1718            &root_hex[..8],
1719            blob_hashes.len(),
1720            total_size,
1721            priority
1722        );
1723
1724        Ok(())
1725    }
1726
1727    /// Collect all blob hashes in a tree and compute total size
1728    async fn collect_tree_blobs<S: Store>(
1729        &self,
1730        tree: &HashTree<S>,
1731        root: &Hash,
1732    ) -> Result<(Vec<Hash>, u64)> {
1733        let mut blobs = Vec::new();
1734        let mut total_size = 0u64;
1735        let mut stack = vec![*root];
1736
1737        while let Some(hash) = stack.pop() {
1738            // Check if it's a tree node
1739            let is_tree = tree
1740                .is_tree(&hash)
1741                .await
1742                .map_err(|e| anyhow::anyhow!("Failed to check tree: {}", e))?;
1743
1744            if is_tree {
1745                // Get tree node and add children to stack
1746                if let Some(node) = tree
1747                    .get_tree_node(&hash)
1748                    .await
1749                    .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
1750                {
1751                    for link in &node.links {
1752                        stack.push(link.hash);
1753                    }
1754                }
1755            } else {
1756                // It's a blob - get its size
1757                if let Some(data) = self
1758                    .router
1759                    .get_sync(&hash)
1760                    .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))?
1761                {
1762                    total_size += data.len() as u64;
1763                    blobs.push(hash);
1764                }
1765            }
1766        }
1767
1768        Ok((blobs, total_size))
1769    }
1770
1771    /// Unindex a tree - removes blob-tree mappings and deletes orphaned blobs
1772    /// Returns the number of bytes freed
1773    pub fn unindex_tree(&self, root_hash: &Hash) -> Result<u64> {
1774        let root_hex = to_hex(root_hash);
1775
1776        let store = self.store_arc();
1777        let tree = HashTree::new(HashTreeConfig::new(store).public());
1778
1779        // Walk tree and collect all blob hashes
1780        let (blob_hashes, _) =
1781            sync_block_on(async { self.collect_tree_blobs(&tree, root_hash).await })?;
1782
1783        let mut wtxn = self.env.write_txn()?;
1784        let mut freed = 0u64;
1785
1786        // For each blob, remove the blob-tree entry and check if orphaned
1787        for blob_hash in &blob_hashes {
1788            // Delete blob-tree entry (64-byte key: blob_hash ++ tree_hash)
1789            let mut key = [0u8; 64];
1790            key[..32].copy_from_slice(blob_hash);
1791            key[32..].copy_from_slice(root_hash);
1792            self.blob_trees.delete(&mut wtxn, &key[..])?;
1793
1794            // Check if blob is in any other tree (prefix scan on first 32 bytes)
1795            let rtxn = self.env.read_txn()?;
1796            let mut has_other_tree = false;
1797
1798            for item in self.blob_trees.prefix_iter(&rtxn, &blob_hash[..])? {
1799                if item.is_ok() {
1800                    has_other_tree = true;
1801                    break;
1802                }
1803            }
1804            drop(rtxn);
1805
1806            // If orphaned, delete the blob
1807            if !has_other_tree {
1808                if let Some(data) = self
1809                    .router
1810                    .get_sync(blob_hash)
1811                    .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))?
1812                {
1813                    freed += data.len() as u64;
1814                    // Delete locally only - keep S3 as archive
1815                    self.router
1816                        .delete_local_only(blob_hash)
1817                        .map_err(|e| anyhow::anyhow!("Failed to delete blob: {}", e))?;
1818                }
1819            }
1820        }
1821
1822        // Delete tree node itself if exists
1823        if let Some(data) = self
1824            .router
1825            .get_sync(root_hash)
1826            .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
1827        {
1828            freed += data.len() as u64;
1829            // Delete locally only - keep S3 as archive
1830            self.router
1831                .delete_local_only(root_hash)
1832                .map_err(|e| anyhow::anyhow!("Failed to delete tree node: {}", e))?;
1833        }
1834
1835        // Delete tree metadata
1836        self.tree_meta.delete(&mut wtxn, root_hash.as_slice())?;
1837
1838        wtxn.commit()?;
1839
1840        tracing::debug!("Unindexed tree {} ({} bytes freed)", &root_hex[..8], freed);
1841
1842        Ok(freed)
1843    }
1844
1845    /// Get tree metadata
1846    pub fn get_tree_meta(&self, root_hash: &Hash) -> Result<Option<TreeMeta>> {
1847        let rtxn = self.env.read_txn()?;
1848        if let Some(bytes) = self.tree_meta.get(&rtxn, root_hash.as_slice())? {
1849            let meta: TreeMeta = rmp_serde::from_slice(bytes)
1850                .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
1851            Ok(Some(meta))
1852        } else {
1853            Ok(None)
1854        }
1855    }
1856
1857    /// List all indexed trees
1858    pub fn list_indexed_trees(&self) -> Result<Vec<(Hash, TreeMeta)>> {
1859        let rtxn = self.env.read_txn()?;
1860        let mut trees = Vec::new();
1861
1862        for item in self.tree_meta.iter(&rtxn)? {
1863            let (hash_bytes, meta_bytes) = item?;
1864            let hash: Hash = hash_bytes
1865                .try_into()
1866                .map_err(|_| anyhow::anyhow!("Invalid hash in tree_meta"))?;
1867            let meta: TreeMeta = rmp_serde::from_slice(meta_bytes)
1868                .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
1869            trees.push((hash, meta));
1870        }
1871
1872        Ok(trees)
1873    }
1874
1875    /// Get total tracked storage size (sum of all tree_meta.total_size)
1876    pub fn tracked_size(&self) -> Result<u64> {
1877        let rtxn = self.env.read_txn()?;
1878        let mut total = 0u64;
1879
1880        for item in self.tree_meta.iter(&rtxn)? {
1881            let (_, bytes) = item?;
1882            let meta: TreeMeta = rmp_serde::from_slice(bytes)
1883                .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
1884            total += meta.total_size;
1885        }
1886
1887        Ok(total)
1888    }
1889
1890    /// Get evictable trees sorted by (priority ASC, synced_at ASC)
1891    fn get_evictable_trees(&self) -> Result<Vec<(Hash, TreeMeta)>> {
1892        let mut trees = self.list_indexed_trees()?;
1893
1894        // Sort by priority (lower first), then by synced_at (older first)
1895        trees.sort_by(|a, b| match a.1.priority.cmp(&b.1.priority) {
1896            std::cmp::Ordering::Equal => a.1.synced_at.cmp(&b.1.synced_at),
1897            other => other,
1898        });
1899
1900        Ok(trees)
1901    }
1902
1903    /// Run eviction if storage is over quota
1904    /// Returns bytes freed
1905    ///
1906    /// Eviction order:
1907    /// 1. Orphaned blobs (not in any indexed tree and not pinned)
1908    /// 2. Trees by priority (lowest first) and age (oldest first)
1909    pub fn evict_if_needed(&self) -> Result<u64> {
1910        // Get actual storage used
1911        let stats = self
1912            .router
1913            .stats()
1914            .map_err(|e| anyhow::anyhow!("Failed to get stats: {}", e))?;
1915        let current = stats.total_bytes;
1916
1917        if current <= self.max_size_bytes {
1918            return Ok(0);
1919        }
1920
1921        // Target 90% of max to avoid constant eviction
1922        let target = self.max_size_bytes * 90 / 100;
1923        let mut freed = 0u64;
1924        let mut current_size = current;
1925
1926        // Phase 1: Evict orphaned blobs (not in any tree and not pinned)
1927        let orphan_freed = self.evict_orphaned_blobs()?;
1928        freed += orphan_freed;
1929        current_size = current_size.saturating_sub(orphan_freed);
1930
1931        if orphan_freed > 0 {
1932            tracing::info!("Evicted orphaned blobs: {} bytes freed", orphan_freed);
1933        }
1934
1935        // Check if we're now under target
1936        if current_size <= target {
1937            if freed > 0 {
1938                tracing::info!("Eviction complete: {} bytes freed", freed);
1939            }
1940            return Ok(freed);
1941        }
1942
1943        // Phase 2: Evict trees by priority (lowest first) and age (oldest first)
1944        // Own trees CAN be evicted (just last), but PINNED trees are never evicted
1945        let evictable = self.get_evictable_trees()?;
1946
1947        for (root_hash, meta) in evictable {
1948            if current_size <= target {
1949                break;
1950            }
1951
1952            let root_hex = to_hex(&root_hash);
1953
1954            // Never evict pinned trees
1955            if self.is_pinned(&root_hash)? {
1956                continue;
1957            }
1958
1959            let tree_freed = self.unindex_tree(&root_hash)?;
1960            freed += tree_freed;
1961            current_size = current_size.saturating_sub(tree_freed);
1962
1963            tracing::info!(
1964                "Evicted tree {} (owner={}, priority={}, {} bytes)",
1965                &root_hex[..8],
1966                &meta.owner[..8.min(meta.owner.len())],
1967                meta.priority,
1968                tree_freed
1969            );
1970        }
1971
1972        if freed > 0 {
1973            tracing::info!("Eviction complete: {} bytes freed", freed);
1974        }
1975
1976        Ok(freed)
1977    }
1978
1979    /// Evict blobs that are not part of any indexed tree and not pinned
1980    fn evict_orphaned_blobs(&self) -> Result<u64> {
1981        let mut freed = 0u64;
1982
1983        // Get all blob hashes from store
1984        let all_hashes = self
1985            .router
1986            .list()
1987            .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
1988
1989        // Get pinned hashes as raw bytes
1990        let rtxn = self.env.read_txn()?;
1991        let pinned: HashSet<Hash> = self
1992            .pins
1993            .iter(&rtxn)?
1994            .filter_map(|item| item.ok())
1995            .filter_map(|(hash_bytes, _)| {
1996                if hash_bytes.len() == 32 {
1997                    let mut hash = [0u8; 32];
1998                    hash.copy_from_slice(hash_bytes);
1999                    Some(hash)
2000                } else {
2001                    None
2002                }
2003            })
2004            .collect();
2005
2006        // Collect all blob hashes that are in at least one tree
2007        // Key format is blob_hash (32 bytes) ++ tree_hash (32 bytes)
2008        let mut blobs_in_trees: HashSet<Hash> = HashSet::new();
2009        for (key_bytes, _) in self.blob_trees.iter(&rtxn)?.flatten() {
2010            if key_bytes.len() >= 32 {
2011                let blob_hash: Hash = key_bytes[..32].try_into().unwrap();
2012                blobs_in_trees.insert(blob_hash);
2013            }
2014        }
2015        drop(rtxn);
2016
2017        // Find and delete orphaned blobs
2018        for hash in all_hashes {
2019            // Skip if pinned
2020            if pinned.contains(&hash) {
2021                continue;
2022            }
2023
2024            // Skip if part of any tree
2025            if blobs_in_trees.contains(&hash) {
2026                continue;
2027            }
2028
2029            // This blob is orphaned - delete locally (keep S3 as archive)
2030            if let Ok(Some(data)) = self.router.get_sync(&hash) {
2031                freed += data.len() as u64;
2032                let _ = self.router.delete_local_only(&hash);
2033                tracing::debug!(
2034                    "Deleted orphaned blob {} ({} bytes)",
2035                    &to_hex(&hash)[..8],
2036                    data.len()
2037                );
2038            }
2039        }
2040
2041        Ok(freed)
2042    }
2043
2044    /// Get the maximum storage size in bytes
2045    pub fn max_size_bytes(&self) -> u64 {
2046        self.max_size_bytes
2047    }
2048
2049    /// Get storage usage by priority tier
2050    pub fn storage_by_priority(&self) -> Result<StorageByPriority> {
2051        let rtxn = self.env.read_txn()?;
2052        let mut own = 0u64;
2053        let mut followed = 0u64;
2054        let mut other = 0u64;
2055
2056        for item in self.tree_meta.iter(&rtxn)? {
2057            let (_, bytes) = item?;
2058            let meta: TreeMeta = rmp_serde::from_slice(bytes)
2059                .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
2060
2061            if meta.priority == PRIORITY_OWN {
2062                own += meta.total_size;
2063            } else if meta.priority >= PRIORITY_FOLLOWED {
2064                followed += meta.total_size;
2065            } else {
2066                other += meta.total_size;
2067            }
2068        }
2069
2070        Ok(StorageByPriority {
2071            own,
2072            followed,
2073            other,
2074        })
2075    }
2076
2077    /// Get storage statistics
2078    pub fn get_storage_stats(&self) -> Result<StorageStats> {
2079        let rtxn = self.env.read_txn()?;
2080        let total_pins = self.pins.len(&rtxn)? as usize;
2081
2082        let stats = self
2083            .router
2084            .stats()
2085            .map_err(|e| anyhow::anyhow!("Failed to get stats: {}", e))?;
2086
2087        Ok(StorageStats {
2088            total_dags: stats.count,
2089            pinned_dags: total_pins,
2090            total_bytes: stats.total_bytes,
2091        })
2092    }
2093
2094    // === Cached roots ===
2095
2096    /// Get cached root for a pubkey/tree_name pair
2097    pub fn get_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<Option<CachedRoot>> {
2098        let key = format!("{}/{}", pubkey_hex, tree_name);
2099        let rtxn = self.env.read_txn()?;
2100        if let Some(bytes) = self.cached_roots.get(&rtxn, &key)? {
2101            let root: CachedRoot = rmp_serde::from_slice(bytes)
2102                .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
2103            Ok(Some(root))
2104        } else {
2105            Ok(None)
2106        }
2107    }
2108
2109    /// Set cached root for a pubkey/tree_name pair
2110    pub fn set_cached_root(
2111        &self,
2112        pubkey_hex: &str,
2113        tree_name: &str,
2114        hash: &str,
2115        key: Option<&str>,
2116        visibility: &str,
2117        updated_at: u64,
2118    ) -> Result<()> {
2119        let db_key = format!("{}/{}", pubkey_hex, tree_name);
2120        let root = CachedRoot {
2121            hash: hash.to_string(),
2122            key: key.map(|k| k.to_string()),
2123            updated_at,
2124            visibility: visibility.to_string(),
2125        };
2126        let bytes = rmp_serde::to_vec(&root)
2127            .map_err(|e| anyhow::anyhow!("Failed to serialize CachedRoot: {}", e))?;
2128        let mut wtxn = self.env.write_txn()?;
2129        self.cached_roots.put(&mut wtxn, &db_key, &bytes)?;
2130        wtxn.commit()?;
2131        Ok(())
2132    }
2133
2134    /// List all cached roots for a pubkey
2135    pub fn list_cached_roots(&self, pubkey_hex: &str) -> Result<Vec<(String, CachedRoot)>> {
2136        let prefix = format!("{}/", pubkey_hex);
2137        let rtxn = self.env.read_txn()?;
2138        let mut results = Vec::new();
2139
2140        for item in self.cached_roots.iter(&rtxn)? {
2141            let (key, bytes) = item?;
2142            if key.starts_with(&prefix) {
2143                let tree_name = key.strip_prefix(&prefix).unwrap_or(key);
2144                let root: CachedRoot = rmp_serde::from_slice(bytes)
2145                    .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
2146                results.push((tree_name.to_string(), root));
2147            }
2148        }
2149
2150        Ok(results)
2151    }
2152
2153    /// Delete a cached root
2154    pub fn delete_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<bool> {
2155        let key = format!("{}/{}", pubkey_hex, tree_name);
2156        let mut wtxn = self.env.write_txn()?;
2157        let deleted = self.cached_roots.delete(&mut wtxn, &key)?;
2158        wtxn.commit()?;
2159        Ok(deleted)
2160    }
2161
2162    /// Garbage collect unpinned content
2163    pub fn gc(&self) -> Result<GcStats> {
2164        let rtxn = self.env.read_txn()?;
2165
2166        // Get all pinned hashes as raw bytes
2167        let pinned: HashSet<Hash> = self
2168            .pins
2169            .iter(&rtxn)?
2170            .filter_map(|item| item.ok())
2171            .filter_map(|(hash_bytes, _)| {
2172                if hash_bytes.len() == 32 {
2173                    let mut hash = [0u8; 32];
2174                    hash.copy_from_slice(hash_bytes);
2175                    Some(hash)
2176                } else {
2177                    None
2178                }
2179            })
2180            .collect();
2181
2182        drop(rtxn);
2183
2184        // Get all stored hashes
2185        let all_hashes = self
2186            .router
2187            .list()
2188            .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
2189
2190        // Delete unpinned hashes
2191        let mut deleted = 0;
2192        let mut freed_bytes = 0u64;
2193
2194        for hash in all_hashes {
2195            if !pinned.contains(&hash) {
2196                if let Ok(Some(data)) = self.router.get_sync(&hash) {
2197                    freed_bytes += data.len() as u64;
2198                    // Delete locally only - keep S3 as archive
2199                    let _ = self.router.delete_local_only(&hash);
2200                    deleted += 1;
2201                }
2202            }
2203        }
2204
2205        Ok(GcStats {
2206            deleted_dags: deleted,
2207            freed_bytes,
2208        })
2209    }
2210
2211    /// Verify LMDB blob integrity - checks that stored data matches its key hash
2212    /// Returns verification statistics and optionally deletes corrupted entries
2213    pub fn verify_lmdb_integrity(&self, delete: bool) -> Result<VerifyResult> {
2214        let all_hashes = self
2215            .router
2216            .list()
2217            .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
2218
2219        let total = all_hashes.len();
2220        let mut valid = 0;
2221        let mut corrupted = 0;
2222        let mut deleted = 0;
2223        let mut corrupted_hashes = Vec::new();
2224
2225        for hash in &all_hashes {
2226            let hash_hex = to_hex(hash);
2227
2228            match self.router.get_sync(hash) {
2229                Ok(Some(data)) => {
2230                    // Compute actual SHA256 of data
2231                    let actual_hash = sha256(&data);
2232
2233                    if actual_hash == *hash {
2234                        valid += 1;
2235                    } else {
2236                        corrupted += 1;
2237                        let actual_hex = to_hex(&actual_hash);
2238                        println!(
2239                            "  CORRUPTED: key={} actual={} size={}",
2240                            &hash_hex[..16],
2241                            &actual_hex[..16],
2242                            data.len()
2243                        );
2244                        corrupted_hashes.push(*hash);
2245                    }
2246                }
2247                Ok(None) => {
2248                    // Hash exists in index but data is missing
2249                    corrupted += 1;
2250                    println!("  MISSING: key={}", &hash_hex[..16]);
2251                    corrupted_hashes.push(*hash);
2252                }
2253                Err(e) => {
2254                    corrupted += 1;
2255                    println!("  ERROR: key={} err={}", &hash_hex[..16], e);
2256                    corrupted_hashes.push(*hash);
2257                }
2258            }
2259        }
2260
2261        // Delete corrupted entries if requested
2262        if delete {
2263            for hash in &corrupted_hashes {
2264                match self.router.delete_sync(hash) {
2265                    Ok(true) => deleted += 1,
2266                    Ok(false) => {} // Already deleted
2267                    Err(e) => {
2268                        let hash_hex = to_hex(hash);
2269                        println!("  Failed to delete {}: {}", &hash_hex[..16], e);
2270                    }
2271                }
2272            }
2273        }
2274
2275        Ok(VerifyResult {
2276            total,
2277            valid,
2278            corrupted,
2279            deleted,
2280        })
2281    }
2282
2283    /// Verify R2/S3 blob integrity - lists all objects and verifies hash matches filename
2284    /// Returns verification statistics and optionally deletes corrupted entries
2285    #[cfg(feature = "s3")]
2286    pub async fn verify_r2_integrity(&self, delete: bool) -> Result<VerifyResult> {
2287        use aws_sdk_s3::Client as S3Client;
2288
2289        // Get S3 client from router (we need to access it directly)
2290        // For now, we'll create a new client from config
2291        let config = crate::config::Config::load()?;
2292        let s3_config = config
2293            .storage
2294            .s3
2295            .ok_or_else(|| anyhow::anyhow!("S3 not configured"))?;
2296
2297        // Build AWS config
2298        let aws_config = aws_config::from_env()
2299            .region(aws_sdk_s3::config::Region::new(s3_config.region.clone()))
2300            .load()
2301            .await;
2302
2303        let s3_client = S3Client::from_conf(
2304            aws_sdk_s3::config::Builder::from(&aws_config)
2305                .endpoint_url(&s3_config.endpoint)
2306                .force_path_style(true)
2307                .build(),
2308        );
2309
2310        let bucket = &s3_config.bucket;
2311        let prefix = s3_config.prefix.as_deref().unwrap_or("");
2312
2313        let mut total = 0;
2314        let mut valid = 0;
2315        let mut corrupted = 0;
2316        let mut deleted = 0;
2317        let mut corrupted_keys = Vec::new();
2318
2319        // List all objects in bucket
2320        let mut continuation_token: Option<String> = None;
2321
2322        loop {
2323            let mut list_req = s3_client.list_objects_v2().bucket(bucket).prefix(prefix);
2324
2325            if let Some(ref token) = continuation_token {
2326                list_req = list_req.continuation_token(token);
2327            }
2328
2329            let list_resp = list_req
2330                .send()
2331                .await
2332                .map_err(|e| anyhow::anyhow!("Failed to list S3 objects: {}", e))?;
2333
2334            for object in list_resp.contents() {
2335                let key = object.key().unwrap_or("");
2336
2337                // Skip non-.bin files
2338                if !key.ends_with(".bin") {
2339                    continue;
2340                }
2341
2342                total += 1;
2343
2344                // Extract expected hash from filename (remove prefix and .bin)
2345                let filename = key.strip_prefix(prefix).unwrap_or(key);
2346                let expected_hash_hex = filename.strip_suffix(".bin").unwrap_or(filename);
2347
2348                // Validate it's a valid hex hash
2349                if expected_hash_hex.len() != 64 {
2350                    corrupted += 1;
2351                    println!("  INVALID KEY: {}", key);
2352                    corrupted_keys.push(key.to_string());
2353                    continue;
2354                }
2355
2356                let expected_hash = match from_hex(expected_hash_hex) {
2357                    Ok(h) => h,
2358                    Err(_) => {
2359                        corrupted += 1;
2360                        println!("  INVALID HEX: {}", key);
2361                        corrupted_keys.push(key.to_string());
2362                        continue;
2363                    }
2364                };
2365
2366                // Download and verify content
2367                match s3_client.get_object().bucket(bucket).key(key).send().await {
2368                    Ok(resp) => match resp.body.collect().await {
2369                        Ok(bytes) => {
2370                            let data = bytes.into_bytes();
2371                            let actual_hash = sha256(&data);
2372
2373                            if actual_hash == expected_hash {
2374                                valid += 1;
2375                            } else {
2376                                corrupted += 1;
2377                                let actual_hex = to_hex(&actual_hash);
2378                                println!(
2379                                    "  CORRUPTED: key={} actual={} size={}",
2380                                    &expected_hash_hex[..16],
2381                                    &actual_hex[..16],
2382                                    data.len()
2383                                );
2384                                corrupted_keys.push(key.to_string());
2385                            }
2386                        }
2387                        Err(e) => {
2388                            corrupted += 1;
2389                            println!("  READ ERROR: {} - {}", key, e);
2390                            corrupted_keys.push(key.to_string());
2391                        }
2392                    },
2393                    Err(e) => {
2394                        corrupted += 1;
2395                        println!("  FETCH ERROR: {} - {}", key, e);
2396                        corrupted_keys.push(key.to_string());
2397                    }
2398                }
2399
2400                // Progress indicator every 100 objects
2401                if total % 100 == 0 {
2402                    println!(
2403                        "  Progress: {} objects checked, {} corrupted so far",
2404                        total, corrupted
2405                    );
2406                }
2407            }
2408
2409            // Check if there are more objects
2410            if list_resp.is_truncated() == Some(true) {
2411                continuation_token = list_resp.next_continuation_token().map(|s| s.to_string());
2412            } else {
2413                break;
2414            }
2415        }
2416
2417        // Delete corrupted entries if requested
2418        if delete {
2419            for key in &corrupted_keys {
2420                match s3_client
2421                    .delete_object()
2422                    .bucket(bucket)
2423                    .key(key)
2424                    .send()
2425                    .await
2426                {
2427                    Ok(_) => deleted += 1,
2428                    Err(e) => {
2429                        println!("  Failed to delete {}: {}", key, e);
2430                    }
2431                }
2432            }
2433        }
2434
2435        Ok(VerifyResult {
2436            total,
2437            valid,
2438            corrupted,
2439            deleted,
2440        })
2441    }
2442
2443    /// Fallback for non-S3 builds
2444    #[cfg(not(feature = "s3"))]
2445    pub async fn verify_r2_integrity(&self, _delete: bool) -> Result<VerifyResult> {
2446        Err(anyhow::anyhow!("S3 feature not enabled"))
2447    }
2448}
2449
2450/// Result of blob integrity verification
2451#[derive(Debug, Clone)]
2452pub struct VerifyResult {
2453    pub total: usize,
2454    pub valid: usize,
2455    pub corrupted: usize,
2456    pub deleted: usize,
2457}
2458
2459#[derive(Debug)]
2460pub struct StorageStats {
2461    pub total_dags: usize,
2462    pub pinned_dags: usize,
2463    pub total_bytes: u64,
2464}
2465
2466/// Storage usage broken down by priority tier
2467#[derive(Debug, Clone)]
2468pub struct StorageByPriority {
2469    /// Own/pinned trees (priority 255)
2470    pub own: u64,
2471    /// Followed users' trees (priority 128)
2472    pub followed: u64,
2473    /// Other trees (priority 64)
2474    pub other: u64,
2475}
2476
2477#[derive(Debug, Clone)]
2478pub struct FileChunkMetadata {
2479    pub total_size: u64,
2480    pub chunk_hashes: Vec<Hash>,
2481    pub chunk_sizes: Vec<u64>,
2482    pub is_chunked: bool,
2483}
2484
2485/// Owned iterator for async streaming
2486pub struct FileRangeChunksOwned {
2487    store: Arc<HashtreeStore>,
2488    metadata: FileChunkMetadata,
2489    start: u64,
2490    end: u64,
2491    current_chunk_idx: usize,
2492    current_offset: u64,
2493}
2494
2495impl Iterator for FileRangeChunksOwned {
2496    type Item = Result<Vec<u8>>;
2497
2498    fn next(&mut self) -> Option<Self::Item> {
2499        if !self.metadata.is_chunked || self.current_chunk_idx >= self.metadata.chunk_hashes.len() {
2500            return None;
2501        }
2502
2503        if self.current_offset > self.end {
2504            return None;
2505        }
2506
2507        let chunk_hash = &self.metadata.chunk_hashes[self.current_chunk_idx];
2508        let chunk_size = self.metadata.chunk_sizes[self.current_chunk_idx];
2509        let chunk_end = self.current_offset + chunk_size - 1;
2510
2511        self.current_chunk_idx += 1;
2512
2513        if chunk_end < self.start || self.current_offset > self.end {
2514            self.current_offset += chunk_size;
2515            return self.next();
2516        }
2517
2518        let chunk_content = match self.store.get_chunk(chunk_hash) {
2519            Ok(Some(content)) => content,
2520            Ok(None) => {
2521                return Some(Err(anyhow::anyhow!(
2522                    "Chunk {} not found",
2523                    to_hex(chunk_hash)
2524                )));
2525            }
2526            Err(e) => {
2527                return Some(Err(e));
2528            }
2529        };
2530
2531        let chunk_read_start = if self.current_offset >= self.start {
2532            0
2533        } else {
2534            (self.start - self.current_offset) as usize
2535        };
2536
2537        let chunk_read_end = if chunk_end <= self.end {
2538            chunk_size as usize - 1
2539        } else {
2540            (self.end - self.current_offset) as usize
2541        };
2542
2543        let result = chunk_content[chunk_read_start..=chunk_read_end].to_vec();
2544        self.current_offset += chunk_size;
2545
2546        Some(Ok(result))
2547    }
2548}
2549
2550#[derive(Debug)]
2551pub struct GcStats {
2552    pub deleted_dags: usize,
2553    pub freed_bytes: u64,
2554}
2555
2556#[derive(Debug, Clone)]
2557pub struct DirEntry {
2558    pub name: String,
2559    pub cid: String,
2560    pub is_directory: bool,
2561    pub size: u64,
2562}
2563
2564#[derive(Debug, Clone)]
2565pub struct DirectoryListing {
2566    pub dir_name: String,
2567    pub entries: Vec<DirEntry>,
2568}
2569
2570#[derive(Debug, Clone)]
2571pub struct PinnedItem {
2572    pub cid: String,
2573    pub name: String,
2574    pub is_directory: bool,
2575}
2576
2577/// Blob metadata for Blossom protocol
2578#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
2579pub struct BlobMetadata {
2580    pub sha256: String,
2581    pub size: u64,
2582    pub mime_type: String,
2583    pub uploaded: u64,
2584}
2585
2586// Implement ContentStore trait for WebRTC data exchange
2587impl crate::webrtc::ContentStore for HashtreeStore {
2588    fn get(&self, hash_hex: &str) -> Result<Option<Vec<u8>>> {
2589        let hash = from_hex(hash_hex).map_err(|e| anyhow::anyhow!("Invalid hash: {}", e))?;
2590        self.get_chunk(&hash)
2591    }
2592}
2593
2594#[cfg(test)]
2595mod tests {
2596    use super::*;
2597    use tempfile::TempDir;
2598
2599    #[cfg(feature = "lmdb")]
2600    #[test]
2601    fn hashtree_store_expands_blob_lmdb_map_size_to_storage_budget() -> Result<()> {
2602        let temp = TempDir::new()?;
2603        let requested = LMDB_BLOB_MIN_MAP_SIZE_BYTES + 64 * 1024 * 1024;
2604        let store = HashtreeStore::with_options_and_backend(
2605            temp.path(),
2606            None,
2607            requested,
2608            &StorageBackend::Lmdb,
2609        )?;
2610
2611        let map_size = match store.router.local.as_ref() {
2612            LocalStore::Lmdb(local) => local.map_size_bytes() as u64,
2613            LocalStore::Fs(_) => panic!("expected LMDB local store"),
2614        };
2615
2616        assert!(
2617            map_size >= requested,
2618            "expected blob LMDB map to grow to at least {requested} bytes, got {map_size}"
2619        );
2620
2621        drop(store);
2622        Ok(())
2623    }
2624
2625    #[cfg(feature = "lmdb")]
2626    #[test]
2627    fn local_store_can_override_lmdb_map_size() -> Result<()> {
2628        let temp = TempDir::new()?;
2629        let requested = 512 * 1024 * 1024u64;
2630        let store = LocalStore::new_with_lmdb_map_size(
2631            temp.path().join("lmdb-blobs"),
2632            &StorageBackend::Lmdb,
2633            Some(requested),
2634        )?;
2635
2636        let map_size = match store {
2637            LocalStore::Lmdb(local) => local.map_size_bytes() as u64,
2638            LocalStore::Fs(_) => panic!("expected LMDB local store"),
2639        };
2640
2641        assert!(
2642            map_size >= requested,
2643            "expected LMDB map to grow to at least {requested} bytes, got {map_size}"
2644        );
2645
2646        Ok(())
2647    }
2648}