Skip to main content

hashtree_cli/
storage.rs

1use anyhow::{Context, Result};
2use async_trait::async_trait;
3use futures::executor::block_on as sync_block_on;
4use futures::io::AllowStdIo;
5use futures::StreamExt;
6use hashtree_config::StorageBackend;
7use hashtree_core::store::{Store, StoreError};
8use hashtree_core::{
9    from_hex, sha256, to_hex, types::Hash, Cid, DirEntry as HashTreeDirEntry, HashTree,
10    HashTreeConfig, TreeNode,
11};
12use hashtree_fs::FsBlobStore;
13#[cfg(feature = "lmdb")]
14use hashtree_lmdb::LmdbBlobStore;
15use heed::types::*;
16use heed::{Database, EnvOpenOptions};
17use serde::{Deserialize, Serialize};
18use std::collections::HashSet;
19use std::io::{Read, Write};
20use std::path::Path;
21use std::sync::Arc;
22use std::time::{SystemTime, UNIX_EPOCH};
23
24/// Priority levels for tree eviction
25pub const PRIORITY_OTHER: u8 = 64;
26pub const PRIORITY_FOLLOWED: u8 = 128;
27pub const PRIORITY_OWN: u8 = 255;
28
29/// Metadata for a synced tree (for eviction tracking)
30#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct TreeMeta {
32    /// Pubkey of tree owner
33    pub owner: String,
34    /// Tree name if known (from nostr key like "npub.../name")
35    pub name: Option<String>,
36    /// Unix timestamp when this tree was synced
37    pub synced_at: u64,
38    /// Total size of all blobs in this tree
39    pub total_size: u64,
40    /// Eviction priority: 255=own/pinned, 128=followed, 64=other
41    pub priority: u8,
42}
43
44/// Cached root info from Nostr events.
45#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct CachedRoot {
47    /// Root hash (hex)
48    pub hash: String,
49    /// Optional decryption key (hex)
50    pub key: Option<String>,
51    /// Unix timestamp when this was cached (from event created_at)
52    pub updated_at: u64,
53    /// Visibility: "public", "link-visible", or "private"
54    pub visibility: String,
55}
56
57/// Storage statistics
58#[derive(Debug, Clone)]
59pub struct LocalStoreStats {
60    pub count: usize,
61    pub total_bytes: u64,
62}
63
64/// Local blob store - wraps either FsBlobStore or LmdbBlobStore
65pub enum LocalStore {
66    Fs(FsBlobStore),
67    #[cfg(feature = "lmdb")]
68    Lmdb(LmdbBlobStore),
69}
70
71impl LocalStore {
72    /// Create a new local store based on config
73    pub fn new<P: AsRef<Path>>(path: P, backend: &StorageBackend) -> Result<Self, StoreError> {
74        match backend {
75            StorageBackend::Fs => Ok(LocalStore::Fs(FsBlobStore::new(path)?)),
76            #[cfg(feature = "lmdb")]
77            StorageBackend::Lmdb => Ok(LocalStore::Lmdb(LmdbBlobStore::new(path)?)),
78            #[cfg(not(feature = "lmdb"))]
79            StorageBackend::Lmdb => {
80                tracing::warn!(
81                    "LMDB backend requested but lmdb feature not enabled, using filesystem storage"
82                );
83                Ok(LocalStore::Fs(FsBlobStore::new(path)?))
84            }
85        }
86    }
87
88    /// Sync put operation
89    pub fn put_sync(&self, hash: Hash, data: &[u8]) -> Result<bool, StoreError> {
90        match self {
91            LocalStore::Fs(store) => store.put_sync(hash, data),
92            #[cfg(feature = "lmdb")]
93            LocalStore::Lmdb(store) => store.put_sync(hash, data),
94        }
95    }
96
97    /// Sync get operation
98    pub fn get_sync(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
99        match self {
100            LocalStore::Fs(store) => store.get_sync(hash),
101            #[cfg(feature = "lmdb")]
102            LocalStore::Lmdb(store) => store.get_sync(hash),
103        }
104    }
105
106    /// Check if hash exists
107    pub fn exists(&self, hash: &Hash) -> Result<bool, StoreError> {
108        match self {
109            LocalStore::Fs(store) => Ok(store.exists(hash)),
110            #[cfg(feature = "lmdb")]
111            LocalStore::Lmdb(store) => store.exists(hash),
112        }
113    }
114
115    /// Sync delete operation
116    pub fn delete_sync(&self, hash: &Hash) -> Result<bool, StoreError> {
117        match self {
118            LocalStore::Fs(store) => store.delete_sync(hash),
119            #[cfg(feature = "lmdb")]
120            LocalStore::Lmdb(store) => store.delete_sync(hash),
121        }
122    }
123
124    /// Get storage statistics
125    pub fn stats(&self) -> Result<LocalStoreStats, StoreError> {
126        match self {
127            LocalStore::Fs(store) => {
128                let stats = store.stats()?;
129                Ok(LocalStoreStats {
130                    count: stats.count,
131                    total_bytes: stats.total_bytes,
132                })
133            }
134            #[cfg(feature = "lmdb")]
135            LocalStore::Lmdb(store) => {
136                let stats = store.stats()?;
137                Ok(LocalStoreStats {
138                    count: stats.count,
139                    total_bytes: stats.total_bytes,
140                })
141            }
142        }
143    }
144
145    /// List all hashes in the store
146    pub fn list(&self) -> Result<Vec<Hash>, StoreError> {
147        match self {
148            LocalStore::Fs(store) => store.list(),
149            #[cfg(feature = "lmdb")]
150            LocalStore::Lmdb(store) => store.list(),
151        }
152    }
153}
154
155#[async_trait]
156impl Store for LocalStore {
157    async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
158        self.put_sync(hash, &data)
159    }
160
161    async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
162        self.get_sync(hash)
163    }
164
165    async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
166        self.exists(hash)
167    }
168
169    async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
170        self.delete_sync(hash)
171    }
172}
173
174#[cfg(feature = "s3")]
175use tokio::sync::mpsc;
176
177use crate::config::S3Config;
178
179/// Message for background S3 sync
180#[cfg(feature = "s3")]
181enum S3SyncMessage {
182    Upload { hash: Hash, data: Vec<u8> },
183    Delete { hash: Hash },
184}
185
186/// Storage router - local store primary with optional S3 backup
187///
188/// Write path: local first (fast), then queue S3 upload (non-blocking)
189/// Read path: local first, fall back to S3 if miss
190pub struct StorageRouter {
191    /// Primary local store (always used)
192    local: Arc<LocalStore>,
193    /// Optional S3 client for backup
194    #[cfg(feature = "s3")]
195    s3_client: Option<aws_sdk_s3::Client>,
196    #[cfg(feature = "s3")]
197    s3_bucket: Option<String>,
198    #[cfg(feature = "s3")]
199    s3_prefix: String,
200    /// Channel to send uploads to background task
201    #[cfg(feature = "s3")]
202    sync_tx: Option<mpsc::UnboundedSender<S3SyncMessage>>,
203}
204
205impl StorageRouter {
206    /// Create router with local storage only
207    pub fn new(local: Arc<LocalStore>) -> Self {
208        Self {
209            local,
210            #[cfg(feature = "s3")]
211            s3_client: None,
212            #[cfg(feature = "s3")]
213            s3_bucket: None,
214            #[cfg(feature = "s3")]
215            s3_prefix: String::new(),
216            #[cfg(feature = "s3")]
217            sync_tx: None,
218        }
219    }
220
221    /// Create router with local storage + S3 backup
222    #[cfg(feature = "s3")]
223    pub async fn with_s3(local: Arc<LocalStore>, config: &S3Config) -> Result<Self, anyhow::Error> {
224        use aws_sdk_s3::Client as S3Client;
225
226        // Build AWS config
227        let mut aws_config_loader = aws_config::from_env();
228        aws_config_loader =
229            aws_config_loader.region(aws_sdk_s3::config::Region::new(config.region.clone()));
230        let aws_config = aws_config_loader.load().await;
231
232        // Build S3 client with custom endpoint
233        let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&aws_config);
234        s3_config_builder = s3_config_builder
235            .endpoint_url(&config.endpoint)
236            .force_path_style(true);
237
238        let s3_client = S3Client::from_conf(s3_config_builder.build());
239        let bucket = config.bucket.clone();
240        let prefix = config.prefix.clone().unwrap_or_default();
241
242        // Create background sync channel
243        let (sync_tx, mut sync_rx) = mpsc::unbounded_channel::<S3SyncMessage>();
244
245        // Spawn background sync task with bounded concurrent uploads
246        let sync_client = s3_client.clone();
247        let sync_bucket = bucket.clone();
248        let sync_prefix = prefix.clone();
249
250        tokio::spawn(async move {
251            use aws_sdk_s3::primitives::ByteStream;
252
253            tracing::info!("S3 background sync task started");
254
255            // Limit concurrent uploads to prevent overwhelming the runtime
256            let semaphore = std::sync::Arc::new(tokio::sync::Semaphore::new(32));
257            let client = std::sync::Arc::new(sync_client);
258            let bucket = std::sync::Arc::new(sync_bucket);
259            let prefix = std::sync::Arc::new(sync_prefix);
260
261            while let Some(msg) = sync_rx.recv().await {
262                let client = client.clone();
263                let bucket = bucket.clone();
264                let prefix = prefix.clone();
265                let semaphore = semaphore.clone();
266
267                // Spawn each upload with semaphore-bounded concurrency
268                tokio::spawn(async move {
269                    // Acquire permit before uploading
270                    let _permit = semaphore.acquire().await;
271
272                    match msg {
273                        S3SyncMessage::Upload { hash, data } => {
274                            let key = format!("{}{}.bin", prefix, to_hex(&hash));
275                            tracing::debug!("S3 uploading {} ({} bytes)", &key, data.len());
276
277                            match client
278                                .put_object()
279                                .bucket(bucket.as_str())
280                                .key(&key)
281                                .body(ByteStream::from(data))
282                                .send()
283                                .await
284                            {
285                                Ok(_) => tracing::debug!("S3 upload succeeded: {}", &key),
286                                Err(e) => tracing::error!("S3 upload failed {}: {}", &key, e),
287                            }
288                        }
289                        S3SyncMessage::Delete { hash } => {
290                            let key = format!("{}{}.bin", prefix, to_hex(&hash));
291                            tracing::debug!("S3 deleting {}", &key);
292
293                            if let Err(e) = client
294                                .delete_object()
295                                .bucket(bucket.as_str())
296                                .key(&key)
297                                .send()
298                                .await
299                            {
300                                tracing::error!("S3 delete failed {}: {}", &key, e);
301                            }
302                        }
303                    }
304                });
305            }
306        });
307
308        tracing::info!(
309            "S3 storage initialized: bucket={}, prefix={}",
310            bucket,
311            prefix
312        );
313
314        Ok(Self {
315            local,
316            s3_client: Some(s3_client),
317            s3_bucket: Some(bucket),
318            s3_prefix: prefix,
319            sync_tx: Some(sync_tx),
320        })
321    }
322
323    /// Store data - writes to LMDB, queues S3 upload in background
324    pub fn put_sync(&self, hash: Hash, data: &[u8]) -> Result<bool, StoreError> {
325        // Always write to local first
326        let is_new = self.local.put_sync(hash, data)?;
327
328        // Queue S3 upload if configured (non-blocking)
329        // Always upload to S3 (even if not new locally) to ensure S3 has all blobs
330        #[cfg(feature = "s3")]
331        if let Some(ref tx) = self.sync_tx {
332            tracing::info!(
333                "Queueing S3 upload for {} ({} bytes, is_new={})",
334                crate::storage::to_hex(&hash)[..16].to_string(),
335                data.len(),
336                is_new
337            );
338            if let Err(e) = tx.send(S3SyncMessage::Upload {
339                hash,
340                data: data.to_vec(),
341            }) {
342                tracing::error!("Failed to queue S3 upload: {}", e);
343            }
344        }
345
346        Ok(is_new)
347    }
348
349    /// Get data - tries LMDB first, falls back to S3
350    pub fn get_sync(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
351        // Try local first
352        if let Some(data) = self.local.get_sync(hash)? {
353            return Ok(Some(data));
354        }
355
356        // Fall back to S3 if configured
357        #[cfg(feature = "s3")]
358        if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
359            let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
360
361            match sync_block_on(async { client.get_object().bucket(bucket).key(&key).send().await })
362            {
363                Ok(output) => {
364                    if let Ok(body) = sync_block_on(output.body.collect()) {
365                        let data = body.into_bytes().to_vec();
366                        // Cache locally for future reads
367                        let _ = self.local.put_sync(*hash, &data);
368                        return Ok(Some(data));
369                    }
370                }
371                Err(e) => {
372                    let service_err = e.into_service_error();
373                    if !service_err.is_no_such_key() {
374                        tracing::warn!("S3 get failed: {}", service_err);
375                    }
376                }
377            }
378        }
379
380        Ok(None)
381    }
382
383    /// Check if hash exists
384    pub fn exists(&self, hash: &Hash) -> Result<bool, StoreError> {
385        // Check local first
386        if self.local.exists(hash)? {
387            return Ok(true);
388        }
389
390        // Check S3 if configured
391        #[cfg(feature = "s3")]
392        if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
393            let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
394
395            match sync_block_on(async {
396                client.head_object().bucket(bucket).key(&key).send().await
397            }) {
398                Ok(_) => return Ok(true),
399                Err(e) => {
400                    let service_err = e.into_service_error();
401                    if !service_err.is_not_found() {
402                        tracing::warn!("S3 head failed: {}", service_err);
403                    }
404                }
405            }
406        }
407
408        Ok(false)
409    }
410
411    /// Delete data from both local and S3 stores
412    pub fn delete_sync(&self, hash: &Hash) -> Result<bool, StoreError> {
413        let deleted = self.local.delete_sync(hash)?;
414
415        // Queue S3 delete if configured
416        #[cfg(feature = "s3")]
417        if let Some(ref tx) = self.sync_tx {
418            let _ = tx.send(S3SyncMessage::Delete { hash: *hash });
419        }
420
421        Ok(deleted)
422    }
423
424    /// Delete data from local store only (don't propagate to S3)
425    /// Used for eviction where we want to keep S3 as archive
426    pub fn delete_local_only(&self, hash: &Hash) -> Result<bool, StoreError> {
427        self.local.delete_sync(hash)
428    }
429
430    /// Get stats from local store
431    pub fn stats(&self) -> Result<LocalStoreStats, StoreError> {
432        self.local.stats()
433    }
434
435    /// List all hashes from local store
436    pub fn list(&self) -> Result<Vec<Hash>, StoreError> {
437        self.local.list()
438    }
439
440    /// Get the underlying local store for HashTree operations
441    pub fn local_store(&self) -> Arc<LocalStore> {
442        Arc::clone(&self.local)
443    }
444}
445
446// Implement async Store trait for StorageRouter so it can be used directly with HashTree
447// This ensures all writes go through S3 sync
448#[async_trait]
449impl Store for StorageRouter {
450    async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
451        self.put_sync(hash, &data)
452    }
453
454    async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
455        self.get_sync(hash)
456    }
457
458    async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
459        self.exists(hash)
460    }
461
462    async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
463        self.delete_sync(hash)
464    }
465}
466
467pub struct HashtreeStore {
468    env: heed::Env,
469    /// Set of pinned hashes (32-byte raw hashes, prevents garbage collection)
470    pins: Database<Bytes, Unit>,
471    /// Blob ownership: sha256 (32 bytes) ++ pubkey (32 bytes) -> () (composite key for multi-owner)
472    blob_owners: Database<Bytes, Unit>,
473    /// Maps pubkey (32 bytes) -> blob metadata JSON (for blossom list)
474    pubkey_blobs: Database<Bytes, Bytes>,
475    /// Tree metadata for eviction: tree_root_hash (32 bytes) -> TreeMeta (msgpack)
476    tree_meta: Database<Bytes, Bytes>,
477    /// Blob-to-tree mapping: blob_hash ++ tree_hash (64 bytes) -> ()
478    blob_trees: Database<Bytes, Unit>,
479    /// Tree refs: "npub/path" -> tree_root_hash (32 bytes) - for replacing old versions
480    tree_refs: Database<Str, Bytes>,
481    /// Cached roots from Nostr: "pubkey_hex/tree_name" -> CachedRoot (msgpack)
482    cached_roots: Database<Str, Bytes>,
483    /// Storage router - handles LMDB + optional S3 (Arc for sharing with HashTree)
484    router: Arc<StorageRouter>,
485    /// Maximum storage size in bytes (from config)
486    max_size_bytes: u64,
487}
488
489impl HashtreeStore {
490    /// Create a new store with the configured local storage limit.
491    pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
492        let config = hashtree_config::Config::load_or_default();
493        let max_size_bytes = config
494            .storage
495            .max_size_gb
496            .saturating_mul(1024 * 1024 * 1024);
497        Self::with_options_and_backend(path, None, max_size_bytes, &config.storage.backend)
498    }
499
500    /// Create a new store with an explicit local backend and size limit.
501    pub fn new_with_backend<P: AsRef<Path>>(
502        path: P,
503        backend: hashtree_config::StorageBackend,
504        max_size_bytes: u64,
505    ) -> Result<Self> {
506        Self::with_options_and_backend(path, None, max_size_bytes, &backend)
507    }
508
509    /// Create a new store with optional S3 backend and the configured local storage limit.
510    pub fn with_s3<P: AsRef<Path>>(path: P, s3_config: Option<&S3Config>) -> Result<Self> {
511        let config = hashtree_config::Config::load_or_default();
512        let max_size_bytes = config
513            .storage
514            .max_size_gb
515            .saturating_mul(1024 * 1024 * 1024);
516        Self::with_options_and_backend(path, s3_config, max_size_bytes, &config.storage.backend)
517    }
518
519    /// Create a new store with optional S3 backend and custom size limit
520    pub fn with_options<P: AsRef<Path>>(
521        path: P,
522        s3_config: Option<&S3Config>,
523        max_size_bytes: u64,
524    ) -> Result<Self> {
525        let config = hashtree_config::Config::load_or_default();
526        Self::with_options_and_backend(path, s3_config, max_size_bytes, &config.storage.backend)
527    }
528
529    fn with_options_and_backend<P: AsRef<Path>>(
530        path: P,
531        s3_config: Option<&S3Config>,
532        max_size_bytes: u64,
533        backend: &hashtree_config::StorageBackend,
534    ) -> Result<Self> {
535        let path = path.as_ref();
536        std::fs::create_dir_all(path)?;
537
538        let env = unsafe {
539            EnvOpenOptions::new()
540                .map_size(10 * 1024 * 1024 * 1024) // 10GB virtual address space
541                .max_dbs(8) // pins, blob_owners, pubkey_blobs, tree_meta, blob_trees, tree_refs, cached_roots, blobs
542                .open(path)?
543        };
544
545        let mut wtxn = env.write_txn()?;
546        let pins = env.create_database(&mut wtxn, Some("pins"))?;
547        let blob_owners = env.create_database(&mut wtxn, Some("blob_owners"))?;
548        let pubkey_blobs = env.create_database(&mut wtxn, Some("pubkey_blobs"))?;
549        let tree_meta = env.create_database(&mut wtxn, Some("tree_meta"))?;
550        let blob_trees = env.create_database(&mut wtxn, Some("blob_trees"))?;
551        let tree_refs = env.create_database(&mut wtxn, Some("tree_refs"))?;
552        let cached_roots = env.create_database(&mut wtxn, Some("cached_roots"))?;
553        wtxn.commit()?;
554
555        // Create local blob store based on configured backend
556        let local_store = Arc::new(
557            LocalStore::new(path.join("blobs"), backend)
558                .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
559        );
560
561        // Create storage router with optional S3
562        #[cfg(feature = "s3")]
563        let router = Arc::new(if let Some(s3_cfg) = s3_config {
564            tracing::info!(
565                "Initializing S3 storage backend: bucket={}, endpoint={}",
566                s3_cfg.bucket,
567                s3_cfg.endpoint
568            );
569
570            sync_block_on(async { StorageRouter::with_s3(local_store, s3_cfg).await })?
571        } else {
572            StorageRouter::new(local_store)
573        });
574
575        #[cfg(not(feature = "s3"))]
576        let router = Arc::new({
577            if s3_config.is_some() {
578                tracing::warn!(
579                    "S3 config provided but S3 feature not enabled. Using local storage only."
580                );
581            }
582            StorageRouter::new(local_store)
583        });
584
585        Ok(Self {
586            env,
587            pins,
588            blob_owners,
589            pubkey_blobs,
590            tree_meta,
591            blob_trees,
592            tree_refs,
593            cached_roots,
594            router,
595            max_size_bytes,
596        })
597    }
598
599    /// Get the storage router
600    pub fn router(&self) -> &StorageRouter {
601        &self.router
602    }
603
604    /// Get the storage router as Arc (for use with HashTree which needs Arc<dyn Store>)
605    /// All writes through this go to both LMDB and S3
606    pub fn store_arc(&self) -> Arc<StorageRouter> {
607        Arc::clone(&self.router)
608    }
609
610    /// Upload a file as raw plaintext and return its CID, with auto-pin
611    pub fn upload_file<P: AsRef<Path>>(&self, file_path: P) -> Result<String> {
612        self.upload_file_internal(file_path, true)
613    }
614
615    /// Upload a file without pinning (for blossom uploads that can be evicted)
616    pub fn upload_file_no_pin<P: AsRef<Path>>(&self, file_path: P) -> Result<String> {
617        self.upload_file_internal(file_path, false)
618    }
619
620    fn upload_file_internal<P: AsRef<Path>>(&self, file_path: P, pin: bool) -> Result<String> {
621        let file_path = file_path.as_ref();
622        let file = std::fs::File::open(file_path)
623            .with_context(|| format!("Failed to open file {}", file_path.display()))?;
624
625        // Store raw plaintext blobs without CHK encryption, streaming from disk.
626        let store = self.store_arc();
627        let tree = HashTree::new(HashTreeConfig::new(store).public());
628
629        let (cid, _size) = sync_block_on(async { tree.put_stream(AllowStdIo::new(file)).await })
630            .context("Failed to store file")?;
631
632        // Only pin if requested (htree add = pin, blossom upload = no pin)
633        if pin {
634            let mut wtxn = self.env.write_txn()?;
635            self.pins.put(&mut wtxn, cid.hash.as_slice(), &())?;
636            wtxn.commit()?;
637        }
638
639        Ok(to_hex(&cid.hash))
640    }
641
642    /// Upload a file from a stream with progress callbacks
643    pub fn upload_file_stream<R: Read, F>(
644        &self,
645        reader: R,
646        _file_name: impl Into<String>,
647        mut callback: F,
648    ) -> Result<String>
649    where
650        F: FnMut(&str),
651    {
652        // Use HashTree.put_stream for streaming upload without CHK encryption.
653        let store = self.store_arc();
654        let tree = HashTree::new(HashTreeConfig::new(store).public());
655
656        let (cid, _size) = sync_block_on(async { tree.put_stream(AllowStdIo::new(reader)).await })
657            .context("Failed to store file")?;
658
659        let root_hex = to_hex(&cid.hash);
660        callback(&root_hex);
661
662        // Auto-pin on upload
663        let mut wtxn = self.env.write_txn()?;
664        self.pins.put(&mut wtxn, cid.hash.as_slice(), &())?;
665        wtxn.commit()?;
666
667        Ok(root_hex)
668    }
669
670    /// Upload a directory and return its root hash (hex)
671    /// Respects .gitignore by default
672    pub fn upload_dir<P: AsRef<Path>>(&self, dir_path: P) -> Result<String> {
673        self.upload_dir_with_options(dir_path, true)
674    }
675
676    /// Upload a directory with options as raw plaintext (no CHK encryption)
677    pub fn upload_dir_with_options<P: AsRef<Path>>(
678        &self,
679        dir_path: P,
680        respect_gitignore: bool,
681    ) -> Result<String> {
682        let dir_path = dir_path.as_ref();
683
684        let store = self.store_arc();
685        let tree = HashTree::new(HashTreeConfig::new(store).public());
686
687        let root_cid = sync_block_on(async {
688            self.upload_dir_recursive(&tree, dir_path, dir_path, respect_gitignore)
689                .await
690        })
691        .context("Failed to upload directory")?;
692
693        let root_hex = to_hex(&root_cid.hash);
694
695        let mut wtxn = self.env.write_txn()?;
696        self.pins.put(&mut wtxn, root_cid.hash.as_slice(), &())?;
697        wtxn.commit()?;
698
699        Ok(root_hex)
700    }
701
702    async fn upload_dir_recursive<S: Store>(
703        &self,
704        tree: &HashTree<S>,
705        _root_path: &Path,
706        current_path: &Path,
707        respect_gitignore: bool,
708    ) -> Result<Cid> {
709        use ignore::WalkBuilder;
710        use std::collections::HashMap;
711
712        // Build directory structure from flat file list - store full Cid with key
713        let mut dir_contents: HashMap<String, Vec<(String, Cid)>> = HashMap::new();
714        dir_contents.insert(String::new(), Vec::new()); // Root
715
716        let walker = WalkBuilder::new(current_path)
717            .git_ignore(respect_gitignore)
718            .git_global(respect_gitignore)
719            .git_exclude(respect_gitignore)
720            .hidden(false)
721            .build();
722
723        for result in walker {
724            let entry = result?;
725            let path = entry.path();
726
727            // Skip the root directory itself
728            if path == current_path {
729                continue;
730            }
731
732            let relative = path.strip_prefix(current_path).unwrap_or(path);
733
734            if path.is_file() {
735                let file = std::fs::File::open(path)
736                    .with_context(|| format!("Failed to open file {}", path.display()))?;
737                let (cid, _size) = tree.put_stream(AllowStdIo::new(file)).await.map_err(|e| {
738                    anyhow::anyhow!("Failed to upload file {}: {}", path.display(), e)
739                })?;
740
741                // Get parent directory path and file name
742                let parent = relative
743                    .parent()
744                    .map(|p| p.to_string_lossy().to_string())
745                    .unwrap_or_default();
746                let name = relative
747                    .file_name()
748                    .map(|n| n.to_string_lossy().to_string())
749                    .unwrap_or_default();
750
751                dir_contents.entry(parent).or_default().push((name, cid));
752            } else if path.is_dir() {
753                // Ensure directory entry exists
754                let dir_path = relative.to_string_lossy().to_string();
755                dir_contents.entry(dir_path).or_default();
756            }
757        }
758
759        // Build directory tree bottom-up
760        self.build_directory_tree(tree, &mut dir_contents).await
761    }
762
763    async fn build_directory_tree<S: Store>(
764        &self,
765        tree: &HashTree<S>,
766        dir_contents: &mut std::collections::HashMap<String, Vec<(String, Cid)>>,
767    ) -> Result<Cid> {
768        // Sort directories by depth (deepest first) to build bottom-up
769        let mut dirs: Vec<String> = dir_contents.keys().cloned().collect();
770        dirs.sort_by(|a, b| {
771            let depth_a = a.matches('/').count() + if a.is_empty() { 0 } else { 1 };
772            let depth_b = b.matches('/').count() + if b.is_empty() { 0 } else { 1 };
773            depth_b.cmp(&depth_a) // Deepest first
774        });
775
776        let mut dir_cids: std::collections::HashMap<String, Cid> = std::collections::HashMap::new();
777
778        for dir_path in dirs {
779            let files = dir_contents.get(&dir_path).cloned().unwrap_or_default();
780
781            let mut entries: Vec<HashTreeDirEntry> = files
782                .into_iter()
783                .map(|(name, cid)| HashTreeDirEntry::from_cid(name, &cid))
784                .collect();
785
786            // Add subdirectory entries
787            for (subdir_path, cid) in &dir_cids {
788                let parent = std::path::Path::new(subdir_path)
789                    .parent()
790                    .map(|p| p.to_string_lossy().to_string())
791                    .unwrap_or_default();
792
793                if parent == dir_path {
794                    let name = std::path::Path::new(subdir_path)
795                        .file_name()
796                        .map(|n| n.to_string_lossy().to_string())
797                        .unwrap_or_default();
798                    entries.push(HashTreeDirEntry::from_cid(name, cid));
799                }
800            }
801
802            let cid = tree
803                .put_directory(entries)
804                .await
805                .map_err(|e| anyhow::anyhow!("Failed to create directory node: {}", e))?;
806
807            dir_cids.insert(dir_path, cid);
808        }
809
810        // Return root Cid
811        dir_cids
812            .get("")
813            .cloned()
814            .ok_or_else(|| anyhow::anyhow!("No root directory"))
815    }
816
817    /// Upload a file with CHK encryption, returns CID in format "hash:key"
818    pub fn upload_file_encrypted<P: AsRef<Path>>(&self, file_path: P) -> Result<String> {
819        let file_path = file_path.as_ref();
820        let file = std::fs::File::open(file_path)
821            .with_context(|| format!("Failed to open file {}", file_path.display()))?;
822
823        // Use unified API with encryption enabled (default), streaming from disk.
824        let store = self.store_arc();
825        let tree = HashTree::new(HashTreeConfig::new(store));
826
827        let (cid, _size) = sync_block_on(async { tree.put_stream(AllowStdIo::new(file)).await })
828            .map_err(|e| anyhow::anyhow!("Failed to encrypt file: {}", e))?;
829
830        let cid_str = cid.to_string();
831
832        let mut wtxn = self.env.write_txn()?;
833        self.pins.put(&mut wtxn, cid.hash.as_slice(), &())?;
834        wtxn.commit()?;
835
836        Ok(cid_str)
837    }
838
839    /// Upload a directory with CHK encryption, returns CID
840    /// Respects .gitignore by default
841    pub fn upload_dir_encrypted<P: AsRef<Path>>(&self, dir_path: P) -> Result<String> {
842        self.upload_dir_encrypted_with_options(dir_path, true)
843    }
844
845    /// Upload a directory with CHK encryption and options
846    /// Returns CID as "hash:key" format for encrypted directories
847    pub fn upload_dir_encrypted_with_options<P: AsRef<Path>>(
848        &self,
849        dir_path: P,
850        respect_gitignore: bool,
851    ) -> Result<String> {
852        let dir_path = dir_path.as_ref();
853        let store = self.store_arc();
854
855        // Use unified API with encryption enabled (default)
856        let tree = HashTree::new(HashTreeConfig::new(store));
857
858        let root_cid = sync_block_on(async {
859            self.upload_dir_recursive(&tree, dir_path, dir_path, respect_gitignore)
860                .await
861        })
862        .context("Failed to upload encrypted directory")?;
863
864        let cid_str = root_cid.to_string(); // Returns "hash:key" or "hash"
865
866        let mut wtxn = self.env.write_txn()?;
867        // Pin by hash only (the key is for decryption, not identification)
868        self.pins.put(&mut wtxn, root_cid.hash.as_slice(), &())?;
869        wtxn.commit()?;
870
871        Ok(cid_str)
872    }
873
874    /// Get tree node by hash (raw bytes)
875    pub fn get_tree_node(&self, hash: &[u8; 32]) -> Result<Option<TreeNode>> {
876        let store = self.store_arc();
877        let tree = HashTree::new(HashTreeConfig::new(store).public());
878
879        sync_block_on(async {
880            tree.get_tree_node(hash)
881                .await
882                .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))
883        })
884    }
885
886    /// Store a raw blob, returns SHA256 hash as hex.
887    pub fn put_blob(&self, data: &[u8]) -> Result<String> {
888        let hash = sha256(data);
889        self.router
890            .put_sync(hash, data)
891            .map_err(|e| anyhow::anyhow!("Failed to store blob: {}", e))?;
892        Ok(to_hex(&hash))
893    }
894
895    /// Get a raw blob by SHA256 hash (raw bytes).
896    pub fn get_blob(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
897        self.router
898            .get_sync(hash)
899            .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))
900    }
901
902    /// Check if a blob exists by SHA256 hash (raw bytes).
903    pub fn blob_exists(&self, hash: &[u8; 32]) -> Result<bool> {
904        self.router
905            .exists(hash)
906            .map_err(|e| anyhow::anyhow!("Failed to check blob: {}", e))
907    }
908
909    // === Blossom ownership tracking ===
910    // Uses composite key: sha256 (32 bytes) ++ pubkey (32 bytes) -> ()
911    // This allows efficient multi-owner tracking with O(1) lookups
912
913    /// Build composite key for blob_owners: sha256 ++ pubkey (64 bytes total)
914    fn blob_owner_key(sha256: &[u8; 32], pubkey: &[u8; 32]) -> [u8; 64] {
915        let mut key = [0u8; 64];
916        key[..32].copy_from_slice(sha256);
917        key[32..].copy_from_slice(pubkey);
918        key
919    }
920
921    /// Add an owner (pubkey) to a blob for Blossom protocol
922    /// Multiple users can own the same blob - it's only deleted when all owners remove it
923    pub fn set_blob_owner(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<()> {
924        let key = Self::blob_owner_key(sha256, pubkey);
925        let mut wtxn = self.env.write_txn()?;
926
927        // Add ownership entry (idempotent - put overwrites)
928        self.blob_owners.put(&mut wtxn, &key[..], &())?;
929
930        // Convert sha256 to hex for BlobMetadata (which stores sha256 as hex string)
931        let sha256_hex = to_hex(sha256);
932
933        // Get existing blobs for this pubkey (for /list endpoint)
934        let mut blobs: Vec<BlobMetadata> = self
935            .pubkey_blobs
936            .get(&wtxn, pubkey)?
937            .and_then(|b| serde_json::from_slice(b).ok())
938            .unwrap_or_default();
939
940        // Check if blob already exists for this pubkey
941        if !blobs.iter().any(|b| b.sha256 == sha256_hex) {
942            let now = SystemTime::now()
943                .duration_since(UNIX_EPOCH)
944                .unwrap()
945                .as_secs();
946
947            // Get size from raw blob
948            let size = self
949                .get_blob(sha256)?
950                .map(|data| data.len() as u64)
951                .unwrap_or(0);
952
953            blobs.push(BlobMetadata {
954                sha256: sha256_hex,
955                size,
956                mime_type: "application/octet-stream".to_string(),
957                uploaded: now,
958            });
959
960            let blobs_json = serde_json::to_vec(&blobs)?;
961            self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
962        }
963
964        wtxn.commit()?;
965        Ok(())
966    }
967
968    /// Check if a pubkey owns a blob
969    pub fn is_blob_owner(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<bool> {
970        let key = Self::blob_owner_key(sha256, pubkey);
971        let rtxn = self.env.read_txn()?;
972        Ok(self.blob_owners.get(&rtxn, &key[..])?.is_some())
973    }
974
975    /// Get all owners (pubkeys) of a blob via prefix scan (returns raw bytes)
976    pub fn get_blob_owners(&self, sha256: &[u8; 32]) -> Result<Vec<[u8; 32]>> {
977        let rtxn = self.env.read_txn()?;
978
979        let mut owners = Vec::new();
980        for item in self.blob_owners.prefix_iter(&rtxn, &sha256[..])? {
981            let (key, _) = item?;
982            if key.len() == 64 {
983                // Extract pubkey from composite key (bytes 32-64)
984                let mut pubkey = [0u8; 32];
985                pubkey.copy_from_slice(&key[32..64]);
986                owners.push(pubkey);
987            }
988        }
989        Ok(owners)
990    }
991
992    /// Check if blob has any owners
993    pub fn blob_has_owners(&self, sha256: &[u8; 32]) -> Result<bool> {
994        let rtxn = self.env.read_txn()?;
995
996        // Just check if any entry exists with this prefix
997        for item in self.blob_owners.prefix_iter(&rtxn, &sha256[..])? {
998            if item.is_ok() {
999                return Ok(true);
1000            }
1001        }
1002        Ok(false)
1003    }
1004
1005    /// Get the first owner (pubkey) of a blob (for backwards compatibility)
1006    pub fn get_blob_owner(&self, sha256: &[u8; 32]) -> Result<Option<[u8; 32]>> {
1007        Ok(self.get_blob_owners(sha256)?.into_iter().next())
1008    }
1009
1010    /// Remove a user's ownership of a blossom blob
1011    /// Only deletes the actual blob when no owners remain
1012    /// Returns true if the blob was actually deleted (no owners left)
1013    pub fn delete_blossom_blob(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<bool> {
1014        let key = Self::blob_owner_key(sha256, pubkey);
1015        let mut wtxn = self.env.write_txn()?;
1016
1017        // Remove this pubkey's ownership entry
1018        self.blob_owners.delete(&mut wtxn, &key[..])?;
1019
1020        // Hex strings for logging and BlobMetadata (which stores sha256 as hex string)
1021        let sha256_hex = to_hex(sha256);
1022
1023        // Remove from pubkey's blob list
1024        if let Some(blobs_bytes) = self.pubkey_blobs.get(&wtxn, pubkey)? {
1025            if let Ok(mut blobs) = serde_json::from_slice::<Vec<BlobMetadata>>(blobs_bytes) {
1026                blobs.retain(|b| b.sha256 != sha256_hex);
1027                let blobs_json = serde_json::to_vec(&blobs)?;
1028                self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
1029            }
1030        }
1031
1032        // Check if any other owners remain (prefix scan)
1033        let mut has_other_owners = false;
1034        for item in self.blob_owners.prefix_iter(&wtxn, &sha256[..])? {
1035            if item.is_ok() {
1036                has_other_owners = true;
1037                break;
1038            }
1039        }
1040
1041        if has_other_owners {
1042            wtxn.commit()?;
1043            tracing::debug!(
1044                "Removed {} from blob {} owners, other owners remain",
1045                &to_hex(pubkey)[..8],
1046                &sha256_hex[..8]
1047            );
1048            return Ok(false);
1049        }
1050
1051        // No owners left - delete the blob completely
1052        tracing::info!(
1053            "All owners removed from blob {}, deleting",
1054            &sha256_hex[..8]
1055        );
1056
1057        // Delete raw blob (by content hash) - this deletes from S3 too
1058        let _ = self.router.delete_sync(sha256);
1059
1060        wtxn.commit()?;
1061        Ok(true)
1062    }
1063
1064    /// List all blobs owned by a pubkey (for Blossom /list endpoint)
1065    pub fn list_blobs_by_pubkey(
1066        &self,
1067        pubkey: &[u8; 32],
1068    ) -> Result<Vec<crate::server::blossom::BlobDescriptor>> {
1069        let rtxn = self.env.read_txn()?;
1070
1071        let blobs: Vec<BlobMetadata> = self
1072            .pubkey_blobs
1073            .get(&rtxn, pubkey)?
1074            .and_then(|b| serde_json::from_slice(b).ok())
1075            .unwrap_or_default();
1076
1077        Ok(blobs
1078            .into_iter()
1079            .map(|b| crate::server::blossom::BlobDescriptor {
1080                url: format!("/{}", b.sha256),
1081                sha256: b.sha256,
1082                size: b.size,
1083                mime_type: b.mime_type,
1084                uploaded: b.uploaded,
1085            })
1086            .collect())
1087    }
1088
1089    /// Get a single chunk/blob by hash (raw bytes)
1090    pub fn get_chunk(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
1091        self.router
1092            .get_sync(hash)
1093            .map_err(|e| anyhow::anyhow!("Failed to get chunk: {}", e))
1094    }
1095
1096    /// Get file content by hash (raw bytes)
1097    /// Returns raw bytes (caller handles decryption if needed)
1098    pub fn get_file(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
1099        let store = self.store_arc();
1100        let tree = HashTree::new(HashTreeConfig::new(store).public());
1101
1102        sync_block_on(async {
1103            tree.read_file(hash)
1104                .await
1105                .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
1106        })
1107    }
1108
1109    /// Get file content by Cid (hash + optional decryption key as raw bytes)
1110    /// Handles decryption automatically if key is present
1111    pub fn get_file_by_cid(&self, cid: &Cid) -> Result<Option<Vec<u8>>> {
1112        let store = self.store_arc();
1113        let tree = HashTree::new(HashTreeConfig::new(store).public());
1114
1115        sync_block_on(async {
1116            tree.get(cid, None)
1117                .await
1118                .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
1119        })
1120    }
1121
1122    fn ensure_cid_exists(&self, cid: &Cid) -> Result<()> {
1123        let exists = self
1124            .router
1125            .exists(&cid.hash)
1126            .map_err(|e| anyhow::anyhow!("Failed to check cid existence: {}", e))?;
1127        if !exists {
1128            anyhow::bail!("CID not found: {}", to_hex(&cid.hash));
1129        }
1130        Ok(())
1131    }
1132
1133    /// Stream file content identified by Cid into a writer without buffering full file in memory.
1134    pub fn write_file_by_cid_to_writer<W: Write>(&self, cid: &Cid, writer: &mut W) -> Result<u64> {
1135        self.ensure_cid_exists(cid)?;
1136
1137        let store = self.store_arc();
1138        let tree = HashTree::new(HashTreeConfig::new(store).public());
1139        let mut total_bytes = 0u64;
1140        let mut streamed_any_chunk = false;
1141
1142        sync_block_on(async {
1143            let mut stream = tree.get_stream(cid);
1144            while let Some(chunk) = stream.next().await {
1145                streamed_any_chunk = true;
1146                let chunk =
1147                    chunk.map_err(|e| anyhow::anyhow!("Failed to stream file chunk: {}", e))?;
1148                writer
1149                    .write_all(&chunk)
1150                    .map_err(|e| anyhow::anyhow!("Failed to write file chunk: {}", e))?;
1151                total_bytes += chunk.len() as u64;
1152            }
1153            Ok::<(), anyhow::Error>(())
1154        })?;
1155
1156        if !streamed_any_chunk {
1157            anyhow::bail!("CID not found: {}", to_hex(&cid.hash));
1158        }
1159
1160        writer
1161            .flush()
1162            .map_err(|e| anyhow::anyhow!("Failed to flush output: {}", e))?;
1163        Ok(total_bytes)
1164    }
1165
1166    /// Stream file content identified by Cid directly into a destination path.
1167    pub fn write_file_by_cid<P: AsRef<Path>>(&self, cid: &Cid, output_path: P) -> Result<u64> {
1168        self.ensure_cid_exists(cid)?;
1169
1170        let output_path = output_path.as_ref();
1171        if let Some(parent) = output_path.parent() {
1172            if !parent.as_os_str().is_empty() {
1173                std::fs::create_dir_all(parent).with_context(|| {
1174                    format!("Failed to create output directory {}", parent.display())
1175                })?;
1176            }
1177        }
1178
1179        let mut file = std::fs::File::create(output_path)
1180            .with_context(|| format!("Failed to create output file {}", output_path.display()))?;
1181        self.write_file_by_cid_to_writer(cid, &mut file)
1182    }
1183
1184    /// Stream a public (unencrypted) file by hash directly into a destination path.
1185    pub fn write_file<P: AsRef<Path>>(&self, hash: &[u8; 32], output_path: P) -> Result<u64> {
1186        self.write_file_by_cid(&Cid::public(*hash), output_path)
1187    }
1188
1189    /// Resolve a path within a tree (returns Cid with key if encrypted)
1190    pub fn resolve_path(&self, cid: &Cid, path: &str) -> Result<Option<Cid>> {
1191        let store = self.store_arc();
1192        let tree = HashTree::new(HashTreeConfig::new(store).public());
1193
1194        sync_block_on(async {
1195            tree.resolve_path(cid, path)
1196                .await
1197                .map_err(|e| anyhow::anyhow!("Failed to resolve path: {}", e))
1198        })
1199    }
1200
1201    /// Get chunk metadata for a file (chunk list, sizes, total size)
1202    pub fn get_file_chunk_metadata(&self, hash: &[u8; 32]) -> Result<Option<FileChunkMetadata>> {
1203        let store = self.store_arc();
1204        let tree = HashTree::new(HashTreeConfig::new(store.clone()).public());
1205
1206        sync_block_on(async {
1207            // First check if the hash exists in the store at all
1208            // (either as a blob or tree node)
1209            let exists = store
1210                .has(hash)
1211                .await
1212                .map_err(|e| anyhow::anyhow!("Failed to check existence: {}", e))?;
1213
1214            if !exists {
1215                return Ok(None);
1216            }
1217
1218            // Get total size
1219            let total_size = tree
1220                .get_size(hash)
1221                .await
1222                .map_err(|e| anyhow::anyhow!("Failed to get size: {}", e))?;
1223
1224            // Check if it's a tree (chunked) or blob
1225            let is_tree_node = tree
1226                .is_tree(hash)
1227                .await
1228                .map_err(|e| anyhow::anyhow!("Failed to check tree: {}", e))?;
1229
1230            if !is_tree_node {
1231                // Single blob, not chunked
1232                return Ok(Some(FileChunkMetadata {
1233                    total_size,
1234                    chunk_hashes: vec![],
1235                    chunk_sizes: vec![],
1236                    is_chunked: false,
1237                }));
1238            }
1239
1240            // Get tree node to extract chunk info
1241            let node = match tree
1242                .get_tree_node(hash)
1243                .await
1244                .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
1245            {
1246                Some(n) => n,
1247                None => return Ok(None),
1248            };
1249
1250            // Check if it's a directory (has named links)
1251            let is_directory = tree
1252                .is_directory(hash)
1253                .await
1254                .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1255
1256            if is_directory {
1257                return Ok(None); // Not a file
1258            }
1259
1260            // Extract chunk info from links
1261            let chunk_hashes: Vec<Hash> = node.links.iter().map(|l| l.hash).collect();
1262            let chunk_sizes: Vec<u64> = node.links.iter().map(|l| l.size).collect();
1263
1264            Ok(Some(FileChunkMetadata {
1265                total_size,
1266                chunk_hashes,
1267                chunk_sizes,
1268                is_chunked: !node.links.is_empty(),
1269            }))
1270        })
1271    }
1272
1273    /// Get byte range from file
1274    pub fn get_file_range(
1275        &self,
1276        hash: &[u8; 32],
1277        start: u64,
1278        end: Option<u64>,
1279    ) -> Result<Option<(Vec<u8>, u64)>> {
1280        let metadata = match self.get_file_chunk_metadata(hash)? {
1281            Some(m) => m,
1282            None => return Ok(None),
1283        };
1284
1285        if metadata.total_size == 0 {
1286            return Ok(Some((Vec::new(), 0)));
1287        }
1288
1289        if start >= metadata.total_size {
1290            return Ok(None);
1291        }
1292
1293        let end = end
1294            .unwrap_or(metadata.total_size - 1)
1295            .min(metadata.total_size - 1);
1296
1297        // For non-chunked files, load entire file
1298        if !metadata.is_chunked {
1299            let content = self.get_file(hash)?.unwrap_or_default();
1300            let range_content = if start < content.len() as u64 {
1301                content[start as usize..=(end as usize).min(content.len() - 1)].to_vec()
1302            } else {
1303                Vec::new()
1304            };
1305            return Ok(Some((range_content, metadata.total_size)));
1306        }
1307
1308        // For chunked files, load only needed chunks
1309        let mut result = Vec::new();
1310        let mut current_offset = 0u64;
1311
1312        for (i, chunk_hash) in metadata.chunk_hashes.iter().enumerate() {
1313            let chunk_size = metadata.chunk_sizes[i];
1314            let chunk_end = current_offset + chunk_size - 1;
1315
1316            // Check if this chunk overlaps with requested range
1317            if chunk_end >= start && current_offset <= end {
1318                let chunk_content = match self.get_chunk(chunk_hash)? {
1319                    Some(content) => content,
1320                    None => {
1321                        return Err(anyhow::anyhow!("Chunk {} not found", to_hex(chunk_hash)));
1322                    }
1323                };
1324
1325                let chunk_read_start = if current_offset >= start {
1326                    0
1327                } else {
1328                    (start - current_offset) as usize
1329                };
1330
1331                let chunk_read_end = if chunk_end <= end {
1332                    chunk_size as usize - 1
1333                } else {
1334                    (end - current_offset) as usize
1335                };
1336
1337                result.extend_from_slice(&chunk_content[chunk_read_start..=chunk_read_end]);
1338            }
1339
1340            current_offset += chunk_size;
1341
1342            if current_offset > end {
1343                break;
1344            }
1345        }
1346
1347        Ok(Some((result, metadata.total_size)))
1348    }
1349
1350    /// Stream file range as chunks using Arc for async/Send contexts
1351    pub fn stream_file_range_chunks_owned(
1352        self: Arc<Self>,
1353        hash: &[u8; 32],
1354        start: u64,
1355        end: u64,
1356    ) -> Result<Option<FileRangeChunksOwned>> {
1357        let metadata = match self.get_file_chunk_metadata(hash)? {
1358            Some(m) => m,
1359            None => return Ok(None),
1360        };
1361
1362        if metadata.total_size == 0 || start >= metadata.total_size {
1363            return Ok(None);
1364        }
1365
1366        let end = end.min(metadata.total_size - 1);
1367
1368        Ok(Some(FileRangeChunksOwned {
1369            store: self,
1370            metadata,
1371            start,
1372            end,
1373            current_chunk_idx: 0,
1374            current_offset: 0,
1375        }))
1376    }
1377
1378    /// Get directory structure by hash (raw bytes)
1379    pub fn get_directory_listing(&self, hash: &[u8; 32]) -> Result<Option<DirectoryListing>> {
1380        let store = self.store_arc();
1381        let tree = HashTree::new(HashTreeConfig::new(store).public());
1382
1383        sync_block_on(async {
1384            // Check if it's a directory
1385            let is_dir = tree
1386                .is_directory(hash)
1387                .await
1388                .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1389
1390            if !is_dir {
1391                return Ok(None);
1392            }
1393
1394            // Get directory entries (public Cid - no encryption key)
1395            let cid = hashtree_core::Cid::public(*hash);
1396            let tree_entries = tree
1397                .list_directory(&cid)
1398                .await
1399                .map_err(|e| anyhow::anyhow!("Failed to list directory: {}", e))?;
1400
1401            let entries: Vec<DirEntry> = tree_entries
1402                .into_iter()
1403                .map(|e| DirEntry {
1404                    name: e.name,
1405                    cid: to_hex(&e.hash),
1406                    is_directory: e.link_type.is_tree(),
1407                    size: e.size,
1408                })
1409                .collect();
1410
1411            Ok(Some(DirectoryListing {
1412                dir_name: String::new(),
1413                entries,
1414            }))
1415        })
1416    }
1417
1418    /// Get directory structure by CID, supporting encrypted directories.
1419    pub fn get_directory_listing_by_cid(&self, cid: &Cid) -> Result<Option<DirectoryListing>> {
1420        let store = self.store_arc();
1421        let tree = HashTree::new(HashTreeConfig::new(store).public());
1422        let cid = cid.clone();
1423
1424        sync_block_on(async {
1425            let is_dir = tree
1426                .is_dir(&cid)
1427                .await
1428                .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1429
1430            if !is_dir {
1431                return Ok(None);
1432            }
1433
1434            let tree_entries = tree
1435                .list_directory(&cid)
1436                .await
1437                .map_err(|e| anyhow::anyhow!("Failed to list directory: {}", e))?;
1438
1439            let entries: Vec<DirEntry> = tree_entries
1440                .into_iter()
1441                .map(|e| DirEntry {
1442                    name: e.name,
1443                    cid: Cid {
1444                        hash: e.hash,
1445                        key: e.key,
1446                    }
1447                    .to_string(),
1448                    is_directory: e.link_type.is_tree(),
1449                    size: e.size,
1450                })
1451                .collect();
1452
1453            Ok(Some(DirectoryListing {
1454                dir_name: String::new(),
1455                entries,
1456            }))
1457        })
1458    }
1459
1460    /// Pin a hash (prevent garbage collection)
1461    pub fn pin(&self, hash: &[u8; 32]) -> Result<()> {
1462        let mut wtxn = self.env.write_txn()?;
1463        self.pins.put(&mut wtxn, hash.as_slice(), &())?;
1464        wtxn.commit()?;
1465        Ok(())
1466    }
1467
1468    /// Unpin a hash (allow garbage collection)
1469    pub fn unpin(&self, hash: &[u8; 32]) -> Result<()> {
1470        let mut wtxn = self.env.write_txn()?;
1471        self.pins.delete(&mut wtxn, hash.as_slice())?;
1472        wtxn.commit()?;
1473        Ok(())
1474    }
1475
1476    /// Check if hash is pinned
1477    pub fn is_pinned(&self, hash: &[u8; 32]) -> Result<bool> {
1478        let rtxn = self.env.read_txn()?;
1479        Ok(self.pins.get(&rtxn, hash.as_slice())?.is_some())
1480    }
1481
1482    /// List all pinned hashes (raw bytes)
1483    pub fn list_pins_raw(&self) -> Result<Vec<[u8; 32]>> {
1484        let rtxn = self.env.read_txn()?;
1485        let mut pins = Vec::new();
1486
1487        for item in self.pins.iter(&rtxn)? {
1488            let (hash_bytes, _) = item?;
1489            if hash_bytes.len() == 32 {
1490                let mut hash = [0u8; 32];
1491                hash.copy_from_slice(hash_bytes);
1492                pins.push(hash);
1493            }
1494        }
1495
1496        Ok(pins)
1497    }
1498
1499    /// List all pinned hashes with names
1500    pub fn list_pins_with_names(&self) -> Result<Vec<PinnedItem>> {
1501        let rtxn = self.env.read_txn()?;
1502        let store = self.store_arc();
1503        let tree = HashTree::new(HashTreeConfig::new(store).public());
1504        let mut pins = Vec::new();
1505
1506        for item in self.pins.iter(&rtxn)? {
1507            let (hash_bytes, _) = item?;
1508            if hash_bytes.len() != 32 {
1509                continue;
1510            }
1511            let mut hash = [0u8; 32];
1512            hash.copy_from_slice(hash_bytes);
1513
1514            // Try to determine if it's a directory
1515            let is_directory =
1516                sync_block_on(async { tree.is_directory(&hash).await.unwrap_or(false) });
1517
1518            pins.push(PinnedItem {
1519                cid: to_hex(&hash),
1520                name: "Unknown".to_string(),
1521                is_directory,
1522            });
1523        }
1524
1525        Ok(pins)
1526    }
1527
1528    // === Tree indexing for eviction ===
1529
1530    /// Index a tree after sync - tracks all blobs in the tree for eviction
1531    ///
1532    /// If `ref_key` is provided (e.g. "npub.../name"), it will replace any existing
1533    /// tree with that ref, allowing old versions to be evicted.
1534    pub fn index_tree(
1535        &self,
1536        root_hash: &Hash,
1537        owner: &str,
1538        name: Option<&str>,
1539        priority: u8,
1540        ref_key: Option<&str>,
1541    ) -> Result<()> {
1542        let root_hex = to_hex(root_hash);
1543
1544        // If ref_key provided, check for and unindex old version
1545        if let Some(key) = ref_key {
1546            let rtxn = self.env.read_txn()?;
1547            if let Some(old_hash_bytes) = self.tree_refs.get(&rtxn, key)? {
1548                if old_hash_bytes != root_hash.as_slice() {
1549                    let old_hash: Hash = old_hash_bytes
1550                        .try_into()
1551                        .map_err(|_| anyhow::anyhow!("Invalid hash in tree_refs"))?;
1552                    drop(rtxn);
1553                    // Unindex old tree (will delete orphaned blobs)
1554                    let _ = self.unindex_tree(&old_hash);
1555                    tracing::debug!("Replaced old tree for ref {}", key);
1556                }
1557            }
1558        }
1559
1560        let store = self.store_arc();
1561        let tree = HashTree::new(HashTreeConfig::new(store).public());
1562
1563        // Walk tree and collect all blob hashes + compute total size
1564        let (blob_hashes, total_size) =
1565            sync_block_on(async { self.collect_tree_blobs(&tree, root_hash).await })?;
1566
1567        let mut wtxn = self.env.write_txn()?;
1568
1569        // Store blob-tree relationships (64-byte key: blob_hash ++ tree_hash)
1570        for blob_hash in &blob_hashes {
1571            let mut key = [0u8; 64];
1572            key[..32].copy_from_slice(blob_hash);
1573            key[32..].copy_from_slice(root_hash);
1574            self.blob_trees.put(&mut wtxn, &key[..], &())?;
1575        }
1576
1577        // Store tree metadata
1578        let meta = TreeMeta {
1579            owner: owner.to_string(),
1580            name: name.map(|s| s.to_string()),
1581            synced_at: SystemTime::now()
1582                .duration_since(UNIX_EPOCH)
1583                .unwrap()
1584                .as_secs(),
1585            total_size,
1586            priority,
1587        };
1588        let meta_bytes = rmp_serde::to_vec(&meta)
1589            .map_err(|e| anyhow::anyhow!("Failed to serialize TreeMeta: {}", e))?;
1590        self.tree_meta
1591            .put(&mut wtxn, root_hash.as_slice(), &meta_bytes)?;
1592
1593        // Store ref -> hash mapping if ref_key provided
1594        if let Some(key) = ref_key {
1595            self.tree_refs.put(&mut wtxn, key, root_hash.as_slice())?;
1596        }
1597
1598        wtxn.commit()?;
1599
1600        tracing::debug!(
1601            "Indexed tree {} ({} blobs, {} bytes, priority {})",
1602            &root_hex[..8],
1603            blob_hashes.len(),
1604            total_size,
1605            priority
1606        );
1607
1608        Ok(())
1609    }
1610
1611    /// Collect all blob hashes in a tree and compute total size
1612    async fn collect_tree_blobs<S: Store>(
1613        &self,
1614        tree: &HashTree<S>,
1615        root: &Hash,
1616    ) -> Result<(Vec<Hash>, u64)> {
1617        let mut blobs = Vec::new();
1618        let mut total_size = 0u64;
1619        let mut stack = vec![*root];
1620
1621        while let Some(hash) = stack.pop() {
1622            // Check if it's a tree node
1623            let is_tree = tree
1624                .is_tree(&hash)
1625                .await
1626                .map_err(|e| anyhow::anyhow!("Failed to check tree: {}", e))?;
1627
1628            if is_tree {
1629                // Get tree node and add children to stack
1630                if let Some(node) = tree
1631                    .get_tree_node(&hash)
1632                    .await
1633                    .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
1634                {
1635                    for link in &node.links {
1636                        stack.push(link.hash);
1637                    }
1638                }
1639            } else {
1640                // It's a blob - get its size
1641                if let Some(data) = self
1642                    .router
1643                    .get_sync(&hash)
1644                    .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))?
1645                {
1646                    total_size += data.len() as u64;
1647                    blobs.push(hash);
1648                }
1649            }
1650        }
1651
1652        Ok((blobs, total_size))
1653    }
1654
1655    /// Unindex a tree - removes blob-tree mappings and deletes orphaned blobs
1656    /// Returns the number of bytes freed
1657    pub fn unindex_tree(&self, root_hash: &Hash) -> Result<u64> {
1658        let root_hex = to_hex(root_hash);
1659
1660        let store = self.store_arc();
1661        let tree = HashTree::new(HashTreeConfig::new(store).public());
1662
1663        // Walk tree and collect all blob hashes
1664        let (blob_hashes, _) =
1665            sync_block_on(async { self.collect_tree_blobs(&tree, root_hash).await })?;
1666
1667        let mut wtxn = self.env.write_txn()?;
1668        let mut freed = 0u64;
1669
1670        // For each blob, remove the blob-tree entry and check if orphaned
1671        for blob_hash in &blob_hashes {
1672            // Delete blob-tree entry (64-byte key: blob_hash ++ tree_hash)
1673            let mut key = [0u8; 64];
1674            key[..32].copy_from_slice(blob_hash);
1675            key[32..].copy_from_slice(root_hash);
1676            self.blob_trees.delete(&mut wtxn, &key[..])?;
1677
1678            // Check if blob is in any other tree (prefix scan on first 32 bytes)
1679            let rtxn = self.env.read_txn()?;
1680            let mut has_other_tree = false;
1681
1682            for item in self.blob_trees.prefix_iter(&rtxn, &blob_hash[..])? {
1683                if item.is_ok() {
1684                    has_other_tree = true;
1685                    break;
1686                }
1687            }
1688            drop(rtxn);
1689
1690            // If orphaned, delete the blob
1691            if !has_other_tree {
1692                if let Some(data) = self
1693                    .router
1694                    .get_sync(blob_hash)
1695                    .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))?
1696                {
1697                    freed += data.len() as u64;
1698                    // Delete locally only - keep S3 as archive
1699                    self.router
1700                        .delete_local_only(blob_hash)
1701                        .map_err(|e| anyhow::anyhow!("Failed to delete blob: {}", e))?;
1702                }
1703            }
1704        }
1705
1706        // Delete tree node itself if exists
1707        if let Some(data) = self
1708            .router
1709            .get_sync(root_hash)
1710            .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
1711        {
1712            freed += data.len() as u64;
1713            // Delete locally only - keep S3 as archive
1714            self.router
1715                .delete_local_only(root_hash)
1716                .map_err(|e| anyhow::anyhow!("Failed to delete tree node: {}", e))?;
1717        }
1718
1719        // Delete tree metadata
1720        self.tree_meta.delete(&mut wtxn, root_hash.as_slice())?;
1721
1722        wtxn.commit()?;
1723
1724        tracing::debug!("Unindexed tree {} ({} bytes freed)", &root_hex[..8], freed);
1725
1726        Ok(freed)
1727    }
1728
1729    /// Get tree metadata
1730    pub fn get_tree_meta(&self, root_hash: &Hash) -> Result<Option<TreeMeta>> {
1731        let rtxn = self.env.read_txn()?;
1732        if let Some(bytes) = self.tree_meta.get(&rtxn, root_hash.as_slice())? {
1733            let meta: TreeMeta = rmp_serde::from_slice(bytes)
1734                .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
1735            Ok(Some(meta))
1736        } else {
1737            Ok(None)
1738        }
1739    }
1740
1741    /// List all indexed trees
1742    pub fn list_indexed_trees(&self) -> Result<Vec<(Hash, TreeMeta)>> {
1743        let rtxn = self.env.read_txn()?;
1744        let mut trees = Vec::new();
1745
1746        for item in self.tree_meta.iter(&rtxn)? {
1747            let (hash_bytes, meta_bytes) = item?;
1748            let hash: Hash = hash_bytes
1749                .try_into()
1750                .map_err(|_| anyhow::anyhow!("Invalid hash in tree_meta"))?;
1751            let meta: TreeMeta = rmp_serde::from_slice(meta_bytes)
1752                .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
1753            trees.push((hash, meta));
1754        }
1755
1756        Ok(trees)
1757    }
1758
1759    /// Get total tracked storage size (sum of all tree_meta.total_size)
1760    pub fn tracked_size(&self) -> Result<u64> {
1761        let rtxn = self.env.read_txn()?;
1762        let mut total = 0u64;
1763
1764        for item in self.tree_meta.iter(&rtxn)? {
1765            let (_, bytes) = item?;
1766            let meta: TreeMeta = rmp_serde::from_slice(bytes)
1767                .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
1768            total += meta.total_size;
1769        }
1770
1771        Ok(total)
1772    }
1773
1774    /// Get evictable trees sorted by (priority ASC, synced_at ASC)
1775    fn get_evictable_trees(&self) -> Result<Vec<(Hash, TreeMeta)>> {
1776        let mut trees = self.list_indexed_trees()?;
1777
1778        // Sort by priority (lower first), then by synced_at (older first)
1779        trees.sort_by(|a, b| match a.1.priority.cmp(&b.1.priority) {
1780            std::cmp::Ordering::Equal => a.1.synced_at.cmp(&b.1.synced_at),
1781            other => other,
1782        });
1783
1784        Ok(trees)
1785    }
1786
1787    /// Run eviction if storage is over quota
1788    /// Returns bytes freed
1789    ///
1790    /// Eviction order:
1791    /// 1. Orphaned blobs (not in any indexed tree and not pinned)
1792    /// 2. Trees by priority (lowest first) and age (oldest first)
1793    pub fn evict_if_needed(&self) -> Result<u64> {
1794        // Get actual storage used
1795        let stats = self
1796            .router
1797            .stats()
1798            .map_err(|e| anyhow::anyhow!("Failed to get stats: {}", e))?;
1799        let current = stats.total_bytes;
1800
1801        if current <= self.max_size_bytes {
1802            return Ok(0);
1803        }
1804
1805        // Target 90% of max to avoid constant eviction
1806        let target = self.max_size_bytes * 90 / 100;
1807        let mut freed = 0u64;
1808        let mut current_size = current;
1809
1810        // Phase 1: Evict orphaned blobs (not in any tree and not pinned)
1811        let orphan_freed = self.evict_orphaned_blobs()?;
1812        freed += orphan_freed;
1813        current_size = current_size.saturating_sub(orphan_freed);
1814
1815        if orphan_freed > 0 {
1816            tracing::info!("Evicted orphaned blobs: {} bytes freed", orphan_freed);
1817        }
1818
1819        // Check if we're now under target
1820        if current_size <= target {
1821            if freed > 0 {
1822                tracing::info!("Eviction complete: {} bytes freed", freed);
1823            }
1824            return Ok(freed);
1825        }
1826
1827        // Phase 2: Evict trees by priority (lowest first) and age (oldest first)
1828        // Own trees CAN be evicted (just last), but PINNED trees are never evicted
1829        let evictable = self.get_evictable_trees()?;
1830
1831        for (root_hash, meta) in evictable {
1832            if current_size <= target {
1833                break;
1834            }
1835
1836            let root_hex = to_hex(&root_hash);
1837
1838            // Never evict pinned trees
1839            if self.is_pinned(&root_hash)? {
1840                continue;
1841            }
1842
1843            let tree_freed = self.unindex_tree(&root_hash)?;
1844            freed += tree_freed;
1845            current_size = current_size.saturating_sub(tree_freed);
1846
1847            tracing::info!(
1848                "Evicted tree {} (owner={}, priority={}, {} bytes)",
1849                &root_hex[..8],
1850                &meta.owner[..8.min(meta.owner.len())],
1851                meta.priority,
1852                tree_freed
1853            );
1854        }
1855
1856        if freed > 0 {
1857            tracing::info!("Eviction complete: {} bytes freed", freed);
1858        }
1859
1860        Ok(freed)
1861    }
1862
1863    /// Evict blobs that are not part of any indexed tree and not pinned
1864    fn evict_orphaned_blobs(&self) -> Result<u64> {
1865        let mut freed = 0u64;
1866
1867        // Get all blob hashes from store
1868        let all_hashes = self
1869            .router
1870            .list()
1871            .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
1872
1873        // Get pinned hashes as raw bytes
1874        let rtxn = self.env.read_txn()?;
1875        let pinned: HashSet<Hash> = self
1876            .pins
1877            .iter(&rtxn)?
1878            .filter_map(|item| item.ok())
1879            .filter_map(|(hash_bytes, _)| {
1880                if hash_bytes.len() == 32 {
1881                    let mut hash = [0u8; 32];
1882                    hash.copy_from_slice(hash_bytes);
1883                    Some(hash)
1884                } else {
1885                    None
1886                }
1887            })
1888            .collect();
1889
1890        // Collect all blob hashes that are in at least one tree
1891        // Key format is blob_hash (32 bytes) ++ tree_hash (32 bytes)
1892        let mut blobs_in_trees: HashSet<Hash> = HashSet::new();
1893        for (key_bytes, _) in self.blob_trees.iter(&rtxn)?.flatten() {
1894            if key_bytes.len() >= 32 {
1895                let blob_hash: Hash = key_bytes[..32].try_into().unwrap();
1896                blobs_in_trees.insert(blob_hash);
1897            }
1898        }
1899        drop(rtxn);
1900
1901        // Find and delete orphaned blobs
1902        for hash in all_hashes {
1903            // Skip if pinned
1904            if pinned.contains(&hash) {
1905                continue;
1906            }
1907
1908            // Skip if part of any tree
1909            if blobs_in_trees.contains(&hash) {
1910                continue;
1911            }
1912
1913            // This blob is orphaned - delete locally (keep S3 as archive)
1914            if let Ok(Some(data)) = self.router.get_sync(&hash) {
1915                freed += data.len() as u64;
1916                let _ = self.router.delete_local_only(&hash);
1917                tracing::debug!(
1918                    "Deleted orphaned blob {} ({} bytes)",
1919                    &to_hex(&hash)[..8],
1920                    data.len()
1921                );
1922            }
1923        }
1924
1925        Ok(freed)
1926    }
1927
1928    /// Get the maximum storage size in bytes
1929    pub fn max_size_bytes(&self) -> u64 {
1930        self.max_size_bytes
1931    }
1932
1933    /// Get storage usage by priority tier
1934    pub fn storage_by_priority(&self) -> Result<StorageByPriority> {
1935        let rtxn = self.env.read_txn()?;
1936        let mut own = 0u64;
1937        let mut followed = 0u64;
1938        let mut other = 0u64;
1939
1940        for item in self.tree_meta.iter(&rtxn)? {
1941            let (_, bytes) = item?;
1942            let meta: TreeMeta = rmp_serde::from_slice(bytes)
1943                .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
1944
1945            if meta.priority == PRIORITY_OWN {
1946                own += meta.total_size;
1947            } else if meta.priority >= PRIORITY_FOLLOWED {
1948                followed += meta.total_size;
1949            } else {
1950                other += meta.total_size;
1951            }
1952        }
1953
1954        Ok(StorageByPriority {
1955            own,
1956            followed,
1957            other,
1958        })
1959    }
1960
1961    /// Get storage statistics
1962    pub fn get_storage_stats(&self) -> Result<StorageStats> {
1963        let rtxn = self.env.read_txn()?;
1964        let total_pins = self.pins.len(&rtxn)? as usize;
1965
1966        let stats = self
1967            .router
1968            .stats()
1969            .map_err(|e| anyhow::anyhow!("Failed to get stats: {}", e))?;
1970
1971        Ok(StorageStats {
1972            total_dags: stats.count,
1973            pinned_dags: total_pins,
1974            total_bytes: stats.total_bytes,
1975        })
1976    }
1977
1978    // === Cached roots ===
1979
1980    /// Get cached root for a pubkey/tree_name pair
1981    pub fn get_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<Option<CachedRoot>> {
1982        let key = format!("{}/{}", pubkey_hex, tree_name);
1983        let rtxn = self.env.read_txn()?;
1984        if let Some(bytes) = self.cached_roots.get(&rtxn, &key)? {
1985            let root: CachedRoot = rmp_serde::from_slice(bytes)
1986                .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
1987            Ok(Some(root))
1988        } else {
1989            Ok(None)
1990        }
1991    }
1992
1993    /// Set cached root for a pubkey/tree_name pair
1994    pub fn set_cached_root(
1995        &self,
1996        pubkey_hex: &str,
1997        tree_name: &str,
1998        hash: &str,
1999        key: Option<&str>,
2000        visibility: &str,
2001        updated_at: u64,
2002    ) -> Result<()> {
2003        let db_key = format!("{}/{}", pubkey_hex, tree_name);
2004        let root = CachedRoot {
2005            hash: hash.to_string(),
2006            key: key.map(|k| k.to_string()),
2007            updated_at,
2008            visibility: visibility.to_string(),
2009        };
2010        let bytes = rmp_serde::to_vec(&root)
2011            .map_err(|e| anyhow::anyhow!("Failed to serialize CachedRoot: {}", e))?;
2012        let mut wtxn = self.env.write_txn()?;
2013        self.cached_roots.put(&mut wtxn, &db_key, &bytes)?;
2014        wtxn.commit()?;
2015        Ok(())
2016    }
2017
2018    /// List all cached roots for a pubkey
2019    pub fn list_cached_roots(&self, pubkey_hex: &str) -> Result<Vec<(String, CachedRoot)>> {
2020        let prefix = format!("{}/", pubkey_hex);
2021        let rtxn = self.env.read_txn()?;
2022        let mut results = Vec::new();
2023
2024        for item in self.cached_roots.iter(&rtxn)? {
2025            let (key, bytes) = item?;
2026            if key.starts_with(&prefix) {
2027                let tree_name = key.strip_prefix(&prefix).unwrap_or(key);
2028                let root: CachedRoot = rmp_serde::from_slice(bytes)
2029                    .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
2030                results.push((tree_name.to_string(), root));
2031            }
2032        }
2033
2034        Ok(results)
2035    }
2036
2037    /// Delete a cached root
2038    pub fn delete_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<bool> {
2039        let key = format!("{}/{}", pubkey_hex, tree_name);
2040        let mut wtxn = self.env.write_txn()?;
2041        let deleted = self.cached_roots.delete(&mut wtxn, &key)?;
2042        wtxn.commit()?;
2043        Ok(deleted)
2044    }
2045
2046    /// Garbage collect unpinned content
2047    pub fn gc(&self) -> Result<GcStats> {
2048        let rtxn = self.env.read_txn()?;
2049
2050        // Get all pinned hashes as raw bytes
2051        let pinned: HashSet<Hash> = self
2052            .pins
2053            .iter(&rtxn)?
2054            .filter_map(|item| item.ok())
2055            .filter_map(|(hash_bytes, _)| {
2056                if hash_bytes.len() == 32 {
2057                    let mut hash = [0u8; 32];
2058                    hash.copy_from_slice(hash_bytes);
2059                    Some(hash)
2060                } else {
2061                    None
2062                }
2063            })
2064            .collect();
2065
2066        drop(rtxn);
2067
2068        // Get all stored hashes
2069        let all_hashes = self
2070            .router
2071            .list()
2072            .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
2073
2074        // Delete unpinned hashes
2075        let mut deleted = 0;
2076        let mut freed_bytes = 0u64;
2077
2078        for hash in all_hashes {
2079            if !pinned.contains(&hash) {
2080                if let Ok(Some(data)) = self.router.get_sync(&hash) {
2081                    freed_bytes += data.len() as u64;
2082                    // Delete locally only - keep S3 as archive
2083                    let _ = self.router.delete_local_only(&hash);
2084                    deleted += 1;
2085                }
2086            }
2087        }
2088
2089        Ok(GcStats {
2090            deleted_dags: deleted,
2091            freed_bytes,
2092        })
2093    }
2094
2095    /// Verify LMDB blob integrity - checks that stored data matches its key hash
2096    /// Returns verification statistics and optionally deletes corrupted entries
2097    pub fn verify_lmdb_integrity(&self, delete: bool) -> Result<VerifyResult> {
2098        let all_hashes = self
2099            .router
2100            .list()
2101            .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
2102
2103        let total = all_hashes.len();
2104        let mut valid = 0;
2105        let mut corrupted = 0;
2106        let mut deleted = 0;
2107        let mut corrupted_hashes = Vec::new();
2108
2109        for hash in &all_hashes {
2110            let hash_hex = to_hex(hash);
2111
2112            match self.router.get_sync(hash) {
2113                Ok(Some(data)) => {
2114                    // Compute actual SHA256 of data
2115                    let actual_hash = sha256(&data);
2116
2117                    if actual_hash == *hash {
2118                        valid += 1;
2119                    } else {
2120                        corrupted += 1;
2121                        let actual_hex = to_hex(&actual_hash);
2122                        println!(
2123                            "  CORRUPTED: key={} actual={} size={}",
2124                            &hash_hex[..16],
2125                            &actual_hex[..16],
2126                            data.len()
2127                        );
2128                        corrupted_hashes.push(*hash);
2129                    }
2130                }
2131                Ok(None) => {
2132                    // Hash exists in index but data is missing
2133                    corrupted += 1;
2134                    println!("  MISSING: key={}", &hash_hex[..16]);
2135                    corrupted_hashes.push(*hash);
2136                }
2137                Err(e) => {
2138                    corrupted += 1;
2139                    println!("  ERROR: key={} err={}", &hash_hex[..16], e);
2140                    corrupted_hashes.push(*hash);
2141                }
2142            }
2143        }
2144
2145        // Delete corrupted entries if requested
2146        if delete {
2147            for hash in &corrupted_hashes {
2148                match self.router.delete_sync(hash) {
2149                    Ok(true) => deleted += 1,
2150                    Ok(false) => {} // Already deleted
2151                    Err(e) => {
2152                        let hash_hex = to_hex(hash);
2153                        println!("  Failed to delete {}: {}", &hash_hex[..16], e);
2154                    }
2155                }
2156            }
2157        }
2158
2159        Ok(VerifyResult {
2160            total,
2161            valid,
2162            corrupted,
2163            deleted,
2164        })
2165    }
2166
2167    /// Verify R2/S3 blob integrity - lists all objects and verifies hash matches filename
2168    /// Returns verification statistics and optionally deletes corrupted entries
2169    #[cfg(feature = "s3")]
2170    pub async fn verify_r2_integrity(&self, delete: bool) -> Result<VerifyResult> {
2171        use aws_sdk_s3::Client as S3Client;
2172
2173        // Get S3 client from router (we need to access it directly)
2174        // For now, we'll create a new client from config
2175        let config = crate::config::Config::load()?;
2176        let s3_config = config
2177            .storage
2178            .s3
2179            .ok_or_else(|| anyhow::anyhow!("S3 not configured"))?;
2180
2181        // Build AWS config
2182        let aws_config = aws_config::from_env()
2183            .region(aws_sdk_s3::config::Region::new(s3_config.region.clone()))
2184            .load()
2185            .await;
2186
2187        let s3_client = S3Client::from_conf(
2188            aws_sdk_s3::config::Builder::from(&aws_config)
2189                .endpoint_url(&s3_config.endpoint)
2190                .force_path_style(true)
2191                .build(),
2192        );
2193
2194        let bucket = &s3_config.bucket;
2195        let prefix = s3_config.prefix.as_deref().unwrap_or("");
2196
2197        let mut total = 0;
2198        let mut valid = 0;
2199        let mut corrupted = 0;
2200        let mut deleted = 0;
2201        let mut corrupted_keys = Vec::new();
2202
2203        // List all objects in bucket
2204        let mut continuation_token: Option<String> = None;
2205
2206        loop {
2207            let mut list_req = s3_client.list_objects_v2().bucket(bucket).prefix(prefix);
2208
2209            if let Some(ref token) = continuation_token {
2210                list_req = list_req.continuation_token(token);
2211            }
2212
2213            let list_resp = list_req
2214                .send()
2215                .await
2216                .map_err(|e| anyhow::anyhow!("Failed to list S3 objects: {}", e))?;
2217
2218            for object in list_resp.contents() {
2219                let key = object.key().unwrap_or("");
2220
2221                // Skip non-.bin files
2222                if !key.ends_with(".bin") {
2223                    continue;
2224                }
2225
2226                total += 1;
2227
2228                // Extract expected hash from filename (remove prefix and .bin)
2229                let filename = key.strip_prefix(prefix).unwrap_or(key);
2230                let expected_hash_hex = filename.strip_suffix(".bin").unwrap_or(filename);
2231
2232                // Validate it's a valid hex hash
2233                if expected_hash_hex.len() != 64 {
2234                    corrupted += 1;
2235                    println!("  INVALID KEY: {}", key);
2236                    corrupted_keys.push(key.to_string());
2237                    continue;
2238                }
2239
2240                let expected_hash = match from_hex(expected_hash_hex) {
2241                    Ok(h) => h,
2242                    Err(_) => {
2243                        corrupted += 1;
2244                        println!("  INVALID HEX: {}", key);
2245                        corrupted_keys.push(key.to_string());
2246                        continue;
2247                    }
2248                };
2249
2250                // Download and verify content
2251                match s3_client.get_object().bucket(bucket).key(key).send().await {
2252                    Ok(resp) => match resp.body.collect().await {
2253                        Ok(bytes) => {
2254                            let data = bytes.into_bytes();
2255                            let actual_hash = sha256(&data);
2256
2257                            if actual_hash == expected_hash {
2258                                valid += 1;
2259                            } else {
2260                                corrupted += 1;
2261                                let actual_hex = to_hex(&actual_hash);
2262                                println!(
2263                                    "  CORRUPTED: key={} actual={} size={}",
2264                                    &expected_hash_hex[..16],
2265                                    &actual_hex[..16],
2266                                    data.len()
2267                                );
2268                                corrupted_keys.push(key.to_string());
2269                            }
2270                        }
2271                        Err(e) => {
2272                            corrupted += 1;
2273                            println!("  READ ERROR: {} - {}", key, e);
2274                            corrupted_keys.push(key.to_string());
2275                        }
2276                    },
2277                    Err(e) => {
2278                        corrupted += 1;
2279                        println!("  FETCH ERROR: {} - {}", key, e);
2280                        corrupted_keys.push(key.to_string());
2281                    }
2282                }
2283
2284                // Progress indicator every 100 objects
2285                if total % 100 == 0 {
2286                    println!(
2287                        "  Progress: {} objects checked, {} corrupted so far",
2288                        total, corrupted
2289                    );
2290                }
2291            }
2292
2293            // Check if there are more objects
2294            if list_resp.is_truncated() == Some(true) {
2295                continuation_token = list_resp.next_continuation_token().map(|s| s.to_string());
2296            } else {
2297                break;
2298            }
2299        }
2300
2301        // Delete corrupted entries if requested
2302        if delete {
2303            for key in &corrupted_keys {
2304                match s3_client
2305                    .delete_object()
2306                    .bucket(bucket)
2307                    .key(key)
2308                    .send()
2309                    .await
2310                {
2311                    Ok(_) => deleted += 1,
2312                    Err(e) => {
2313                        println!("  Failed to delete {}: {}", key, e);
2314                    }
2315                }
2316            }
2317        }
2318
2319        Ok(VerifyResult {
2320            total,
2321            valid,
2322            corrupted,
2323            deleted,
2324        })
2325    }
2326
2327    /// Fallback for non-S3 builds
2328    #[cfg(not(feature = "s3"))]
2329    pub async fn verify_r2_integrity(&self, _delete: bool) -> Result<VerifyResult> {
2330        Err(anyhow::anyhow!("S3 feature not enabled"))
2331    }
2332}
2333
2334/// Result of blob integrity verification
2335#[derive(Debug, Clone)]
2336pub struct VerifyResult {
2337    pub total: usize,
2338    pub valid: usize,
2339    pub corrupted: usize,
2340    pub deleted: usize,
2341}
2342
2343#[derive(Debug)]
2344pub struct StorageStats {
2345    pub total_dags: usize,
2346    pub pinned_dags: usize,
2347    pub total_bytes: u64,
2348}
2349
2350/// Storage usage broken down by priority tier
2351#[derive(Debug, Clone)]
2352pub struct StorageByPriority {
2353    /// Own/pinned trees (priority 255)
2354    pub own: u64,
2355    /// Followed users' trees (priority 128)
2356    pub followed: u64,
2357    /// Other trees (priority 64)
2358    pub other: u64,
2359}
2360
2361#[derive(Debug, Clone)]
2362pub struct FileChunkMetadata {
2363    pub total_size: u64,
2364    pub chunk_hashes: Vec<Hash>,
2365    pub chunk_sizes: Vec<u64>,
2366    pub is_chunked: bool,
2367}
2368
2369/// Owned iterator for async streaming
2370pub struct FileRangeChunksOwned {
2371    store: Arc<HashtreeStore>,
2372    metadata: FileChunkMetadata,
2373    start: u64,
2374    end: u64,
2375    current_chunk_idx: usize,
2376    current_offset: u64,
2377}
2378
2379impl Iterator for FileRangeChunksOwned {
2380    type Item = Result<Vec<u8>>;
2381
2382    fn next(&mut self) -> Option<Self::Item> {
2383        if !self.metadata.is_chunked || self.current_chunk_idx >= self.metadata.chunk_hashes.len() {
2384            return None;
2385        }
2386
2387        if self.current_offset > self.end {
2388            return None;
2389        }
2390
2391        let chunk_hash = &self.metadata.chunk_hashes[self.current_chunk_idx];
2392        let chunk_size = self.metadata.chunk_sizes[self.current_chunk_idx];
2393        let chunk_end = self.current_offset + chunk_size - 1;
2394
2395        self.current_chunk_idx += 1;
2396
2397        if chunk_end < self.start || self.current_offset > self.end {
2398            self.current_offset += chunk_size;
2399            return self.next();
2400        }
2401
2402        let chunk_content = match self.store.get_chunk(chunk_hash) {
2403            Ok(Some(content)) => content,
2404            Ok(None) => {
2405                return Some(Err(anyhow::anyhow!(
2406                    "Chunk {} not found",
2407                    to_hex(chunk_hash)
2408                )));
2409            }
2410            Err(e) => {
2411                return Some(Err(e));
2412            }
2413        };
2414
2415        let chunk_read_start = if self.current_offset >= self.start {
2416            0
2417        } else {
2418            (self.start - self.current_offset) as usize
2419        };
2420
2421        let chunk_read_end = if chunk_end <= self.end {
2422            chunk_size as usize - 1
2423        } else {
2424            (self.end - self.current_offset) as usize
2425        };
2426
2427        let result = chunk_content[chunk_read_start..=chunk_read_end].to_vec();
2428        self.current_offset += chunk_size;
2429
2430        Some(Ok(result))
2431    }
2432}
2433
2434#[derive(Debug)]
2435pub struct GcStats {
2436    pub deleted_dags: usize,
2437    pub freed_bytes: u64,
2438}
2439
2440#[derive(Debug, Clone)]
2441pub struct DirEntry {
2442    pub name: String,
2443    pub cid: String,
2444    pub is_directory: bool,
2445    pub size: u64,
2446}
2447
2448#[derive(Debug, Clone)]
2449pub struct DirectoryListing {
2450    pub dir_name: String,
2451    pub entries: Vec<DirEntry>,
2452}
2453
2454#[derive(Debug, Clone)]
2455pub struct PinnedItem {
2456    pub cid: String,
2457    pub name: String,
2458    pub is_directory: bool,
2459}
2460
2461/// Blob metadata for Blossom protocol
2462#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
2463pub struct BlobMetadata {
2464    pub sha256: String,
2465    pub size: u64,
2466    pub mime_type: String,
2467    pub uploaded: u64,
2468}
2469
2470// Implement ContentStore trait for WebRTC data exchange
2471impl crate::webrtc::ContentStore for HashtreeStore {
2472    fn get(&self, hash_hex: &str) -> Result<Option<Vec<u8>>> {
2473        let hash = from_hex(hash_hex).map_err(|e| anyhow::anyhow!("Invalid hash: {}", e))?;
2474        self.get_chunk(&hash)
2475    }
2476}