Skip to main content

hashtree_cli/
storage.rs

1use anyhow::{Context, Result};
2use async_trait::async_trait;
3use heed::{Database, EnvOpenOptions};
4use heed::types::*;
5use hashtree_fs::FsBlobStore;
6#[cfg(feature = "lmdb")]
7use hashtree_lmdb::LmdbBlobStore;
8use hashtree_core::{
9    HashTree, HashTreeConfig, Cid,
10    sha256, to_hex, from_hex, TreeNode, DirEntry as HashTreeDirEntry,
11    types::Hash,
12};
13use hashtree_core::store::{Store, StoreError};
14use hashtree_config::StorageBackend;
15use serde::{Deserialize, Serialize};
16use std::path::Path;
17use std::collections::HashSet;
18use std::io::Read;
19use std::sync::Arc;
20use std::time::{SystemTime, UNIX_EPOCH};
21use futures::executor::block_on as sync_block_on;
22
23/// Priority levels for tree eviction
24pub const PRIORITY_OTHER: u8 = 64;
25pub const PRIORITY_FOLLOWED: u8 = 128;
26pub const PRIORITY_OWN: u8 = 255;
27
28/// Metadata for a synced tree (for eviction tracking)
29#[derive(Debug, Clone, Serialize, Deserialize)]
30pub struct TreeMeta {
31    /// Pubkey of tree owner
32    pub owner: String,
33    /// Tree name if known (from nostr key like "npub.../name")
34    pub name: Option<String>,
35    /// Unix timestamp when this tree was synced
36    pub synced_at: u64,
37    /// Total size of all blobs in this tree
38    pub total_size: u64,
39    /// Eviction priority: 255=own/pinned, 128=followed, 64=other
40    pub priority: u8,
41}
42
43/// Cached root info from Nostr events (replaces nostrdb caching)
44#[derive(Debug, Clone, Serialize, Deserialize)]
45pub struct CachedRoot {
46    /// Root hash (hex)
47    pub hash: String,
48    /// Optional decryption key (hex)
49    pub key: Option<String>,
50    /// Unix timestamp when this was cached (from event created_at)
51    pub updated_at: u64,
52    /// Visibility: "public", "link-visible", or "private"
53    pub visibility: String,
54}
55
56/// Storage statistics
57#[derive(Debug, Clone)]
58pub struct LocalStoreStats {
59    pub count: usize,
60    pub total_bytes: u64,
61}
62
63/// Local blob store - wraps either FsBlobStore or LmdbBlobStore
64pub enum LocalStore {
65    Fs(FsBlobStore),
66    #[cfg(feature = "lmdb")]
67    Lmdb(LmdbBlobStore),
68}
69
70impl LocalStore {
71    /// Create a new local store based on config
72    pub fn new<P: AsRef<Path>>(path: P, backend: &StorageBackend) -> Result<Self, StoreError> {
73        match backend {
74            StorageBackend::Fs => {
75                Ok(LocalStore::Fs(FsBlobStore::new(path)?))
76            }
77            #[cfg(feature = "lmdb")]
78            StorageBackend::Lmdb => {
79                Ok(LocalStore::Lmdb(LmdbBlobStore::new(path)?))
80            }
81            #[cfg(not(feature = "lmdb"))]
82            StorageBackend::Lmdb => {
83                tracing::warn!("LMDB backend requested but lmdb feature not enabled, using filesystem storage");
84                Ok(LocalStore::Fs(FsBlobStore::new(path)?))
85            }
86        }
87    }
88
89    /// Sync put operation
90    pub fn put_sync(&self, hash: Hash, data: &[u8]) -> Result<bool, StoreError> {
91        match self {
92            LocalStore::Fs(store) => store.put_sync(hash, data),
93            #[cfg(feature = "lmdb")]
94            LocalStore::Lmdb(store) => store.put_sync(hash, data),
95        }
96    }
97
98    /// Sync get operation
99    pub fn get_sync(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
100        match self {
101            LocalStore::Fs(store) => store.get_sync(hash),
102            #[cfg(feature = "lmdb")]
103            LocalStore::Lmdb(store) => store.get_sync(hash),
104        }
105    }
106
107    /// Check if hash exists
108    pub fn exists(&self, hash: &Hash) -> Result<bool, StoreError> {
109        match self {
110            LocalStore::Fs(store) => Ok(store.exists(hash)),
111            #[cfg(feature = "lmdb")]
112            LocalStore::Lmdb(store) => store.exists(hash),
113        }
114    }
115
116    /// Sync delete operation
117    pub fn delete_sync(&self, hash: &Hash) -> Result<bool, StoreError> {
118        match self {
119            LocalStore::Fs(store) => store.delete_sync(hash),
120            #[cfg(feature = "lmdb")]
121            LocalStore::Lmdb(store) => store.delete_sync(hash),
122        }
123    }
124
125    /// Get storage statistics
126    pub fn stats(&self) -> Result<LocalStoreStats, StoreError> {
127        match self {
128            LocalStore::Fs(store) => {
129                let stats = store.stats()?;
130                Ok(LocalStoreStats {
131                    count: stats.count,
132                    total_bytes: stats.total_bytes,
133                })
134            }
135            #[cfg(feature = "lmdb")]
136            LocalStore::Lmdb(store) => {
137                let stats = store.stats()?;
138                Ok(LocalStoreStats {
139                    count: stats.count,
140                    total_bytes: stats.total_bytes,
141                })
142            }
143        }
144    }
145
146    /// List all hashes in the store
147    pub fn list(&self) -> Result<Vec<Hash>, StoreError> {
148        match self {
149            LocalStore::Fs(store) => store.list(),
150            #[cfg(feature = "lmdb")]
151            LocalStore::Lmdb(store) => store.list(),
152        }
153    }
154}
155
156#[async_trait]
157impl Store for LocalStore {
158    async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
159        self.put_sync(hash, &data)
160    }
161
162    async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
163        self.get_sync(hash)
164    }
165
166    async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
167        self.exists(hash)
168    }
169
170    async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
171        self.delete_sync(hash)
172    }
173}
174
175#[cfg(feature = "s3")]
176use tokio::sync::mpsc;
177
178use crate::config::S3Config;
179
180/// Message for background S3 sync
181#[cfg(feature = "s3")]
182enum S3SyncMessage {
183    Upload { hash: Hash, data: Vec<u8> },
184    Delete { hash: Hash },
185}
186
187/// Storage router - local store primary with optional S3 backup
188///
189/// Write path: local first (fast), then queue S3 upload (non-blocking)
190/// Read path: local first, fall back to S3 if miss
191pub struct StorageRouter {
192    /// Primary local store (always used)
193    local: Arc<LocalStore>,
194    /// Optional S3 client for backup
195    #[cfg(feature = "s3")]
196    s3_client: Option<aws_sdk_s3::Client>,
197    #[cfg(feature = "s3")]
198    s3_bucket: Option<String>,
199    #[cfg(feature = "s3")]
200    s3_prefix: String,
201    /// Channel to send uploads to background task
202    #[cfg(feature = "s3")]
203    sync_tx: Option<mpsc::UnboundedSender<S3SyncMessage>>,
204}
205
206impl StorageRouter {
207    /// Create router with local storage only
208    pub fn new(local: Arc<LocalStore>) -> Self {
209        Self {
210            local,
211            #[cfg(feature = "s3")]
212            s3_client: None,
213            #[cfg(feature = "s3")]
214            s3_bucket: None,
215            #[cfg(feature = "s3")]
216            s3_prefix: String::new(),
217            #[cfg(feature = "s3")]
218            sync_tx: None,
219        }
220    }
221
222    /// Create router with local storage + S3 backup
223    #[cfg(feature = "s3")]
224    pub async fn with_s3(local: Arc<LocalStore>, config: &S3Config) -> Result<Self, anyhow::Error> {
225        use aws_sdk_s3::Client as S3Client;
226
227        // Build AWS config
228        let mut aws_config_loader = aws_config::from_env();
229        aws_config_loader = aws_config_loader.region(aws_sdk_s3::config::Region::new(config.region.clone()));
230        let aws_config = aws_config_loader.load().await;
231
232        // Build S3 client with custom endpoint
233        let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&aws_config);
234        s3_config_builder = s3_config_builder
235            .endpoint_url(&config.endpoint)
236            .force_path_style(true);
237
238        let s3_client = S3Client::from_conf(s3_config_builder.build());
239        let bucket = config.bucket.clone();
240        let prefix = config.prefix.clone().unwrap_or_default();
241
242        // Create background sync channel
243        let (sync_tx, mut sync_rx) = mpsc::unbounded_channel::<S3SyncMessage>();
244
245        // Spawn background sync task with bounded concurrent uploads
246        let sync_client = s3_client.clone();
247        let sync_bucket = bucket.clone();
248        let sync_prefix = prefix.clone();
249
250        tokio::spawn(async move {
251            use aws_sdk_s3::primitives::ByteStream;
252
253            tracing::info!("S3 background sync task started");
254
255            // Limit concurrent uploads to prevent overwhelming the runtime
256            let semaphore = std::sync::Arc::new(tokio::sync::Semaphore::new(32));
257            let client = std::sync::Arc::new(sync_client);
258            let bucket = std::sync::Arc::new(sync_bucket);
259            let prefix = std::sync::Arc::new(sync_prefix);
260
261            while let Some(msg) = sync_rx.recv().await {
262                let client = client.clone();
263                let bucket = bucket.clone();
264                let prefix = prefix.clone();
265                let semaphore = semaphore.clone();
266
267                // Spawn each upload with semaphore-bounded concurrency
268                tokio::spawn(async move {
269                    // Acquire permit before uploading
270                    let _permit = semaphore.acquire().await;
271
272                    match msg {
273                        S3SyncMessage::Upload { hash, data } => {
274                            let key = format!("{}{}.bin", prefix, to_hex(&hash));
275                            tracing::debug!("S3 uploading {} ({} bytes)", &key, data.len());
276
277                            match client
278                                .put_object()
279                                .bucket(bucket.as_str())
280                                .key(&key)
281                                .body(ByteStream::from(data))
282                                .send()
283                                .await
284                            {
285                                Ok(_) => tracing::debug!("S3 upload succeeded: {}", &key),
286                                Err(e) => tracing::error!("S3 upload failed {}: {}", &key, e),
287                            }
288                        }
289                        S3SyncMessage::Delete { hash } => {
290                            let key = format!("{}{}.bin", prefix, to_hex(&hash));
291                            tracing::debug!("S3 deleting {}", &key);
292
293                            if let Err(e) = client
294                                .delete_object()
295                                .bucket(bucket.as_str())
296                                .key(&key)
297                                .send()
298                                .await
299                            {
300                                tracing::error!("S3 delete failed {}: {}", &key, e);
301                            }
302                        }
303                    }
304                });
305            }
306        });
307
308        tracing::info!("S3 storage initialized: bucket={}, prefix={}", bucket, prefix);
309
310        Ok(Self {
311            local,
312            s3_client: Some(s3_client),
313            s3_bucket: Some(bucket),
314            s3_prefix: prefix,
315            sync_tx: Some(sync_tx),
316        })
317    }
318
319    /// Store data - writes to LMDB, queues S3 upload in background
320    pub fn put_sync(&self, hash: Hash, data: &[u8]) -> Result<bool, StoreError> {
321        // Always write to local first
322        let is_new = self.local.put_sync(hash, data)?;
323
324        // Queue S3 upload if configured (non-blocking)
325        // Always upload to S3 (even if not new locally) to ensure S3 has all blobs
326        #[cfg(feature = "s3")]
327        if let Some(ref tx) = self.sync_tx {
328            tracing::info!("Queueing S3 upload for {} ({} bytes, is_new={})",
329                crate::storage::to_hex(&hash)[..16].to_string(), data.len(), is_new);
330            if let Err(e) = tx.send(S3SyncMessage::Upload { hash, data: data.to_vec() }) {
331                tracing::error!("Failed to queue S3 upload: {}", e);
332            }
333        }
334
335        Ok(is_new)
336    }
337
338    /// Get data - tries LMDB first, falls back to S3
339    pub fn get_sync(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
340        // Try local first
341        if let Some(data) = self.local.get_sync(hash)? {
342            return Ok(Some(data));
343        }
344
345        // Fall back to S3 if configured
346        #[cfg(feature = "s3")]
347        if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
348            let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
349
350            match sync_block_on(async {
351                client.get_object()
352                    .bucket(bucket)
353                    .key(&key)
354                    .send()
355                    .await
356            }) {
357                Ok(output) => {
358                    if let Ok(body) = sync_block_on(output.body.collect()) {
359                        let data = body.into_bytes().to_vec();
360                        // Cache locally for future reads
361                        let _ = self.local.put_sync(*hash, &data);
362                        return Ok(Some(data));
363                    }
364                }
365                Err(e) => {
366                    let service_err = e.into_service_error();
367                    if !service_err.is_no_such_key() {
368                        tracing::warn!("S3 get failed: {}", service_err);
369                    }
370                }
371            }
372        }
373
374        Ok(None)
375    }
376
377    /// Check if hash exists
378    pub fn exists(&self, hash: &Hash) -> Result<bool, StoreError> {
379        // Check local first
380        if self.local.exists(hash)? {
381            return Ok(true);
382        }
383
384        // Check S3 if configured
385        #[cfg(feature = "s3")]
386        if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
387            let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
388
389            match sync_block_on(async {
390                client.head_object()
391                    .bucket(bucket)
392                    .key(&key)
393                    .send()
394                    .await
395            }) {
396                Ok(_) => return Ok(true),
397                Err(e) => {
398                    let service_err = e.into_service_error();
399                    if !service_err.is_not_found() {
400                        tracing::warn!("S3 head failed: {}", service_err);
401                    }
402                }
403            }
404        }
405
406        Ok(false)
407    }
408
409    /// Delete data from both local and S3 stores
410    pub fn delete_sync(&self, hash: &Hash) -> Result<bool, StoreError> {
411        let deleted = self.local.delete_sync(hash)?;
412
413        // Queue S3 delete if configured
414        #[cfg(feature = "s3")]
415        if let Some(ref tx) = self.sync_tx {
416            let _ = tx.send(S3SyncMessage::Delete { hash: *hash });
417        }
418
419        Ok(deleted)
420    }
421
422    /// Delete data from local store only (don't propagate to S3)
423    /// Used for eviction where we want to keep S3 as archive
424    pub fn delete_local_only(&self, hash: &Hash) -> Result<bool, StoreError> {
425        self.local.delete_sync(hash)
426    }
427
428    /// Get stats from local store
429    pub fn stats(&self) -> Result<LocalStoreStats, StoreError> {
430        self.local.stats()
431    }
432
433    /// List all hashes from local store
434    pub fn list(&self) -> Result<Vec<Hash>, StoreError> {
435        self.local.list()
436    }
437
438    /// Get the underlying local store for HashTree operations
439    pub fn local_store(&self) -> Arc<LocalStore> {
440        Arc::clone(&self.local)
441    }
442}
443
444// Implement async Store trait for StorageRouter so it can be used directly with HashTree
445// This ensures all writes go through S3 sync
446#[async_trait]
447impl Store for StorageRouter {
448    async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
449        self.put_sync(hash, &data)
450    }
451
452    async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
453        self.get_sync(hash)
454    }
455
456    async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
457        self.exists(hash)
458    }
459
460    async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
461        self.delete_sync(hash)
462    }
463}
464
465pub struct HashtreeStore {
466    env: heed::Env,
467    /// Set of pinned hashes (32-byte raw hashes, prevents garbage collection)
468    pins: Database<Bytes, Unit>,
469    /// Blob ownership: sha256 (32 bytes) ++ pubkey (32 bytes) -> () (composite key for multi-owner)
470    blob_owners: Database<Bytes, Unit>,
471    /// Maps pubkey (32 bytes) -> blob metadata JSON (for blossom list)
472    pubkey_blobs: Database<Bytes, Bytes>,
473    /// Tree metadata for eviction: tree_root_hash (32 bytes) -> TreeMeta (msgpack)
474    tree_meta: Database<Bytes, Bytes>,
475    /// Blob-to-tree mapping: blob_hash ++ tree_hash (64 bytes) -> ()
476    blob_trees: Database<Bytes, Unit>,
477    /// Tree refs: "npub/path" -> tree_root_hash (32 bytes) - for replacing old versions
478    tree_refs: Database<Str, Bytes>,
479    /// Cached roots from Nostr: "pubkey_hex/tree_name" -> CachedRoot (msgpack)
480    cached_roots: Database<Str, Bytes>,
481    /// Storage router - handles LMDB + optional S3 (Arc for sharing with HashTree)
482    router: Arc<StorageRouter>,
483    /// Maximum storage size in bytes (from config)
484    max_size_bytes: u64,
485}
486
487impl HashtreeStore {
488    /// Create a new store with local LMDB storage only (10GB default limit)
489    pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
490        Self::with_options(path, None, 10 * 1024 * 1024 * 1024)
491    }
492
493    /// Create a new store with optional S3 backend (10GB default limit)
494    pub fn with_s3<P: AsRef<Path>>(path: P, s3_config: Option<&S3Config>) -> Result<Self> {
495        Self::with_options(path, s3_config, 10 * 1024 * 1024 * 1024)
496    }
497
498    /// Create a new store with optional S3 backend and custom size limit
499    pub fn with_options<P: AsRef<Path>>(path: P, s3_config: Option<&S3Config>, max_size_bytes: u64) -> Result<Self> {
500        let path = path.as_ref();
501        std::fs::create_dir_all(path)?;
502
503        let env = unsafe {
504            EnvOpenOptions::new()
505                .map_size(10 * 1024 * 1024 * 1024) // 10GB virtual address space
506                .max_dbs(8)  // pins, blob_owners, pubkey_blobs, tree_meta, blob_trees, tree_refs, cached_roots, blobs
507                .open(path)?
508        };
509
510        let mut wtxn = env.write_txn()?;
511        let pins = env.create_database(&mut wtxn, Some("pins"))?;
512        let blob_owners = env.create_database(&mut wtxn, Some("blob_owners"))?;
513        let pubkey_blobs = env.create_database(&mut wtxn, Some("pubkey_blobs"))?;
514        let tree_meta = env.create_database(&mut wtxn, Some("tree_meta"))?;
515        let blob_trees = env.create_database(&mut wtxn, Some("blob_trees"))?;
516        let tree_refs = env.create_database(&mut wtxn, Some("tree_refs"))?;
517        let cached_roots = env.create_database(&mut wtxn, Some("cached_roots"))?;
518        wtxn.commit()?;
519
520        // Get storage backend from config
521        let config = hashtree_config::Config::load_or_default();
522        let backend = &config.storage.backend;
523
524        // Create local blob store based on configured backend
525        let local_store = Arc::new(LocalStore::new(path.join("blobs"), backend)
526            .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?);
527
528        // Create storage router with optional S3
529        #[cfg(feature = "s3")]
530        let router = Arc::new(if let Some(s3_cfg) = s3_config {
531            tracing::info!("Initializing S3 storage backend: bucket={}, endpoint={}",
532                s3_cfg.bucket, s3_cfg.endpoint);
533
534            sync_block_on(async {
535                StorageRouter::with_s3(local_store, s3_cfg).await
536            })?
537        } else {
538            StorageRouter::new(local_store)
539        });
540
541        #[cfg(not(feature = "s3"))]
542        let router = Arc::new({
543            if s3_config.is_some() {
544                tracing::warn!("S3 config provided but S3 feature not enabled. Using local storage only.");
545            }
546            StorageRouter::new(local_store)
547        });
548
549        Ok(Self {
550            env,
551            pins,
552            blob_owners,
553            pubkey_blobs,
554            tree_meta,
555            blob_trees,
556            tree_refs,
557            cached_roots,
558            router,
559            max_size_bytes,
560        })
561    }
562
563    /// Get the storage router
564    pub fn router(&self) -> &StorageRouter {
565        &self.router
566    }
567
568    /// Get the storage router as Arc (for use with HashTree which needs Arc<dyn Store>)
569    /// All writes through this go to both LMDB and S3
570    pub fn store_arc(&self) -> Arc<StorageRouter> {
571        Arc::clone(&self.router)
572    }
573
574    /// Upload a file and return its CID (public/unencrypted), with auto-pin
575    pub fn upload_file<P: AsRef<Path>>(&self, file_path: P) -> Result<String> {
576        self.upload_file_internal(file_path, true)
577    }
578
579    /// Upload a file without pinning (for blossom uploads that can be evicted)
580    pub fn upload_file_no_pin<P: AsRef<Path>>(&self, file_path: P) -> Result<String> {
581        self.upload_file_internal(file_path, false)
582    }
583
584    fn upload_file_internal<P: AsRef<Path>>(&self, file_path: P, pin: bool) -> Result<String> {
585        let file_path = file_path.as_ref();
586        let file_content = std::fs::read(file_path)?;
587
588        // Use hashtree to store the file (public mode - no encryption)
589        let store = self.store_arc();
590        let tree = HashTree::new(HashTreeConfig::new(store).public());
591
592        let (cid, _size) = sync_block_on(async {
593            tree.put(&file_content).await
594        }).context("Failed to store file")?;
595
596        // Only pin if requested (htree add = pin, blossom upload = no pin)
597        if pin {
598            let mut wtxn = self.env.write_txn()?;
599            self.pins.put(&mut wtxn, cid.hash.as_slice(), &())?;
600            wtxn.commit()?;
601        }
602
603        Ok(to_hex(&cid.hash))
604    }
605
606    /// Upload a file from a stream with progress callbacks
607    pub fn upload_file_stream<R: Read, F>(
608        &self,
609        mut reader: R,
610        _file_name: impl Into<String>,
611        mut callback: F,
612    ) -> Result<String>
613    where
614        F: FnMut(&str),
615    {
616        let mut data = Vec::new();
617        reader.read_to_end(&mut data)?;
618
619        // Use HashTree.put for upload (public mode)
620        let store = self.store_arc();
621        let tree = HashTree::new(HashTreeConfig::new(store).public());
622
623        let (cid, _size) = sync_block_on(async {
624            tree.put(&data).await
625        }).context("Failed to store file")?;
626
627        let root_hex = to_hex(&cid.hash);
628        callback(&root_hex);
629
630        // Auto-pin on upload
631        let mut wtxn = self.env.write_txn()?;
632        self.pins.put(&mut wtxn, cid.hash.as_slice(), &())?;
633        wtxn.commit()?;
634
635        Ok(root_hex)
636    }
637
638    /// Upload a directory and return its root hash (hex)
639    /// Respects .gitignore by default
640    pub fn upload_dir<P: AsRef<Path>>(&self, dir_path: P) -> Result<String> {
641        self.upload_dir_with_options(dir_path, true)
642    }
643
644    /// Upload a directory with options (public mode - no encryption)
645    pub fn upload_dir_with_options<P: AsRef<Path>>(&self, dir_path: P, respect_gitignore: bool) -> Result<String> {
646        let dir_path = dir_path.as_ref();
647
648        let store = self.store_arc();
649        let tree = HashTree::new(HashTreeConfig::new(store).public());
650
651        let root_cid = sync_block_on(async {
652            self.upload_dir_recursive(&tree, dir_path, dir_path, respect_gitignore).await
653        }).context("Failed to upload directory")?;
654
655        let root_hex = to_hex(&root_cid.hash);
656
657        let mut wtxn = self.env.write_txn()?;
658        self.pins.put(&mut wtxn, root_cid.hash.as_slice(), &())?;
659        wtxn.commit()?;
660
661        Ok(root_hex)
662    }
663
664    async fn upload_dir_recursive<S: Store>(
665        &self,
666        tree: &HashTree<S>,
667        _root_path: &Path,
668        current_path: &Path,
669        respect_gitignore: bool,
670    ) -> Result<Cid> {
671        use ignore::WalkBuilder;
672        use std::collections::HashMap;
673
674        // Build directory structure from flat file list - store full Cid with key
675        let mut dir_contents: HashMap<String, Vec<(String, Cid)>> = HashMap::new();
676        dir_contents.insert(String::new(), Vec::new()); // Root
677
678        let walker = WalkBuilder::new(current_path)
679            .git_ignore(respect_gitignore)
680            .git_global(respect_gitignore)
681            .git_exclude(respect_gitignore)
682            .hidden(false)
683            .build();
684
685        for result in walker {
686            let entry = result?;
687            let path = entry.path();
688
689            // Skip the root directory itself
690            if path == current_path {
691                continue;
692            }
693
694            let relative = path.strip_prefix(current_path)
695                .unwrap_or(path);
696
697            if path.is_file() {
698                let content = std::fs::read(path)?;
699                let (cid, _size) = tree.put(&content).await
700                    .map_err(|e| anyhow::anyhow!("Failed to upload file {}: {}", path.display(), e))?;
701
702                // Get parent directory path and file name
703                let parent = relative.parent()
704                    .map(|p| p.to_string_lossy().to_string())
705                    .unwrap_or_default();
706                let name = relative.file_name()
707                    .map(|n| n.to_string_lossy().to_string())
708                    .unwrap_or_default();
709
710                dir_contents.entry(parent).or_default().push((name, cid));
711            } else if path.is_dir() {
712                // Ensure directory entry exists
713                let dir_path = relative.to_string_lossy().to_string();
714                dir_contents.entry(dir_path).or_default();
715            }
716        }
717
718        // Build directory tree bottom-up
719        self.build_directory_tree(tree, &mut dir_contents).await
720    }
721
722    async fn build_directory_tree<S: Store>(
723        &self,
724        tree: &HashTree<S>,
725        dir_contents: &mut std::collections::HashMap<String, Vec<(String, Cid)>>,
726    ) -> Result<Cid> {
727        // Sort directories by depth (deepest first) to build bottom-up
728        let mut dirs: Vec<String> = dir_contents.keys().cloned().collect();
729        dirs.sort_by(|a, b| {
730            let depth_a = a.matches('/').count() + if a.is_empty() { 0 } else { 1 };
731            let depth_b = b.matches('/').count() + if b.is_empty() { 0 } else { 1 };
732            depth_b.cmp(&depth_a) // Deepest first
733        });
734
735        let mut dir_cids: std::collections::HashMap<String, Cid> = std::collections::HashMap::new();
736
737        for dir_path in dirs {
738            let files = dir_contents.get(&dir_path).cloned().unwrap_or_default();
739
740            let mut entries: Vec<HashTreeDirEntry> = files.into_iter()
741                .map(|(name, cid)| HashTreeDirEntry::from_cid(name, &cid))
742                .collect();
743
744            // Add subdirectory entries
745            for (subdir_path, cid) in &dir_cids {
746                let parent = std::path::Path::new(subdir_path)
747                    .parent()
748                    .map(|p| p.to_string_lossy().to_string())
749                    .unwrap_or_default();
750
751                if parent == dir_path {
752                    let name = std::path::Path::new(subdir_path)
753                        .file_name()
754                        .map(|n| n.to_string_lossy().to_string())
755                        .unwrap_or_default();
756                    entries.push(HashTreeDirEntry::from_cid(name, cid));
757                }
758            }
759
760            let cid = tree.put_directory(entries).await
761                .map_err(|e| anyhow::anyhow!("Failed to create directory node: {}", e))?;
762
763            dir_cids.insert(dir_path, cid);
764        }
765
766        // Return root Cid
767        dir_cids.get("")
768            .cloned()
769            .ok_or_else(|| anyhow::anyhow!("No root directory"))
770    }
771
772    /// Upload a file with CHK encryption, returns CID in format "hash:key"
773    pub fn upload_file_encrypted<P: AsRef<Path>>(&self, file_path: P) -> Result<String> {
774        let file_path = file_path.as_ref();
775        let file_content = std::fs::read(file_path)?;
776
777        // Use unified API with encryption enabled (default)
778        let store = self.store_arc();
779        let tree = HashTree::new(HashTreeConfig::new(store));
780
781        let (cid, _size) = sync_block_on(async {
782            tree.put(&file_content).await
783        }).map_err(|e| anyhow::anyhow!("Failed to encrypt file: {}", e))?;
784
785        let cid_str = cid.to_string();
786
787        let mut wtxn = self.env.write_txn()?;
788        self.pins.put(&mut wtxn, cid.hash.as_slice(), &())?;
789        wtxn.commit()?;
790
791        Ok(cid_str)
792    }
793
794    /// Upload a directory with CHK encryption, returns CID
795    /// Respects .gitignore by default
796    pub fn upload_dir_encrypted<P: AsRef<Path>>(&self, dir_path: P) -> Result<String> {
797        self.upload_dir_encrypted_with_options(dir_path, true)
798    }
799
800    /// Upload a directory with CHK encryption and options
801    /// Returns CID as "hash:key" format for encrypted directories
802    pub fn upload_dir_encrypted_with_options<P: AsRef<Path>>(&self, dir_path: P, respect_gitignore: bool) -> Result<String> {
803        let dir_path = dir_path.as_ref();
804        let store = self.store_arc();
805
806        // Use unified API with encryption enabled (default)
807        let tree = HashTree::new(HashTreeConfig::new(store));
808
809        let root_cid = sync_block_on(async {
810            self.upload_dir_recursive(&tree, dir_path, dir_path, respect_gitignore).await
811        }).context("Failed to upload encrypted directory")?;
812
813        let cid_str = root_cid.to_string(); // Returns "hash:key" or "hash"
814
815        let mut wtxn = self.env.write_txn()?;
816        // Pin by hash only (the key is for decryption, not identification)
817        self.pins.put(&mut wtxn, root_cid.hash.as_slice(), &())?;
818        wtxn.commit()?;
819
820        Ok(cid_str)
821    }
822
823    /// Get tree node by hash (raw bytes)
824    pub fn get_tree_node(&self, hash: &[u8; 32]) -> Result<Option<TreeNode>> {
825        let store = self.store_arc();
826        let tree = HashTree::new(HashTreeConfig::new(store).public());
827
828        sync_block_on(async {
829            tree.get_tree_node(hash).await
830                .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))
831        })
832    }
833
834    /// Store a raw blob, returns SHA256 hash as hex.
835    pub fn put_blob(&self, data: &[u8]) -> Result<String> {
836        let hash = sha256(data);
837        self.router.put_sync(hash, data)
838            .map_err(|e| anyhow::anyhow!("Failed to store blob: {}", e))?;
839        Ok(to_hex(&hash))
840    }
841
842    /// Get a raw blob by SHA256 hash (raw bytes).
843    pub fn get_blob(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
844        self.router.get_sync(hash)
845            .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))
846    }
847
848    /// Check if a blob exists by SHA256 hash (raw bytes).
849    pub fn blob_exists(&self, hash: &[u8; 32]) -> Result<bool> {
850        self.router.exists(hash)
851            .map_err(|e| anyhow::anyhow!("Failed to check blob: {}", e))
852    }
853
854    // === Blossom ownership tracking ===
855    // Uses composite key: sha256 (32 bytes) ++ pubkey (32 bytes) -> ()
856    // This allows efficient multi-owner tracking with O(1) lookups
857
858    /// Build composite key for blob_owners: sha256 ++ pubkey (64 bytes total)
859    fn blob_owner_key(sha256: &[u8; 32], pubkey: &[u8; 32]) -> [u8; 64] {
860        let mut key = [0u8; 64];
861        key[..32].copy_from_slice(sha256);
862        key[32..].copy_from_slice(pubkey);
863        key
864    }
865
866    /// Add an owner (pubkey) to a blob for Blossom protocol
867    /// Multiple users can own the same blob - it's only deleted when all owners remove it
868    pub fn set_blob_owner(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<()> {
869        let key = Self::blob_owner_key(sha256, pubkey);
870        let mut wtxn = self.env.write_txn()?;
871
872        // Add ownership entry (idempotent - put overwrites)
873        self.blob_owners.put(&mut wtxn, &key[..], &())?;
874
875        // Convert sha256 to hex for BlobMetadata (which stores sha256 as hex string)
876        let sha256_hex = to_hex(sha256);
877
878        // Get existing blobs for this pubkey (for /list endpoint)
879        let mut blobs: Vec<BlobMetadata> = self
880            .pubkey_blobs
881            .get(&wtxn, pubkey)?
882            .and_then(|b| serde_json::from_slice(b).ok())
883            .unwrap_or_default();
884
885        // Check if blob already exists for this pubkey
886        if !blobs.iter().any(|b| b.sha256 == sha256_hex) {
887            let now = SystemTime::now()
888                .duration_since(UNIX_EPOCH)
889                .unwrap()
890                .as_secs();
891
892            // Get size from raw blob
893            let size = self
894                .get_blob(sha256)?
895                .map(|data| data.len() as u64)
896                .unwrap_or(0);
897
898            blobs.push(BlobMetadata {
899                sha256: sha256_hex,
900                size,
901                mime_type: "application/octet-stream".to_string(),
902                uploaded: now,
903            });
904
905            let blobs_json = serde_json::to_vec(&blobs)?;
906            self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
907        }
908
909        wtxn.commit()?;
910        Ok(())
911    }
912
913    /// Check if a pubkey owns a blob
914    pub fn is_blob_owner(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<bool> {
915        let key = Self::blob_owner_key(sha256, pubkey);
916        let rtxn = self.env.read_txn()?;
917        Ok(self.blob_owners.get(&rtxn, &key[..])?.is_some())
918    }
919
920    /// Get all owners (pubkeys) of a blob via prefix scan (returns raw bytes)
921    pub fn get_blob_owners(&self, sha256: &[u8; 32]) -> Result<Vec<[u8; 32]>> {
922        let rtxn = self.env.read_txn()?;
923
924        let mut owners = Vec::new();
925        for item in self.blob_owners.prefix_iter(&rtxn, &sha256[..])? {
926            let (key, _) = item?;
927            if key.len() == 64 {
928                // Extract pubkey from composite key (bytes 32-64)
929                let mut pubkey = [0u8; 32];
930                pubkey.copy_from_slice(&key[32..64]);
931                owners.push(pubkey);
932            }
933        }
934        Ok(owners)
935    }
936
937    /// Check if blob has any owners
938    pub fn blob_has_owners(&self, sha256: &[u8; 32]) -> Result<bool> {
939        let rtxn = self.env.read_txn()?;
940
941        // Just check if any entry exists with this prefix
942        for item in self.blob_owners.prefix_iter(&rtxn, &sha256[..])? {
943            if item.is_ok() {
944                return Ok(true);
945            }
946        }
947        Ok(false)
948    }
949
950    /// Get the first owner (pubkey) of a blob (for backwards compatibility)
951    pub fn get_blob_owner(&self, sha256: &[u8; 32]) -> Result<Option<[u8; 32]>> {
952        Ok(self.get_blob_owners(sha256)?.into_iter().next())
953    }
954
955    /// Remove a user's ownership of a blossom blob
956    /// Only deletes the actual blob when no owners remain
957    /// Returns true if the blob was actually deleted (no owners left)
958    pub fn delete_blossom_blob(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<bool> {
959        let key = Self::blob_owner_key(sha256, pubkey);
960        let mut wtxn = self.env.write_txn()?;
961
962        // Remove this pubkey's ownership entry
963        self.blob_owners.delete(&mut wtxn, &key[..])?;
964
965        // Hex strings for logging and BlobMetadata (which stores sha256 as hex string)
966        let sha256_hex = to_hex(sha256);
967
968        // Remove from pubkey's blob list
969        if let Some(blobs_bytes) = self.pubkey_blobs.get(&wtxn, pubkey)? {
970            if let Ok(mut blobs) = serde_json::from_slice::<Vec<BlobMetadata>>(blobs_bytes) {
971                blobs.retain(|b| b.sha256 != sha256_hex);
972                let blobs_json = serde_json::to_vec(&blobs)?;
973                self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
974            }
975        }
976
977        // Check if any other owners remain (prefix scan)
978        let mut has_other_owners = false;
979        for item in self.blob_owners.prefix_iter(&wtxn, &sha256[..])? {
980            if item.is_ok() {
981                has_other_owners = true;
982                break;
983            }
984        }
985
986        if has_other_owners {
987            wtxn.commit()?;
988            tracing::debug!(
989                "Removed {} from blob {} owners, other owners remain",
990                &to_hex(pubkey)[..8],
991                &sha256_hex[..8]
992            );
993            return Ok(false);
994        }
995
996        // No owners left - delete the blob completely
997        tracing::info!(
998            "All owners removed from blob {}, deleting",
999            &sha256_hex[..8]
1000        );
1001
1002        // Delete raw blob (by content hash) - this deletes from S3 too
1003        let _ = self.router.delete_sync(sha256);
1004
1005        wtxn.commit()?;
1006        Ok(true)
1007    }
1008
1009    /// List all blobs owned by a pubkey (for Blossom /list endpoint)
1010    pub fn list_blobs_by_pubkey(&self, pubkey: &[u8; 32]) -> Result<Vec<crate::server::blossom::BlobDescriptor>> {
1011        let rtxn = self.env.read_txn()?;
1012
1013        let blobs: Vec<BlobMetadata> = self
1014            .pubkey_blobs
1015            .get(&rtxn, pubkey)?
1016            .and_then(|b| serde_json::from_slice(b).ok())
1017            .unwrap_or_default();
1018
1019        Ok(blobs
1020            .into_iter()
1021            .map(|b| crate::server::blossom::BlobDescriptor {
1022                url: format!("/{}", b.sha256),
1023                sha256: b.sha256,
1024                size: b.size,
1025                mime_type: b.mime_type,
1026                uploaded: b.uploaded,
1027            })
1028            .collect())
1029    }
1030
1031    /// Get a single chunk/blob by hash (raw bytes)
1032    pub fn get_chunk(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
1033        self.router.get_sync(hash)
1034            .map_err(|e| anyhow::anyhow!("Failed to get chunk: {}", e))
1035    }
1036
1037    /// Get file content by hash (raw bytes)
1038    /// Returns raw bytes (caller handles decryption if needed)
1039    pub fn get_file(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
1040        let store = self.store_arc();
1041        let tree = HashTree::new(HashTreeConfig::new(store).public());
1042
1043        sync_block_on(async {
1044            tree.read_file(hash).await
1045                .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
1046        })
1047    }
1048
1049    /// Get file content by Cid (hash + optional decryption key as raw bytes)
1050    /// Handles decryption automatically if key is present
1051    pub fn get_file_by_cid(&self, cid: &Cid) -> Result<Option<Vec<u8>>> {
1052        let store = self.store_arc();
1053        let tree = HashTree::new(HashTreeConfig::new(store).public());
1054
1055        sync_block_on(async {
1056            tree.get(cid).await
1057                .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
1058        })
1059    }
1060
1061    /// Resolve a path within a tree (returns Cid with key if encrypted)
1062    pub fn resolve_path(&self, cid: &Cid, path: &str) -> Result<Option<Cid>> {
1063        let store = self.store_arc();
1064        let tree = HashTree::new(HashTreeConfig::new(store).public());
1065
1066        sync_block_on(async {
1067            tree.resolve_path(cid, path).await
1068                .map_err(|e| anyhow::anyhow!("Failed to resolve path: {}", e))
1069        })
1070    }
1071
1072    /// Get chunk metadata for a file (chunk list, sizes, total size)
1073    pub fn get_file_chunk_metadata(&self, hash: &[u8; 32]) -> Result<Option<FileChunkMetadata>> {
1074        let store = self.store_arc();
1075        let tree = HashTree::new(HashTreeConfig::new(store.clone()).public());
1076
1077        sync_block_on(async {
1078            // First check if the hash exists in the store at all
1079            // (either as a blob or tree node)
1080            let exists = store.has(&hash).await
1081                .map_err(|e| anyhow::anyhow!("Failed to check existence: {}", e))?;
1082
1083            if !exists {
1084                return Ok(None);
1085            }
1086
1087            // Get total size
1088            let total_size = tree.get_size(&hash).await
1089                .map_err(|e| anyhow::anyhow!("Failed to get size: {}", e))?;
1090
1091            // Check if it's a tree (chunked) or blob
1092            let is_tree_node = tree.is_tree(&hash).await
1093                .map_err(|e| anyhow::anyhow!("Failed to check tree: {}", e))?;
1094
1095            if !is_tree_node {
1096                // Single blob, not chunked
1097                return Ok(Some(FileChunkMetadata {
1098                    total_size,
1099                    chunk_hashes: vec![],
1100                    chunk_sizes: vec![],
1101                    is_chunked: false,
1102                }));
1103            }
1104
1105            // Get tree node to extract chunk info
1106            let node = match tree.get_tree_node(&hash).await
1107                .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))? {
1108                Some(n) => n,
1109                None => return Ok(None),
1110            };
1111
1112            // Check if it's a directory (has named links)
1113            let is_directory = tree.is_directory(&hash).await
1114                .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1115
1116            if is_directory {
1117                return Ok(None); // Not a file
1118            }
1119
1120            // Extract chunk info from links
1121            let chunk_hashes: Vec<Hash> = node.links.iter().map(|l| l.hash).collect();
1122            let chunk_sizes: Vec<u64> = node.links.iter().map(|l| l.size).collect();
1123
1124            Ok(Some(FileChunkMetadata {
1125                total_size,
1126                chunk_hashes,
1127                chunk_sizes,
1128                is_chunked: !node.links.is_empty(),
1129            }))
1130        })
1131    }
1132
1133    /// Get byte range from file
1134    pub fn get_file_range(&self, hash: &[u8; 32], start: u64, end: Option<u64>) -> Result<Option<(Vec<u8>, u64)>> {
1135        let metadata = match self.get_file_chunk_metadata(hash)? {
1136            Some(m) => m,
1137            None => return Ok(None),
1138        };
1139
1140        if metadata.total_size == 0 {
1141            return Ok(Some((Vec::new(), 0)));
1142        }
1143
1144        if start >= metadata.total_size {
1145            return Ok(None);
1146        }
1147
1148        let end = end.unwrap_or(metadata.total_size - 1).min(metadata.total_size - 1);
1149
1150        // For non-chunked files, load entire file
1151        if !metadata.is_chunked {
1152            let content = self.get_file(hash)?.unwrap_or_default();
1153            let range_content = if start < content.len() as u64 {
1154                content[start as usize..=(end as usize).min(content.len() - 1)].to_vec()
1155            } else {
1156                Vec::new()
1157            };
1158            return Ok(Some((range_content, metadata.total_size)));
1159        }
1160
1161        // For chunked files, load only needed chunks
1162        let mut result = Vec::new();
1163        let mut current_offset = 0u64;
1164
1165        for (i, chunk_hash) in metadata.chunk_hashes.iter().enumerate() {
1166            let chunk_size = metadata.chunk_sizes[i];
1167            let chunk_end = current_offset + chunk_size - 1;
1168
1169            // Check if this chunk overlaps with requested range
1170            if chunk_end >= start && current_offset <= end {
1171                let chunk_content = match self.get_chunk(chunk_hash)? {
1172                    Some(content) => content,
1173                    None => {
1174                        return Err(anyhow::anyhow!("Chunk {} not found", to_hex(chunk_hash)));
1175                    }
1176                };
1177
1178                let chunk_read_start = if current_offset >= start {
1179                    0
1180                } else {
1181                    (start - current_offset) as usize
1182                };
1183
1184                let chunk_read_end = if chunk_end <= end {
1185                    chunk_size as usize - 1
1186                } else {
1187                    (end - current_offset) as usize
1188                };
1189
1190                result.extend_from_slice(&chunk_content[chunk_read_start..=chunk_read_end]);
1191            }
1192
1193            current_offset += chunk_size;
1194
1195            if current_offset > end {
1196                break;
1197            }
1198        }
1199
1200        Ok(Some((result, metadata.total_size)))
1201    }
1202
1203    /// Stream file range as chunks using Arc for async/Send contexts
1204    pub fn stream_file_range_chunks_owned(
1205        self: Arc<Self>,
1206        hash: &[u8; 32],
1207        start: u64,
1208        end: u64,
1209    ) -> Result<Option<FileRangeChunksOwned>> {
1210        let metadata = match self.get_file_chunk_metadata(hash)? {
1211            Some(m) => m,
1212            None => return Ok(None),
1213        };
1214
1215        if metadata.total_size == 0 || start >= metadata.total_size {
1216            return Ok(None);
1217        }
1218
1219        let end = end.min(metadata.total_size - 1);
1220
1221        Ok(Some(FileRangeChunksOwned {
1222            store: self,
1223            metadata,
1224            start,
1225            end,
1226            current_chunk_idx: 0,
1227            current_offset: 0,
1228        }))
1229    }
1230
1231    /// Get directory structure by hash (raw bytes)
1232    pub fn get_directory_listing(&self, hash: &[u8; 32]) -> Result<Option<DirectoryListing>> {
1233        let store = self.store_arc();
1234        let tree = HashTree::new(HashTreeConfig::new(store).public());
1235
1236        sync_block_on(async {
1237            // Check if it's a directory
1238            let is_dir = tree.is_directory(&hash).await
1239                .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1240
1241            if !is_dir {
1242                return Ok(None);
1243            }
1244
1245            // Get directory entries (public Cid - no encryption key)
1246            let cid = hashtree_core::Cid::public(*hash);
1247            let tree_entries = tree.list_directory(&cid).await
1248                .map_err(|e| anyhow::anyhow!("Failed to list directory: {}", e))?;
1249
1250            let entries: Vec<DirEntry> = tree_entries.into_iter().map(|e| DirEntry {
1251                name: e.name,
1252                cid: to_hex(&e.hash),
1253                is_directory: e.link_type.is_tree(),
1254                size: e.size,
1255            }).collect();
1256
1257            Ok(Some(DirectoryListing {
1258                dir_name: String::new(),
1259                entries,
1260            }))
1261        })
1262    }
1263
1264    /// Pin a hash (prevent garbage collection)
1265    pub fn pin(&self, hash: &[u8; 32]) -> Result<()> {
1266        let mut wtxn = self.env.write_txn()?;
1267        self.pins.put(&mut wtxn, hash.as_slice(), &())?;
1268        wtxn.commit()?;
1269        Ok(())
1270    }
1271
1272    /// Unpin a hash (allow garbage collection)
1273    pub fn unpin(&self, hash: &[u8; 32]) -> Result<()> {
1274        let mut wtxn = self.env.write_txn()?;
1275        self.pins.delete(&mut wtxn, hash.as_slice())?;
1276        wtxn.commit()?;
1277        Ok(())
1278    }
1279
1280    /// Check if hash is pinned
1281    pub fn is_pinned(&self, hash: &[u8; 32]) -> Result<bool> {
1282        let rtxn = self.env.read_txn()?;
1283        Ok(self.pins.get(&rtxn, hash.as_slice())?.is_some())
1284    }
1285
1286    /// List all pinned hashes (raw bytes)
1287    pub fn list_pins_raw(&self) -> Result<Vec<[u8; 32]>> {
1288        let rtxn = self.env.read_txn()?;
1289        let mut pins = Vec::new();
1290
1291        for item in self.pins.iter(&rtxn)? {
1292            let (hash_bytes, _) = item?;
1293            if hash_bytes.len() == 32 {
1294                let mut hash = [0u8; 32];
1295                hash.copy_from_slice(hash_bytes);
1296                pins.push(hash);
1297            }
1298        }
1299
1300        Ok(pins)
1301    }
1302
1303    /// List all pinned hashes with names
1304    pub fn list_pins_with_names(&self) -> Result<Vec<PinnedItem>> {
1305        let rtxn = self.env.read_txn()?;
1306        let store = self.store_arc();
1307        let tree = HashTree::new(HashTreeConfig::new(store).public());
1308        let mut pins = Vec::new();
1309
1310        for item in self.pins.iter(&rtxn)? {
1311            let (hash_bytes, _) = item?;
1312            if hash_bytes.len() != 32 {
1313                continue;
1314            }
1315            let mut hash = [0u8; 32];
1316            hash.copy_from_slice(hash_bytes);
1317
1318            // Try to determine if it's a directory
1319            let is_directory = sync_block_on(async {
1320                tree.is_directory(&hash).await.unwrap_or(false)
1321            });
1322
1323            pins.push(PinnedItem {
1324                cid: to_hex(&hash),
1325                name: "Unknown".to_string(),
1326                is_directory,
1327            });
1328        }
1329
1330        Ok(pins)
1331    }
1332
1333    // === Tree indexing for eviction ===
1334
1335    /// Index a tree after sync - tracks all blobs in the tree for eviction
1336    ///
1337    /// If `ref_key` is provided (e.g. "npub.../name"), it will replace any existing
1338    /// tree with that ref, allowing old versions to be evicted.
1339    pub fn index_tree(
1340        &self,
1341        root_hash: &Hash,
1342        owner: &str,
1343        name: Option<&str>,
1344        priority: u8,
1345        ref_key: Option<&str>,
1346    ) -> Result<()> {
1347        let root_hex = to_hex(root_hash);
1348
1349        // If ref_key provided, check for and unindex old version
1350        if let Some(key) = ref_key {
1351            let rtxn = self.env.read_txn()?;
1352            if let Some(old_hash_bytes) = self.tree_refs.get(&rtxn, key)? {
1353                if old_hash_bytes != root_hash.as_slice() {
1354                    let old_hash: Hash = old_hash_bytes.try_into()
1355                        .map_err(|_| anyhow::anyhow!("Invalid hash in tree_refs"))?;
1356                    drop(rtxn);
1357                    // Unindex old tree (will delete orphaned blobs)
1358                    let _ = self.unindex_tree(&old_hash);
1359                    tracing::debug!("Replaced old tree for ref {}", key);
1360                }
1361            }
1362        }
1363
1364        let store = self.store_arc();
1365        let tree = HashTree::new(HashTreeConfig::new(store).public());
1366
1367        // Walk tree and collect all blob hashes + compute total size
1368        let (blob_hashes, total_size) = sync_block_on(async {
1369            self.collect_tree_blobs(&tree, root_hash).await
1370        })?;
1371
1372        let mut wtxn = self.env.write_txn()?;
1373
1374        // Store blob-tree relationships (64-byte key: blob_hash ++ tree_hash)
1375        for blob_hash in &blob_hashes {
1376            let mut key = [0u8; 64];
1377            key[..32].copy_from_slice(blob_hash);
1378            key[32..].copy_from_slice(root_hash);
1379            self.blob_trees.put(&mut wtxn, &key[..], &())?;
1380        }
1381
1382        // Store tree metadata
1383        let meta = TreeMeta {
1384            owner: owner.to_string(),
1385            name: name.map(|s| s.to_string()),
1386            synced_at: SystemTime::now()
1387                .duration_since(UNIX_EPOCH)
1388                .unwrap()
1389                .as_secs(),
1390            total_size,
1391            priority,
1392        };
1393        let meta_bytes = rmp_serde::to_vec(&meta)
1394            .map_err(|e| anyhow::anyhow!("Failed to serialize TreeMeta: {}", e))?;
1395        self.tree_meta.put(&mut wtxn, root_hash.as_slice(), &meta_bytes)?;
1396
1397        // Store ref -> hash mapping if ref_key provided
1398        if let Some(key) = ref_key {
1399            self.tree_refs.put(&mut wtxn, key, root_hash.as_slice())?;
1400        }
1401
1402        wtxn.commit()?;
1403
1404        tracing::debug!(
1405            "Indexed tree {} ({} blobs, {} bytes, priority {})",
1406            &root_hex[..8],
1407            blob_hashes.len(),
1408            total_size,
1409            priority
1410        );
1411
1412        Ok(())
1413    }
1414
1415    /// Collect all blob hashes in a tree and compute total size
1416    async fn collect_tree_blobs<S: Store>(
1417        &self,
1418        tree: &HashTree<S>,
1419        root: &Hash,
1420    ) -> Result<(Vec<Hash>, u64)> {
1421        let mut blobs = Vec::new();
1422        let mut total_size = 0u64;
1423        let mut stack = vec![*root];
1424
1425        while let Some(hash) = stack.pop() {
1426            // Check if it's a tree node
1427            let is_tree = tree.is_tree(&hash).await
1428                .map_err(|e| anyhow::anyhow!("Failed to check tree: {}", e))?;
1429
1430            if is_tree {
1431                // Get tree node and add children to stack
1432                if let Some(node) = tree.get_tree_node(&hash).await
1433                    .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
1434                {
1435                    for link in &node.links {
1436                        stack.push(link.hash);
1437                    }
1438                }
1439            } else {
1440                // It's a blob - get its size
1441                if let Some(data) = self.router.get_sync(&hash)
1442                    .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))?
1443                {
1444                    total_size += data.len() as u64;
1445                    blobs.push(hash);
1446                }
1447            }
1448        }
1449
1450        Ok((blobs, total_size))
1451    }
1452
1453    /// Unindex a tree - removes blob-tree mappings and deletes orphaned blobs
1454    /// Returns the number of bytes freed
1455    pub fn unindex_tree(&self, root_hash: &Hash) -> Result<u64> {
1456        let root_hex = to_hex(root_hash);
1457
1458        let store = self.store_arc();
1459        let tree = HashTree::new(HashTreeConfig::new(store).public());
1460
1461        // Walk tree and collect all blob hashes
1462        let (blob_hashes, _) = sync_block_on(async {
1463            self.collect_tree_blobs(&tree, root_hash).await
1464        })?;
1465
1466        let mut wtxn = self.env.write_txn()?;
1467        let mut freed = 0u64;
1468
1469        // For each blob, remove the blob-tree entry and check if orphaned
1470        for blob_hash in &blob_hashes {
1471            // Delete blob-tree entry (64-byte key: blob_hash ++ tree_hash)
1472            let mut key = [0u8; 64];
1473            key[..32].copy_from_slice(blob_hash);
1474            key[32..].copy_from_slice(root_hash);
1475            self.blob_trees.delete(&mut wtxn, &key[..])?;
1476
1477            // Check if blob is in any other tree (prefix scan on first 32 bytes)
1478            let rtxn = self.env.read_txn()?;
1479            let mut has_other_tree = false;
1480
1481            for item in self.blob_trees.prefix_iter(&rtxn, &blob_hash[..])? {
1482                if item.is_ok() {
1483                    has_other_tree = true;
1484                    break;
1485                }
1486            }
1487            drop(rtxn);
1488
1489            // If orphaned, delete the blob
1490            if !has_other_tree {
1491                if let Some(data) = self.router.get_sync(blob_hash)
1492                    .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))?
1493                {
1494                    freed += data.len() as u64;
1495                    // Delete locally only - keep S3 as archive
1496                    self.router.delete_local_only(blob_hash)
1497                        .map_err(|e| anyhow::anyhow!("Failed to delete blob: {}", e))?;
1498                }
1499            }
1500        }
1501
1502        // Delete tree node itself if exists
1503        if let Some(data) = self.router.get_sync(root_hash)
1504            .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
1505        {
1506            freed += data.len() as u64;
1507            // Delete locally only - keep S3 as archive
1508            self.router.delete_local_only(root_hash)
1509                .map_err(|e| anyhow::anyhow!("Failed to delete tree node: {}", e))?;
1510        }
1511
1512        // Delete tree metadata
1513        self.tree_meta.delete(&mut wtxn, root_hash.as_slice())?;
1514
1515        wtxn.commit()?;
1516
1517        tracing::debug!(
1518            "Unindexed tree {} ({} bytes freed)",
1519            &root_hex[..8],
1520            freed
1521        );
1522
1523        Ok(freed)
1524    }
1525
1526    /// Get tree metadata
1527    pub fn get_tree_meta(&self, root_hash: &Hash) -> Result<Option<TreeMeta>> {
1528        let rtxn = self.env.read_txn()?;
1529        if let Some(bytes) = self.tree_meta.get(&rtxn, root_hash.as_slice())? {
1530            let meta: TreeMeta = rmp_serde::from_slice(bytes)
1531                .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
1532            Ok(Some(meta))
1533        } else {
1534            Ok(None)
1535        }
1536    }
1537
1538    /// List all indexed trees
1539    pub fn list_indexed_trees(&self) -> Result<Vec<(Hash, TreeMeta)>> {
1540        let rtxn = self.env.read_txn()?;
1541        let mut trees = Vec::new();
1542
1543        for item in self.tree_meta.iter(&rtxn)? {
1544            let (hash_bytes, meta_bytes) = item?;
1545            let hash: Hash = hash_bytes.try_into()
1546                .map_err(|_| anyhow::anyhow!("Invalid hash in tree_meta"))?;
1547            let meta: TreeMeta = rmp_serde::from_slice(meta_bytes)
1548                .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
1549            trees.push((hash, meta));
1550        }
1551
1552        Ok(trees)
1553    }
1554
1555    /// Get total tracked storage size (sum of all tree_meta.total_size)
1556    pub fn tracked_size(&self) -> Result<u64> {
1557        let rtxn = self.env.read_txn()?;
1558        let mut total = 0u64;
1559
1560        for item in self.tree_meta.iter(&rtxn)? {
1561            let (_, bytes) = item?;
1562            let meta: TreeMeta = rmp_serde::from_slice(bytes)
1563                .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
1564            total += meta.total_size;
1565        }
1566
1567        Ok(total)
1568    }
1569
1570    /// Get evictable trees sorted by (priority ASC, synced_at ASC)
1571    fn get_evictable_trees(&self) -> Result<Vec<(Hash, TreeMeta)>> {
1572        let mut trees = self.list_indexed_trees()?;
1573
1574        // Sort by priority (lower first), then by synced_at (older first)
1575        trees.sort_by(|a, b| {
1576            match a.1.priority.cmp(&b.1.priority) {
1577                std::cmp::Ordering::Equal => a.1.synced_at.cmp(&b.1.synced_at),
1578                other => other,
1579            }
1580        });
1581
1582        Ok(trees)
1583    }
1584
1585    /// Run eviction if storage is over quota
1586    /// Returns bytes freed
1587    ///
1588    /// Eviction order:
1589    /// 1. Orphaned blobs (not in any indexed tree and not pinned)
1590    /// 2. Trees by priority (lowest first) and age (oldest first)
1591    pub fn evict_if_needed(&self) -> Result<u64> {
1592        // Get actual storage used
1593        let stats = self.router.stats()
1594            .map_err(|e| anyhow::anyhow!("Failed to get stats: {}", e))?;
1595        let current = stats.total_bytes;
1596
1597        if current <= self.max_size_bytes {
1598            return Ok(0);
1599        }
1600
1601        // Target 90% of max to avoid constant eviction
1602        let target = self.max_size_bytes * 90 / 100;
1603        let mut freed = 0u64;
1604        let mut current_size = current;
1605
1606        // Phase 1: Evict orphaned blobs (not in any tree and not pinned)
1607        let orphan_freed = self.evict_orphaned_blobs()?;
1608        freed += orphan_freed;
1609        current_size = current_size.saturating_sub(orphan_freed);
1610
1611        if orphan_freed > 0 {
1612            tracing::info!("Evicted orphaned blobs: {} bytes freed", orphan_freed);
1613        }
1614
1615        // Check if we're now under target
1616        if current_size <= target {
1617            if freed > 0 {
1618                tracing::info!("Eviction complete: {} bytes freed", freed);
1619            }
1620            return Ok(freed);
1621        }
1622
1623        // Phase 2: Evict trees by priority (lowest first) and age (oldest first)
1624        // Own trees CAN be evicted (just last), but PINNED trees are never evicted
1625        let evictable = self.get_evictable_trees()?;
1626
1627        for (root_hash, meta) in evictable {
1628            if current_size <= target {
1629                break;
1630            }
1631
1632            let root_hex = to_hex(&root_hash);
1633
1634            // Never evict pinned trees
1635            if self.is_pinned(&root_hash)? {
1636                continue;
1637            }
1638
1639            let tree_freed = self.unindex_tree(&root_hash)?;
1640            freed += tree_freed;
1641            current_size = current_size.saturating_sub(tree_freed);
1642
1643            tracing::info!(
1644                "Evicted tree {} (owner={}, priority={}, {} bytes)",
1645                &root_hex[..8],
1646                &meta.owner[..8.min(meta.owner.len())],
1647                meta.priority,
1648                tree_freed
1649            );
1650        }
1651
1652        if freed > 0 {
1653            tracing::info!("Eviction complete: {} bytes freed", freed);
1654        }
1655
1656        Ok(freed)
1657    }
1658
1659    /// Evict blobs that are not part of any indexed tree and not pinned
1660    fn evict_orphaned_blobs(&self) -> Result<u64> {
1661        let mut freed = 0u64;
1662
1663        // Get all blob hashes from store
1664        let all_hashes = self.router.list()
1665            .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
1666
1667        // Get pinned hashes as raw bytes
1668        let rtxn = self.env.read_txn()?;
1669        let pinned: HashSet<Hash> = self.pins.iter(&rtxn)?
1670            .filter_map(|item| item.ok())
1671            .filter_map(|(hash_bytes, _)| {
1672                if hash_bytes.len() == 32 {
1673                    let mut hash = [0u8; 32];
1674                    hash.copy_from_slice(hash_bytes);
1675                    Some(hash)
1676                } else {
1677                    None
1678                }
1679            })
1680            .collect();
1681
1682        // Collect all blob hashes that are in at least one tree
1683        // Key format is blob_hash (32 bytes) ++ tree_hash (32 bytes)
1684        let mut blobs_in_trees: HashSet<Hash> = HashSet::new();
1685        for item in self.blob_trees.iter(&rtxn)? {
1686            if let Ok((key_bytes, _)) = item {
1687                if key_bytes.len() >= 32 {
1688                    let blob_hash: Hash = key_bytes[..32].try_into().unwrap();
1689                    blobs_in_trees.insert(blob_hash);
1690                }
1691            }
1692        }
1693        drop(rtxn);
1694
1695        // Find and delete orphaned blobs
1696        for hash in all_hashes {
1697            // Skip if pinned
1698            if pinned.contains(&hash) {
1699                continue;
1700            }
1701
1702            // Skip if part of any tree
1703            if blobs_in_trees.contains(&hash) {
1704                continue;
1705            }
1706
1707            // This blob is orphaned - delete locally (keep S3 as archive)
1708            if let Ok(Some(data)) = self.router.get_sync(&hash) {
1709                freed += data.len() as u64;
1710                let _ = self.router.delete_local_only(&hash);
1711                tracing::debug!("Deleted orphaned blob {} ({} bytes)", &to_hex(&hash)[..8], data.len());
1712            }
1713        }
1714
1715        Ok(freed)
1716    }
1717
1718    /// Get the maximum storage size in bytes
1719    pub fn max_size_bytes(&self) -> u64 {
1720        self.max_size_bytes
1721    }
1722
1723    /// Get storage usage by priority tier
1724    pub fn storage_by_priority(&self) -> Result<StorageByPriority> {
1725        let rtxn = self.env.read_txn()?;
1726        let mut own = 0u64;
1727        let mut followed = 0u64;
1728        let mut other = 0u64;
1729
1730        for item in self.tree_meta.iter(&rtxn)? {
1731            let (_, bytes) = item?;
1732            let meta: TreeMeta = rmp_serde::from_slice(bytes)
1733                .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
1734
1735            if meta.priority >= PRIORITY_OWN {
1736                own += meta.total_size;
1737            } else if meta.priority >= PRIORITY_FOLLOWED {
1738                followed += meta.total_size;
1739            } else {
1740                other += meta.total_size;
1741            }
1742        }
1743
1744        Ok(StorageByPriority { own, followed, other })
1745    }
1746
1747    /// Get storage statistics
1748    pub fn get_storage_stats(&self) -> Result<StorageStats> {
1749        let rtxn = self.env.read_txn()?;
1750        let total_pins = self.pins.len(&rtxn)? as usize;
1751
1752        let stats = self.router.stats()
1753            .map_err(|e| anyhow::anyhow!("Failed to get stats: {}", e))?;
1754
1755        Ok(StorageStats {
1756            total_dags: stats.count,
1757            pinned_dags: total_pins,
1758            total_bytes: stats.total_bytes,
1759        })
1760    }
1761
1762    // === Cached roots (replaces nostrdb event caching) ===
1763
1764    /// Get cached root for a pubkey/tree_name pair
1765    pub fn get_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<Option<CachedRoot>> {
1766        let key = format!("{}/{}", pubkey_hex, tree_name);
1767        let rtxn = self.env.read_txn()?;
1768        if let Some(bytes) = self.cached_roots.get(&rtxn, &key)? {
1769            let root: CachedRoot = rmp_serde::from_slice(bytes)
1770                .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
1771            Ok(Some(root))
1772        } else {
1773            Ok(None)
1774        }
1775    }
1776
1777    /// Set cached root for a pubkey/tree_name pair
1778    pub fn set_cached_root(
1779        &self,
1780        pubkey_hex: &str,
1781        tree_name: &str,
1782        hash: &str,
1783        key: Option<&str>,
1784        visibility: &str,
1785        updated_at: u64,
1786    ) -> Result<()> {
1787        let db_key = format!("{}/{}", pubkey_hex, tree_name);
1788        let root = CachedRoot {
1789            hash: hash.to_string(),
1790            key: key.map(|k| k.to_string()),
1791            updated_at,
1792            visibility: visibility.to_string(),
1793        };
1794        let bytes = rmp_serde::to_vec(&root)
1795            .map_err(|e| anyhow::anyhow!("Failed to serialize CachedRoot: {}", e))?;
1796        let mut wtxn = self.env.write_txn()?;
1797        self.cached_roots.put(&mut wtxn, &db_key, &bytes)?;
1798        wtxn.commit()?;
1799        Ok(())
1800    }
1801
1802    /// List all cached roots for a pubkey
1803    pub fn list_cached_roots(&self, pubkey_hex: &str) -> Result<Vec<(String, CachedRoot)>> {
1804        let prefix = format!("{}/", pubkey_hex);
1805        let rtxn = self.env.read_txn()?;
1806        let mut results = Vec::new();
1807
1808        for item in self.cached_roots.iter(&rtxn)? {
1809            let (key, bytes) = item?;
1810            if key.starts_with(&prefix) {
1811                let tree_name = key.strip_prefix(&prefix).unwrap_or(key);
1812                let root: CachedRoot = rmp_serde::from_slice(bytes)
1813                    .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
1814                results.push((tree_name.to_string(), root));
1815            }
1816        }
1817
1818        Ok(results)
1819    }
1820
1821    /// Delete a cached root
1822    pub fn delete_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<bool> {
1823        let key = format!("{}/{}", pubkey_hex, tree_name);
1824        let mut wtxn = self.env.write_txn()?;
1825        let deleted = self.cached_roots.delete(&mut wtxn, &key)?;
1826        wtxn.commit()?;
1827        Ok(deleted)
1828    }
1829
1830    /// Garbage collect unpinned content
1831    pub fn gc(&self) -> Result<GcStats> {
1832        let rtxn = self.env.read_txn()?;
1833
1834        // Get all pinned hashes as raw bytes
1835        let pinned: HashSet<Hash> = self.pins.iter(&rtxn)?
1836            .filter_map(|item| item.ok())
1837            .filter_map(|(hash_bytes, _)| {
1838                if hash_bytes.len() == 32 {
1839                    let mut hash = [0u8; 32];
1840                    hash.copy_from_slice(hash_bytes);
1841                    Some(hash)
1842                } else {
1843                    None
1844                }
1845            })
1846            .collect();
1847
1848        drop(rtxn);
1849
1850        // Get all stored hashes
1851        let all_hashes = self.router.list()
1852            .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
1853
1854        // Delete unpinned hashes
1855        let mut deleted = 0;
1856        let mut freed_bytes = 0u64;
1857
1858        for hash in all_hashes {
1859            if !pinned.contains(&hash) {
1860                if let Ok(Some(data)) = self.router.get_sync(&hash) {
1861                    freed_bytes += data.len() as u64;
1862                    // Delete locally only - keep S3 as archive
1863                    let _ = self.router.delete_local_only(&hash);
1864                    deleted += 1;
1865                }
1866            }
1867        }
1868
1869        Ok(GcStats {
1870            deleted_dags: deleted,
1871            freed_bytes,
1872        })
1873    }
1874
1875    /// Verify LMDB blob integrity - checks that stored data matches its key hash
1876    /// Returns verification statistics and optionally deletes corrupted entries
1877    pub fn verify_lmdb_integrity(&self, delete: bool) -> Result<VerifyResult> {
1878        let all_hashes = self.router.list()
1879            .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
1880
1881        let total = all_hashes.len();
1882        let mut valid = 0;
1883        let mut corrupted = 0;
1884        let mut deleted = 0;
1885        let mut corrupted_hashes = Vec::new();
1886
1887        for hash in &all_hashes {
1888            let hash_hex = to_hex(hash);
1889
1890            match self.router.get_sync(hash) {
1891                Ok(Some(data)) => {
1892                    // Compute actual SHA256 of data
1893                    let actual_hash = sha256(&data);
1894
1895                    if actual_hash == *hash {
1896                        valid += 1;
1897                    } else {
1898                        corrupted += 1;
1899                        let actual_hex = to_hex(&actual_hash);
1900                        println!("  CORRUPTED: key={} actual={} size={}",
1901                            &hash_hex[..16], &actual_hex[..16], data.len());
1902                        corrupted_hashes.push(*hash);
1903                    }
1904                }
1905                Ok(None) => {
1906                    // Hash exists in index but data is missing
1907                    corrupted += 1;
1908                    println!("  MISSING: key={}", &hash_hex[..16]);
1909                    corrupted_hashes.push(*hash);
1910                }
1911                Err(e) => {
1912                    corrupted += 1;
1913                    println!("  ERROR: key={} err={}", &hash_hex[..16], e);
1914                    corrupted_hashes.push(*hash);
1915                }
1916            }
1917        }
1918
1919        // Delete corrupted entries if requested
1920        if delete {
1921            for hash in &corrupted_hashes {
1922                match self.router.delete_sync(hash) {
1923                    Ok(true) => deleted += 1,
1924                    Ok(false) => {} // Already deleted
1925                    Err(e) => {
1926                        let hash_hex = to_hex(hash);
1927                        println!("  Failed to delete {}: {}", &hash_hex[..16], e);
1928                    }
1929                }
1930            }
1931        }
1932
1933        Ok(VerifyResult {
1934            total,
1935            valid,
1936            corrupted,
1937            deleted,
1938        })
1939    }
1940
1941    /// Verify R2/S3 blob integrity - lists all objects and verifies hash matches filename
1942    /// Returns verification statistics and optionally deletes corrupted entries
1943    #[cfg(feature = "s3")]
1944    pub async fn verify_r2_integrity(&self, delete: bool) -> Result<VerifyResult> {
1945        use aws_sdk_s3::Client as S3Client;
1946
1947        // Get S3 client from router (we need to access it directly)
1948        // For now, we'll create a new client from config
1949        let config = crate::config::Config::load()?;
1950        let s3_config = config.storage.s3
1951            .ok_or_else(|| anyhow::anyhow!("S3 not configured"))?;
1952
1953        // Build AWS config
1954        let aws_config = aws_config::from_env()
1955            .region(aws_sdk_s3::config::Region::new(s3_config.region.clone()))
1956            .load()
1957            .await;
1958
1959        let s3_client = S3Client::from_conf(
1960            aws_sdk_s3::config::Builder::from(&aws_config)
1961                .endpoint_url(&s3_config.endpoint)
1962                .force_path_style(true)
1963                .build()
1964        );
1965
1966        let bucket = &s3_config.bucket;
1967        let prefix = s3_config.prefix.as_deref().unwrap_or("");
1968
1969        let mut total = 0;
1970        let mut valid = 0;
1971        let mut corrupted = 0;
1972        let mut deleted = 0;
1973        let mut corrupted_keys = Vec::new();
1974
1975        // List all objects in bucket
1976        let mut continuation_token: Option<String> = None;
1977
1978        loop {
1979            let mut list_req = s3_client.list_objects_v2()
1980                .bucket(bucket)
1981                .prefix(prefix);
1982
1983            if let Some(ref token) = continuation_token {
1984                list_req = list_req.continuation_token(token);
1985            }
1986
1987            let list_resp = list_req.send().await
1988                .map_err(|e| anyhow::anyhow!("Failed to list S3 objects: {}", e))?;
1989
1990            for object in list_resp.contents() {
1991                let key = object.key().unwrap_or("");
1992
1993                // Skip non-.bin files
1994                if !key.ends_with(".bin") {
1995                    continue;
1996                }
1997
1998                total += 1;
1999
2000                // Extract expected hash from filename (remove prefix and .bin)
2001                let filename = key.strip_prefix(prefix).unwrap_or(key);
2002                let expected_hash_hex = filename.strip_suffix(".bin").unwrap_or(filename);
2003
2004                // Validate it's a valid hex hash
2005                if expected_hash_hex.len() != 64 {
2006                    corrupted += 1;
2007                    println!("  INVALID KEY: {}", key);
2008                    corrupted_keys.push(key.to_string());
2009                    continue;
2010                }
2011
2012                let expected_hash = match from_hex(expected_hash_hex) {
2013                    Ok(h) => h,
2014                    Err(_) => {
2015                        corrupted += 1;
2016                        println!("  INVALID HEX: {}", key);
2017                        corrupted_keys.push(key.to_string());
2018                        continue;
2019                    }
2020                };
2021
2022                // Download and verify content
2023                match s3_client.get_object()
2024                    .bucket(bucket)
2025                    .key(key)
2026                    .send()
2027                    .await
2028                {
2029                    Ok(resp) => {
2030                        match resp.body.collect().await {
2031                            Ok(bytes) => {
2032                                let data = bytes.into_bytes();
2033                                let actual_hash = sha256(&data);
2034
2035                                if actual_hash == expected_hash {
2036                                    valid += 1;
2037                                } else {
2038                                    corrupted += 1;
2039                                    let actual_hex = to_hex(&actual_hash);
2040                                    println!("  CORRUPTED: key={} actual={} size={}",
2041                                        &expected_hash_hex[..16], &actual_hex[..16], data.len());
2042                                    corrupted_keys.push(key.to_string());
2043                                }
2044                            }
2045                            Err(e) => {
2046                                corrupted += 1;
2047                                println!("  READ ERROR: {} - {}", key, e);
2048                                corrupted_keys.push(key.to_string());
2049                            }
2050                        }
2051                    }
2052                    Err(e) => {
2053                        corrupted += 1;
2054                        println!("  FETCH ERROR: {} - {}", key, e);
2055                        corrupted_keys.push(key.to_string());
2056                    }
2057                }
2058
2059                // Progress indicator every 100 objects
2060                if total % 100 == 0 {
2061                    println!("  Progress: {} objects checked, {} corrupted so far", total, corrupted);
2062                }
2063            }
2064
2065            // Check if there are more objects
2066            if list_resp.is_truncated() == Some(true) {
2067                continuation_token = list_resp.next_continuation_token().map(|s| s.to_string());
2068            } else {
2069                break;
2070            }
2071        }
2072
2073        // Delete corrupted entries if requested
2074        if delete {
2075            for key in &corrupted_keys {
2076                match s3_client.delete_object()
2077                    .bucket(bucket)
2078                    .key(key)
2079                    .send()
2080                    .await
2081                {
2082                    Ok(_) => deleted += 1,
2083                    Err(e) => {
2084                        println!("  Failed to delete {}: {}", key, e);
2085                    }
2086                }
2087            }
2088        }
2089
2090        Ok(VerifyResult {
2091            total,
2092            valid,
2093            corrupted,
2094            deleted,
2095        })
2096    }
2097
2098    /// Fallback for non-S3 builds
2099    #[cfg(not(feature = "s3"))]
2100    pub async fn verify_r2_integrity(&self, _delete: bool) -> Result<VerifyResult> {
2101        Err(anyhow::anyhow!("S3 feature not enabled"))
2102    }
2103}
2104
2105/// Result of blob integrity verification
2106#[derive(Debug, Clone)]
2107pub struct VerifyResult {
2108    pub total: usize,
2109    pub valid: usize,
2110    pub corrupted: usize,
2111    pub deleted: usize,
2112}
2113
2114#[derive(Debug)]
2115pub struct StorageStats {
2116    pub total_dags: usize,
2117    pub pinned_dags: usize,
2118    pub total_bytes: u64,
2119}
2120
2121/// Storage usage broken down by priority tier
2122#[derive(Debug, Clone)]
2123pub struct StorageByPriority {
2124    /// Own/pinned trees (priority 255)
2125    pub own: u64,
2126    /// Followed users' trees (priority 128)
2127    pub followed: u64,
2128    /// Other trees (priority 64)
2129    pub other: u64,
2130}
2131
2132#[derive(Debug, Clone)]
2133pub struct FileChunkMetadata {
2134    pub total_size: u64,
2135    pub chunk_hashes: Vec<Hash>,
2136    pub chunk_sizes: Vec<u64>,
2137    pub is_chunked: bool,
2138}
2139
2140/// Owned iterator for async streaming
2141pub struct FileRangeChunksOwned {
2142    store: Arc<HashtreeStore>,
2143    metadata: FileChunkMetadata,
2144    start: u64,
2145    end: u64,
2146    current_chunk_idx: usize,
2147    current_offset: u64,
2148}
2149
2150impl Iterator for FileRangeChunksOwned {
2151    type Item = Result<Vec<u8>>;
2152
2153    fn next(&mut self) -> Option<Self::Item> {
2154        if !self.metadata.is_chunked || self.current_chunk_idx >= self.metadata.chunk_hashes.len() {
2155            return None;
2156        }
2157
2158        if self.current_offset > self.end {
2159            return None;
2160        }
2161
2162        let chunk_hash = &self.metadata.chunk_hashes[self.current_chunk_idx];
2163        let chunk_size = self.metadata.chunk_sizes[self.current_chunk_idx];
2164        let chunk_end = self.current_offset + chunk_size - 1;
2165
2166        self.current_chunk_idx += 1;
2167
2168        if chunk_end < self.start || self.current_offset > self.end {
2169            self.current_offset += chunk_size;
2170            return self.next();
2171        }
2172
2173        let chunk_content = match self.store.get_chunk(chunk_hash) {
2174            Ok(Some(content)) => content,
2175            Ok(None) => {
2176                return Some(Err(anyhow::anyhow!("Chunk {} not found", to_hex(chunk_hash))));
2177            }
2178            Err(e) => {
2179                return Some(Err(e));
2180            }
2181        };
2182
2183        let chunk_read_start = if self.current_offset >= self.start {
2184            0
2185        } else {
2186            (self.start - self.current_offset) as usize
2187        };
2188
2189        let chunk_read_end = if chunk_end <= self.end {
2190            chunk_size as usize - 1
2191        } else {
2192            (self.end - self.current_offset) as usize
2193        };
2194
2195        let result = chunk_content[chunk_read_start..=chunk_read_end].to_vec();
2196        self.current_offset += chunk_size;
2197
2198        Some(Ok(result))
2199    }
2200}
2201
2202#[derive(Debug)]
2203pub struct GcStats {
2204    pub deleted_dags: usize,
2205    pub freed_bytes: u64,
2206}
2207
2208#[derive(Debug, Clone)]
2209pub struct DirEntry {
2210    pub name: String,
2211    pub cid: String,
2212    pub is_directory: bool,
2213    pub size: u64,
2214}
2215
2216#[derive(Debug, Clone)]
2217pub struct DirectoryListing {
2218    pub dir_name: String,
2219    pub entries: Vec<DirEntry>,
2220}
2221
2222#[derive(Debug, Clone)]
2223pub struct PinnedItem {
2224    pub cid: String,
2225    pub name: String,
2226    pub is_directory: bool,
2227}
2228
2229/// Blob metadata for Blossom protocol
2230#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
2231pub struct BlobMetadata {
2232    pub sha256: String,
2233    pub size: u64,
2234    pub mime_type: String,
2235    pub uploaded: u64,
2236}
2237
2238// Implement ContentStore trait for WebRTC data exchange
2239impl crate::webrtc::ContentStore for HashtreeStore {
2240    fn get(&self, hash_hex: &str) -> Result<Option<Vec<u8>>> {
2241        let hash = from_hex(hash_hex)
2242            .map_err(|e| anyhow::anyhow!("Invalid hash: {}", e))?;
2243        self.get_chunk(&hash)
2244    }
2245}