hashtree_cli/
storage.rs

1use anyhow::{Context, Result};
2use async_trait::async_trait;
3use heed::{Database, EnvOpenOptions};
4use heed::types::*;
5use hashtree_lmdb::LmdbBlobStore;
6use hashtree_core::{
7    HashTree, HashTreeConfig, Cid,
8    sha256, to_hex, from_hex, TreeNode, DirEntry as HashTreeDirEntry,
9    types::Hash,
10};
11use hashtree_core::store::{Store, StoreError};
12use serde::{Deserialize, Serialize};
13use std::path::Path;
14use std::collections::HashSet;
15use std::io::Read;
16use std::sync::Arc;
17use std::time::{SystemTime, UNIX_EPOCH};
18use futures::executor::block_on as sync_block_on;
19
20/// Priority levels for tree eviction
21pub const PRIORITY_OTHER: u8 = 64;
22pub const PRIORITY_FOLLOWED: u8 = 128;
23pub const PRIORITY_OWN: u8 = 255;
24
25/// Metadata for a synced tree (for eviction tracking)
26#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct TreeMeta {
28    /// Pubkey of tree owner
29    pub owner: String,
30    /// Tree name if known (from nostr key like "npub.../name")
31    pub name: Option<String>,
32    /// Unix timestamp when this tree was synced
33    pub synced_at: u64,
34    /// Total size of all blobs in this tree
35    pub total_size: u64,
36    /// Eviction priority: 255=own/pinned, 128=followed, 64=other
37    pub priority: u8,
38}
39
40/// Cached root info from Nostr events (replaces nostrdb caching)
41#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct CachedRoot {
43    /// Root hash (hex)
44    pub hash: String,
45    /// Optional decryption key (hex)
46    pub key: Option<String>,
47    /// Unix timestamp when this was cached (from event created_at)
48    pub updated_at: u64,
49    /// Visibility: "public" or "private"
50    pub visibility: String,
51}
52
53#[cfg(feature = "s3")]
54use tokio::sync::mpsc;
55
56use crate::config::S3Config;
57
58/// Message for background S3 sync
59#[cfg(feature = "s3")]
60enum S3SyncMessage {
61    Upload { hash: Hash, data: Vec<u8> },
62    Delete { hash: Hash },
63}
64
65/// Storage router - LMDB primary with optional S3 backup
66///
67/// Write path: LMDB first (fast), then queue S3 upload (non-blocking)
68/// Read path: LMDB first, fall back to S3 if miss
69pub struct StorageRouter {
70    /// Primary local store (always used)
71    local: Arc<LmdbBlobStore>,
72    /// Optional S3 client for backup
73    #[cfg(feature = "s3")]
74    s3_client: Option<aws_sdk_s3::Client>,
75    #[cfg(feature = "s3")]
76    s3_bucket: Option<String>,
77    #[cfg(feature = "s3")]
78    s3_prefix: String,
79    /// Channel to send uploads to background task
80    #[cfg(feature = "s3")]
81    sync_tx: Option<mpsc::UnboundedSender<S3SyncMessage>>,
82}
83
84impl StorageRouter {
85    /// Create router with LMDB only
86    pub fn new(local: Arc<LmdbBlobStore>) -> Self {
87        Self {
88            local,
89            #[cfg(feature = "s3")]
90            s3_client: None,
91            #[cfg(feature = "s3")]
92            s3_bucket: None,
93            #[cfg(feature = "s3")]
94            s3_prefix: String::new(),
95            #[cfg(feature = "s3")]
96            sync_tx: None,
97        }
98    }
99
100    /// Create router with LMDB + S3 backup
101    #[cfg(feature = "s3")]
102    pub async fn with_s3(local: Arc<LmdbBlobStore>, config: &S3Config) -> Result<Self, anyhow::Error> {
103        use aws_sdk_s3::Client as S3Client;
104
105        // Build AWS config
106        let mut aws_config_loader = aws_config::from_env();
107        aws_config_loader = aws_config_loader.region(aws_sdk_s3::config::Region::new(config.region.clone()));
108        let aws_config = aws_config_loader.load().await;
109
110        // Build S3 client with custom endpoint
111        let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&aws_config);
112        s3_config_builder = s3_config_builder
113            .endpoint_url(&config.endpoint)
114            .force_path_style(true);
115
116        let s3_client = S3Client::from_conf(s3_config_builder.build());
117        let bucket = config.bucket.clone();
118        let prefix = config.prefix.clone().unwrap_or_default();
119
120        // Create background sync channel
121        let (sync_tx, mut sync_rx) = mpsc::unbounded_channel::<S3SyncMessage>();
122
123        // Spawn background sync task with bounded concurrent uploads
124        let sync_client = s3_client.clone();
125        let sync_bucket = bucket.clone();
126        let sync_prefix = prefix.clone();
127
128        tokio::spawn(async move {
129            use aws_sdk_s3::primitives::ByteStream;
130
131            tracing::info!("S3 background sync task started");
132
133            // Limit concurrent uploads to prevent overwhelming the runtime
134            let semaphore = std::sync::Arc::new(tokio::sync::Semaphore::new(32));
135            let client = std::sync::Arc::new(sync_client);
136            let bucket = std::sync::Arc::new(sync_bucket);
137            let prefix = std::sync::Arc::new(sync_prefix);
138
139            while let Some(msg) = sync_rx.recv().await {
140                let client = client.clone();
141                let bucket = bucket.clone();
142                let prefix = prefix.clone();
143                let semaphore = semaphore.clone();
144
145                // Spawn each upload with semaphore-bounded concurrency
146                tokio::spawn(async move {
147                    // Acquire permit before uploading
148                    let _permit = semaphore.acquire().await;
149
150                    match msg {
151                        S3SyncMessage::Upload { hash, data } => {
152                            let key = format!("{}{}.bin", prefix, to_hex(&hash));
153                            tracing::debug!("S3 uploading {} ({} bytes)", &key, data.len());
154
155                            match client
156                                .put_object()
157                                .bucket(bucket.as_str())
158                                .key(&key)
159                                .body(ByteStream::from(data))
160                                .send()
161                                .await
162                            {
163                                Ok(_) => tracing::debug!("S3 upload succeeded: {}", &key),
164                                Err(e) => tracing::error!("S3 upload failed {}: {}", &key, e),
165                            }
166                        }
167                        S3SyncMessage::Delete { hash } => {
168                            let key = format!("{}{}.bin", prefix, to_hex(&hash));
169                            tracing::debug!("S3 deleting {}", &key);
170
171                            if let Err(e) = client
172                                .delete_object()
173                                .bucket(bucket.as_str())
174                                .key(&key)
175                                .send()
176                                .await
177                            {
178                                tracing::error!("S3 delete failed {}: {}", &key, e);
179                            }
180                        }
181                    }
182                });
183            }
184        });
185
186        tracing::info!("S3 storage initialized: bucket={}, prefix={}", bucket, prefix);
187
188        Ok(Self {
189            local,
190            s3_client: Some(s3_client),
191            s3_bucket: Some(bucket),
192            s3_prefix: prefix,
193            sync_tx: Some(sync_tx),
194        })
195    }
196
197    /// Store data - writes to LMDB, queues S3 upload in background
198    pub fn put_sync(&self, hash: Hash, data: &[u8]) -> Result<bool, StoreError> {
199        // Always write to local first
200        let is_new = self.local.put_sync(hash, data)?;
201
202        // Queue S3 upload if configured (non-blocking)
203        // Always upload to S3 (even if not new locally) to ensure S3 has all blobs
204        #[cfg(feature = "s3")]
205        if let Some(ref tx) = self.sync_tx {
206            tracing::info!("Queueing S3 upload for {} ({} bytes, is_new={})",
207                crate::storage::to_hex(&hash)[..16].to_string(), data.len(), is_new);
208            if let Err(e) = tx.send(S3SyncMessage::Upload { hash, data: data.to_vec() }) {
209                tracing::error!("Failed to queue S3 upload: {}", e);
210            }
211        }
212
213        Ok(is_new)
214    }
215
216    /// Get data - tries LMDB first, falls back to S3
217    pub fn get_sync(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
218        // Try local first
219        if let Some(data) = self.local.get_sync(hash)? {
220            return Ok(Some(data));
221        }
222
223        // Fall back to S3 if configured
224        #[cfg(feature = "s3")]
225        if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
226            let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
227
228            match sync_block_on(async {
229                client.get_object()
230                    .bucket(bucket)
231                    .key(&key)
232                    .send()
233                    .await
234            }) {
235                Ok(output) => {
236                    if let Ok(body) = sync_block_on(output.body.collect()) {
237                        let data = body.into_bytes().to_vec();
238                        // Cache locally for future reads
239                        let _ = self.local.put_sync(*hash, &data);
240                        return Ok(Some(data));
241                    }
242                }
243                Err(e) => {
244                    let service_err = e.into_service_error();
245                    if !service_err.is_no_such_key() {
246                        tracing::warn!("S3 get failed: {}", service_err);
247                    }
248                }
249            }
250        }
251
252        Ok(None)
253    }
254
255    /// Check if hash exists
256    pub fn exists(&self, hash: &Hash) -> Result<bool, StoreError> {
257        // Check local first
258        if self.local.exists(hash)? {
259            return Ok(true);
260        }
261
262        // Check S3 if configured
263        #[cfg(feature = "s3")]
264        if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
265            let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
266
267            match sync_block_on(async {
268                client.head_object()
269                    .bucket(bucket)
270                    .key(&key)
271                    .send()
272                    .await
273            }) {
274                Ok(_) => return Ok(true),
275                Err(e) => {
276                    let service_err = e.into_service_error();
277                    if !service_err.is_not_found() {
278                        tracing::warn!("S3 head failed: {}", service_err);
279                    }
280                }
281            }
282        }
283
284        Ok(false)
285    }
286
287    /// Delete data from both local and S3 stores
288    pub fn delete_sync(&self, hash: &Hash) -> Result<bool, StoreError> {
289        let deleted = self.local.delete_sync(hash)?;
290
291        // Queue S3 delete if configured
292        #[cfg(feature = "s3")]
293        if let Some(ref tx) = self.sync_tx {
294            let _ = tx.send(S3SyncMessage::Delete { hash: *hash });
295        }
296
297        Ok(deleted)
298    }
299
300    /// Delete data from local store only (don't propagate to S3)
301    /// Used for eviction where we want to keep S3 as archive
302    pub fn delete_local_only(&self, hash: &Hash) -> Result<bool, StoreError> {
303        self.local.delete_sync(hash)
304    }
305
306    /// Get stats from local store
307    pub fn stats(&self) -> Result<hashtree_lmdb::LmdbStats, StoreError> {
308        self.local.stats()
309    }
310
311    /// List all hashes from local store
312    pub fn list(&self) -> Result<Vec<Hash>, StoreError> {
313        self.local.list()
314    }
315
316    /// Get the underlying LMDB store for HashTree operations
317    pub fn local_store(&self) -> Arc<LmdbBlobStore> {
318        Arc::clone(&self.local)
319    }
320}
321
322// Implement async Store trait for StorageRouter so it can be used directly with HashTree
323// This ensures all writes go through S3 sync
324#[async_trait]
325impl Store for StorageRouter {
326    async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
327        self.put_sync(hash, &data)
328    }
329
330    async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
331        self.get_sync(hash)
332    }
333
334    async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
335        self.exists(hash)
336    }
337
338    async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
339        self.delete_sync(hash)
340    }
341}
342
343pub struct HashtreeStore {
344    env: heed::Env,
345    /// Set of pinned hashes (32-byte raw hashes, prevents garbage collection)
346    pins: Database<Bytes, Unit>,
347    /// Maps SHA256 (32 bytes) -> root hash (32 bytes) for blossom compatibility
348    sha256_index: Database<Bytes, Bytes>,
349    /// Blob ownership: sha256 (32 bytes) ++ pubkey (32 bytes) -> () (composite key for multi-owner)
350    blob_owners: Database<Bytes, Unit>,
351    /// Maps pubkey (32 bytes) -> blob metadata JSON (for blossom list)
352    pubkey_blobs: Database<Bytes, Bytes>,
353    /// Tree metadata for eviction: tree_root_hash (32 bytes) -> TreeMeta (msgpack)
354    tree_meta: Database<Bytes, Bytes>,
355    /// Blob-to-tree mapping: blob_hash ++ tree_hash (64 bytes) -> ()
356    blob_trees: Database<Bytes, Unit>,
357    /// Tree refs: "npub/path" -> tree_root_hash (32 bytes) - for replacing old versions
358    tree_refs: Database<Str, Bytes>,
359    /// Cached roots from Nostr: "pubkey_hex/tree_name" -> CachedRoot (msgpack)
360    cached_roots: Database<Str, Bytes>,
361    /// Storage router - handles LMDB + optional S3 (Arc for sharing with HashTree)
362    router: Arc<StorageRouter>,
363    /// Maximum storage size in bytes (from config)
364    max_size_bytes: u64,
365}
366
367impl HashtreeStore {
368    /// Create a new store with local LMDB storage only (10GB default limit)
369    pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
370        Self::with_options(path, None, 10 * 1024 * 1024 * 1024)
371    }
372
373    /// Create a new store with optional S3 backend (10GB default limit)
374    pub fn with_s3<P: AsRef<Path>>(path: P, s3_config: Option<&S3Config>) -> Result<Self> {
375        Self::with_options(path, s3_config, 10 * 1024 * 1024 * 1024)
376    }
377
378    /// Create a new store with optional S3 backend and custom size limit
379    pub fn with_options<P: AsRef<Path>>(path: P, s3_config: Option<&S3Config>, max_size_bytes: u64) -> Result<Self> {
380        let path = path.as_ref();
381        std::fs::create_dir_all(path)?;
382
383        let env = unsafe {
384            EnvOpenOptions::new()
385                .map_size(10 * 1024 * 1024 * 1024) // 10GB virtual address space
386                .max_dbs(9)  // pins, sha256_index, blob_owners, pubkey_blobs, tree_meta, blob_trees, tree_refs, cached_roots, blobs
387                .open(path)?
388        };
389
390        let mut wtxn = env.write_txn()?;
391        let pins = env.create_database(&mut wtxn, Some("pins"))?;
392        let sha256_index = env.create_database(&mut wtxn, Some("sha256_index"))?;
393        let blob_owners = env.create_database(&mut wtxn, Some("blob_owners"))?;
394        let pubkey_blobs = env.create_database(&mut wtxn, Some("pubkey_blobs"))?;
395        let tree_meta = env.create_database(&mut wtxn, Some("tree_meta"))?;
396        let blob_trees = env.create_database(&mut wtxn, Some("blob_trees"))?;
397        let tree_refs = env.create_database(&mut wtxn, Some("tree_refs"))?;
398        let cached_roots = env.create_database(&mut wtxn, Some("cached_roots"))?;
399        wtxn.commit()?;
400
401        // Create local LMDB blob store
402        let lmdb_store = Arc::new(LmdbBlobStore::new(path.join("blobs"))
403            .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?);
404
405        // Create storage router with optional S3
406        #[cfg(feature = "s3")]
407        let router = Arc::new(if let Some(s3_cfg) = s3_config {
408            tracing::info!("Initializing S3 storage backend: bucket={}, endpoint={}",
409                s3_cfg.bucket, s3_cfg.endpoint);
410
411            sync_block_on(async {
412                StorageRouter::with_s3(lmdb_store, s3_cfg).await
413            })?
414        } else {
415            StorageRouter::new(lmdb_store)
416        });
417
418        #[cfg(not(feature = "s3"))]
419        let router = Arc::new({
420            if s3_config.is_some() {
421                tracing::warn!("S3 config provided but S3 feature not enabled. Using local storage only.");
422            }
423            StorageRouter::new(lmdb_store)
424        });
425
426        Ok(Self {
427            env,
428            pins,
429            sha256_index,
430            blob_owners,
431            pubkey_blobs,
432            tree_meta,
433            blob_trees,
434            tree_refs,
435            cached_roots,
436            router,
437            max_size_bytes,
438        })
439    }
440
441    /// Get the storage router
442    pub fn router(&self) -> &StorageRouter {
443        &self.router
444    }
445
446    /// Get the storage router as Arc (for use with HashTree which needs Arc<dyn Store>)
447    /// All writes through this go to both LMDB and S3
448    pub fn store_arc(&self) -> Arc<StorageRouter> {
449        Arc::clone(&self.router)
450    }
451
452    /// Upload a file and return its CID (public/unencrypted), with auto-pin
453    pub fn upload_file<P: AsRef<Path>>(&self, file_path: P) -> Result<String> {
454        self.upload_file_internal(file_path, true)
455    }
456
457    /// Upload a file without pinning (for blossom uploads that can be evicted)
458    pub fn upload_file_no_pin<P: AsRef<Path>>(&self, file_path: P) -> Result<String> {
459        self.upload_file_internal(file_path, false)
460    }
461
462    fn upload_file_internal<P: AsRef<Path>>(&self, file_path: P, pin: bool) -> Result<String> {
463        let file_path = file_path.as_ref();
464        let file_content = std::fs::read(file_path)?;
465
466        // Compute SHA256 hash of file content for blossom compatibility
467        let content_sha256 = sha256(&file_content);
468
469        // Use hashtree to store the file (public mode - no encryption)
470        let store = self.store_arc();
471        let tree = HashTree::new(HashTreeConfig::new(store).public());
472
473        let cid = sync_block_on(async {
474            tree.put(&file_content).await
475        }).context("Failed to store file")?;
476
477        let mut wtxn = self.env.write_txn()?;
478
479        // Store SHA256 -> root hash mapping for blossom compatibility
480        self.sha256_index.put(&mut wtxn, &content_sha256, &cid.hash)?;
481
482        // Only pin if requested (htree add = pin, blossom upload = no pin)
483        if pin {
484            self.pins.put(&mut wtxn, cid.hash.as_slice(), &())?;
485        }
486
487        wtxn.commit()?;
488
489        Ok(to_hex(&cid.hash))
490    }
491
492    /// Upload a file from a stream with progress callbacks
493    pub fn upload_file_stream<R: Read, F>(
494        &self,
495        mut reader: R,
496        _file_name: impl Into<String>,
497        mut callback: F,
498    ) -> Result<String>
499    where
500        F: FnMut(&str),
501    {
502        // Read all data first to compute SHA256
503        let mut data = Vec::new();
504        reader.read_to_end(&mut data)?;
505
506        // Compute SHA256 hash of file content for blossom compatibility
507        let content_sha256 = sha256(&data);
508
509        // Use HashTree.put for upload (public mode for blossom)
510        let store = self.store_arc();
511        let tree = HashTree::new(HashTreeConfig::new(store).public());
512
513        let cid = sync_block_on(async {
514            tree.put(&data).await
515        }).context("Failed to store file")?;
516
517        let root_hex = to_hex(&cid.hash);
518        callback(&root_hex);
519
520        let mut wtxn = self.env.write_txn()?;
521
522        // Store SHA256 -> root hash mapping for blossom compatibility
523        self.sha256_index.put(&mut wtxn, &content_sha256, &cid.hash)?;
524
525        // Auto-pin on upload
526        self.pins.put(&mut wtxn, cid.hash.as_slice(), &())?;
527
528        wtxn.commit()?;
529
530        Ok(root_hex)
531    }
532
533    /// Upload a directory and return its root hash (hex)
534    /// Respects .gitignore by default
535    pub fn upload_dir<P: AsRef<Path>>(&self, dir_path: P) -> Result<String> {
536        self.upload_dir_with_options(dir_path, true)
537    }
538
539    /// Upload a directory with options (public mode - no encryption)
540    pub fn upload_dir_with_options<P: AsRef<Path>>(&self, dir_path: P, respect_gitignore: bool) -> Result<String> {
541        let dir_path = dir_path.as_ref();
542
543        let store = self.store_arc();
544        let tree = HashTree::new(HashTreeConfig::new(store).public());
545
546        let root_cid = sync_block_on(async {
547            self.upload_dir_recursive(&tree, dir_path, dir_path, respect_gitignore).await
548        }).context("Failed to upload directory")?;
549
550        let root_hex = to_hex(&root_cid.hash);
551
552        let mut wtxn = self.env.write_txn()?;
553        self.pins.put(&mut wtxn, root_cid.hash.as_slice(), &())?;
554        wtxn.commit()?;
555
556        Ok(root_hex)
557    }
558
559    async fn upload_dir_recursive<S: Store>(
560        &self,
561        tree: &HashTree<S>,
562        _root_path: &Path,
563        current_path: &Path,
564        respect_gitignore: bool,
565    ) -> Result<Cid> {
566        use ignore::WalkBuilder;
567        use std::collections::HashMap;
568
569        // Build directory structure from flat file list - store full Cid with key
570        let mut dir_contents: HashMap<String, Vec<(String, Cid)>> = HashMap::new();
571        dir_contents.insert(String::new(), Vec::new()); // Root
572
573        let walker = WalkBuilder::new(current_path)
574            .git_ignore(respect_gitignore)
575            .git_global(respect_gitignore)
576            .git_exclude(respect_gitignore)
577            .hidden(false)
578            .build();
579
580        for result in walker {
581            let entry = result?;
582            let path = entry.path();
583
584            // Skip the root directory itself
585            if path == current_path {
586                continue;
587            }
588
589            let relative = path.strip_prefix(current_path)
590                .unwrap_or(path);
591
592            if path.is_file() {
593                let content = std::fs::read(path)?;
594                let cid = tree.put(&content).await
595                    .map_err(|e| anyhow::anyhow!("Failed to upload file {}: {}", path.display(), e))?;
596
597                // Get parent directory path and file name
598                let parent = relative.parent()
599                    .map(|p| p.to_string_lossy().to_string())
600                    .unwrap_or_default();
601                let name = relative.file_name()
602                    .map(|n| n.to_string_lossy().to_string())
603                    .unwrap_or_default();
604
605                dir_contents.entry(parent).or_default().push((name, cid));
606            } else if path.is_dir() {
607                // Ensure directory entry exists
608                let dir_path = relative.to_string_lossy().to_string();
609                dir_contents.entry(dir_path).or_default();
610            }
611        }
612
613        // Build directory tree bottom-up
614        self.build_directory_tree(tree, &mut dir_contents).await
615    }
616
617    async fn build_directory_tree<S: Store>(
618        &self,
619        tree: &HashTree<S>,
620        dir_contents: &mut std::collections::HashMap<String, Vec<(String, Cid)>>,
621    ) -> Result<Cid> {
622        // Sort directories by depth (deepest first) to build bottom-up
623        let mut dirs: Vec<String> = dir_contents.keys().cloned().collect();
624        dirs.sort_by(|a, b| {
625            let depth_a = a.matches('/').count() + if a.is_empty() { 0 } else { 1 };
626            let depth_b = b.matches('/').count() + if b.is_empty() { 0 } else { 1 };
627            depth_b.cmp(&depth_a) // Deepest first
628        });
629
630        let mut dir_cids: std::collections::HashMap<String, Cid> = std::collections::HashMap::new();
631
632        for dir_path in dirs {
633            let files = dir_contents.get(&dir_path).cloned().unwrap_or_default();
634
635            let mut entries: Vec<HashTreeDirEntry> = files.into_iter()
636                .map(|(name, cid)| HashTreeDirEntry::from_cid(name, &cid))
637                .collect();
638
639            // Add subdirectory entries
640            for (subdir_path, cid) in &dir_cids {
641                let parent = std::path::Path::new(subdir_path)
642                    .parent()
643                    .map(|p| p.to_string_lossy().to_string())
644                    .unwrap_or_default();
645
646                if parent == dir_path {
647                    let name = std::path::Path::new(subdir_path)
648                        .file_name()
649                        .map(|n| n.to_string_lossy().to_string())
650                        .unwrap_or_default();
651                    entries.push(HashTreeDirEntry::from_cid(name, cid));
652                }
653            }
654
655            let cid = tree.put_directory(entries).await
656                .map_err(|e| anyhow::anyhow!("Failed to create directory node: {}", e))?;
657
658            dir_cids.insert(dir_path, cid);
659        }
660
661        // Return root Cid
662        dir_cids.get("")
663            .cloned()
664            .ok_or_else(|| anyhow::anyhow!("No root directory"))
665    }
666
667    /// Upload a file with CHK encryption, returns CID in format "hash:key"
668    pub fn upload_file_encrypted<P: AsRef<Path>>(&self, file_path: P) -> Result<String> {
669        let file_path = file_path.as_ref();
670        let file_content = std::fs::read(file_path)?;
671
672        // Use unified API with encryption enabled (default)
673        let store = self.store_arc();
674        let tree = HashTree::new(HashTreeConfig::new(store));
675
676        let cid = sync_block_on(async {
677            tree.put(&file_content).await
678        }).map_err(|e| anyhow::anyhow!("Failed to encrypt file: {}", e))?;
679
680        let cid_str = cid.to_string();
681
682        let mut wtxn = self.env.write_txn()?;
683        self.pins.put(&mut wtxn, cid.hash.as_slice(), &())?;
684        wtxn.commit()?;
685
686        Ok(cid_str)
687    }
688
689    /// Upload a directory with CHK encryption, returns CID
690    /// Respects .gitignore by default
691    pub fn upload_dir_encrypted<P: AsRef<Path>>(&self, dir_path: P) -> Result<String> {
692        self.upload_dir_encrypted_with_options(dir_path, true)
693    }
694
695    /// Upload a directory with CHK encryption and options
696    /// Returns CID as "hash:key" format for encrypted directories
697    pub fn upload_dir_encrypted_with_options<P: AsRef<Path>>(&self, dir_path: P, respect_gitignore: bool) -> Result<String> {
698        let dir_path = dir_path.as_ref();
699        let store = self.store_arc();
700
701        // Use unified API with encryption enabled (default)
702        let tree = HashTree::new(HashTreeConfig::new(store));
703
704        let root_cid = sync_block_on(async {
705            self.upload_dir_recursive(&tree, dir_path, dir_path, respect_gitignore).await
706        }).context("Failed to upload encrypted directory")?;
707
708        let cid_str = root_cid.to_string(); // Returns "hash:key" or "hash"
709
710        let mut wtxn = self.env.write_txn()?;
711        // Pin by hash only (the key is for decryption, not identification)
712        self.pins.put(&mut wtxn, root_cid.hash.as_slice(), &())?;
713        wtxn.commit()?;
714
715        Ok(cid_str)
716    }
717
718    /// Get tree node by hash (raw bytes)
719    pub fn get_tree_node(&self, hash: &[u8; 32]) -> Result<Option<TreeNode>> {
720        let store = self.store_arc();
721        let tree = HashTree::new(HashTreeConfig::new(store).public());
722
723        sync_block_on(async {
724            tree.get_tree_node(hash).await
725                .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))
726        })
727    }
728
729    /// Look up root hash by SHA256 hash (blossom compatibility)
730    pub fn get_cid_by_sha256(&self, sha256: &[u8; 32]) -> Result<Option<Hash>> {
731        let rtxn = self.env.read_txn()?;
732        Ok(self.sha256_index.get(&rtxn, sha256)?.map(|bytes| {
733            let mut hash = [0u8; 32];
734            hash.copy_from_slice(bytes);
735            hash
736        }))
737    }
738
739    /// Store a raw blob, returns SHA256 hash as hex.
740    pub fn put_blob(&self, data: &[u8]) -> Result<String> {
741        let hash = sha256(data);
742        self.router.put_sync(hash, data)
743            .map_err(|e| anyhow::anyhow!("Failed to store blob: {}", e))?;
744        Ok(to_hex(&hash))
745    }
746
747    /// Get a raw blob by SHA256 hash (raw bytes).
748    pub fn get_blob(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
749        self.router.get_sync(hash)
750            .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))
751    }
752
753    /// Check if a blob exists by SHA256 hash (raw bytes).
754    pub fn blob_exists(&self, hash: &[u8; 32]) -> Result<bool> {
755        self.router.exists(hash)
756            .map_err(|e| anyhow::anyhow!("Failed to check blob: {}", e))
757    }
758
759    // === Blossom ownership tracking ===
760    // Uses composite key: sha256 (32 bytes) ++ pubkey (32 bytes) -> ()
761    // This allows efficient multi-owner tracking with O(1) lookups
762
763    /// Build composite key for blob_owners: sha256 ++ pubkey (64 bytes total)
764    fn blob_owner_key(sha256: &[u8; 32], pubkey: &[u8; 32]) -> [u8; 64] {
765        let mut key = [0u8; 64];
766        key[..32].copy_from_slice(sha256);
767        key[32..].copy_from_slice(pubkey);
768        key
769    }
770
771    /// Add an owner (pubkey) to a blob for Blossom protocol
772    /// Multiple users can own the same blob - it's only deleted when all owners remove it
773    pub fn set_blob_owner(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<()> {
774        let key = Self::blob_owner_key(sha256, pubkey);
775        let mut wtxn = self.env.write_txn()?;
776
777        // Add ownership entry (idempotent - put overwrites)
778        self.blob_owners.put(&mut wtxn, &key[..], &())?;
779
780        // Convert sha256 to hex for BlobMetadata (which stores sha256 as hex string)
781        let sha256_hex = to_hex(sha256);
782
783        // Get existing blobs for this pubkey (for /list endpoint)
784        let mut blobs: Vec<BlobMetadata> = self
785            .pubkey_blobs
786            .get(&wtxn, pubkey)?
787            .and_then(|b| serde_json::from_slice(b).ok())
788            .unwrap_or_default();
789
790        // Check if blob already exists for this pubkey
791        if !blobs.iter().any(|b| b.sha256 == sha256_hex) {
792            let now = SystemTime::now()
793                .duration_since(UNIX_EPOCH)
794                .unwrap()
795                .as_secs();
796
797            // Get size from root hash lookup
798            let size = self
799                .get_cid_by_sha256(sha256)?
800                .and_then(|root_hash| self.get_file_chunk_metadata(&root_hash).ok().flatten())
801                .map(|m| m.total_size)
802                .unwrap_or(0);
803
804            blobs.push(BlobMetadata {
805                sha256: sha256_hex,
806                size,
807                mime_type: "application/octet-stream".to_string(),
808                uploaded: now,
809            });
810
811            let blobs_json = serde_json::to_vec(&blobs)?;
812            self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
813        }
814
815        wtxn.commit()?;
816        Ok(())
817    }
818
819    /// Check if a pubkey owns a blob
820    pub fn is_blob_owner(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<bool> {
821        let key = Self::blob_owner_key(sha256, pubkey);
822        let rtxn = self.env.read_txn()?;
823        Ok(self.blob_owners.get(&rtxn, &key[..])?.is_some())
824    }
825
826    /// Get all owners (pubkeys) of a blob via prefix scan (returns raw bytes)
827    pub fn get_blob_owners(&self, sha256: &[u8; 32]) -> Result<Vec<[u8; 32]>> {
828        let rtxn = self.env.read_txn()?;
829
830        let mut owners = Vec::new();
831        for item in self.blob_owners.prefix_iter(&rtxn, &sha256[..])? {
832            let (key, _) = item?;
833            if key.len() == 64 {
834                // Extract pubkey from composite key (bytes 32-64)
835                let mut pubkey = [0u8; 32];
836                pubkey.copy_from_slice(&key[32..64]);
837                owners.push(pubkey);
838            }
839        }
840        Ok(owners)
841    }
842
843    /// Check if blob has any owners
844    pub fn blob_has_owners(&self, sha256: &[u8; 32]) -> Result<bool> {
845        let rtxn = self.env.read_txn()?;
846
847        // Just check if any entry exists with this prefix
848        for item in self.blob_owners.prefix_iter(&rtxn, &sha256[..])? {
849            if item.is_ok() {
850                return Ok(true);
851            }
852        }
853        Ok(false)
854    }
855
856    /// Get the first owner (pubkey) of a blob (for backwards compatibility)
857    pub fn get_blob_owner(&self, sha256: &[u8; 32]) -> Result<Option<[u8; 32]>> {
858        Ok(self.get_blob_owners(sha256)?.into_iter().next())
859    }
860
861    /// Remove a user's ownership of a blossom blob
862    /// Only deletes the actual blob when no owners remain
863    /// Returns true if the blob was actually deleted (no owners left)
864    pub fn delete_blossom_blob(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<bool> {
865        let key = Self::blob_owner_key(sha256, pubkey);
866        let mut wtxn = self.env.write_txn()?;
867
868        // Remove this pubkey's ownership entry
869        self.blob_owners.delete(&mut wtxn, &key[..])?;
870
871        // Hex strings for logging and BlobMetadata (which stores sha256 as hex string)
872        let sha256_hex = to_hex(sha256);
873
874        // Remove from pubkey's blob list
875        if let Some(blobs_bytes) = self.pubkey_blobs.get(&wtxn, pubkey)? {
876            if let Ok(mut blobs) = serde_json::from_slice::<Vec<BlobMetadata>>(blobs_bytes) {
877                blobs.retain(|b| b.sha256 != sha256_hex);
878                let blobs_json = serde_json::to_vec(&blobs)?;
879                self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
880            }
881        }
882
883        // Check if any other owners remain (prefix scan)
884        let mut has_other_owners = false;
885        for item in self.blob_owners.prefix_iter(&wtxn, &sha256[..])? {
886            if item.is_ok() {
887                has_other_owners = true;
888                break;
889            }
890        }
891
892        if has_other_owners {
893            wtxn.commit()?;
894            tracing::debug!(
895                "Removed {} from blob {} owners, other owners remain",
896                &to_hex(pubkey)[..8],
897                &sha256_hex[..8]
898            );
899            return Ok(false);
900        }
901
902        // No owners left - delete the blob completely
903        tracing::info!(
904            "All owners removed from blob {}, deleting",
905            &sha256_hex[..8]
906        );
907
908        // Delete from sha256_index and unpin
909        let root_hash: Option<Hash> = self.sha256_index.get(&wtxn, sha256)?.map(|bytes| {
910            let mut hash = [0u8; 32];
911            hash.copy_from_slice(bytes);
912            hash
913        });
914        if let Some(ref hash) = root_hash {
915            self.pins.delete(&mut wtxn, hash)?;
916        }
917        self.sha256_index.delete(&mut wtxn, sha256)?;
918
919        // Delete raw blob (by content hash) - this deletes from S3 too
920        let _ = self.router.delete_sync(sha256);
921
922        wtxn.commit()?;
923        Ok(true)
924    }
925
926    /// List all blobs owned by a pubkey (for Blossom /list endpoint)
927    pub fn list_blobs_by_pubkey(&self, pubkey: &[u8; 32]) -> Result<Vec<crate::server::blossom::BlobDescriptor>> {
928        let rtxn = self.env.read_txn()?;
929
930        let blobs: Vec<BlobMetadata> = self
931            .pubkey_blobs
932            .get(&rtxn, pubkey)?
933            .and_then(|b| serde_json::from_slice(b).ok())
934            .unwrap_or_default();
935
936        Ok(blobs
937            .into_iter()
938            .map(|b| crate::server::blossom::BlobDescriptor {
939                url: format!("/{}", b.sha256),
940                sha256: b.sha256,
941                size: b.size,
942                mime_type: b.mime_type,
943                uploaded: b.uploaded,
944            })
945            .collect())
946    }
947
948    /// Get a single chunk/blob by hash (raw bytes)
949    pub fn get_chunk(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
950        self.router.get_sync(hash)
951            .map_err(|e| anyhow::anyhow!("Failed to get chunk: {}", e))
952    }
953
954    /// Get file content by hash (raw bytes)
955    /// Returns raw bytes (caller handles decryption if needed)
956    pub fn get_file(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
957        let store = self.store_arc();
958        let tree = HashTree::new(HashTreeConfig::new(store).public());
959
960        sync_block_on(async {
961            tree.read_file(hash).await
962                .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
963        })
964    }
965
966    /// Get file content by Cid (hash + optional decryption key as raw bytes)
967    /// Handles decryption automatically if key is present
968    pub fn get_file_by_cid(&self, cid: &Cid) -> Result<Option<Vec<u8>>> {
969        let store = self.store_arc();
970        let tree = HashTree::new(HashTreeConfig::new(store).public());
971
972        sync_block_on(async {
973            tree.get(cid).await
974                .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
975        })
976    }
977
978    /// Get chunk metadata for a file (chunk list, sizes, total size)
979    pub fn get_file_chunk_metadata(&self, hash: &[u8; 32]) -> Result<Option<FileChunkMetadata>> {
980        let store = self.store_arc();
981        let tree = HashTree::new(HashTreeConfig::new(store.clone()).public());
982
983        sync_block_on(async {
984            // First check if the hash exists in the store at all
985            // (either as a blob or tree node)
986            let exists = store.has(&hash).await
987                .map_err(|e| anyhow::anyhow!("Failed to check existence: {}", e))?;
988
989            if !exists {
990                return Ok(None);
991            }
992
993            // Get total size
994            let total_size = tree.get_size(&hash).await
995                .map_err(|e| anyhow::anyhow!("Failed to get size: {}", e))?;
996
997            // Check if it's a tree (chunked) or blob
998            let is_tree_node = tree.is_tree(&hash).await
999                .map_err(|e| anyhow::anyhow!("Failed to check tree: {}", e))?;
1000
1001            if !is_tree_node {
1002                // Single blob, not chunked
1003                return Ok(Some(FileChunkMetadata {
1004                    total_size,
1005                    chunk_hashes: vec![],
1006                    chunk_sizes: vec![],
1007                    is_chunked: false,
1008                }));
1009            }
1010
1011            // Get tree node to extract chunk info
1012            let node = match tree.get_tree_node(&hash).await
1013                .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))? {
1014                Some(n) => n,
1015                None => return Ok(None),
1016            };
1017
1018            // Check if it's a directory (has named links)
1019            let is_directory = tree.is_directory(&hash).await
1020                .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1021
1022            if is_directory {
1023                return Ok(None); // Not a file
1024            }
1025
1026            // Extract chunk info from links
1027            let chunk_hashes: Vec<Hash> = node.links.iter().map(|l| l.hash).collect();
1028            let chunk_sizes: Vec<u64> = node.links.iter().map(|l| l.size).collect();
1029
1030            Ok(Some(FileChunkMetadata {
1031                total_size,
1032                chunk_hashes,
1033                chunk_sizes,
1034                is_chunked: !node.links.is_empty(),
1035            }))
1036        })
1037    }
1038
1039    /// Get byte range from file
1040    pub fn get_file_range(&self, hash: &[u8; 32], start: u64, end: Option<u64>) -> Result<Option<(Vec<u8>, u64)>> {
1041        let metadata = match self.get_file_chunk_metadata(hash)? {
1042            Some(m) => m,
1043            None => return Ok(None),
1044        };
1045
1046        if metadata.total_size == 0 {
1047            return Ok(Some((Vec::new(), 0)));
1048        }
1049
1050        if start >= metadata.total_size {
1051            return Ok(None);
1052        }
1053
1054        let end = end.unwrap_or(metadata.total_size - 1).min(metadata.total_size - 1);
1055
1056        // For non-chunked files, load entire file
1057        if !metadata.is_chunked {
1058            let content = self.get_file(hash)?.unwrap_or_default();
1059            let range_content = if start < content.len() as u64 {
1060                content[start as usize..=(end as usize).min(content.len() - 1)].to_vec()
1061            } else {
1062                Vec::new()
1063            };
1064            return Ok(Some((range_content, metadata.total_size)));
1065        }
1066
1067        // For chunked files, load only needed chunks
1068        let mut result = Vec::new();
1069        let mut current_offset = 0u64;
1070
1071        for (i, chunk_hash) in metadata.chunk_hashes.iter().enumerate() {
1072            let chunk_size = metadata.chunk_sizes[i];
1073            let chunk_end = current_offset + chunk_size - 1;
1074
1075            // Check if this chunk overlaps with requested range
1076            if chunk_end >= start && current_offset <= end {
1077                let chunk_content = match self.get_chunk(chunk_hash)? {
1078                    Some(content) => content,
1079                    None => {
1080                        return Err(anyhow::anyhow!("Chunk {} not found", to_hex(chunk_hash)));
1081                    }
1082                };
1083
1084                let chunk_read_start = if current_offset >= start {
1085                    0
1086                } else {
1087                    (start - current_offset) as usize
1088                };
1089
1090                let chunk_read_end = if chunk_end <= end {
1091                    chunk_size as usize - 1
1092                } else {
1093                    (end - current_offset) as usize
1094                };
1095
1096                result.extend_from_slice(&chunk_content[chunk_read_start..=chunk_read_end]);
1097            }
1098
1099            current_offset += chunk_size;
1100
1101            if current_offset > end {
1102                break;
1103            }
1104        }
1105
1106        Ok(Some((result, metadata.total_size)))
1107    }
1108
1109    /// Stream file range as chunks using Arc for async/Send contexts
1110    pub fn stream_file_range_chunks_owned(
1111        self: Arc<Self>,
1112        hash: &[u8; 32],
1113        start: u64,
1114        end: u64,
1115    ) -> Result<Option<FileRangeChunksOwned>> {
1116        let metadata = match self.get_file_chunk_metadata(hash)? {
1117            Some(m) => m,
1118            None => return Ok(None),
1119        };
1120
1121        if metadata.total_size == 0 || start >= metadata.total_size {
1122            return Ok(None);
1123        }
1124
1125        let end = end.min(metadata.total_size - 1);
1126
1127        Ok(Some(FileRangeChunksOwned {
1128            store: self,
1129            metadata,
1130            start,
1131            end,
1132            current_chunk_idx: 0,
1133            current_offset: 0,
1134        }))
1135    }
1136
1137    /// Get directory structure by hash (raw bytes)
1138    pub fn get_directory_listing(&self, hash: &[u8; 32]) -> Result<Option<DirectoryListing>> {
1139        let store = self.store_arc();
1140        let tree = HashTree::new(HashTreeConfig::new(store).public());
1141
1142        sync_block_on(async {
1143            // Check if it's a directory
1144            let is_dir = tree.is_directory(&hash).await
1145                .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1146
1147            if !is_dir {
1148                return Ok(None);
1149            }
1150
1151            // Get directory entries (public Cid - no encryption key)
1152            let cid = hashtree_core::Cid::public(*hash, 0);
1153            let tree_entries = tree.list_directory(&cid).await
1154                .map_err(|e| anyhow::anyhow!("Failed to list directory: {}", e))?;
1155
1156            let entries: Vec<DirEntry> = tree_entries.into_iter().map(|e| DirEntry {
1157                name: e.name,
1158                cid: to_hex(&e.hash),
1159                is_directory: e.link_type.is_tree(),
1160                size: e.size,
1161            }).collect();
1162
1163            Ok(Some(DirectoryListing {
1164                dir_name: String::new(),
1165                entries,
1166            }))
1167        })
1168    }
1169
1170    /// Pin a hash (prevent garbage collection)
1171    pub fn pin(&self, hash: &[u8; 32]) -> Result<()> {
1172        let mut wtxn = self.env.write_txn()?;
1173        self.pins.put(&mut wtxn, hash.as_slice(), &())?;
1174        wtxn.commit()?;
1175        Ok(())
1176    }
1177
1178    /// Unpin a hash (allow garbage collection)
1179    pub fn unpin(&self, hash: &[u8; 32]) -> Result<()> {
1180        let mut wtxn = self.env.write_txn()?;
1181        self.pins.delete(&mut wtxn, hash.as_slice())?;
1182        wtxn.commit()?;
1183        Ok(())
1184    }
1185
1186    /// Check if hash is pinned
1187    pub fn is_pinned(&self, hash: &[u8; 32]) -> Result<bool> {
1188        let rtxn = self.env.read_txn()?;
1189        Ok(self.pins.get(&rtxn, hash.as_slice())?.is_some())
1190    }
1191
1192    /// List all pinned hashes (raw bytes)
1193    pub fn list_pins_raw(&self) -> Result<Vec<[u8; 32]>> {
1194        let rtxn = self.env.read_txn()?;
1195        let mut pins = Vec::new();
1196
1197        for item in self.pins.iter(&rtxn)? {
1198            let (hash_bytes, _) = item?;
1199            if hash_bytes.len() == 32 {
1200                let mut hash = [0u8; 32];
1201                hash.copy_from_slice(hash_bytes);
1202                pins.push(hash);
1203            }
1204        }
1205
1206        Ok(pins)
1207    }
1208
1209    /// List all pinned hashes with names
1210    pub fn list_pins_with_names(&self) -> Result<Vec<PinnedItem>> {
1211        let rtxn = self.env.read_txn()?;
1212        let store = self.store_arc();
1213        let tree = HashTree::new(HashTreeConfig::new(store).public());
1214        let mut pins = Vec::new();
1215
1216        for item in self.pins.iter(&rtxn)? {
1217            let (hash_bytes, _) = item?;
1218            if hash_bytes.len() != 32 {
1219                continue;
1220            }
1221            let mut hash = [0u8; 32];
1222            hash.copy_from_slice(hash_bytes);
1223
1224            // Try to determine if it's a directory
1225            let is_directory = sync_block_on(async {
1226                tree.is_directory(&hash).await.unwrap_or(false)
1227            });
1228
1229            pins.push(PinnedItem {
1230                cid: to_hex(&hash),
1231                name: "Unknown".to_string(),
1232                is_directory,
1233            });
1234        }
1235
1236        Ok(pins)
1237    }
1238
1239    // === Tree indexing for eviction ===
1240
1241    /// Index a tree after sync - tracks all blobs in the tree for eviction
1242    ///
1243    /// If `ref_key` is provided (e.g. "npub.../name"), it will replace any existing
1244    /// tree with that ref, allowing old versions to be evicted.
1245    pub fn index_tree(
1246        &self,
1247        root_hash: &Hash,
1248        owner: &str,
1249        name: Option<&str>,
1250        priority: u8,
1251        ref_key: Option<&str>,
1252    ) -> Result<()> {
1253        let root_hex = to_hex(root_hash);
1254
1255        // If ref_key provided, check for and unindex old version
1256        if let Some(key) = ref_key {
1257            let rtxn = self.env.read_txn()?;
1258            if let Some(old_hash_bytes) = self.tree_refs.get(&rtxn, key)? {
1259                if old_hash_bytes != root_hash.as_slice() {
1260                    let old_hash: Hash = old_hash_bytes.try_into()
1261                        .map_err(|_| anyhow::anyhow!("Invalid hash in tree_refs"))?;
1262                    drop(rtxn);
1263                    // Unindex old tree (will delete orphaned blobs)
1264                    let _ = self.unindex_tree(&old_hash);
1265                    tracing::debug!("Replaced old tree for ref {}", key);
1266                }
1267            }
1268        }
1269
1270        let store = self.store_arc();
1271        let tree = HashTree::new(HashTreeConfig::new(store).public());
1272
1273        // Walk tree and collect all blob hashes + compute total size
1274        let (blob_hashes, total_size) = sync_block_on(async {
1275            self.collect_tree_blobs(&tree, root_hash).await
1276        })?;
1277
1278        let mut wtxn = self.env.write_txn()?;
1279
1280        // Store blob-tree relationships (64-byte key: blob_hash ++ tree_hash)
1281        for blob_hash in &blob_hashes {
1282            let mut key = [0u8; 64];
1283            key[..32].copy_from_slice(blob_hash);
1284            key[32..].copy_from_slice(root_hash);
1285            self.blob_trees.put(&mut wtxn, &key[..], &())?;
1286        }
1287
1288        // Store tree metadata
1289        let meta = TreeMeta {
1290            owner: owner.to_string(),
1291            name: name.map(|s| s.to_string()),
1292            synced_at: SystemTime::now()
1293                .duration_since(UNIX_EPOCH)
1294                .unwrap()
1295                .as_secs(),
1296            total_size,
1297            priority,
1298        };
1299        let meta_bytes = rmp_serde::to_vec(&meta)
1300            .map_err(|e| anyhow::anyhow!("Failed to serialize TreeMeta: {}", e))?;
1301        self.tree_meta.put(&mut wtxn, root_hash.as_slice(), &meta_bytes)?;
1302
1303        // Store ref -> hash mapping if ref_key provided
1304        if let Some(key) = ref_key {
1305            self.tree_refs.put(&mut wtxn, key, root_hash.as_slice())?;
1306        }
1307
1308        wtxn.commit()?;
1309
1310        tracing::debug!(
1311            "Indexed tree {} ({} blobs, {} bytes, priority {})",
1312            &root_hex[..8],
1313            blob_hashes.len(),
1314            total_size,
1315            priority
1316        );
1317
1318        Ok(())
1319    }
1320
1321    /// Collect all blob hashes in a tree and compute total size
1322    async fn collect_tree_blobs<S: Store>(
1323        &self,
1324        tree: &HashTree<S>,
1325        root: &Hash,
1326    ) -> Result<(Vec<Hash>, u64)> {
1327        let mut blobs = Vec::new();
1328        let mut total_size = 0u64;
1329        let mut stack = vec![*root];
1330
1331        while let Some(hash) = stack.pop() {
1332            // Check if it's a tree node
1333            let is_tree = tree.is_tree(&hash).await
1334                .map_err(|e| anyhow::anyhow!("Failed to check tree: {}", e))?;
1335
1336            if is_tree {
1337                // Get tree node and add children to stack
1338                if let Some(node) = tree.get_tree_node(&hash).await
1339                    .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
1340                {
1341                    for link in &node.links {
1342                        stack.push(link.hash);
1343                    }
1344                }
1345            } else {
1346                // It's a blob - get its size
1347                if let Some(data) = self.router.get_sync(&hash)
1348                    .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))?
1349                {
1350                    total_size += data.len() as u64;
1351                    blobs.push(hash);
1352                }
1353            }
1354        }
1355
1356        Ok((blobs, total_size))
1357    }
1358
1359    /// Unindex a tree - removes blob-tree mappings and deletes orphaned blobs
1360    /// Returns the number of bytes freed
1361    pub fn unindex_tree(&self, root_hash: &Hash) -> Result<u64> {
1362        let root_hex = to_hex(root_hash);
1363
1364        let store = self.store_arc();
1365        let tree = HashTree::new(HashTreeConfig::new(store).public());
1366
1367        // Walk tree and collect all blob hashes
1368        let (blob_hashes, _) = sync_block_on(async {
1369            self.collect_tree_blobs(&tree, root_hash).await
1370        })?;
1371
1372        let mut wtxn = self.env.write_txn()?;
1373        let mut freed = 0u64;
1374
1375        // For each blob, remove the blob-tree entry and check if orphaned
1376        for blob_hash in &blob_hashes {
1377            // Delete blob-tree entry (64-byte key: blob_hash ++ tree_hash)
1378            let mut key = [0u8; 64];
1379            key[..32].copy_from_slice(blob_hash);
1380            key[32..].copy_from_slice(root_hash);
1381            self.blob_trees.delete(&mut wtxn, &key[..])?;
1382
1383            // Check if blob is in any other tree (prefix scan on first 32 bytes)
1384            let rtxn = self.env.read_txn()?;
1385            let mut has_other_tree = false;
1386
1387            for item in self.blob_trees.prefix_iter(&rtxn, &blob_hash[..])? {
1388                if item.is_ok() {
1389                    has_other_tree = true;
1390                    break;
1391                }
1392            }
1393            drop(rtxn);
1394
1395            // If orphaned, delete the blob
1396            if !has_other_tree {
1397                if let Some(data) = self.router.get_sync(blob_hash)
1398                    .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))?
1399                {
1400                    freed += data.len() as u64;
1401                    // Delete locally only - keep S3 as archive
1402                    self.router.delete_local_only(blob_hash)
1403                        .map_err(|e| anyhow::anyhow!("Failed to delete blob: {}", e))?;
1404                }
1405            }
1406        }
1407
1408        // Delete tree node itself if exists
1409        if let Some(data) = self.router.get_sync(root_hash)
1410            .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
1411        {
1412            freed += data.len() as u64;
1413            // Delete locally only - keep S3 as archive
1414            self.router.delete_local_only(root_hash)
1415                .map_err(|e| anyhow::anyhow!("Failed to delete tree node: {}", e))?;
1416        }
1417
1418        // Delete tree metadata
1419        self.tree_meta.delete(&mut wtxn, root_hash.as_slice())?;
1420
1421        wtxn.commit()?;
1422
1423        tracing::debug!(
1424            "Unindexed tree {} ({} bytes freed)",
1425            &root_hex[..8],
1426            freed
1427        );
1428
1429        Ok(freed)
1430    }
1431
1432    /// Get tree metadata
1433    pub fn get_tree_meta(&self, root_hash: &Hash) -> Result<Option<TreeMeta>> {
1434        let rtxn = self.env.read_txn()?;
1435        if let Some(bytes) = self.tree_meta.get(&rtxn, root_hash.as_slice())? {
1436            let meta: TreeMeta = rmp_serde::from_slice(bytes)
1437                .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
1438            Ok(Some(meta))
1439        } else {
1440            Ok(None)
1441        }
1442    }
1443
1444    /// List all indexed trees
1445    pub fn list_indexed_trees(&self) -> Result<Vec<(Hash, TreeMeta)>> {
1446        let rtxn = self.env.read_txn()?;
1447        let mut trees = Vec::new();
1448
1449        for item in self.tree_meta.iter(&rtxn)? {
1450            let (hash_bytes, meta_bytes) = item?;
1451            let hash: Hash = hash_bytes.try_into()
1452                .map_err(|_| anyhow::anyhow!("Invalid hash in tree_meta"))?;
1453            let meta: TreeMeta = rmp_serde::from_slice(meta_bytes)
1454                .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
1455            trees.push((hash, meta));
1456        }
1457
1458        Ok(trees)
1459    }
1460
1461    /// Get total tracked storage size (sum of all tree_meta.total_size)
1462    pub fn tracked_size(&self) -> Result<u64> {
1463        let rtxn = self.env.read_txn()?;
1464        let mut total = 0u64;
1465
1466        for item in self.tree_meta.iter(&rtxn)? {
1467            let (_, bytes) = item?;
1468            let meta: TreeMeta = rmp_serde::from_slice(bytes)
1469                .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
1470            total += meta.total_size;
1471        }
1472
1473        Ok(total)
1474    }
1475
1476    /// Get evictable trees sorted by (priority ASC, synced_at ASC)
1477    fn get_evictable_trees(&self) -> Result<Vec<(Hash, TreeMeta)>> {
1478        let mut trees = self.list_indexed_trees()?;
1479
1480        // Sort by priority (lower first), then by synced_at (older first)
1481        trees.sort_by(|a, b| {
1482            match a.1.priority.cmp(&b.1.priority) {
1483                std::cmp::Ordering::Equal => a.1.synced_at.cmp(&b.1.synced_at),
1484                other => other,
1485            }
1486        });
1487
1488        Ok(trees)
1489    }
1490
1491    /// Run eviction if storage is over quota
1492    /// Returns bytes freed
1493    ///
1494    /// Eviction order:
1495    /// 1. Orphaned blobs (not in any indexed tree and not pinned)
1496    /// 2. Trees by priority (lowest first) and age (oldest first)
1497    pub fn evict_if_needed(&self) -> Result<u64> {
1498        // Get actual storage used
1499        let stats = self.router.stats()
1500            .map_err(|e| anyhow::anyhow!("Failed to get stats: {}", e))?;
1501        let current = stats.total_bytes;
1502
1503        if current <= self.max_size_bytes {
1504            return Ok(0);
1505        }
1506
1507        // Target 90% of max to avoid constant eviction
1508        let target = self.max_size_bytes * 90 / 100;
1509        let mut freed = 0u64;
1510        let mut current_size = current;
1511
1512        // Phase 1: Evict orphaned blobs (not in any tree and not pinned)
1513        let orphan_freed = self.evict_orphaned_blobs()?;
1514        freed += orphan_freed;
1515        current_size = current_size.saturating_sub(orphan_freed);
1516
1517        if orphan_freed > 0 {
1518            tracing::info!("Evicted orphaned blobs: {} bytes freed", orphan_freed);
1519        }
1520
1521        // Check if we're now under target
1522        if current_size <= target {
1523            if freed > 0 {
1524                tracing::info!("Eviction complete: {} bytes freed", freed);
1525            }
1526            return Ok(freed);
1527        }
1528
1529        // Phase 2: Evict trees by priority (lowest first) and age (oldest first)
1530        // Own trees CAN be evicted (just last), but PINNED trees are never evicted
1531        let evictable = self.get_evictable_trees()?;
1532
1533        for (root_hash, meta) in evictable {
1534            if current_size <= target {
1535                break;
1536            }
1537
1538            let root_hex = to_hex(&root_hash);
1539
1540            // Never evict pinned trees
1541            if self.is_pinned(&root_hash)? {
1542                continue;
1543            }
1544
1545            let tree_freed = self.unindex_tree(&root_hash)?;
1546            freed += tree_freed;
1547            current_size = current_size.saturating_sub(tree_freed);
1548
1549            tracing::info!(
1550                "Evicted tree {} (owner={}, priority={}, {} bytes)",
1551                &root_hex[..8],
1552                &meta.owner[..8.min(meta.owner.len())],
1553                meta.priority,
1554                tree_freed
1555            );
1556        }
1557
1558        if freed > 0 {
1559            tracing::info!("Eviction complete: {} bytes freed", freed);
1560        }
1561
1562        Ok(freed)
1563    }
1564
1565    /// Evict blobs that are not part of any indexed tree and not pinned
1566    fn evict_orphaned_blobs(&self) -> Result<u64> {
1567        let mut freed = 0u64;
1568
1569        // Get all blob hashes from store
1570        let all_hashes = self.router.list()
1571            .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
1572
1573        // Get pinned hashes as raw bytes
1574        let rtxn = self.env.read_txn()?;
1575        let pinned: HashSet<Hash> = self.pins.iter(&rtxn)?
1576            .filter_map(|item| item.ok())
1577            .filter_map(|(hash_bytes, _)| {
1578                if hash_bytes.len() == 32 {
1579                    let mut hash = [0u8; 32];
1580                    hash.copy_from_slice(hash_bytes);
1581                    Some(hash)
1582                } else {
1583                    None
1584                }
1585            })
1586            .collect();
1587
1588        // Collect all blob hashes that are in at least one tree
1589        // Key format is blob_hash (32 bytes) ++ tree_hash (32 bytes)
1590        let mut blobs_in_trees: HashSet<Hash> = HashSet::new();
1591        for item in self.blob_trees.iter(&rtxn)? {
1592            if let Ok((key_bytes, _)) = item {
1593                if key_bytes.len() >= 32 {
1594                    let blob_hash: Hash = key_bytes[..32].try_into().unwrap();
1595                    blobs_in_trees.insert(blob_hash);
1596                }
1597            }
1598        }
1599        drop(rtxn);
1600
1601        // Find and delete orphaned blobs
1602        for hash in all_hashes {
1603            // Skip if pinned
1604            if pinned.contains(&hash) {
1605                continue;
1606            }
1607
1608            // Skip if part of any tree
1609            if blobs_in_trees.contains(&hash) {
1610                continue;
1611            }
1612
1613            // This blob is orphaned - delete locally (keep S3 as archive)
1614            if let Ok(Some(data)) = self.router.get_sync(&hash) {
1615                freed += data.len() as u64;
1616                let _ = self.router.delete_local_only(&hash);
1617                tracing::debug!("Deleted orphaned blob {} ({} bytes)", &to_hex(&hash)[..8], data.len());
1618            }
1619        }
1620
1621        Ok(freed)
1622    }
1623
1624    /// Get the maximum storage size in bytes
1625    pub fn max_size_bytes(&self) -> u64 {
1626        self.max_size_bytes
1627    }
1628
1629    /// Get storage usage by priority tier
1630    pub fn storage_by_priority(&self) -> Result<StorageByPriority> {
1631        let rtxn = self.env.read_txn()?;
1632        let mut own = 0u64;
1633        let mut followed = 0u64;
1634        let mut other = 0u64;
1635
1636        for item in self.tree_meta.iter(&rtxn)? {
1637            let (_, bytes) = item?;
1638            let meta: TreeMeta = rmp_serde::from_slice(bytes)
1639                .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
1640
1641            if meta.priority >= PRIORITY_OWN {
1642                own += meta.total_size;
1643            } else if meta.priority >= PRIORITY_FOLLOWED {
1644                followed += meta.total_size;
1645            } else {
1646                other += meta.total_size;
1647            }
1648        }
1649
1650        Ok(StorageByPriority { own, followed, other })
1651    }
1652
1653    /// Get storage statistics
1654    pub fn get_storage_stats(&self) -> Result<StorageStats> {
1655        let rtxn = self.env.read_txn()?;
1656        let total_pins = self.pins.len(&rtxn)? as usize;
1657
1658        let stats = self.router.stats()
1659            .map_err(|e| anyhow::anyhow!("Failed to get stats: {}", e))?;
1660
1661        Ok(StorageStats {
1662            total_dags: stats.count,
1663            pinned_dags: total_pins,
1664            total_bytes: stats.total_bytes,
1665        })
1666    }
1667
1668    // === Cached roots (replaces nostrdb event caching) ===
1669
1670    /// Get cached root for a pubkey/tree_name pair
1671    pub fn get_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<Option<CachedRoot>> {
1672        let key = format!("{}/{}", pubkey_hex, tree_name);
1673        let rtxn = self.env.read_txn()?;
1674        if let Some(bytes) = self.cached_roots.get(&rtxn, &key)? {
1675            let root: CachedRoot = rmp_serde::from_slice(bytes)
1676                .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
1677            Ok(Some(root))
1678        } else {
1679            Ok(None)
1680        }
1681    }
1682
1683    /// Set cached root for a pubkey/tree_name pair
1684    pub fn set_cached_root(
1685        &self,
1686        pubkey_hex: &str,
1687        tree_name: &str,
1688        hash: &str,
1689        key: Option<&str>,
1690        visibility: &str,
1691        updated_at: u64,
1692    ) -> Result<()> {
1693        let db_key = format!("{}/{}", pubkey_hex, tree_name);
1694        let root = CachedRoot {
1695            hash: hash.to_string(),
1696            key: key.map(|k| k.to_string()),
1697            updated_at,
1698            visibility: visibility.to_string(),
1699        };
1700        let bytes = rmp_serde::to_vec(&root)
1701            .map_err(|e| anyhow::anyhow!("Failed to serialize CachedRoot: {}", e))?;
1702        let mut wtxn = self.env.write_txn()?;
1703        self.cached_roots.put(&mut wtxn, &db_key, &bytes)?;
1704        wtxn.commit()?;
1705        Ok(())
1706    }
1707
1708    /// List all cached roots for a pubkey
1709    pub fn list_cached_roots(&self, pubkey_hex: &str) -> Result<Vec<(String, CachedRoot)>> {
1710        let prefix = format!("{}/", pubkey_hex);
1711        let rtxn = self.env.read_txn()?;
1712        let mut results = Vec::new();
1713
1714        for item in self.cached_roots.iter(&rtxn)? {
1715            let (key, bytes) = item?;
1716            if key.starts_with(&prefix) {
1717                let tree_name = key.strip_prefix(&prefix).unwrap_or(key);
1718                let root: CachedRoot = rmp_serde::from_slice(bytes)
1719                    .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
1720                results.push((tree_name.to_string(), root));
1721            }
1722        }
1723
1724        Ok(results)
1725    }
1726
1727    /// Delete a cached root
1728    pub fn delete_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<bool> {
1729        let key = format!("{}/{}", pubkey_hex, tree_name);
1730        let mut wtxn = self.env.write_txn()?;
1731        let deleted = self.cached_roots.delete(&mut wtxn, &key)?;
1732        wtxn.commit()?;
1733        Ok(deleted)
1734    }
1735
1736    /// Garbage collect unpinned content
1737    pub fn gc(&self) -> Result<GcStats> {
1738        let rtxn = self.env.read_txn()?;
1739
1740        // Get all pinned hashes as raw bytes
1741        let pinned: HashSet<Hash> = self.pins.iter(&rtxn)?
1742            .filter_map(|item| item.ok())
1743            .filter_map(|(hash_bytes, _)| {
1744                if hash_bytes.len() == 32 {
1745                    let mut hash = [0u8; 32];
1746                    hash.copy_from_slice(hash_bytes);
1747                    Some(hash)
1748                } else {
1749                    None
1750                }
1751            })
1752            .collect();
1753
1754        drop(rtxn);
1755
1756        // Get all stored hashes
1757        let all_hashes = self.router.list()
1758            .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
1759
1760        // Delete unpinned hashes
1761        let mut deleted = 0;
1762        let mut freed_bytes = 0u64;
1763
1764        for hash in all_hashes {
1765            if !pinned.contains(&hash) {
1766                if let Ok(Some(data)) = self.router.get_sync(&hash) {
1767                    freed_bytes += data.len() as u64;
1768                    // Delete locally only - keep S3 as archive
1769                    let _ = self.router.delete_local_only(&hash);
1770                    deleted += 1;
1771                }
1772            }
1773        }
1774
1775        Ok(GcStats {
1776            deleted_dags: deleted,
1777            freed_bytes,
1778        })
1779    }
1780
1781    /// Verify LMDB blob integrity - checks that stored data matches its key hash
1782    /// Returns verification statistics and optionally deletes corrupted entries
1783    pub fn verify_lmdb_integrity(&self, delete: bool) -> Result<VerifyResult> {
1784        let all_hashes = self.router.list()
1785            .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
1786
1787        let total = all_hashes.len();
1788        let mut valid = 0;
1789        let mut corrupted = 0;
1790        let mut deleted = 0;
1791        let mut corrupted_hashes = Vec::new();
1792
1793        for hash in &all_hashes {
1794            let hash_hex = to_hex(hash);
1795
1796            match self.router.get_sync(hash) {
1797                Ok(Some(data)) => {
1798                    // Compute actual SHA256 of data
1799                    let actual_hash = sha256(&data);
1800
1801                    if actual_hash == *hash {
1802                        valid += 1;
1803                    } else {
1804                        corrupted += 1;
1805                        let actual_hex = to_hex(&actual_hash);
1806                        println!("  CORRUPTED: key={} actual={} size={}",
1807                            &hash_hex[..16], &actual_hex[..16], data.len());
1808                        corrupted_hashes.push(*hash);
1809                    }
1810                }
1811                Ok(None) => {
1812                    // Hash exists in index but data is missing
1813                    corrupted += 1;
1814                    println!("  MISSING: key={}", &hash_hex[..16]);
1815                    corrupted_hashes.push(*hash);
1816                }
1817                Err(e) => {
1818                    corrupted += 1;
1819                    println!("  ERROR: key={} err={}", &hash_hex[..16], e);
1820                    corrupted_hashes.push(*hash);
1821                }
1822            }
1823        }
1824
1825        // Delete corrupted entries if requested
1826        if delete {
1827            for hash in &corrupted_hashes {
1828                match self.router.delete_sync(hash) {
1829                    Ok(true) => deleted += 1,
1830                    Ok(false) => {} // Already deleted
1831                    Err(e) => {
1832                        let hash_hex = to_hex(hash);
1833                        println!("  Failed to delete {}: {}", &hash_hex[..16], e);
1834                    }
1835                }
1836            }
1837        }
1838
1839        Ok(VerifyResult {
1840            total,
1841            valid,
1842            corrupted,
1843            deleted,
1844        })
1845    }
1846
1847    /// Verify R2/S3 blob integrity - lists all objects and verifies hash matches filename
1848    /// Returns verification statistics and optionally deletes corrupted entries
1849    #[cfg(feature = "s3")]
1850    pub async fn verify_r2_integrity(&self, delete: bool) -> Result<VerifyResult> {
1851        use aws_sdk_s3::Client as S3Client;
1852
1853        // Get S3 client from router (we need to access it directly)
1854        // For now, we'll create a new client from config
1855        let config = crate::config::Config::load()?;
1856        let s3_config = config.storage.s3
1857            .ok_or_else(|| anyhow::anyhow!("S3 not configured"))?;
1858
1859        // Build AWS config
1860        let aws_config = aws_config::from_env()
1861            .region(aws_sdk_s3::config::Region::new(s3_config.region.clone()))
1862            .load()
1863            .await;
1864
1865        let s3_client = S3Client::from_conf(
1866            aws_sdk_s3::config::Builder::from(&aws_config)
1867                .endpoint_url(&s3_config.endpoint)
1868                .force_path_style(true)
1869                .build()
1870        );
1871
1872        let bucket = &s3_config.bucket;
1873        let prefix = s3_config.prefix.as_deref().unwrap_or("");
1874
1875        let mut total = 0;
1876        let mut valid = 0;
1877        let mut corrupted = 0;
1878        let mut deleted = 0;
1879        let mut corrupted_keys = Vec::new();
1880
1881        // List all objects in bucket
1882        let mut continuation_token: Option<String> = None;
1883
1884        loop {
1885            let mut list_req = s3_client.list_objects_v2()
1886                .bucket(bucket)
1887                .prefix(prefix);
1888
1889            if let Some(ref token) = continuation_token {
1890                list_req = list_req.continuation_token(token);
1891            }
1892
1893            let list_resp = list_req.send().await
1894                .map_err(|e| anyhow::anyhow!("Failed to list S3 objects: {}", e))?;
1895
1896            for object in list_resp.contents() {
1897                let key = object.key().unwrap_or("");
1898
1899                // Skip non-.bin files
1900                if !key.ends_with(".bin") {
1901                    continue;
1902                }
1903
1904                total += 1;
1905
1906                // Extract expected hash from filename (remove prefix and .bin)
1907                let filename = key.strip_prefix(prefix).unwrap_or(key);
1908                let expected_hash_hex = filename.strip_suffix(".bin").unwrap_or(filename);
1909
1910                // Validate it's a valid hex hash
1911                if expected_hash_hex.len() != 64 {
1912                    corrupted += 1;
1913                    println!("  INVALID KEY: {}", key);
1914                    corrupted_keys.push(key.to_string());
1915                    continue;
1916                }
1917
1918                let expected_hash = match from_hex(expected_hash_hex) {
1919                    Ok(h) => h,
1920                    Err(_) => {
1921                        corrupted += 1;
1922                        println!("  INVALID HEX: {}", key);
1923                        corrupted_keys.push(key.to_string());
1924                        continue;
1925                    }
1926                };
1927
1928                // Download and verify content
1929                match s3_client.get_object()
1930                    .bucket(bucket)
1931                    .key(key)
1932                    .send()
1933                    .await
1934                {
1935                    Ok(resp) => {
1936                        match resp.body.collect().await {
1937                            Ok(bytes) => {
1938                                let data = bytes.into_bytes();
1939                                let actual_hash = sha256(&data);
1940
1941                                if actual_hash == expected_hash {
1942                                    valid += 1;
1943                                } else {
1944                                    corrupted += 1;
1945                                    let actual_hex = to_hex(&actual_hash);
1946                                    println!("  CORRUPTED: key={} actual={} size={}",
1947                                        &expected_hash_hex[..16], &actual_hex[..16], data.len());
1948                                    corrupted_keys.push(key.to_string());
1949                                }
1950                            }
1951                            Err(e) => {
1952                                corrupted += 1;
1953                                println!("  READ ERROR: {} - {}", key, e);
1954                                corrupted_keys.push(key.to_string());
1955                            }
1956                        }
1957                    }
1958                    Err(e) => {
1959                        corrupted += 1;
1960                        println!("  FETCH ERROR: {} - {}", key, e);
1961                        corrupted_keys.push(key.to_string());
1962                    }
1963                }
1964
1965                // Progress indicator every 100 objects
1966                if total % 100 == 0 {
1967                    println!("  Progress: {} objects checked, {} corrupted so far", total, corrupted);
1968                }
1969            }
1970
1971            // Check if there are more objects
1972            if list_resp.is_truncated() == Some(true) {
1973                continuation_token = list_resp.next_continuation_token().map(|s| s.to_string());
1974            } else {
1975                break;
1976            }
1977        }
1978
1979        // Delete corrupted entries if requested
1980        if delete {
1981            for key in &corrupted_keys {
1982                match s3_client.delete_object()
1983                    .bucket(bucket)
1984                    .key(key)
1985                    .send()
1986                    .await
1987                {
1988                    Ok(_) => deleted += 1,
1989                    Err(e) => {
1990                        println!("  Failed to delete {}: {}", key, e);
1991                    }
1992                }
1993            }
1994        }
1995
1996        Ok(VerifyResult {
1997            total,
1998            valid,
1999            corrupted,
2000            deleted,
2001        })
2002    }
2003
2004    /// Fallback for non-S3 builds
2005    #[cfg(not(feature = "s3"))]
2006    pub async fn verify_r2_integrity(&self, _delete: bool) -> Result<VerifyResult> {
2007        Err(anyhow::anyhow!("S3 feature not enabled"))
2008    }
2009}
2010
2011/// Result of blob integrity verification
2012#[derive(Debug, Clone)]
2013pub struct VerifyResult {
2014    pub total: usize,
2015    pub valid: usize,
2016    pub corrupted: usize,
2017    pub deleted: usize,
2018}
2019
2020#[derive(Debug)]
2021pub struct StorageStats {
2022    pub total_dags: usize,
2023    pub pinned_dags: usize,
2024    pub total_bytes: u64,
2025}
2026
2027/// Storage usage broken down by priority tier
2028#[derive(Debug, Clone)]
2029pub struct StorageByPriority {
2030    /// Own/pinned trees (priority 255)
2031    pub own: u64,
2032    /// Followed users' trees (priority 128)
2033    pub followed: u64,
2034    /// Other trees (priority 64)
2035    pub other: u64,
2036}
2037
2038#[derive(Debug, Clone)]
2039pub struct FileChunkMetadata {
2040    pub total_size: u64,
2041    pub chunk_hashes: Vec<Hash>,
2042    pub chunk_sizes: Vec<u64>,
2043    pub is_chunked: bool,
2044}
2045
2046/// Owned iterator for async streaming
2047pub struct FileRangeChunksOwned {
2048    store: Arc<HashtreeStore>,
2049    metadata: FileChunkMetadata,
2050    start: u64,
2051    end: u64,
2052    current_chunk_idx: usize,
2053    current_offset: u64,
2054}
2055
2056impl Iterator for FileRangeChunksOwned {
2057    type Item = Result<Vec<u8>>;
2058
2059    fn next(&mut self) -> Option<Self::Item> {
2060        if !self.metadata.is_chunked || self.current_chunk_idx >= self.metadata.chunk_hashes.len() {
2061            return None;
2062        }
2063
2064        if self.current_offset > self.end {
2065            return None;
2066        }
2067
2068        let chunk_hash = &self.metadata.chunk_hashes[self.current_chunk_idx];
2069        let chunk_size = self.metadata.chunk_sizes[self.current_chunk_idx];
2070        let chunk_end = self.current_offset + chunk_size - 1;
2071
2072        self.current_chunk_idx += 1;
2073
2074        if chunk_end < self.start || self.current_offset > self.end {
2075            self.current_offset += chunk_size;
2076            return self.next();
2077        }
2078
2079        let chunk_content = match self.store.get_chunk(chunk_hash) {
2080            Ok(Some(content)) => content,
2081            Ok(None) => {
2082                return Some(Err(anyhow::anyhow!("Chunk {} not found", to_hex(chunk_hash))));
2083            }
2084            Err(e) => {
2085                return Some(Err(e));
2086            }
2087        };
2088
2089        let chunk_read_start = if self.current_offset >= self.start {
2090            0
2091        } else {
2092            (self.start - self.current_offset) as usize
2093        };
2094
2095        let chunk_read_end = if chunk_end <= self.end {
2096            chunk_size as usize - 1
2097        } else {
2098            (self.end - self.current_offset) as usize
2099        };
2100
2101        let result = chunk_content[chunk_read_start..=chunk_read_end].to_vec();
2102        self.current_offset += chunk_size;
2103
2104        Some(Ok(result))
2105    }
2106}
2107
2108#[derive(Debug)]
2109pub struct GcStats {
2110    pub deleted_dags: usize,
2111    pub freed_bytes: u64,
2112}
2113
2114#[derive(Debug, Clone)]
2115pub struct DirEntry {
2116    pub name: String,
2117    pub cid: String,
2118    pub is_directory: bool,
2119    pub size: u64,
2120}
2121
2122#[derive(Debug, Clone)]
2123pub struct DirectoryListing {
2124    pub dir_name: String,
2125    pub entries: Vec<DirEntry>,
2126}
2127
2128#[derive(Debug, Clone)]
2129pub struct PinnedItem {
2130    pub cid: String,
2131    pub name: String,
2132    pub is_directory: bool,
2133}
2134
2135/// Blob metadata for Blossom protocol
2136#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
2137pub struct BlobMetadata {
2138    pub sha256: String,
2139    pub size: u64,
2140    pub mime_type: String,
2141    pub uploaded: u64,
2142}
2143
2144// Implement ContentStore trait for WebRTC data exchange
2145impl crate::webrtc::ContentStore for HashtreeStore {
2146    fn get(&self, hash_hex: &str) -> Result<Option<Vec<u8>>> {
2147        let hash = from_hex(hash_hex)
2148            .map_err(|e| anyhow::anyhow!("Invalid hash: {}", e))?;
2149        self.get_chunk(&hash)
2150    }
2151}