hashtree_cli/
storage.rs

1use anyhow::{Context, Result};
2use async_trait::async_trait;
3use heed::{Database, EnvOpenOptions};
4use heed::types::*;
5use hashtree_lmdb::LmdbBlobStore;
6use hashtree_core::{
7    HashTree, HashTreeConfig, Cid,
8    sha256, to_hex, from_hex, TreeNode, DirEntry as HashTreeDirEntry,
9    types::Hash,
10};
11use hashtree_core::store::{Store, StoreError};
12use serde::{Deserialize, Serialize};
13use std::path::Path;
14use std::collections::HashSet;
15use std::io::Read;
16use std::sync::Arc;
17use std::time::{SystemTime, UNIX_EPOCH};
18use futures::executor::block_on as sync_block_on;
19
20/// Priority levels for tree eviction
21pub const PRIORITY_OTHER: u8 = 64;
22pub const PRIORITY_FOLLOWED: u8 = 128;
23pub const PRIORITY_OWN: u8 = 255;
24
25/// Metadata for a synced tree (for eviction tracking)
26#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct TreeMeta {
28    /// Pubkey of tree owner
29    pub owner: String,
30    /// Tree name if known (from nostr key like "npub.../name")
31    pub name: Option<String>,
32    /// Unix timestamp when this tree was synced
33    pub synced_at: u64,
34    /// Total size of all blobs in this tree
35    pub total_size: u64,
36    /// Eviction priority: 255=own/pinned, 128=followed, 64=other
37    pub priority: u8,
38}
39
40/// Cached root info from Nostr events (replaces nostrdb caching)
41#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct CachedRoot {
43    /// Root hash (hex)
44    pub hash: String,
45    /// Optional decryption key (hex)
46    pub key: Option<String>,
47    /// Unix timestamp when this was cached (from event created_at)
48    pub updated_at: u64,
49    /// Visibility: "public" or "private"
50    pub visibility: String,
51}
52
53#[cfg(feature = "s3")]
54use tokio::sync::mpsc;
55
56use crate::config::S3Config;
57
58/// Message for background S3 sync
59#[cfg(feature = "s3")]
60enum S3SyncMessage {
61    Upload { hash: Hash, data: Vec<u8> },
62    Delete { hash: Hash },
63}
64
65/// Storage router - LMDB primary with optional S3 backup
66///
67/// Write path: LMDB first (fast), then queue S3 upload (non-blocking)
68/// Read path: LMDB first, fall back to S3 if miss
69pub struct StorageRouter {
70    /// Primary local store (always used)
71    local: Arc<LmdbBlobStore>,
72    /// Optional S3 client for backup
73    #[cfg(feature = "s3")]
74    s3_client: Option<aws_sdk_s3::Client>,
75    #[cfg(feature = "s3")]
76    s3_bucket: Option<String>,
77    #[cfg(feature = "s3")]
78    s3_prefix: String,
79    /// Channel to send uploads to background task
80    #[cfg(feature = "s3")]
81    sync_tx: Option<mpsc::UnboundedSender<S3SyncMessage>>,
82}
83
84impl StorageRouter {
85    /// Create router with LMDB only
86    pub fn new(local: Arc<LmdbBlobStore>) -> Self {
87        Self {
88            local,
89            #[cfg(feature = "s3")]
90            s3_client: None,
91            #[cfg(feature = "s3")]
92            s3_bucket: None,
93            #[cfg(feature = "s3")]
94            s3_prefix: String::new(),
95            #[cfg(feature = "s3")]
96            sync_tx: None,
97        }
98    }
99
100    /// Create router with LMDB + S3 backup
101    #[cfg(feature = "s3")]
102    pub async fn with_s3(local: Arc<LmdbBlobStore>, config: &S3Config) -> Result<Self, anyhow::Error> {
103        use aws_sdk_s3::Client as S3Client;
104
105        // Build AWS config
106        let mut aws_config_loader = aws_config::from_env();
107        aws_config_loader = aws_config_loader.region(aws_sdk_s3::config::Region::new(config.region.clone()));
108        let aws_config = aws_config_loader.load().await;
109
110        // Build S3 client with custom endpoint
111        let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&aws_config);
112        s3_config_builder = s3_config_builder
113            .endpoint_url(&config.endpoint)
114            .force_path_style(true);
115
116        let s3_client = S3Client::from_conf(s3_config_builder.build());
117        let bucket = config.bucket.clone();
118        let prefix = config.prefix.clone().unwrap_or_default();
119
120        // Create background sync channel
121        let (sync_tx, mut sync_rx) = mpsc::unbounded_channel::<S3SyncMessage>();
122
123        // Spawn background sync task with bounded concurrent uploads
124        let sync_client = s3_client.clone();
125        let sync_bucket = bucket.clone();
126        let sync_prefix = prefix.clone();
127
128        tokio::spawn(async move {
129            use aws_sdk_s3::primitives::ByteStream;
130
131            tracing::info!("S3 background sync task started");
132
133            // Limit concurrent uploads to prevent overwhelming the runtime
134            let semaphore = std::sync::Arc::new(tokio::sync::Semaphore::new(32));
135            let client = std::sync::Arc::new(sync_client);
136            let bucket = std::sync::Arc::new(sync_bucket);
137            let prefix = std::sync::Arc::new(sync_prefix);
138
139            while let Some(msg) = sync_rx.recv().await {
140                let client = client.clone();
141                let bucket = bucket.clone();
142                let prefix = prefix.clone();
143                let semaphore = semaphore.clone();
144
145                // Spawn each upload with semaphore-bounded concurrency
146                tokio::spawn(async move {
147                    // Acquire permit before uploading
148                    let _permit = semaphore.acquire().await;
149
150                    match msg {
151                        S3SyncMessage::Upload { hash, data } => {
152                            let key = format!("{}{}.bin", prefix, to_hex(&hash));
153                            tracing::debug!("S3 uploading {} ({} bytes)", &key, data.len());
154
155                            match client
156                                .put_object()
157                                .bucket(bucket.as_str())
158                                .key(&key)
159                                .body(ByteStream::from(data))
160                                .send()
161                                .await
162                            {
163                                Ok(_) => tracing::debug!("S3 upload succeeded: {}", &key),
164                                Err(e) => tracing::error!("S3 upload failed {}: {}", &key, e),
165                            }
166                        }
167                        S3SyncMessage::Delete { hash } => {
168                            let key = format!("{}{}.bin", prefix, to_hex(&hash));
169                            tracing::debug!("S3 deleting {}", &key);
170
171                            if let Err(e) = client
172                                .delete_object()
173                                .bucket(bucket.as_str())
174                                .key(&key)
175                                .send()
176                                .await
177                            {
178                                tracing::error!("S3 delete failed {}: {}", &key, e);
179                            }
180                        }
181                    }
182                });
183            }
184        });
185
186        tracing::info!("S3 storage initialized: bucket={}, prefix={}", bucket, prefix);
187
188        Ok(Self {
189            local,
190            s3_client: Some(s3_client),
191            s3_bucket: Some(bucket),
192            s3_prefix: prefix,
193            sync_tx: Some(sync_tx),
194        })
195    }
196
197    /// Store data - writes to LMDB, queues S3 upload in background
198    pub fn put_sync(&self, hash: Hash, data: &[u8]) -> Result<bool, StoreError> {
199        // Always write to local first
200        let is_new = self.local.put_sync(hash, data)?;
201
202        // Queue S3 upload if configured (non-blocking)
203        // Always upload to S3 (even if not new locally) to ensure S3 has all blobs
204        #[cfg(feature = "s3")]
205        if let Some(ref tx) = self.sync_tx {
206            tracing::info!("Queueing S3 upload for {} ({} bytes, is_new={})",
207                crate::storage::to_hex(&hash)[..16].to_string(), data.len(), is_new);
208            if let Err(e) = tx.send(S3SyncMessage::Upload { hash, data: data.to_vec() }) {
209                tracing::error!("Failed to queue S3 upload: {}", e);
210            }
211        }
212
213        Ok(is_new)
214    }
215
216    /// Get data - tries LMDB first, falls back to S3
217    pub fn get_sync(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
218        // Try local first
219        if let Some(data) = self.local.get_sync(hash)? {
220            return Ok(Some(data));
221        }
222
223        // Fall back to S3 if configured
224        #[cfg(feature = "s3")]
225        if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
226            let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
227
228            match sync_block_on(async {
229                client.get_object()
230                    .bucket(bucket)
231                    .key(&key)
232                    .send()
233                    .await
234            }) {
235                Ok(output) => {
236                    if let Ok(body) = sync_block_on(output.body.collect()) {
237                        let data = body.into_bytes().to_vec();
238                        // Cache locally for future reads
239                        let _ = self.local.put_sync(*hash, &data);
240                        return Ok(Some(data));
241                    }
242                }
243                Err(e) => {
244                    let service_err = e.into_service_error();
245                    if !service_err.is_no_such_key() {
246                        tracing::warn!("S3 get failed: {}", service_err);
247                    }
248                }
249            }
250        }
251
252        Ok(None)
253    }
254
255    /// Check if hash exists
256    pub fn exists(&self, hash: &Hash) -> Result<bool, StoreError> {
257        // Check local first
258        if self.local.exists(hash)? {
259            return Ok(true);
260        }
261
262        // Check S3 if configured
263        #[cfg(feature = "s3")]
264        if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
265            let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
266
267            match sync_block_on(async {
268                client.head_object()
269                    .bucket(bucket)
270                    .key(&key)
271                    .send()
272                    .await
273            }) {
274                Ok(_) => return Ok(true),
275                Err(e) => {
276                    let service_err = e.into_service_error();
277                    if !service_err.is_not_found() {
278                        tracing::warn!("S3 head failed: {}", service_err);
279                    }
280                }
281            }
282        }
283
284        Ok(false)
285    }
286
287    /// Delete data from both local and S3 stores
288    pub fn delete_sync(&self, hash: &Hash) -> Result<bool, StoreError> {
289        let deleted = self.local.delete_sync(hash)?;
290
291        // Queue S3 delete if configured
292        #[cfg(feature = "s3")]
293        if let Some(ref tx) = self.sync_tx {
294            let _ = tx.send(S3SyncMessage::Delete { hash: *hash });
295        }
296
297        Ok(deleted)
298    }
299
300    /// Delete data from local store only (don't propagate to S3)
301    /// Used for eviction where we want to keep S3 as archive
302    pub fn delete_local_only(&self, hash: &Hash) -> Result<bool, StoreError> {
303        self.local.delete_sync(hash)
304    }
305
306    /// Get stats from local store
307    pub fn stats(&self) -> Result<hashtree_lmdb::LmdbStats, StoreError> {
308        self.local.stats()
309    }
310
311    /// List all hashes from local store
312    pub fn list(&self) -> Result<Vec<Hash>, StoreError> {
313        self.local.list()
314    }
315
316    /// Get the underlying LMDB store for HashTree operations
317    pub fn local_store(&self) -> Arc<LmdbBlobStore> {
318        Arc::clone(&self.local)
319    }
320}
321
322// Implement async Store trait for StorageRouter so it can be used directly with HashTree
323// This ensures all writes go through S3 sync
324#[async_trait]
325impl Store for StorageRouter {
326    async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
327        self.put_sync(hash, &data)
328    }
329
330    async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
331        self.get_sync(hash)
332    }
333
334    async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
335        self.exists(hash)
336    }
337
338    async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
339        self.delete_sync(hash)
340    }
341}
342
343pub struct HashtreeStore {
344    env: heed::Env,
345    /// Set of pinned hashes (32-byte raw hashes, prevents garbage collection)
346    pins: Database<Bytes, Unit>,
347    /// Blob ownership: sha256 (32 bytes) ++ pubkey (32 bytes) -> () (composite key for multi-owner)
348    blob_owners: Database<Bytes, Unit>,
349    /// Maps pubkey (32 bytes) -> blob metadata JSON (for blossom list)
350    pubkey_blobs: Database<Bytes, Bytes>,
351    /// Tree metadata for eviction: tree_root_hash (32 bytes) -> TreeMeta (msgpack)
352    tree_meta: Database<Bytes, Bytes>,
353    /// Blob-to-tree mapping: blob_hash ++ tree_hash (64 bytes) -> ()
354    blob_trees: Database<Bytes, Unit>,
355    /// Tree refs: "npub/path" -> tree_root_hash (32 bytes) - for replacing old versions
356    tree_refs: Database<Str, Bytes>,
357    /// Cached roots from Nostr: "pubkey_hex/tree_name" -> CachedRoot (msgpack)
358    cached_roots: Database<Str, Bytes>,
359    /// Storage router - handles LMDB + optional S3 (Arc for sharing with HashTree)
360    router: Arc<StorageRouter>,
361    /// Maximum storage size in bytes (from config)
362    max_size_bytes: u64,
363}
364
365impl HashtreeStore {
366    /// Create a new store with local LMDB storage only (10GB default limit)
367    pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
368        Self::with_options(path, None, 10 * 1024 * 1024 * 1024)
369    }
370
371    /// Create a new store with optional S3 backend (10GB default limit)
372    pub fn with_s3<P: AsRef<Path>>(path: P, s3_config: Option<&S3Config>) -> Result<Self> {
373        Self::with_options(path, s3_config, 10 * 1024 * 1024 * 1024)
374    }
375
376    /// Create a new store with optional S3 backend and custom size limit
377    pub fn with_options<P: AsRef<Path>>(path: P, s3_config: Option<&S3Config>, max_size_bytes: u64) -> Result<Self> {
378        let path = path.as_ref();
379        std::fs::create_dir_all(path)?;
380
381        let env = unsafe {
382            EnvOpenOptions::new()
383                .map_size(10 * 1024 * 1024 * 1024) // 10GB virtual address space
384                .max_dbs(8)  // pins, blob_owners, pubkey_blobs, tree_meta, blob_trees, tree_refs, cached_roots, blobs
385                .open(path)?
386        };
387
388        let mut wtxn = env.write_txn()?;
389        let pins = env.create_database(&mut wtxn, Some("pins"))?;
390        let blob_owners = env.create_database(&mut wtxn, Some("blob_owners"))?;
391        let pubkey_blobs = env.create_database(&mut wtxn, Some("pubkey_blobs"))?;
392        let tree_meta = env.create_database(&mut wtxn, Some("tree_meta"))?;
393        let blob_trees = env.create_database(&mut wtxn, Some("blob_trees"))?;
394        let tree_refs = env.create_database(&mut wtxn, Some("tree_refs"))?;
395        let cached_roots = env.create_database(&mut wtxn, Some("cached_roots"))?;
396        wtxn.commit()?;
397
398        // Create local LMDB blob store
399        let lmdb_store = Arc::new(LmdbBlobStore::new(path.join("blobs"))
400            .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?);
401
402        // Create storage router with optional S3
403        #[cfg(feature = "s3")]
404        let router = Arc::new(if let Some(s3_cfg) = s3_config {
405            tracing::info!("Initializing S3 storage backend: bucket={}, endpoint={}",
406                s3_cfg.bucket, s3_cfg.endpoint);
407
408            sync_block_on(async {
409                StorageRouter::with_s3(lmdb_store, s3_cfg).await
410            })?
411        } else {
412            StorageRouter::new(lmdb_store)
413        });
414
415        #[cfg(not(feature = "s3"))]
416        let router = Arc::new({
417            if s3_config.is_some() {
418                tracing::warn!("S3 config provided but S3 feature not enabled. Using local storage only.");
419            }
420            StorageRouter::new(lmdb_store)
421        });
422
423        Ok(Self {
424            env,
425            pins,
426            blob_owners,
427            pubkey_blobs,
428            tree_meta,
429            blob_trees,
430            tree_refs,
431            cached_roots,
432            router,
433            max_size_bytes,
434        })
435    }
436
437    /// Get the storage router
438    pub fn router(&self) -> &StorageRouter {
439        &self.router
440    }
441
442    /// Get the storage router as Arc (for use with HashTree which needs Arc<dyn Store>)
443    /// All writes through this go to both LMDB and S3
444    pub fn store_arc(&self) -> Arc<StorageRouter> {
445        Arc::clone(&self.router)
446    }
447
448    /// Upload a file and return its CID (public/unencrypted), with auto-pin
449    pub fn upload_file<P: AsRef<Path>>(&self, file_path: P) -> Result<String> {
450        self.upload_file_internal(file_path, true)
451    }
452
453    /// Upload a file without pinning (for blossom uploads that can be evicted)
454    pub fn upload_file_no_pin<P: AsRef<Path>>(&self, file_path: P) -> Result<String> {
455        self.upload_file_internal(file_path, false)
456    }
457
458    fn upload_file_internal<P: AsRef<Path>>(&self, file_path: P, pin: bool) -> Result<String> {
459        let file_path = file_path.as_ref();
460        let file_content = std::fs::read(file_path)?;
461
462        // Use hashtree to store the file (public mode - no encryption)
463        let store = self.store_arc();
464        let tree = HashTree::new(HashTreeConfig::new(store).public());
465
466        let (cid, _size) = sync_block_on(async {
467            tree.put(&file_content).await
468        }).context("Failed to store file")?;
469
470        // Only pin if requested (htree add = pin, blossom upload = no pin)
471        if pin {
472            let mut wtxn = self.env.write_txn()?;
473            self.pins.put(&mut wtxn, cid.hash.as_slice(), &())?;
474            wtxn.commit()?;
475        }
476
477        Ok(to_hex(&cid.hash))
478    }
479
480    /// Upload a file from a stream with progress callbacks
481    pub fn upload_file_stream<R: Read, F>(
482        &self,
483        mut reader: R,
484        _file_name: impl Into<String>,
485        mut callback: F,
486    ) -> Result<String>
487    where
488        F: FnMut(&str),
489    {
490        let mut data = Vec::new();
491        reader.read_to_end(&mut data)?;
492
493        // Use HashTree.put for upload (public mode)
494        let store = self.store_arc();
495        let tree = HashTree::new(HashTreeConfig::new(store).public());
496
497        let (cid, _size) = sync_block_on(async {
498            tree.put(&data).await
499        }).context("Failed to store file")?;
500
501        let root_hex = to_hex(&cid.hash);
502        callback(&root_hex);
503
504        // Auto-pin on upload
505        let mut wtxn = self.env.write_txn()?;
506        self.pins.put(&mut wtxn, cid.hash.as_slice(), &())?;
507        wtxn.commit()?;
508
509        Ok(root_hex)
510    }
511
512    /// Upload a directory and return its root hash (hex)
513    /// Respects .gitignore by default
514    pub fn upload_dir<P: AsRef<Path>>(&self, dir_path: P) -> Result<String> {
515        self.upload_dir_with_options(dir_path, true)
516    }
517
518    /// Upload a directory with options (public mode - no encryption)
519    pub fn upload_dir_with_options<P: AsRef<Path>>(&self, dir_path: P, respect_gitignore: bool) -> Result<String> {
520        let dir_path = dir_path.as_ref();
521
522        let store = self.store_arc();
523        let tree = HashTree::new(HashTreeConfig::new(store).public());
524
525        let root_cid = sync_block_on(async {
526            self.upload_dir_recursive(&tree, dir_path, dir_path, respect_gitignore).await
527        }).context("Failed to upload directory")?;
528
529        let root_hex = to_hex(&root_cid.hash);
530
531        let mut wtxn = self.env.write_txn()?;
532        self.pins.put(&mut wtxn, root_cid.hash.as_slice(), &())?;
533        wtxn.commit()?;
534
535        Ok(root_hex)
536    }
537
538    async fn upload_dir_recursive<S: Store>(
539        &self,
540        tree: &HashTree<S>,
541        _root_path: &Path,
542        current_path: &Path,
543        respect_gitignore: bool,
544    ) -> Result<Cid> {
545        use ignore::WalkBuilder;
546        use std::collections::HashMap;
547
548        // Build directory structure from flat file list - store full Cid with key
549        let mut dir_contents: HashMap<String, Vec<(String, Cid)>> = HashMap::new();
550        dir_contents.insert(String::new(), Vec::new()); // Root
551
552        let walker = WalkBuilder::new(current_path)
553            .git_ignore(respect_gitignore)
554            .git_global(respect_gitignore)
555            .git_exclude(respect_gitignore)
556            .hidden(false)
557            .build();
558
559        for result in walker {
560            let entry = result?;
561            let path = entry.path();
562
563            // Skip the root directory itself
564            if path == current_path {
565                continue;
566            }
567
568            let relative = path.strip_prefix(current_path)
569                .unwrap_or(path);
570
571            if path.is_file() {
572                let content = std::fs::read(path)?;
573                let (cid, _size) = tree.put(&content).await
574                    .map_err(|e| anyhow::anyhow!("Failed to upload file {}: {}", path.display(), e))?;
575
576                // Get parent directory path and file name
577                let parent = relative.parent()
578                    .map(|p| p.to_string_lossy().to_string())
579                    .unwrap_or_default();
580                let name = relative.file_name()
581                    .map(|n| n.to_string_lossy().to_string())
582                    .unwrap_or_default();
583
584                dir_contents.entry(parent).or_default().push((name, cid));
585            } else if path.is_dir() {
586                // Ensure directory entry exists
587                let dir_path = relative.to_string_lossy().to_string();
588                dir_contents.entry(dir_path).or_default();
589            }
590        }
591
592        // Build directory tree bottom-up
593        self.build_directory_tree(tree, &mut dir_contents).await
594    }
595
596    async fn build_directory_tree<S: Store>(
597        &self,
598        tree: &HashTree<S>,
599        dir_contents: &mut std::collections::HashMap<String, Vec<(String, Cid)>>,
600    ) -> Result<Cid> {
601        // Sort directories by depth (deepest first) to build bottom-up
602        let mut dirs: Vec<String> = dir_contents.keys().cloned().collect();
603        dirs.sort_by(|a, b| {
604            let depth_a = a.matches('/').count() + if a.is_empty() { 0 } else { 1 };
605            let depth_b = b.matches('/').count() + if b.is_empty() { 0 } else { 1 };
606            depth_b.cmp(&depth_a) // Deepest first
607        });
608
609        let mut dir_cids: std::collections::HashMap<String, Cid> = std::collections::HashMap::new();
610
611        for dir_path in dirs {
612            let files = dir_contents.get(&dir_path).cloned().unwrap_or_default();
613
614            let mut entries: Vec<HashTreeDirEntry> = files.into_iter()
615                .map(|(name, cid)| HashTreeDirEntry::from_cid(name, &cid))
616                .collect();
617
618            // Add subdirectory entries
619            for (subdir_path, cid) in &dir_cids {
620                let parent = std::path::Path::new(subdir_path)
621                    .parent()
622                    .map(|p| p.to_string_lossy().to_string())
623                    .unwrap_or_default();
624
625                if parent == dir_path {
626                    let name = std::path::Path::new(subdir_path)
627                        .file_name()
628                        .map(|n| n.to_string_lossy().to_string())
629                        .unwrap_or_default();
630                    entries.push(HashTreeDirEntry::from_cid(name, cid));
631                }
632            }
633
634            let cid = tree.put_directory(entries).await
635                .map_err(|e| anyhow::anyhow!("Failed to create directory node: {}", e))?;
636
637            dir_cids.insert(dir_path, cid);
638        }
639
640        // Return root Cid
641        dir_cids.get("")
642            .cloned()
643            .ok_or_else(|| anyhow::anyhow!("No root directory"))
644    }
645
646    /// Upload a file with CHK encryption, returns CID in format "hash:key"
647    pub fn upload_file_encrypted<P: AsRef<Path>>(&self, file_path: P) -> Result<String> {
648        let file_path = file_path.as_ref();
649        let file_content = std::fs::read(file_path)?;
650
651        // Use unified API with encryption enabled (default)
652        let store = self.store_arc();
653        let tree = HashTree::new(HashTreeConfig::new(store));
654
655        let (cid, _size) = sync_block_on(async {
656            tree.put(&file_content).await
657        }).map_err(|e| anyhow::anyhow!("Failed to encrypt file: {}", e))?;
658
659        let cid_str = cid.to_string();
660
661        let mut wtxn = self.env.write_txn()?;
662        self.pins.put(&mut wtxn, cid.hash.as_slice(), &())?;
663        wtxn.commit()?;
664
665        Ok(cid_str)
666    }
667
668    /// Upload a directory with CHK encryption, returns CID
669    /// Respects .gitignore by default
670    pub fn upload_dir_encrypted<P: AsRef<Path>>(&self, dir_path: P) -> Result<String> {
671        self.upload_dir_encrypted_with_options(dir_path, true)
672    }
673
674    /// Upload a directory with CHK encryption and options
675    /// Returns CID as "hash:key" format for encrypted directories
676    pub fn upload_dir_encrypted_with_options<P: AsRef<Path>>(&self, dir_path: P, respect_gitignore: bool) -> Result<String> {
677        let dir_path = dir_path.as_ref();
678        let store = self.store_arc();
679
680        // Use unified API with encryption enabled (default)
681        let tree = HashTree::new(HashTreeConfig::new(store));
682
683        let root_cid = sync_block_on(async {
684            self.upload_dir_recursive(&tree, dir_path, dir_path, respect_gitignore).await
685        }).context("Failed to upload encrypted directory")?;
686
687        let cid_str = root_cid.to_string(); // Returns "hash:key" or "hash"
688
689        let mut wtxn = self.env.write_txn()?;
690        // Pin by hash only (the key is for decryption, not identification)
691        self.pins.put(&mut wtxn, root_cid.hash.as_slice(), &())?;
692        wtxn.commit()?;
693
694        Ok(cid_str)
695    }
696
697    /// Get tree node by hash (raw bytes)
698    pub fn get_tree_node(&self, hash: &[u8; 32]) -> Result<Option<TreeNode>> {
699        let store = self.store_arc();
700        let tree = HashTree::new(HashTreeConfig::new(store).public());
701
702        sync_block_on(async {
703            tree.get_tree_node(hash).await
704                .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))
705        })
706    }
707
708    /// Store a raw blob, returns SHA256 hash as hex.
709    pub fn put_blob(&self, data: &[u8]) -> Result<String> {
710        let hash = sha256(data);
711        self.router.put_sync(hash, data)
712            .map_err(|e| anyhow::anyhow!("Failed to store blob: {}", e))?;
713        Ok(to_hex(&hash))
714    }
715
716    /// Get a raw blob by SHA256 hash (raw bytes).
717    pub fn get_blob(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
718        self.router.get_sync(hash)
719            .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))
720    }
721
722    /// Check if a blob exists by SHA256 hash (raw bytes).
723    pub fn blob_exists(&self, hash: &[u8; 32]) -> Result<bool> {
724        self.router.exists(hash)
725            .map_err(|e| anyhow::anyhow!("Failed to check blob: {}", e))
726    }
727
728    // === Blossom ownership tracking ===
729    // Uses composite key: sha256 (32 bytes) ++ pubkey (32 bytes) -> ()
730    // This allows efficient multi-owner tracking with O(1) lookups
731
732    /// Build composite key for blob_owners: sha256 ++ pubkey (64 bytes total)
733    fn blob_owner_key(sha256: &[u8; 32], pubkey: &[u8; 32]) -> [u8; 64] {
734        let mut key = [0u8; 64];
735        key[..32].copy_from_slice(sha256);
736        key[32..].copy_from_slice(pubkey);
737        key
738    }
739
740    /// Add an owner (pubkey) to a blob for Blossom protocol
741    /// Multiple users can own the same blob - it's only deleted when all owners remove it
742    pub fn set_blob_owner(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<()> {
743        let key = Self::blob_owner_key(sha256, pubkey);
744        let mut wtxn = self.env.write_txn()?;
745
746        // Add ownership entry (idempotent - put overwrites)
747        self.blob_owners.put(&mut wtxn, &key[..], &())?;
748
749        // Convert sha256 to hex for BlobMetadata (which stores sha256 as hex string)
750        let sha256_hex = to_hex(sha256);
751
752        // Get existing blobs for this pubkey (for /list endpoint)
753        let mut blobs: Vec<BlobMetadata> = self
754            .pubkey_blobs
755            .get(&wtxn, pubkey)?
756            .and_then(|b| serde_json::from_slice(b).ok())
757            .unwrap_or_default();
758
759        // Check if blob already exists for this pubkey
760        if !blobs.iter().any(|b| b.sha256 == sha256_hex) {
761            let now = SystemTime::now()
762                .duration_since(UNIX_EPOCH)
763                .unwrap()
764                .as_secs();
765
766            // Get size from raw blob
767            let size = self
768                .get_blob(sha256)?
769                .map(|data| data.len() as u64)
770                .unwrap_or(0);
771
772            blobs.push(BlobMetadata {
773                sha256: sha256_hex,
774                size,
775                mime_type: "application/octet-stream".to_string(),
776                uploaded: now,
777            });
778
779            let blobs_json = serde_json::to_vec(&blobs)?;
780            self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
781        }
782
783        wtxn.commit()?;
784        Ok(())
785    }
786
787    /// Check if a pubkey owns a blob
788    pub fn is_blob_owner(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<bool> {
789        let key = Self::blob_owner_key(sha256, pubkey);
790        let rtxn = self.env.read_txn()?;
791        Ok(self.blob_owners.get(&rtxn, &key[..])?.is_some())
792    }
793
794    /// Get all owners (pubkeys) of a blob via prefix scan (returns raw bytes)
795    pub fn get_blob_owners(&self, sha256: &[u8; 32]) -> Result<Vec<[u8; 32]>> {
796        let rtxn = self.env.read_txn()?;
797
798        let mut owners = Vec::new();
799        for item in self.blob_owners.prefix_iter(&rtxn, &sha256[..])? {
800            let (key, _) = item?;
801            if key.len() == 64 {
802                // Extract pubkey from composite key (bytes 32-64)
803                let mut pubkey = [0u8; 32];
804                pubkey.copy_from_slice(&key[32..64]);
805                owners.push(pubkey);
806            }
807        }
808        Ok(owners)
809    }
810
811    /// Check if blob has any owners
812    pub fn blob_has_owners(&self, sha256: &[u8; 32]) -> Result<bool> {
813        let rtxn = self.env.read_txn()?;
814
815        // Just check if any entry exists with this prefix
816        for item in self.blob_owners.prefix_iter(&rtxn, &sha256[..])? {
817            if item.is_ok() {
818                return Ok(true);
819            }
820        }
821        Ok(false)
822    }
823
824    /// Get the first owner (pubkey) of a blob (for backwards compatibility)
825    pub fn get_blob_owner(&self, sha256: &[u8; 32]) -> Result<Option<[u8; 32]>> {
826        Ok(self.get_blob_owners(sha256)?.into_iter().next())
827    }
828
829    /// Remove a user's ownership of a blossom blob
830    /// Only deletes the actual blob when no owners remain
831    /// Returns true if the blob was actually deleted (no owners left)
832    pub fn delete_blossom_blob(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<bool> {
833        let key = Self::blob_owner_key(sha256, pubkey);
834        let mut wtxn = self.env.write_txn()?;
835
836        // Remove this pubkey's ownership entry
837        self.blob_owners.delete(&mut wtxn, &key[..])?;
838
839        // Hex strings for logging and BlobMetadata (which stores sha256 as hex string)
840        let sha256_hex = to_hex(sha256);
841
842        // Remove from pubkey's blob list
843        if let Some(blobs_bytes) = self.pubkey_blobs.get(&wtxn, pubkey)? {
844            if let Ok(mut blobs) = serde_json::from_slice::<Vec<BlobMetadata>>(blobs_bytes) {
845                blobs.retain(|b| b.sha256 != sha256_hex);
846                let blobs_json = serde_json::to_vec(&blobs)?;
847                self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
848            }
849        }
850
851        // Check if any other owners remain (prefix scan)
852        let mut has_other_owners = false;
853        for item in self.blob_owners.prefix_iter(&wtxn, &sha256[..])? {
854            if item.is_ok() {
855                has_other_owners = true;
856                break;
857            }
858        }
859
860        if has_other_owners {
861            wtxn.commit()?;
862            tracing::debug!(
863                "Removed {} from blob {} owners, other owners remain",
864                &to_hex(pubkey)[..8],
865                &sha256_hex[..8]
866            );
867            return Ok(false);
868        }
869
870        // No owners left - delete the blob completely
871        tracing::info!(
872            "All owners removed from blob {}, deleting",
873            &sha256_hex[..8]
874        );
875
876        // Delete raw blob (by content hash) - this deletes from S3 too
877        let _ = self.router.delete_sync(sha256);
878
879        wtxn.commit()?;
880        Ok(true)
881    }
882
883    /// List all blobs owned by a pubkey (for Blossom /list endpoint)
884    pub fn list_blobs_by_pubkey(&self, pubkey: &[u8; 32]) -> Result<Vec<crate::server::blossom::BlobDescriptor>> {
885        let rtxn = self.env.read_txn()?;
886
887        let blobs: Vec<BlobMetadata> = self
888            .pubkey_blobs
889            .get(&rtxn, pubkey)?
890            .and_then(|b| serde_json::from_slice(b).ok())
891            .unwrap_or_default();
892
893        Ok(blobs
894            .into_iter()
895            .map(|b| crate::server::blossom::BlobDescriptor {
896                url: format!("/{}", b.sha256),
897                sha256: b.sha256,
898                size: b.size,
899                mime_type: b.mime_type,
900                uploaded: b.uploaded,
901            })
902            .collect())
903    }
904
905    /// Get a single chunk/blob by hash (raw bytes)
906    pub fn get_chunk(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
907        self.router.get_sync(hash)
908            .map_err(|e| anyhow::anyhow!("Failed to get chunk: {}", e))
909    }
910
911    /// Get file content by hash (raw bytes)
912    /// Returns raw bytes (caller handles decryption if needed)
913    pub fn get_file(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
914        let store = self.store_arc();
915        let tree = HashTree::new(HashTreeConfig::new(store).public());
916
917        sync_block_on(async {
918            tree.read_file(hash).await
919                .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
920        })
921    }
922
923    /// Get file content by Cid (hash + optional decryption key as raw bytes)
924    /// Handles decryption automatically if key is present
925    pub fn get_file_by_cid(&self, cid: &Cid) -> Result<Option<Vec<u8>>> {
926        let store = self.store_arc();
927        let tree = HashTree::new(HashTreeConfig::new(store).public());
928
929        sync_block_on(async {
930            tree.get(cid).await
931                .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
932        })
933    }
934
935    /// Get chunk metadata for a file (chunk list, sizes, total size)
936    pub fn get_file_chunk_metadata(&self, hash: &[u8; 32]) -> Result<Option<FileChunkMetadata>> {
937        let store = self.store_arc();
938        let tree = HashTree::new(HashTreeConfig::new(store.clone()).public());
939
940        sync_block_on(async {
941            // First check if the hash exists in the store at all
942            // (either as a blob or tree node)
943            let exists = store.has(&hash).await
944                .map_err(|e| anyhow::anyhow!("Failed to check existence: {}", e))?;
945
946            if !exists {
947                return Ok(None);
948            }
949
950            // Get total size
951            let total_size = tree.get_size(&hash).await
952                .map_err(|e| anyhow::anyhow!("Failed to get size: {}", e))?;
953
954            // Check if it's a tree (chunked) or blob
955            let is_tree_node = tree.is_tree(&hash).await
956                .map_err(|e| anyhow::anyhow!("Failed to check tree: {}", e))?;
957
958            if !is_tree_node {
959                // Single blob, not chunked
960                return Ok(Some(FileChunkMetadata {
961                    total_size,
962                    chunk_hashes: vec![],
963                    chunk_sizes: vec![],
964                    is_chunked: false,
965                }));
966            }
967
968            // Get tree node to extract chunk info
969            let node = match tree.get_tree_node(&hash).await
970                .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))? {
971                Some(n) => n,
972                None => return Ok(None),
973            };
974
975            // Check if it's a directory (has named links)
976            let is_directory = tree.is_directory(&hash).await
977                .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
978
979            if is_directory {
980                return Ok(None); // Not a file
981            }
982
983            // Extract chunk info from links
984            let chunk_hashes: Vec<Hash> = node.links.iter().map(|l| l.hash).collect();
985            let chunk_sizes: Vec<u64> = node.links.iter().map(|l| l.size).collect();
986
987            Ok(Some(FileChunkMetadata {
988                total_size,
989                chunk_hashes,
990                chunk_sizes,
991                is_chunked: !node.links.is_empty(),
992            }))
993        })
994    }
995
996    /// Get byte range from file
997    pub fn get_file_range(&self, hash: &[u8; 32], start: u64, end: Option<u64>) -> Result<Option<(Vec<u8>, u64)>> {
998        let metadata = match self.get_file_chunk_metadata(hash)? {
999            Some(m) => m,
1000            None => return Ok(None),
1001        };
1002
1003        if metadata.total_size == 0 {
1004            return Ok(Some((Vec::new(), 0)));
1005        }
1006
1007        if start >= metadata.total_size {
1008            return Ok(None);
1009        }
1010
1011        let end = end.unwrap_or(metadata.total_size - 1).min(metadata.total_size - 1);
1012
1013        // For non-chunked files, load entire file
1014        if !metadata.is_chunked {
1015            let content = self.get_file(hash)?.unwrap_or_default();
1016            let range_content = if start < content.len() as u64 {
1017                content[start as usize..=(end as usize).min(content.len() - 1)].to_vec()
1018            } else {
1019                Vec::new()
1020            };
1021            return Ok(Some((range_content, metadata.total_size)));
1022        }
1023
1024        // For chunked files, load only needed chunks
1025        let mut result = Vec::new();
1026        let mut current_offset = 0u64;
1027
1028        for (i, chunk_hash) in metadata.chunk_hashes.iter().enumerate() {
1029            let chunk_size = metadata.chunk_sizes[i];
1030            let chunk_end = current_offset + chunk_size - 1;
1031
1032            // Check if this chunk overlaps with requested range
1033            if chunk_end >= start && current_offset <= end {
1034                let chunk_content = match self.get_chunk(chunk_hash)? {
1035                    Some(content) => content,
1036                    None => {
1037                        return Err(anyhow::anyhow!("Chunk {} not found", to_hex(chunk_hash)));
1038                    }
1039                };
1040
1041                let chunk_read_start = if current_offset >= start {
1042                    0
1043                } else {
1044                    (start - current_offset) as usize
1045                };
1046
1047                let chunk_read_end = if chunk_end <= end {
1048                    chunk_size as usize - 1
1049                } else {
1050                    (end - current_offset) as usize
1051                };
1052
1053                result.extend_from_slice(&chunk_content[chunk_read_start..=chunk_read_end]);
1054            }
1055
1056            current_offset += chunk_size;
1057
1058            if current_offset > end {
1059                break;
1060            }
1061        }
1062
1063        Ok(Some((result, metadata.total_size)))
1064    }
1065
1066    /// Stream file range as chunks using Arc for async/Send contexts
1067    pub fn stream_file_range_chunks_owned(
1068        self: Arc<Self>,
1069        hash: &[u8; 32],
1070        start: u64,
1071        end: u64,
1072    ) -> Result<Option<FileRangeChunksOwned>> {
1073        let metadata = match self.get_file_chunk_metadata(hash)? {
1074            Some(m) => m,
1075            None => return Ok(None),
1076        };
1077
1078        if metadata.total_size == 0 || start >= metadata.total_size {
1079            return Ok(None);
1080        }
1081
1082        let end = end.min(metadata.total_size - 1);
1083
1084        Ok(Some(FileRangeChunksOwned {
1085            store: self,
1086            metadata,
1087            start,
1088            end,
1089            current_chunk_idx: 0,
1090            current_offset: 0,
1091        }))
1092    }
1093
1094    /// Get directory structure by hash (raw bytes)
1095    pub fn get_directory_listing(&self, hash: &[u8; 32]) -> Result<Option<DirectoryListing>> {
1096        let store = self.store_arc();
1097        let tree = HashTree::new(HashTreeConfig::new(store).public());
1098
1099        sync_block_on(async {
1100            // Check if it's a directory
1101            let is_dir = tree.is_directory(&hash).await
1102                .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1103
1104            if !is_dir {
1105                return Ok(None);
1106            }
1107
1108            // Get directory entries (public Cid - no encryption key)
1109            let cid = hashtree_core::Cid::public(*hash);
1110            let tree_entries = tree.list_directory(&cid).await
1111                .map_err(|e| anyhow::anyhow!("Failed to list directory: {}", e))?;
1112
1113            let entries: Vec<DirEntry> = tree_entries.into_iter().map(|e| DirEntry {
1114                name: e.name,
1115                cid: to_hex(&e.hash),
1116                is_directory: e.link_type.is_tree(),
1117                size: e.size,
1118            }).collect();
1119
1120            Ok(Some(DirectoryListing {
1121                dir_name: String::new(),
1122                entries,
1123            }))
1124        })
1125    }
1126
1127    /// Pin a hash (prevent garbage collection)
1128    pub fn pin(&self, hash: &[u8; 32]) -> Result<()> {
1129        let mut wtxn = self.env.write_txn()?;
1130        self.pins.put(&mut wtxn, hash.as_slice(), &())?;
1131        wtxn.commit()?;
1132        Ok(())
1133    }
1134
1135    /// Unpin a hash (allow garbage collection)
1136    pub fn unpin(&self, hash: &[u8; 32]) -> Result<()> {
1137        let mut wtxn = self.env.write_txn()?;
1138        self.pins.delete(&mut wtxn, hash.as_slice())?;
1139        wtxn.commit()?;
1140        Ok(())
1141    }
1142
1143    /// Check if hash is pinned
1144    pub fn is_pinned(&self, hash: &[u8; 32]) -> Result<bool> {
1145        let rtxn = self.env.read_txn()?;
1146        Ok(self.pins.get(&rtxn, hash.as_slice())?.is_some())
1147    }
1148
1149    /// List all pinned hashes (raw bytes)
1150    pub fn list_pins_raw(&self) -> Result<Vec<[u8; 32]>> {
1151        let rtxn = self.env.read_txn()?;
1152        let mut pins = Vec::new();
1153
1154        for item in self.pins.iter(&rtxn)? {
1155            let (hash_bytes, _) = item?;
1156            if hash_bytes.len() == 32 {
1157                let mut hash = [0u8; 32];
1158                hash.copy_from_slice(hash_bytes);
1159                pins.push(hash);
1160            }
1161        }
1162
1163        Ok(pins)
1164    }
1165
1166    /// List all pinned hashes with names
1167    pub fn list_pins_with_names(&self) -> Result<Vec<PinnedItem>> {
1168        let rtxn = self.env.read_txn()?;
1169        let store = self.store_arc();
1170        let tree = HashTree::new(HashTreeConfig::new(store).public());
1171        let mut pins = Vec::new();
1172
1173        for item in self.pins.iter(&rtxn)? {
1174            let (hash_bytes, _) = item?;
1175            if hash_bytes.len() != 32 {
1176                continue;
1177            }
1178            let mut hash = [0u8; 32];
1179            hash.copy_from_slice(hash_bytes);
1180
1181            // Try to determine if it's a directory
1182            let is_directory = sync_block_on(async {
1183                tree.is_directory(&hash).await.unwrap_or(false)
1184            });
1185
1186            pins.push(PinnedItem {
1187                cid: to_hex(&hash),
1188                name: "Unknown".to_string(),
1189                is_directory,
1190            });
1191        }
1192
1193        Ok(pins)
1194    }
1195
1196    // === Tree indexing for eviction ===
1197
1198    /// Index a tree after sync - tracks all blobs in the tree for eviction
1199    ///
1200    /// If `ref_key` is provided (e.g. "npub.../name"), it will replace any existing
1201    /// tree with that ref, allowing old versions to be evicted.
1202    pub fn index_tree(
1203        &self,
1204        root_hash: &Hash,
1205        owner: &str,
1206        name: Option<&str>,
1207        priority: u8,
1208        ref_key: Option<&str>,
1209    ) -> Result<()> {
1210        let root_hex = to_hex(root_hash);
1211
1212        // If ref_key provided, check for and unindex old version
1213        if let Some(key) = ref_key {
1214            let rtxn = self.env.read_txn()?;
1215            if let Some(old_hash_bytes) = self.tree_refs.get(&rtxn, key)? {
1216                if old_hash_bytes != root_hash.as_slice() {
1217                    let old_hash: Hash = old_hash_bytes.try_into()
1218                        .map_err(|_| anyhow::anyhow!("Invalid hash in tree_refs"))?;
1219                    drop(rtxn);
1220                    // Unindex old tree (will delete orphaned blobs)
1221                    let _ = self.unindex_tree(&old_hash);
1222                    tracing::debug!("Replaced old tree for ref {}", key);
1223                }
1224            }
1225        }
1226
1227        let store = self.store_arc();
1228        let tree = HashTree::new(HashTreeConfig::new(store).public());
1229
1230        // Walk tree and collect all blob hashes + compute total size
1231        let (blob_hashes, total_size) = sync_block_on(async {
1232            self.collect_tree_blobs(&tree, root_hash).await
1233        })?;
1234
1235        let mut wtxn = self.env.write_txn()?;
1236
1237        // Store blob-tree relationships (64-byte key: blob_hash ++ tree_hash)
1238        for blob_hash in &blob_hashes {
1239            let mut key = [0u8; 64];
1240            key[..32].copy_from_slice(blob_hash);
1241            key[32..].copy_from_slice(root_hash);
1242            self.blob_trees.put(&mut wtxn, &key[..], &())?;
1243        }
1244
1245        // Store tree metadata
1246        let meta = TreeMeta {
1247            owner: owner.to_string(),
1248            name: name.map(|s| s.to_string()),
1249            synced_at: SystemTime::now()
1250                .duration_since(UNIX_EPOCH)
1251                .unwrap()
1252                .as_secs(),
1253            total_size,
1254            priority,
1255        };
1256        let meta_bytes = rmp_serde::to_vec(&meta)
1257            .map_err(|e| anyhow::anyhow!("Failed to serialize TreeMeta: {}", e))?;
1258        self.tree_meta.put(&mut wtxn, root_hash.as_slice(), &meta_bytes)?;
1259
1260        // Store ref -> hash mapping if ref_key provided
1261        if let Some(key) = ref_key {
1262            self.tree_refs.put(&mut wtxn, key, root_hash.as_slice())?;
1263        }
1264
1265        wtxn.commit()?;
1266
1267        tracing::debug!(
1268            "Indexed tree {} ({} blobs, {} bytes, priority {})",
1269            &root_hex[..8],
1270            blob_hashes.len(),
1271            total_size,
1272            priority
1273        );
1274
1275        Ok(())
1276    }
1277
1278    /// Collect all blob hashes in a tree and compute total size
1279    async fn collect_tree_blobs<S: Store>(
1280        &self,
1281        tree: &HashTree<S>,
1282        root: &Hash,
1283    ) -> Result<(Vec<Hash>, u64)> {
1284        let mut blobs = Vec::new();
1285        let mut total_size = 0u64;
1286        let mut stack = vec![*root];
1287
1288        while let Some(hash) = stack.pop() {
1289            // Check if it's a tree node
1290            let is_tree = tree.is_tree(&hash).await
1291                .map_err(|e| anyhow::anyhow!("Failed to check tree: {}", e))?;
1292
1293            if is_tree {
1294                // Get tree node and add children to stack
1295                if let Some(node) = tree.get_tree_node(&hash).await
1296                    .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
1297                {
1298                    for link in &node.links {
1299                        stack.push(link.hash);
1300                    }
1301                }
1302            } else {
1303                // It's a blob - get its size
1304                if let Some(data) = self.router.get_sync(&hash)
1305                    .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))?
1306                {
1307                    total_size += data.len() as u64;
1308                    blobs.push(hash);
1309                }
1310            }
1311        }
1312
1313        Ok((blobs, total_size))
1314    }
1315
1316    /// Unindex a tree - removes blob-tree mappings and deletes orphaned blobs
1317    /// Returns the number of bytes freed
1318    pub fn unindex_tree(&self, root_hash: &Hash) -> Result<u64> {
1319        let root_hex = to_hex(root_hash);
1320
1321        let store = self.store_arc();
1322        let tree = HashTree::new(HashTreeConfig::new(store).public());
1323
1324        // Walk tree and collect all blob hashes
1325        let (blob_hashes, _) = sync_block_on(async {
1326            self.collect_tree_blobs(&tree, root_hash).await
1327        })?;
1328
1329        let mut wtxn = self.env.write_txn()?;
1330        let mut freed = 0u64;
1331
1332        // For each blob, remove the blob-tree entry and check if orphaned
1333        for blob_hash in &blob_hashes {
1334            // Delete blob-tree entry (64-byte key: blob_hash ++ tree_hash)
1335            let mut key = [0u8; 64];
1336            key[..32].copy_from_slice(blob_hash);
1337            key[32..].copy_from_slice(root_hash);
1338            self.blob_trees.delete(&mut wtxn, &key[..])?;
1339
1340            // Check if blob is in any other tree (prefix scan on first 32 bytes)
1341            let rtxn = self.env.read_txn()?;
1342            let mut has_other_tree = false;
1343
1344            for item in self.blob_trees.prefix_iter(&rtxn, &blob_hash[..])? {
1345                if item.is_ok() {
1346                    has_other_tree = true;
1347                    break;
1348                }
1349            }
1350            drop(rtxn);
1351
1352            // If orphaned, delete the blob
1353            if !has_other_tree {
1354                if let Some(data) = self.router.get_sync(blob_hash)
1355                    .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))?
1356                {
1357                    freed += data.len() as u64;
1358                    // Delete locally only - keep S3 as archive
1359                    self.router.delete_local_only(blob_hash)
1360                        .map_err(|e| anyhow::anyhow!("Failed to delete blob: {}", e))?;
1361                }
1362            }
1363        }
1364
1365        // Delete tree node itself if exists
1366        if let Some(data) = self.router.get_sync(root_hash)
1367            .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
1368        {
1369            freed += data.len() as u64;
1370            // Delete locally only - keep S3 as archive
1371            self.router.delete_local_only(root_hash)
1372                .map_err(|e| anyhow::anyhow!("Failed to delete tree node: {}", e))?;
1373        }
1374
1375        // Delete tree metadata
1376        self.tree_meta.delete(&mut wtxn, root_hash.as_slice())?;
1377
1378        wtxn.commit()?;
1379
1380        tracing::debug!(
1381            "Unindexed tree {} ({} bytes freed)",
1382            &root_hex[..8],
1383            freed
1384        );
1385
1386        Ok(freed)
1387    }
1388
1389    /// Get tree metadata
1390    pub fn get_tree_meta(&self, root_hash: &Hash) -> Result<Option<TreeMeta>> {
1391        let rtxn = self.env.read_txn()?;
1392        if let Some(bytes) = self.tree_meta.get(&rtxn, root_hash.as_slice())? {
1393            let meta: TreeMeta = rmp_serde::from_slice(bytes)
1394                .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
1395            Ok(Some(meta))
1396        } else {
1397            Ok(None)
1398        }
1399    }
1400
1401    /// List all indexed trees
1402    pub fn list_indexed_trees(&self) -> Result<Vec<(Hash, TreeMeta)>> {
1403        let rtxn = self.env.read_txn()?;
1404        let mut trees = Vec::new();
1405
1406        for item in self.tree_meta.iter(&rtxn)? {
1407            let (hash_bytes, meta_bytes) = item?;
1408            let hash: Hash = hash_bytes.try_into()
1409                .map_err(|_| anyhow::anyhow!("Invalid hash in tree_meta"))?;
1410            let meta: TreeMeta = rmp_serde::from_slice(meta_bytes)
1411                .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
1412            trees.push((hash, meta));
1413        }
1414
1415        Ok(trees)
1416    }
1417
1418    /// Get total tracked storage size (sum of all tree_meta.total_size)
1419    pub fn tracked_size(&self) -> Result<u64> {
1420        let rtxn = self.env.read_txn()?;
1421        let mut total = 0u64;
1422
1423        for item in self.tree_meta.iter(&rtxn)? {
1424            let (_, bytes) = item?;
1425            let meta: TreeMeta = rmp_serde::from_slice(bytes)
1426                .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
1427            total += meta.total_size;
1428        }
1429
1430        Ok(total)
1431    }
1432
1433    /// Get evictable trees sorted by (priority ASC, synced_at ASC)
1434    fn get_evictable_trees(&self) -> Result<Vec<(Hash, TreeMeta)>> {
1435        let mut trees = self.list_indexed_trees()?;
1436
1437        // Sort by priority (lower first), then by synced_at (older first)
1438        trees.sort_by(|a, b| {
1439            match a.1.priority.cmp(&b.1.priority) {
1440                std::cmp::Ordering::Equal => a.1.synced_at.cmp(&b.1.synced_at),
1441                other => other,
1442            }
1443        });
1444
1445        Ok(trees)
1446    }
1447
1448    /// Run eviction if storage is over quota
1449    /// Returns bytes freed
1450    ///
1451    /// Eviction order:
1452    /// 1. Orphaned blobs (not in any indexed tree and not pinned)
1453    /// 2. Trees by priority (lowest first) and age (oldest first)
1454    pub fn evict_if_needed(&self) -> Result<u64> {
1455        // Get actual storage used
1456        let stats = self.router.stats()
1457            .map_err(|e| anyhow::anyhow!("Failed to get stats: {}", e))?;
1458        let current = stats.total_bytes;
1459
1460        if current <= self.max_size_bytes {
1461            return Ok(0);
1462        }
1463
1464        // Target 90% of max to avoid constant eviction
1465        let target = self.max_size_bytes * 90 / 100;
1466        let mut freed = 0u64;
1467        let mut current_size = current;
1468
1469        // Phase 1: Evict orphaned blobs (not in any tree and not pinned)
1470        let orphan_freed = self.evict_orphaned_blobs()?;
1471        freed += orphan_freed;
1472        current_size = current_size.saturating_sub(orphan_freed);
1473
1474        if orphan_freed > 0 {
1475            tracing::info!("Evicted orphaned blobs: {} bytes freed", orphan_freed);
1476        }
1477
1478        // Check if we're now under target
1479        if current_size <= target {
1480            if freed > 0 {
1481                tracing::info!("Eviction complete: {} bytes freed", freed);
1482            }
1483            return Ok(freed);
1484        }
1485
1486        // Phase 2: Evict trees by priority (lowest first) and age (oldest first)
1487        // Own trees CAN be evicted (just last), but PINNED trees are never evicted
1488        let evictable = self.get_evictable_trees()?;
1489
1490        for (root_hash, meta) in evictable {
1491            if current_size <= target {
1492                break;
1493            }
1494
1495            let root_hex = to_hex(&root_hash);
1496
1497            // Never evict pinned trees
1498            if self.is_pinned(&root_hash)? {
1499                continue;
1500            }
1501
1502            let tree_freed = self.unindex_tree(&root_hash)?;
1503            freed += tree_freed;
1504            current_size = current_size.saturating_sub(tree_freed);
1505
1506            tracing::info!(
1507                "Evicted tree {} (owner={}, priority={}, {} bytes)",
1508                &root_hex[..8],
1509                &meta.owner[..8.min(meta.owner.len())],
1510                meta.priority,
1511                tree_freed
1512            );
1513        }
1514
1515        if freed > 0 {
1516            tracing::info!("Eviction complete: {} bytes freed", freed);
1517        }
1518
1519        Ok(freed)
1520    }
1521
1522    /// Evict blobs that are not part of any indexed tree and not pinned
1523    fn evict_orphaned_blobs(&self) -> Result<u64> {
1524        let mut freed = 0u64;
1525
1526        // Get all blob hashes from store
1527        let all_hashes = self.router.list()
1528            .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
1529
1530        // Get pinned hashes as raw bytes
1531        let rtxn = self.env.read_txn()?;
1532        let pinned: HashSet<Hash> = self.pins.iter(&rtxn)?
1533            .filter_map(|item| item.ok())
1534            .filter_map(|(hash_bytes, _)| {
1535                if hash_bytes.len() == 32 {
1536                    let mut hash = [0u8; 32];
1537                    hash.copy_from_slice(hash_bytes);
1538                    Some(hash)
1539                } else {
1540                    None
1541                }
1542            })
1543            .collect();
1544
1545        // Collect all blob hashes that are in at least one tree
1546        // Key format is blob_hash (32 bytes) ++ tree_hash (32 bytes)
1547        let mut blobs_in_trees: HashSet<Hash> = HashSet::new();
1548        for item in self.blob_trees.iter(&rtxn)? {
1549            if let Ok((key_bytes, _)) = item {
1550                if key_bytes.len() >= 32 {
1551                    let blob_hash: Hash = key_bytes[..32].try_into().unwrap();
1552                    blobs_in_trees.insert(blob_hash);
1553                }
1554            }
1555        }
1556        drop(rtxn);
1557
1558        // Find and delete orphaned blobs
1559        for hash in all_hashes {
1560            // Skip if pinned
1561            if pinned.contains(&hash) {
1562                continue;
1563            }
1564
1565            // Skip if part of any tree
1566            if blobs_in_trees.contains(&hash) {
1567                continue;
1568            }
1569
1570            // This blob is orphaned - delete locally (keep S3 as archive)
1571            if let Ok(Some(data)) = self.router.get_sync(&hash) {
1572                freed += data.len() as u64;
1573                let _ = self.router.delete_local_only(&hash);
1574                tracing::debug!("Deleted orphaned blob {} ({} bytes)", &to_hex(&hash)[..8], data.len());
1575            }
1576        }
1577
1578        Ok(freed)
1579    }
1580
1581    /// Get the maximum storage size in bytes
1582    pub fn max_size_bytes(&self) -> u64 {
1583        self.max_size_bytes
1584    }
1585
1586    /// Get storage usage by priority tier
1587    pub fn storage_by_priority(&self) -> Result<StorageByPriority> {
1588        let rtxn = self.env.read_txn()?;
1589        let mut own = 0u64;
1590        let mut followed = 0u64;
1591        let mut other = 0u64;
1592
1593        for item in self.tree_meta.iter(&rtxn)? {
1594            let (_, bytes) = item?;
1595            let meta: TreeMeta = rmp_serde::from_slice(bytes)
1596                .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
1597
1598            if meta.priority >= PRIORITY_OWN {
1599                own += meta.total_size;
1600            } else if meta.priority >= PRIORITY_FOLLOWED {
1601                followed += meta.total_size;
1602            } else {
1603                other += meta.total_size;
1604            }
1605        }
1606
1607        Ok(StorageByPriority { own, followed, other })
1608    }
1609
1610    /// Get storage statistics
1611    pub fn get_storage_stats(&self) -> Result<StorageStats> {
1612        let rtxn = self.env.read_txn()?;
1613        let total_pins = self.pins.len(&rtxn)? as usize;
1614
1615        let stats = self.router.stats()
1616            .map_err(|e| anyhow::anyhow!("Failed to get stats: {}", e))?;
1617
1618        Ok(StorageStats {
1619            total_dags: stats.count,
1620            pinned_dags: total_pins,
1621            total_bytes: stats.total_bytes,
1622        })
1623    }
1624
1625    // === Cached roots (replaces nostrdb event caching) ===
1626
1627    /// Get cached root for a pubkey/tree_name pair
1628    pub fn get_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<Option<CachedRoot>> {
1629        let key = format!("{}/{}", pubkey_hex, tree_name);
1630        let rtxn = self.env.read_txn()?;
1631        if let Some(bytes) = self.cached_roots.get(&rtxn, &key)? {
1632            let root: CachedRoot = rmp_serde::from_slice(bytes)
1633                .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
1634            Ok(Some(root))
1635        } else {
1636            Ok(None)
1637        }
1638    }
1639
1640    /// Set cached root for a pubkey/tree_name pair
1641    pub fn set_cached_root(
1642        &self,
1643        pubkey_hex: &str,
1644        tree_name: &str,
1645        hash: &str,
1646        key: Option<&str>,
1647        visibility: &str,
1648        updated_at: u64,
1649    ) -> Result<()> {
1650        let db_key = format!("{}/{}", pubkey_hex, tree_name);
1651        let root = CachedRoot {
1652            hash: hash.to_string(),
1653            key: key.map(|k| k.to_string()),
1654            updated_at,
1655            visibility: visibility.to_string(),
1656        };
1657        let bytes = rmp_serde::to_vec(&root)
1658            .map_err(|e| anyhow::anyhow!("Failed to serialize CachedRoot: {}", e))?;
1659        let mut wtxn = self.env.write_txn()?;
1660        self.cached_roots.put(&mut wtxn, &db_key, &bytes)?;
1661        wtxn.commit()?;
1662        Ok(())
1663    }
1664
1665    /// List all cached roots for a pubkey
1666    pub fn list_cached_roots(&self, pubkey_hex: &str) -> Result<Vec<(String, CachedRoot)>> {
1667        let prefix = format!("{}/", pubkey_hex);
1668        let rtxn = self.env.read_txn()?;
1669        let mut results = Vec::new();
1670
1671        for item in self.cached_roots.iter(&rtxn)? {
1672            let (key, bytes) = item?;
1673            if key.starts_with(&prefix) {
1674                let tree_name = key.strip_prefix(&prefix).unwrap_or(key);
1675                let root: CachedRoot = rmp_serde::from_slice(bytes)
1676                    .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
1677                results.push((tree_name.to_string(), root));
1678            }
1679        }
1680
1681        Ok(results)
1682    }
1683
1684    /// Delete a cached root
1685    pub fn delete_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<bool> {
1686        let key = format!("{}/{}", pubkey_hex, tree_name);
1687        let mut wtxn = self.env.write_txn()?;
1688        let deleted = self.cached_roots.delete(&mut wtxn, &key)?;
1689        wtxn.commit()?;
1690        Ok(deleted)
1691    }
1692
1693    /// Garbage collect unpinned content
1694    pub fn gc(&self) -> Result<GcStats> {
1695        let rtxn = self.env.read_txn()?;
1696
1697        // Get all pinned hashes as raw bytes
1698        let pinned: HashSet<Hash> = self.pins.iter(&rtxn)?
1699            .filter_map(|item| item.ok())
1700            .filter_map(|(hash_bytes, _)| {
1701                if hash_bytes.len() == 32 {
1702                    let mut hash = [0u8; 32];
1703                    hash.copy_from_slice(hash_bytes);
1704                    Some(hash)
1705                } else {
1706                    None
1707                }
1708            })
1709            .collect();
1710
1711        drop(rtxn);
1712
1713        // Get all stored hashes
1714        let all_hashes = self.router.list()
1715            .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
1716
1717        // Delete unpinned hashes
1718        let mut deleted = 0;
1719        let mut freed_bytes = 0u64;
1720
1721        for hash in all_hashes {
1722            if !pinned.contains(&hash) {
1723                if let Ok(Some(data)) = self.router.get_sync(&hash) {
1724                    freed_bytes += data.len() as u64;
1725                    // Delete locally only - keep S3 as archive
1726                    let _ = self.router.delete_local_only(&hash);
1727                    deleted += 1;
1728                }
1729            }
1730        }
1731
1732        Ok(GcStats {
1733            deleted_dags: deleted,
1734            freed_bytes,
1735        })
1736    }
1737
1738    /// Verify LMDB blob integrity - checks that stored data matches its key hash
1739    /// Returns verification statistics and optionally deletes corrupted entries
1740    pub fn verify_lmdb_integrity(&self, delete: bool) -> Result<VerifyResult> {
1741        let all_hashes = self.router.list()
1742            .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
1743
1744        let total = all_hashes.len();
1745        let mut valid = 0;
1746        let mut corrupted = 0;
1747        let mut deleted = 0;
1748        let mut corrupted_hashes = Vec::new();
1749
1750        for hash in &all_hashes {
1751            let hash_hex = to_hex(hash);
1752
1753            match self.router.get_sync(hash) {
1754                Ok(Some(data)) => {
1755                    // Compute actual SHA256 of data
1756                    let actual_hash = sha256(&data);
1757
1758                    if actual_hash == *hash {
1759                        valid += 1;
1760                    } else {
1761                        corrupted += 1;
1762                        let actual_hex = to_hex(&actual_hash);
1763                        println!("  CORRUPTED: key={} actual={} size={}",
1764                            &hash_hex[..16], &actual_hex[..16], data.len());
1765                        corrupted_hashes.push(*hash);
1766                    }
1767                }
1768                Ok(None) => {
1769                    // Hash exists in index but data is missing
1770                    corrupted += 1;
1771                    println!("  MISSING: key={}", &hash_hex[..16]);
1772                    corrupted_hashes.push(*hash);
1773                }
1774                Err(e) => {
1775                    corrupted += 1;
1776                    println!("  ERROR: key={} err={}", &hash_hex[..16], e);
1777                    corrupted_hashes.push(*hash);
1778                }
1779            }
1780        }
1781
1782        // Delete corrupted entries if requested
1783        if delete {
1784            for hash in &corrupted_hashes {
1785                match self.router.delete_sync(hash) {
1786                    Ok(true) => deleted += 1,
1787                    Ok(false) => {} // Already deleted
1788                    Err(e) => {
1789                        let hash_hex = to_hex(hash);
1790                        println!("  Failed to delete {}: {}", &hash_hex[..16], e);
1791                    }
1792                }
1793            }
1794        }
1795
1796        Ok(VerifyResult {
1797            total,
1798            valid,
1799            corrupted,
1800            deleted,
1801        })
1802    }
1803
1804    /// Verify R2/S3 blob integrity - lists all objects and verifies hash matches filename
1805    /// Returns verification statistics and optionally deletes corrupted entries
1806    #[cfg(feature = "s3")]
1807    pub async fn verify_r2_integrity(&self, delete: bool) -> Result<VerifyResult> {
1808        use aws_sdk_s3::Client as S3Client;
1809
1810        // Get S3 client from router (we need to access it directly)
1811        // For now, we'll create a new client from config
1812        let config = crate::config::Config::load()?;
1813        let s3_config = config.storage.s3
1814            .ok_or_else(|| anyhow::anyhow!("S3 not configured"))?;
1815
1816        // Build AWS config
1817        let aws_config = aws_config::from_env()
1818            .region(aws_sdk_s3::config::Region::new(s3_config.region.clone()))
1819            .load()
1820            .await;
1821
1822        let s3_client = S3Client::from_conf(
1823            aws_sdk_s3::config::Builder::from(&aws_config)
1824                .endpoint_url(&s3_config.endpoint)
1825                .force_path_style(true)
1826                .build()
1827        );
1828
1829        let bucket = &s3_config.bucket;
1830        let prefix = s3_config.prefix.as_deref().unwrap_or("");
1831
1832        let mut total = 0;
1833        let mut valid = 0;
1834        let mut corrupted = 0;
1835        let mut deleted = 0;
1836        let mut corrupted_keys = Vec::new();
1837
1838        // List all objects in bucket
1839        let mut continuation_token: Option<String> = None;
1840
1841        loop {
1842            let mut list_req = s3_client.list_objects_v2()
1843                .bucket(bucket)
1844                .prefix(prefix);
1845
1846            if let Some(ref token) = continuation_token {
1847                list_req = list_req.continuation_token(token);
1848            }
1849
1850            let list_resp = list_req.send().await
1851                .map_err(|e| anyhow::anyhow!("Failed to list S3 objects: {}", e))?;
1852
1853            for object in list_resp.contents() {
1854                let key = object.key().unwrap_or("");
1855
1856                // Skip non-.bin files
1857                if !key.ends_with(".bin") {
1858                    continue;
1859                }
1860
1861                total += 1;
1862
1863                // Extract expected hash from filename (remove prefix and .bin)
1864                let filename = key.strip_prefix(prefix).unwrap_or(key);
1865                let expected_hash_hex = filename.strip_suffix(".bin").unwrap_or(filename);
1866
1867                // Validate it's a valid hex hash
1868                if expected_hash_hex.len() != 64 {
1869                    corrupted += 1;
1870                    println!("  INVALID KEY: {}", key);
1871                    corrupted_keys.push(key.to_string());
1872                    continue;
1873                }
1874
1875                let expected_hash = match from_hex(expected_hash_hex) {
1876                    Ok(h) => h,
1877                    Err(_) => {
1878                        corrupted += 1;
1879                        println!("  INVALID HEX: {}", key);
1880                        corrupted_keys.push(key.to_string());
1881                        continue;
1882                    }
1883                };
1884
1885                // Download and verify content
1886                match s3_client.get_object()
1887                    .bucket(bucket)
1888                    .key(key)
1889                    .send()
1890                    .await
1891                {
1892                    Ok(resp) => {
1893                        match resp.body.collect().await {
1894                            Ok(bytes) => {
1895                                let data = bytes.into_bytes();
1896                                let actual_hash = sha256(&data);
1897
1898                                if actual_hash == expected_hash {
1899                                    valid += 1;
1900                                } else {
1901                                    corrupted += 1;
1902                                    let actual_hex = to_hex(&actual_hash);
1903                                    println!("  CORRUPTED: key={} actual={} size={}",
1904                                        &expected_hash_hex[..16], &actual_hex[..16], data.len());
1905                                    corrupted_keys.push(key.to_string());
1906                                }
1907                            }
1908                            Err(e) => {
1909                                corrupted += 1;
1910                                println!("  READ ERROR: {} - {}", key, e);
1911                                corrupted_keys.push(key.to_string());
1912                            }
1913                        }
1914                    }
1915                    Err(e) => {
1916                        corrupted += 1;
1917                        println!("  FETCH ERROR: {} - {}", key, e);
1918                        corrupted_keys.push(key.to_string());
1919                    }
1920                }
1921
1922                // Progress indicator every 100 objects
1923                if total % 100 == 0 {
1924                    println!("  Progress: {} objects checked, {} corrupted so far", total, corrupted);
1925                }
1926            }
1927
1928            // Check if there are more objects
1929            if list_resp.is_truncated() == Some(true) {
1930                continuation_token = list_resp.next_continuation_token().map(|s| s.to_string());
1931            } else {
1932                break;
1933            }
1934        }
1935
1936        // Delete corrupted entries if requested
1937        if delete {
1938            for key in &corrupted_keys {
1939                match s3_client.delete_object()
1940                    .bucket(bucket)
1941                    .key(key)
1942                    .send()
1943                    .await
1944                {
1945                    Ok(_) => deleted += 1,
1946                    Err(e) => {
1947                        println!("  Failed to delete {}: {}", key, e);
1948                    }
1949                }
1950            }
1951        }
1952
1953        Ok(VerifyResult {
1954            total,
1955            valid,
1956            corrupted,
1957            deleted,
1958        })
1959    }
1960
1961    /// Fallback for non-S3 builds
1962    #[cfg(not(feature = "s3"))]
1963    pub async fn verify_r2_integrity(&self, _delete: bool) -> Result<VerifyResult> {
1964        Err(anyhow::anyhow!("S3 feature not enabled"))
1965    }
1966}
1967
1968/// Result of blob integrity verification
1969#[derive(Debug, Clone)]
1970pub struct VerifyResult {
1971    pub total: usize,
1972    pub valid: usize,
1973    pub corrupted: usize,
1974    pub deleted: usize,
1975}
1976
1977#[derive(Debug)]
1978pub struct StorageStats {
1979    pub total_dags: usize,
1980    pub pinned_dags: usize,
1981    pub total_bytes: u64,
1982}
1983
1984/// Storage usage broken down by priority tier
1985#[derive(Debug, Clone)]
1986pub struct StorageByPriority {
1987    /// Own/pinned trees (priority 255)
1988    pub own: u64,
1989    /// Followed users' trees (priority 128)
1990    pub followed: u64,
1991    /// Other trees (priority 64)
1992    pub other: u64,
1993}
1994
1995#[derive(Debug, Clone)]
1996pub struct FileChunkMetadata {
1997    pub total_size: u64,
1998    pub chunk_hashes: Vec<Hash>,
1999    pub chunk_sizes: Vec<u64>,
2000    pub is_chunked: bool,
2001}
2002
2003/// Owned iterator for async streaming
2004pub struct FileRangeChunksOwned {
2005    store: Arc<HashtreeStore>,
2006    metadata: FileChunkMetadata,
2007    start: u64,
2008    end: u64,
2009    current_chunk_idx: usize,
2010    current_offset: u64,
2011}
2012
2013impl Iterator for FileRangeChunksOwned {
2014    type Item = Result<Vec<u8>>;
2015
2016    fn next(&mut self) -> Option<Self::Item> {
2017        if !self.metadata.is_chunked || self.current_chunk_idx >= self.metadata.chunk_hashes.len() {
2018            return None;
2019        }
2020
2021        if self.current_offset > self.end {
2022            return None;
2023        }
2024
2025        let chunk_hash = &self.metadata.chunk_hashes[self.current_chunk_idx];
2026        let chunk_size = self.metadata.chunk_sizes[self.current_chunk_idx];
2027        let chunk_end = self.current_offset + chunk_size - 1;
2028
2029        self.current_chunk_idx += 1;
2030
2031        if chunk_end < self.start || self.current_offset > self.end {
2032            self.current_offset += chunk_size;
2033            return self.next();
2034        }
2035
2036        let chunk_content = match self.store.get_chunk(chunk_hash) {
2037            Ok(Some(content)) => content,
2038            Ok(None) => {
2039                return Some(Err(anyhow::anyhow!("Chunk {} not found", to_hex(chunk_hash))));
2040            }
2041            Err(e) => {
2042                return Some(Err(e));
2043            }
2044        };
2045
2046        let chunk_read_start = if self.current_offset >= self.start {
2047            0
2048        } else {
2049            (self.start - self.current_offset) as usize
2050        };
2051
2052        let chunk_read_end = if chunk_end <= self.end {
2053            chunk_size as usize - 1
2054        } else {
2055            (self.end - self.current_offset) as usize
2056        };
2057
2058        let result = chunk_content[chunk_read_start..=chunk_read_end].to_vec();
2059        self.current_offset += chunk_size;
2060
2061        Some(Ok(result))
2062    }
2063}
2064
2065#[derive(Debug)]
2066pub struct GcStats {
2067    pub deleted_dags: usize,
2068    pub freed_bytes: u64,
2069}
2070
2071#[derive(Debug, Clone)]
2072pub struct DirEntry {
2073    pub name: String,
2074    pub cid: String,
2075    pub is_directory: bool,
2076    pub size: u64,
2077}
2078
2079#[derive(Debug, Clone)]
2080pub struct DirectoryListing {
2081    pub dir_name: String,
2082    pub entries: Vec<DirEntry>,
2083}
2084
2085#[derive(Debug, Clone)]
2086pub struct PinnedItem {
2087    pub cid: String,
2088    pub name: String,
2089    pub is_directory: bool,
2090}
2091
2092/// Blob metadata for Blossom protocol
2093#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
2094pub struct BlobMetadata {
2095    pub sha256: String,
2096    pub size: u64,
2097    pub mime_type: String,
2098    pub uploaded: u64,
2099}
2100
2101// Implement ContentStore trait for WebRTC data exchange
2102impl crate::webrtc::ContentStore for HashtreeStore {
2103    fn get(&self, hash_hex: &str) -> Result<Option<Vec<u8>>> {
2104        let hash = from_hex(hash_hex)
2105            .map_err(|e| anyhow::anyhow!("Invalid hash: {}", e))?;
2106        self.get_chunk(&hash)
2107    }
2108}