hashtree_cli/
storage.rs

1use anyhow::{Context, Result};
2use async_trait::async_trait;
3use heed::{Database, EnvOpenOptions};
4use heed::types::*;
5use hashtree_lmdb::LmdbBlobStore;
6use hashtree_core::{
7    HashTree, HashTreeConfig, Cid,
8    sha256, to_hex, from_hex, TreeNode, DirEntry as HashTreeDirEntry,
9    types::Hash,
10};
11use hashtree_core::store::{Store, StoreError};
12use serde::{Deserialize, Serialize};
13use std::path::Path;
14use std::collections::HashSet;
15use std::io::Read;
16use std::sync::Arc;
17use std::time::{SystemTime, UNIX_EPOCH};
18use futures::executor::block_on as sync_block_on;
19
20/// Priority levels for tree eviction
21pub const PRIORITY_OTHER: u8 = 64;
22pub const PRIORITY_FOLLOWED: u8 = 128;
23pub const PRIORITY_OWN: u8 = 255;
24
25/// Metadata for a synced tree (for eviction tracking)
26#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct TreeMeta {
28    /// Pubkey of tree owner
29    pub owner: String,
30    /// Tree name if known (from nostr key like "npub.../name")
31    pub name: Option<String>,
32    /// Unix timestamp when this tree was synced
33    pub synced_at: u64,
34    /// Total size of all blobs in this tree
35    pub total_size: u64,
36    /// Eviction priority: 255=own/pinned, 128=followed, 64=other
37    pub priority: u8,
38}
39
40/// Cached root info from Nostr events (replaces nostrdb caching)
41#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct CachedRoot {
43    /// Root hash (hex)
44    pub hash: String,
45    /// Optional decryption key (hex)
46    pub key: Option<String>,
47    /// Unix timestamp when this was cached (from event created_at)
48    pub updated_at: u64,
49    /// Visibility: "public" or "private"
50    pub visibility: String,
51}
52
53#[cfg(feature = "s3")]
54use tokio::sync::mpsc;
55
56use crate::config::S3Config;
57
58/// Message for background S3 sync
59#[cfg(feature = "s3")]
60enum S3SyncMessage {
61    Upload { hash: Hash, data: Vec<u8> },
62    Delete { hash: Hash },
63}
64
65/// Storage router - LMDB primary with optional S3 backup
66///
67/// Write path: LMDB first (fast), then queue S3 upload (non-blocking)
68/// Read path: LMDB first, fall back to S3 if miss
69pub struct StorageRouter {
70    /// Primary local store (always used)
71    local: Arc<LmdbBlobStore>,
72    /// Optional S3 client for backup
73    #[cfg(feature = "s3")]
74    s3_client: Option<aws_sdk_s3::Client>,
75    #[cfg(feature = "s3")]
76    s3_bucket: Option<String>,
77    #[cfg(feature = "s3")]
78    s3_prefix: String,
79    /// Channel to send uploads to background task
80    #[cfg(feature = "s3")]
81    sync_tx: Option<mpsc::UnboundedSender<S3SyncMessage>>,
82}
83
84impl StorageRouter {
85    /// Create router with LMDB only
86    pub fn new(local: Arc<LmdbBlobStore>) -> Self {
87        Self {
88            local,
89            #[cfg(feature = "s3")]
90            s3_client: None,
91            #[cfg(feature = "s3")]
92            s3_bucket: None,
93            #[cfg(feature = "s3")]
94            s3_prefix: String::new(),
95            #[cfg(feature = "s3")]
96            sync_tx: None,
97        }
98    }
99
100    /// Create router with LMDB + S3 backup
101    #[cfg(feature = "s3")]
102    pub async fn with_s3(local: Arc<LmdbBlobStore>, config: &S3Config) -> Result<Self, anyhow::Error> {
103        use aws_sdk_s3::Client as S3Client;
104
105        // Build AWS config
106        let mut aws_config_loader = aws_config::from_env();
107        aws_config_loader = aws_config_loader.region(aws_sdk_s3::config::Region::new(config.region.clone()));
108        let aws_config = aws_config_loader.load().await;
109
110        // Build S3 client with custom endpoint
111        let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&aws_config);
112        s3_config_builder = s3_config_builder
113            .endpoint_url(&config.endpoint)
114            .force_path_style(true);
115
116        let s3_client = S3Client::from_conf(s3_config_builder.build());
117        let bucket = config.bucket.clone();
118        let prefix = config.prefix.clone().unwrap_or_default();
119
120        // Create background sync channel
121        let (sync_tx, mut sync_rx) = mpsc::unbounded_channel::<S3SyncMessage>();
122
123        // Spawn background sync task
124        let sync_client = s3_client.clone();
125        let sync_bucket = bucket.clone();
126        let sync_prefix = prefix.clone();
127
128        tokio::spawn(async move {
129            use aws_sdk_s3::primitives::ByteStream;
130
131            tracing::info!("S3 background sync task started");
132
133            while let Some(msg) = sync_rx.recv().await {
134                match msg {
135                    S3SyncMessage::Upload { hash, data } => {
136                        let key = format!("{}{}.bin", sync_prefix, to_hex(&hash));
137                        tracing::info!("S3 uploading {} ({} bytes)", &key, data.len());
138
139                        match sync_client
140                            .put_object()
141                            .bucket(&sync_bucket)
142                            .key(&key)
143                            .body(ByteStream::from(data))
144                            .send()
145                            .await
146                        {
147                            Ok(_) => tracing::info!("S3 upload succeeded for {}", &key),
148                            Err(e) => tracing::error!("S3 upload failed for {}: {}", &key, e),
149                        }
150                    }
151                    S3SyncMessage::Delete { hash } => {
152                        let key = format!("{}{}.bin", sync_prefix, to_hex(&hash));
153                        tracing::debug!("S3 deleting {}", &key);
154
155                        if let Err(e) = sync_client
156                            .delete_object()
157                            .bucket(&sync_bucket)
158                            .key(&key)
159                            .send()
160                            .await
161                        {
162                            tracing::error!("S3 delete failed for {}: {}", &key[..16.min(key.len())], e);
163                        }
164                    }
165                }
166            }
167        });
168
169        tracing::info!("S3 storage initialized: bucket={}, prefix={}", bucket, prefix);
170
171        Ok(Self {
172            local,
173            s3_client: Some(s3_client),
174            s3_bucket: Some(bucket),
175            s3_prefix: prefix,
176            sync_tx: Some(sync_tx),
177        })
178    }
179
180    /// Store data - writes to LMDB, queues S3 upload in background
181    pub fn put_sync(&self, hash: Hash, data: &[u8]) -> Result<bool, StoreError> {
182        // Always write to local first
183        let is_new = self.local.put_sync(hash, data)?;
184
185        // Queue S3 upload if configured (non-blocking)
186        // Always upload to S3 (even if not new locally) to ensure S3 has all blobs
187        #[cfg(feature = "s3")]
188        if let Some(ref tx) = self.sync_tx {
189            tracing::info!("Queueing S3 upload for {} ({} bytes, is_new={})",
190                crate::storage::to_hex(&hash)[..16].to_string(), data.len(), is_new);
191            if let Err(e) = tx.send(S3SyncMessage::Upload { hash, data: data.to_vec() }) {
192                tracing::error!("Failed to queue S3 upload: {}", e);
193            }
194        }
195
196        Ok(is_new)
197    }
198
199    /// Get data - tries LMDB first, falls back to S3
200    pub fn get_sync(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
201        // Try local first
202        if let Some(data) = self.local.get_sync(hash)? {
203            return Ok(Some(data));
204        }
205
206        // Fall back to S3 if configured
207        #[cfg(feature = "s3")]
208        if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
209            let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
210
211            match sync_block_on(async {
212                client.get_object()
213                    .bucket(bucket)
214                    .key(&key)
215                    .send()
216                    .await
217            }) {
218                Ok(output) => {
219                    if let Ok(body) = sync_block_on(output.body.collect()) {
220                        let data = body.into_bytes().to_vec();
221                        // Cache locally for future reads
222                        let _ = self.local.put_sync(*hash, &data);
223                        return Ok(Some(data));
224                    }
225                }
226                Err(e) => {
227                    let service_err = e.into_service_error();
228                    if !service_err.is_no_such_key() {
229                        tracing::warn!("S3 get failed: {}", service_err);
230                    }
231                }
232            }
233        }
234
235        Ok(None)
236    }
237
238    /// Check if hash exists
239    pub fn exists(&self, hash: &Hash) -> Result<bool, StoreError> {
240        // Check local first
241        if self.local.exists(hash)? {
242            return Ok(true);
243        }
244
245        // Check S3 if configured
246        #[cfg(feature = "s3")]
247        if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
248            let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
249
250            match sync_block_on(async {
251                client.head_object()
252                    .bucket(bucket)
253                    .key(&key)
254                    .send()
255                    .await
256            }) {
257                Ok(_) => return Ok(true),
258                Err(e) => {
259                    let service_err = e.into_service_error();
260                    if !service_err.is_not_found() {
261                        tracing::warn!("S3 head failed: {}", service_err);
262                    }
263                }
264            }
265        }
266
267        Ok(false)
268    }
269
270    /// Delete data from both local and S3 stores
271    pub fn delete_sync(&self, hash: &Hash) -> Result<bool, StoreError> {
272        let deleted = self.local.delete_sync(hash)?;
273
274        // Queue S3 delete if configured
275        #[cfg(feature = "s3")]
276        if let Some(ref tx) = self.sync_tx {
277            let _ = tx.send(S3SyncMessage::Delete { hash: *hash });
278        }
279
280        Ok(deleted)
281    }
282
283    /// Delete data from local store only (don't propagate to S3)
284    /// Used for eviction where we want to keep S3 as archive
285    pub fn delete_local_only(&self, hash: &Hash) -> Result<bool, StoreError> {
286        self.local.delete_sync(hash)
287    }
288
289    /// Get stats from local store
290    pub fn stats(&self) -> Result<hashtree_lmdb::LmdbStats, StoreError> {
291        self.local.stats()
292    }
293
294    /// List all hashes from local store
295    pub fn list(&self) -> Result<Vec<Hash>, StoreError> {
296        self.local.list()
297    }
298
299    /// Get the underlying LMDB store for HashTree operations
300    pub fn local_store(&self) -> Arc<LmdbBlobStore> {
301        Arc::clone(&self.local)
302    }
303}
304
305// Implement async Store trait for StorageRouter so it can be used directly with HashTree
306// This ensures all writes go through S3 sync
307#[async_trait]
308impl Store for StorageRouter {
309    async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
310        self.put_sync(hash, &data)
311    }
312
313    async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
314        self.get_sync(hash)
315    }
316
317    async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
318        self.exists(hash)
319    }
320
321    async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
322        self.delete_sync(hash)
323    }
324}
325
326pub struct HashtreeStore {
327    env: heed::Env,
328    /// Set of pinned hashes (hex strings, prevents garbage collection)
329    pins: Database<Str, Unit>,
330    /// Maps SHA256 hex -> root hash hex (for blossom compatibility)
331    sha256_index: Database<Str, Str>,
332    /// Blob ownership: sha256 (32 bytes) ++ pubkey (32 bytes) -> () (composite key for multi-owner)
333    blob_owners: Database<Bytes, Unit>,
334    /// Maps pubkey -> blob metadata JSON (for blossom list)
335    pubkey_blobs: Database<Str, Bytes>,
336    /// Tree metadata for eviction: tree_root_hash (32 bytes) -> TreeMeta (msgpack)
337    tree_meta: Database<Bytes, Bytes>,
338    /// Blob-to-tree mapping: blob_hash ++ tree_hash (64 bytes) -> ()
339    blob_trees: Database<Bytes, Unit>,
340    /// Tree refs: "npub/path" -> tree_root_hash (32 bytes) - for replacing old versions
341    tree_refs: Database<Str, Bytes>,
342    /// Cached roots from Nostr: "pubkey_hex/tree_name" -> CachedRoot (msgpack)
343    cached_roots: Database<Str, Bytes>,
344    /// Storage router - handles LMDB + optional S3 (Arc for sharing with HashTree)
345    router: Arc<StorageRouter>,
346    /// Maximum storage size in bytes (from config)
347    max_size_bytes: u64,
348}
349
350impl HashtreeStore {
351    /// Create a new store with local LMDB storage only (10GB default limit)
352    pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
353        Self::with_options(path, None, 10 * 1024 * 1024 * 1024)
354    }
355
356    /// Create a new store with optional S3 backend (10GB default limit)
357    pub fn with_s3<P: AsRef<Path>>(path: P, s3_config: Option<&S3Config>) -> Result<Self> {
358        Self::with_options(path, s3_config, 10 * 1024 * 1024 * 1024)
359    }
360
361    /// Create a new store with optional S3 backend and custom size limit
362    pub fn with_options<P: AsRef<Path>>(path: P, s3_config: Option<&S3Config>, max_size_bytes: u64) -> Result<Self> {
363        let path = path.as_ref();
364        std::fs::create_dir_all(path)?;
365
366        let env = unsafe {
367            EnvOpenOptions::new()
368                .map_size(10 * 1024 * 1024 * 1024) // 10GB virtual address space
369                .max_dbs(9)  // pins, sha256_index, blob_owners, pubkey_blobs, tree_meta, blob_trees, tree_refs, cached_roots, blobs
370                .open(path)?
371        };
372
373        let mut wtxn = env.write_txn()?;
374        let pins = env.create_database(&mut wtxn, Some("pins"))?;
375        let sha256_index = env.create_database(&mut wtxn, Some("sha256_index"))?;
376        let blob_owners = env.create_database(&mut wtxn, Some("blob_owners"))?;
377        let pubkey_blobs = env.create_database(&mut wtxn, Some("pubkey_blobs"))?;
378        let tree_meta = env.create_database(&mut wtxn, Some("tree_meta"))?;
379        let blob_trees = env.create_database(&mut wtxn, Some("blob_trees"))?;
380        let tree_refs = env.create_database(&mut wtxn, Some("tree_refs"))?;
381        let cached_roots = env.create_database(&mut wtxn, Some("cached_roots"))?;
382        wtxn.commit()?;
383
384        // Create local LMDB blob store
385        let lmdb_store = Arc::new(LmdbBlobStore::new(path.join("blobs"))
386            .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?);
387
388        // Create storage router with optional S3
389        #[cfg(feature = "s3")]
390        let router = Arc::new(if let Some(s3_cfg) = s3_config {
391            tracing::info!("Initializing S3 storage backend: bucket={}, endpoint={}",
392                s3_cfg.bucket, s3_cfg.endpoint);
393
394            sync_block_on(async {
395                StorageRouter::with_s3(lmdb_store, s3_cfg).await
396            })?
397        } else {
398            StorageRouter::new(lmdb_store)
399        });
400
401        #[cfg(not(feature = "s3"))]
402        let router = Arc::new({
403            if s3_config.is_some() {
404                tracing::warn!("S3 config provided but S3 feature not enabled. Using local storage only.");
405            }
406            StorageRouter::new(lmdb_store)
407        });
408
409        Ok(Self {
410            env,
411            pins,
412            sha256_index,
413            blob_owners,
414            pubkey_blobs,
415            tree_meta,
416            blob_trees,
417            tree_refs,
418            cached_roots,
419            router,
420            max_size_bytes,
421        })
422    }
423
424    /// Get the storage router
425    pub fn router(&self) -> &StorageRouter {
426        &self.router
427    }
428
429    /// Get the storage router as Arc (for use with HashTree which needs Arc<dyn Store>)
430    /// All writes through this go to both LMDB and S3
431    pub fn store_arc(&self) -> Arc<StorageRouter> {
432        Arc::clone(&self.router)
433    }
434
435    /// Upload a file and return its CID (public/unencrypted), with auto-pin
436    pub fn upload_file<P: AsRef<Path>>(&self, file_path: P) -> Result<String> {
437        self.upload_file_internal(file_path, true)
438    }
439
440    /// Upload a file without pinning (for blossom uploads that can be evicted)
441    pub fn upload_file_no_pin<P: AsRef<Path>>(&self, file_path: P) -> Result<String> {
442        self.upload_file_internal(file_path, false)
443    }
444
445    fn upload_file_internal<P: AsRef<Path>>(&self, file_path: P, pin: bool) -> Result<String> {
446        let file_path = file_path.as_ref();
447        let file_content = std::fs::read(file_path)?;
448
449        // Compute SHA256 hash of file content for blossom compatibility
450        let content_sha256 = sha256(&file_content);
451        let sha256_hex = to_hex(&content_sha256);
452
453        // Use hashtree to store the file (public mode - no encryption)
454        let store = self.store_arc();
455        let tree = HashTree::new(HashTreeConfig::new(store).public());
456
457        let cid = sync_block_on(async {
458            tree.put(&file_content).await
459        }).context("Failed to store file")?;
460
461        let root_hex = to_hex(&cid.hash);
462
463        let mut wtxn = self.env.write_txn()?;
464
465        // Store SHA256 -> root hash mapping for blossom compatibility
466        self.sha256_index.put(&mut wtxn, &sha256_hex, &root_hex)?;
467
468        // Only pin if requested (htree add = pin, blossom upload = no pin)
469        if pin {
470            self.pins.put(&mut wtxn, &root_hex, &())?;
471        }
472
473        wtxn.commit()?;
474
475        Ok(root_hex)
476    }
477
478    /// Upload a file from a stream with progress callbacks
479    pub fn upload_file_stream<R: Read, F>(
480        &self,
481        mut reader: R,
482        _file_name: impl Into<String>,
483        mut callback: F,
484    ) -> Result<String>
485    where
486        F: FnMut(&str),
487    {
488        // Read all data first to compute SHA256
489        let mut data = Vec::new();
490        reader.read_to_end(&mut data)?;
491
492        // Compute SHA256 hash of file content for blossom compatibility
493        let content_sha256 = sha256(&data);
494        let sha256_hex = to_hex(&content_sha256);
495
496        // Use HashTree.put for upload (public mode for blossom)
497        let store = self.store_arc();
498        let tree = HashTree::new(HashTreeConfig::new(store).public());
499
500        let cid = sync_block_on(async {
501            tree.put(&data).await
502        }).context("Failed to store file")?;
503
504        let root_hex = to_hex(&cid.hash);
505        callback(&root_hex);
506
507        let mut wtxn = self.env.write_txn()?;
508
509        // Store SHA256 -> root hash mapping for blossom compatibility
510        self.sha256_index.put(&mut wtxn, &sha256_hex, &root_hex)?;
511
512        // Auto-pin on upload
513        self.pins.put(&mut wtxn, &root_hex, &())?;
514
515        wtxn.commit()?;
516
517        Ok(root_hex)
518    }
519
520    /// Upload a directory and return its root hash (hex)
521    /// Respects .gitignore by default
522    pub fn upload_dir<P: AsRef<Path>>(&self, dir_path: P) -> Result<String> {
523        self.upload_dir_with_options(dir_path, true)
524    }
525
526    /// Upload a directory with options (public mode - no encryption)
527    pub fn upload_dir_with_options<P: AsRef<Path>>(&self, dir_path: P, respect_gitignore: bool) -> Result<String> {
528        let dir_path = dir_path.as_ref();
529
530        let store = self.store_arc();
531        let tree = HashTree::new(HashTreeConfig::new(store).public());
532
533        let root_cid = sync_block_on(async {
534            self.upload_dir_recursive(&tree, dir_path, dir_path, respect_gitignore).await
535        }).context("Failed to upload directory")?;
536
537        let root_hex = to_hex(&root_cid.hash);
538
539        let mut wtxn = self.env.write_txn()?;
540        self.pins.put(&mut wtxn, &root_hex, &())?;
541        wtxn.commit()?;
542
543        Ok(root_hex)
544    }
545
546    async fn upload_dir_recursive<S: Store>(
547        &self,
548        tree: &HashTree<S>,
549        _root_path: &Path,
550        current_path: &Path,
551        respect_gitignore: bool,
552    ) -> Result<Cid> {
553        use ignore::WalkBuilder;
554        use std::collections::HashMap;
555
556        // Build directory structure from flat file list - store full Cid with key
557        let mut dir_contents: HashMap<String, Vec<(String, Cid)>> = HashMap::new();
558        dir_contents.insert(String::new(), Vec::new()); // Root
559
560        let walker = WalkBuilder::new(current_path)
561            .git_ignore(respect_gitignore)
562            .git_global(respect_gitignore)
563            .git_exclude(respect_gitignore)
564            .hidden(false)
565            .build();
566
567        for result in walker {
568            let entry = result?;
569            let path = entry.path();
570
571            // Skip the root directory itself
572            if path == current_path {
573                continue;
574            }
575
576            let relative = path.strip_prefix(current_path)
577                .unwrap_or(path);
578
579            if path.is_file() {
580                let content = std::fs::read(path)?;
581                let cid = tree.put(&content).await
582                    .map_err(|e| anyhow::anyhow!("Failed to upload file {}: {}", path.display(), e))?;
583
584                // Get parent directory path and file name
585                let parent = relative.parent()
586                    .map(|p| p.to_string_lossy().to_string())
587                    .unwrap_or_default();
588                let name = relative.file_name()
589                    .map(|n| n.to_string_lossy().to_string())
590                    .unwrap_or_default();
591
592                dir_contents.entry(parent).or_default().push((name, cid));
593            } else if path.is_dir() {
594                // Ensure directory entry exists
595                let dir_path = relative.to_string_lossy().to_string();
596                dir_contents.entry(dir_path).or_default();
597            }
598        }
599
600        // Build directory tree bottom-up
601        self.build_directory_tree(tree, &mut dir_contents).await
602    }
603
604    async fn build_directory_tree<S: Store>(
605        &self,
606        tree: &HashTree<S>,
607        dir_contents: &mut std::collections::HashMap<String, Vec<(String, Cid)>>,
608    ) -> Result<Cid> {
609        // Sort directories by depth (deepest first) to build bottom-up
610        let mut dirs: Vec<String> = dir_contents.keys().cloned().collect();
611        dirs.sort_by(|a, b| {
612            let depth_a = a.matches('/').count() + if a.is_empty() { 0 } else { 1 };
613            let depth_b = b.matches('/').count() + if b.is_empty() { 0 } else { 1 };
614            depth_b.cmp(&depth_a) // Deepest first
615        });
616
617        let mut dir_cids: std::collections::HashMap<String, Cid> = std::collections::HashMap::new();
618
619        for dir_path in dirs {
620            let files = dir_contents.get(&dir_path).cloned().unwrap_or_default();
621
622            let mut entries: Vec<HashTreeDirEntry> = files.into_iter()
623                .map(|(name, cid)| HashTreeDirEntry::from_cid(name, &cid))
624                .collect();
625
626            // Add subdirectory entries
627            for (subdir_path, cid) in &dir_cids {
628                let parent = std::path::Path::new(subdir_path)
629                    .parent()
630                    .map(|p| p.to_string_lossy().to_string())
631                    .unwrap_or_default();
632
633                if parent == dir_path {
634                    let name = std::path::Path::new(subdir_path)
635                        .file_name()
636                        .map(|n| n.to_string_lossy().to_string())
637                        .unwrap_or_default();
638                    entries.push(HashTreeDirEntry::from_cid(name, cid));
639                }
640            }
641
642            let cid = tree.put_directory(entries).await
643                .map_err(|e| anyhow::anyhow!("Failed to create directory node: {}", e))?;
644
645            dir_cids.insert(dir_path, cid);
646        }
647
648        // Return root Cid
649        dir_cids.get("")
650            .cloned()
651            .ok_or_else(|| anyhow::anyhow!("No root directory"))
652    }
653
654    /// Upload a file with CHK encryption, returns CID in format "hash:key"
655    pub fn upload_file_encrypted<P: AsRef<Path>>(&self, file_path: P) -> Result<String> {
656        let file_path = file_path.as_ref();
657        let file_content = std::fs::read(file_path)?;
658
659        // Use unified API with encryption enabled (default)
660        let store = self.store_arc();
661        let tree = HashTree::new(HashTreeConfig::new(store));
662
663        let cid = sync_block_on(async {
664            tree.put(&file_content).await
665        }).map_err(|e| anyhow::anyhow!("Failed to encrypt file: {}", e))?;
666
667        let cid_str = cid.to_string();
668
669        let mut wtxn = self.env.write_txn()?;
670        self.pins.put(&mut wtxn, &cid_str, &())?;
671        wtxn.commit()?;
672
673        Ok(cid_str)
674    }
675
676    /// Upload a directory with CHK encryption, returns CID
677    /// Respects .gitignore by default
678    pub fn upload_dir_encrypted<P: AsRef<Path>>(&self, dir_path: P) -> Result<String> {
679        self.upload_dir_encrypted_with_options(dir_path, true)
680    }
681
682    /// Upload a directory with CHK encryption and options
683    /// Returns CID as "hash:key" format for encrypted directories
684    pub fn upload_dir_encrypted_with_options<P: AsRef<Path>>(&self, dir_path: P, respect_gitignore: bool) -> Result<String> {
685        let dir_path = dir_path.as_ref();
686        let store = self.store_arc();
687
688        // Use unified API with encryption enabled (default)
689        let tree = HashTree::new(HashTreeConfig::new(store));
690
691        let root_cid = sync_block_on(async {
692            self.upload_dir_recursive(&tree, dir_path, dir_path, respect_gitignore).await
693        }).context("Failed to upload encrypted directory")?;
694
695        let cid_str = root_cid.to_string(); // Returns "hash:key" or "hash"
696
697        let mut wtxn = self.env.write_txn()?;
698        // Pin by hash only (the key is for decryption, not identification)
699        self.pins.put(&mut wtxn, &to_hex(&root_cid.hash), &())?;
700        wtxn.commit()?;
701
702        Ok(cid_str)
703    }
704
705    /// Get tree node by hash (hex)
706    pub fn get_tree_node(&self, hash_hex: &str) -> Result<Option<TreeNode>> {
707        let hash = from_hex(hash_hex)
708            .map_err(|e| anyhow::anyhow!("Invalid hash: {}", e))?;
709
710        let store = self.store_arc();
711        let tree = HashTree::new(HashTreeConfig::new(store).public());
712
713        sync_block_on(async {
714            tree.get_tree_node(&hash).await
715                .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))
716        })
717    }
718
719    /// Look up root hash by SHA256 hash (blossom compatibility)
720    pub fn get_cid_by_sha256(&self, sha256_hex: &str) -> Result<Option<String>> {
721        let rtxn = self.env.read_txn()?;
722        Ok(self.sha256_index.get(&rtxn, sha256_hex)?.map(|s| s.to_string()))
723    }
724
725    /// Store a raw blob, returns SHA256 hash as hex.
726    pub fn put_blob(&self, data: &[u8]) -> Result<String> {
727        let hash = sha256(data);
728        self.router.put_sync(hash, data)
729            .map_err(|e| anyhow::anyhow!("Failed to store blob: {}", e))?;
730        Ok(to_hex(&hash))
731    }
732
733    /// Get a raw blob by SHA256 hex hash.
734    pub fn get_blob(&self, sha256_hex: &str) -> Result<Option<Vec<u8>>> {
735        let hash = from_hex(sha256_hex)
736            .map_err(|e| anyhow::anyhow!("invalid hex: {}", e))?;
737        self.router.get_sync(&hash)
738            .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))
739    }
740
741    /// Check if a blob exists by SHA256 hex hash.
742    pub fn blob_exists(&self, sha256_hex: &str) -> Result<bool> {
743        let hash = from_hex(sha256_hex)
744            .map_err(|e| anyhow::anyhow!("invalid hex: {}", e))?;
745        self.router.exists(&hash)
746            .map_err(|e| anyhow::anyhow!("Failed to check blob: {}", e))
747    }
748
749    // === Blossom ownership tracking ===
750    // Uses composite key: sha256 (32 bytes) ++ pubkey (32 bytes) -> ()
751    // This allows efficient multi-owner tracking with O(1) lookups
752
753    /// Build composite key for blob_owners: sha256 ++ pubkey (64 bytes total)
754    fn blob_owner_key(sha256_hex: &str, pubkey_hex: &str) -> Result<[u8; 64]> {
755        let sha256_bytes = from_hex(sha256_hex)
756            .map_err(|e| anyhow::anyhow!("invalid sha256 hex: {}", e))?;
757        let pubkey_bytes = from_hex(pubkey_hex)
758            .map_err(|e| anyhow::anyhow!("invalid pubkey hex: {}", e))?;
759        let mut key = [0u8; 64];
760        key[..32].copy_from_slice(&sha256_bytes);
761        key[32..].copy_from_slice(&pubkey_bytes);
762        Ok(key)
763    }
764
765    /// Add an owner (pubkey) to a blob for Blossom protocol
766    /// Multiple users can own the same blob - it's only deleted when all owners remove it
767    pub fn set_blob_owner(&self, sha256_hex: &str, pubkey: &str) -> Result<()> {
768        use std::time::{SystemTime, UNIX_EPOCH};
769
770        let key = Self::blob_owner_key(sha256_hex, pubkey)?;
771        let mut wtxn = self.env.write_txn()?;
772
773        // Add ownership entry (idempotent - put overwrites)
774        self.blob_owners.put(&mut wtxn, &key[..], &())?;
775
776        // Get existing blobs for this pubkey (for /list endpoint)
777        let mut blobs: Vec<BlobMetadata> = self
778            .pubkey_blobs
779            .get(&wtxn, pubkey)?
780            .and_then(|b| serde_json::from_slice(b).ok())
781            .unwrap_or_default();
782
783        // Check if blob already exists for this pubkey
784        if !blobs.iter().any(|b| b.sha256 == sha256_hex) {
785            let now = SystemTime::now()
786                .duration_since(UNIX_EPOCH)
787                .unwrap()
788                .as_secs();
789
790            // Get size from root hash lookup
791            let size = self
792                .get_cid_by_sha256(sha256_hex)?
793                .and_then(|cid| self.get_file_chunk_metadata(&cid).ok().flatten())
794                .map(|m| m.total_size)
795                .unwrap_or(0);
796
797            blobs.push(BlobMetadata {
798                sha256: sha256_hex.to_string(),
799                size,
800                mime_type: "application/octet-stream".to_string(),
801                uploaded: now,
802            });
803
804            let blobs_json = serde_json::to_vec(&blobs)?;
805            self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
806        }
807
808        wtxn.commit()?;
809        Ok(())
810    }
811
812    /// Check if a pubkey owns a blob
813    pub fn is_blob_owner(&self, sha256_hex: &str, pubkey: &str) -> Result<bool> {
814        let key = Self::blob_owner_key(sha256_hex, pubkey)?;
815        let rtxn = self.env.read_txn()?;
816        Ok(self.blob_owners.get(&rtxn, &key[..])?.is_some())
817    }
818
819    /// Get all owners (pubkeys) of a blob via prefix scan
820    pub fn get_blob_owners(&self, sha256_hex: &str) -> Result<Vec<String>> {
821        let sha256_bytes = from_hex(sha256_hex)
822            .map_err(|e| anyhow::anyhow!("invalid sha256 hex: {}", e))?;
823        let rtxn = self.env.read_txn()?;
824
825        let mut owners = Vec::new();
826        for item in self.blob_owners.prefix_iter(&rtxn, &sha256_bytes[..])? {
827            let (key, _) = item?;
828            if key.len() == 64 {
829                // Extract pubkey from composite key (bytes 32-64)
830                let pubkey_hex = to_hex(&key[32..64].try_into().unwrap());
831                owners.push(pubkey_hex);
832            }
833        }
834        Ok(owners)
835    }
836
837    /// Check if blob has any owners
838    pub fn blob_has_owners(&self, sha256_hex: &str) -> Result<bool> {
839        let sha256_bytes = from_hex(sha256_hex)
840            .map_err(|e| anyhow::anyhow!("invalid sha256 hex: {}", e))?;
841        let rtxn = self.env.read_txn()?;
842
843        // Just check if any entry exists with this prefix
844        for item in self.blob_owners.prefix_iter(&rtxn, &sha256_bytes[..])? {
845            if item.is_ok() {
846                return Ok(true);
847            }
848        }
849        Ok(false)
850    }
851
852    /// Get the first owner (pubkey) of a blob (for backwards compatibility)
853    pub fn get_blob_owner(&self, sha256_hex: &str) -> Result<Option<String>> {
854        Ok(self.get_blob_owners(sha256_hex)?.into_iter().next())
855    }
856
857    /// Remove a user's ownership of a blossom blob
858    /// Only deletes the actual blob when no owners remain
859    /// Returns true if the blob was actually deleted (no owners left)
860    pub fn delete_blossom_blob(&self, sha256_hex: &str, pubkey: &str) -> Result<bool> {
861        let key = Self::blob_owner_key(sha256_hex, pubkey)?;
862        let mut wtxn = self.env.write_txn()?;
863
864        // Remove this pubkey's ownership entry
865        self.blob_owners.delete(&mut wtxn, &key[..])?;
866
867        // Remove from pubkey's blob list
868        if let Some(blobs_bytes) = self.pubkey_blobs.get(&wtxn, pubkey)? {
869            if let Ok(mut blobs) = serde_json::from_slice::<Vec<BlobMetadata>>(blobs_bytes) {
870                blobs.retain(|b| b.sha256 != sha256_hex);
871                let blobs_json = serde_json::to_vec(&blobs)?;
872                self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
873            }
874        }
875
876        // Check if any other owners remain (prefix scan)
877        let sha256_bytes = from_hex(sha256_hex)
878            .map_err(|e| anyhow::anyhow!("invalid sha256 hex: {}", e))?;
879        let mut has_other_owners = false;
880        for item in self.blob_owners.prefix_iter(&wtxn, &sha256_bytes[..])? {
881            if item.is_ok() {
882                has_other_owners = true;
883                break;
884            }
885        }
886
887        if has_other_owners {
888            wtxn.commit()?;
889            tracing::debug!(
890                "Removed {} from blob {} owners, other owners remain",
891                &pubkey[..8.min(pubkey.len())],
892                &sha256_hex[..8.min(sha256_hex.len())]
893            );
894            return Ok(false);
895        }
896
897        // No owners left - delete the blob completely
898        tracing::info!(
899            "All owners removed from blob {}, deleting",
900            &sha256_hex[..8.min(sha256_hex.len())]
901        );
902
903        // Delete from sha256_index
904        let root_hex = self.sha256_index.get(&wtxn, sha256_hex)?.map(|s| s.to_string());
905        if let Some(ref root_hex) = root_hex {
906            // Unpin
907            self.pins.delete(&mut wtxn, root_hex)?;
908        }
909        self.sha256_index.delete(&mut wtxn, sha256_hex)?;
910
911        // Delete raw blob (by content hash) - this deletes from S3 too
912        let hash = from_hex(sha256_hex)
913            .map_err(|e| anyhow::anyhow!("invalid hex: {}", e))?;
914        let _ = self.router.delete_sync(&hash);
915
916        wtxn.commit()?;
917        Ok(true)
918    }
919
920    /// List all blobs owned by a pubkey (for Blossom /list endpoint)
921    pub fn list_blobs_by_pubkey(&self, pubkey: &str) -> Result<Vec<crate::server::blossom::BlobDescriptor>> {
922        let rtxn = self.env.read_txn()?;
923
924        let blobs: Vec<BlobMetadata> = self
925            .pubkey_blobs
926            .get(&rtxn, pubkey)?
927            .and_then(|b| serde_json::from_slice(b).ok())
928            .unwrap_or_default();
929
930        Ok(blobs
931            .into_iter()
932            .map(|b| crate::server::blossom::BlobDescriptor {
933                url: format!("/{}", b.sha256),
934                sha256: b.sha256,
935                size: b.size,
936                mime_type: b.mime_type,
937                uploaded: b.uploaded,
938            })
939            .collect())
940    }
941
942    /// Get a single chunk/blob by hash (hex)
943    pub fn get_chunk(&self, chunk_hex: &str) -> Result<Option<Vec<u8>>> {
944        let hash = from_hex(chunk_hex)
945            .map_err(|e| anyhow::anyhow!("Invalid hash: {}", e))?;
946        self.router.get_sync(&hash)
947            .map_err(|e| anyhow::anyhow!("Failed to get chunk: {}", e))
948    }
949
950    /// Get file content by hash (hex)
951    /// Returns raw bytes (caller handles decryption if needed)
952    pub fn get_file(&self, hash_hex: &str) -> Result<Option<Vec<u8>>> {
953        let hash = from_hex(hash_hex)
954            .map_err(|e| anyhow::anyhow!("Invalid hash: {}", e))?;
955
956        let store = self.store_arc();
957        let tree = HashTree::new(HashTreeConfig::new(store).public());
958
959        sync_block_on(async {
960            tree.read_file(&hash).await
961                .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
962        })
963    }
964
965    /// Get chunk metadata for a file (chunk list, sizes, total size)
966    pub fn get_file_chunk_metadata(&self, hash_hex: &str) -> Result<Option<FileChunkMetadata>> {
967        let hash = from_hex(hash_hex)
968            .map_err(|e| anyhow::anyhow!("Invalid hash: {}", e))?;
969
970        let store = self.store_arc();
971        let tree = HashTree::new(HashTreeConfig::new(store.clone()).public());
972
973        sync_block_on(async {
974            // First check if the hash exists in the store at all
975            // (either as a blob or tree node)
976            let exists = store.has(&hash).await
977                .map_err(|e| anyhow::anyhow!("Failed to check existence: {}", e))?;
978
979            if !exists {
980                return Ok(None);
981            }
982
983            // Get total size
984            let total_size = tree.get_size(&hash).await
985                .map_err(|e| anyhow::anyhow!("Failed to get size: {}", e))?;
986
987            // Check if it's a tree (chunked) or blob
988            let is_tree_node = tree.is_tree(&hash).await
989                .map_err(|e| anyhow::anyhow!("Failed to check tree: {}", e))?;
990
991            if !is_tree_node {
992                // Single blob, not chunked
993                return Ok(Some(FileChunkMetadata {
994                    total_size,
995                    chunk_cids: vec![],
996                    chunk_sizes: vec![],
997                    is_chunked: false,
998                }));
999            }
1000
1001            // Get tree node to extract chunk info
1002            let node = match tree.get_tree_node(&hash).await
1003                .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))? {
1004                Some(n) => n,
1005                None => return Ok(None),
1006            };
1007
1008            // Check if it's a directory (has named links)
1009            let is_directory = tree.is_directory(&hash).await
1010                .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1011
1012            if is_directory {
1013                return Ok(None); // Not a file
1014            }
1015
1016            // Extract chunk info from links
1017            let chunk_cids: Vec<String> = node.links.iter().map(|l| to_hex(&l.hash)).collect();
1018            let chunk_sizes: Vec<u64> = node.links.iter().map(|l| l.size).collect();
1019
1020            Ok(Some(FileChunkMetadata {
1021                total_size,
1022                chunk_cids,
1023                chunk_sizes,
1024                is_chunked: !node.links.is_empty(),
1025            }))
1026        })
1027    }
1028
1029    /// Get byte range from file
1030    pub fn get_file_range(&self, hash_hex: &str, start: u64, end: Option<u64>) -> Result<Option<(Vec<u8>, u64)>> {
1031        let metadata = match self.get_file_chunk_metadata(hash_hex)? {
1032            Some(m) => m,
1033            None => return Ok(None),
1034        };
1035
1036        if metadata.total_size == 0 {
1037            return Ok(Some((Vec::new(), 0)));
1038        }
1039
1040        if start >= metadata.total_size {
1041            return Ok(None);
1042        }
1043
1044        let end = end.unwrap_or(metadata.total_size - 1).min(metadata.total_size - 1);
1045
1046        // For non-chunked files, load entire file
1047        if !metadata.is_chunked {
1048            let content = self.get_file(hash_hex)?.unwrap_or_default();
1049            let range_content = if start < content.len() as u64 {
1050                content[start as usize..=(end as usize).min(content.len() - 1)].to_vec()
1051            } else {
1052                Vec::new()
1053            };
1054            return Ok(Some((range_content, metadata.total_size)));
1055        }
1056
1057        // For chunked files, load only needed chunks
1058        let mut result = Vec::new();
1059        let mut current_offset = 0u64;
1060
1061        for (i, chunk_cid) in metadata.chunk_cids.iter().enumerate() {
1062            let chunk_size = metadata.chunk_sizes[i];
1063            let chunk_end = current_offset + chunk_size - 1;
1064
1065            // Check if this chunk overlaps with requested range
1066            if chunk_end >= start && current_offset <= end {
1067                let chunk_content = match self.get_chunk(chunk_cid)? {
1068                    Some(content) => content,
1069                    None => {
1070                        return Err(anyhow::anyhow!("Chunk {} not found", chunk_cid));
1071                    }
1072                };
1073
1074                let chunk_read_start = if current_offset >= start {
1075                    0
1076                } else {
1077                    (start - current_offset) as usize
1078                };
1079
1080                let chunk_read_end = if chunk_end <= end {
1081                    chunk_size as usize - 1
1082                } else {
1083                    (end - current_offset) as usize
1084                };
1085
1086                result.extend_from_slice(&chunk_content[chunk_read_start..=chunk_read_end]);
1087            }
1088
1089            current_offset += chunk_size;
1090
1091            if current_offset > end {
1092                break;
1093            }
1094        }
1095
1096        Ok(Some((result, metadata.total_size)))
1097    }
1098
1099    /// Stream file range as chunks using Arc for async/Send contexts
1100    pub fn stream_file_range_chunks_owned(
1101        self: Arc<Self>,
1102        hash_hex: &str,
1103        start: u64,
1104        end: u64,
1105    ) -> Result<Option<FileRangeChunksOwned>> {
1106        let metadata = match self.get_file_chunk_metadata(hash_hex)? {
1107            Some(m) => m,
1108            None => return Ok(None),
1109        };
1110
1111        if metadata.total_size == 0 || start >= metadata.total_size {
1112            return Ok(None);
1113        }
1114
1115        let end = end.min(metadata.total_size - 1);
1116
1117        Ok(Some(FileRangeChunksOwned {
1118            store: self,
1119            metadata,
1120            start,
1121            end,
1122            current_chunk_idx: 0,
1123            current_offset: 0,
1124        }))
1125    }
1126
1127    /// Get directory structure by hash (hex)
1128    pub fn get_directory_listing(&self, hash_hex: &str) -> Result<Option<DirectoryListing>> {
1129        let hash = from_hex(hash_hex)
1130            .map_err(|e| anyhow::anyhow!("Invalid hash: {}", e))?;
1131
1132        let store = self.store_arc();
1133        let tree = HashTree::new(HashTreeConfig::new(store).public());
1134
1135        sync_block_on(async {
1136            // Check if it's a directory
1137            let is_dir = tree.is_directory(&hash).await
1138                .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1139
1140            if !is_dir {
1141                return Ok(None);
1142            }
1143
1144            // Get directory entries (public Cid - no encryption key)
1145            let cid = hashtree_core::Cid::public(hash, 0);
1146            let tree_entries = tree.list_directory(&cid).await
1147                .map_err(|e| anyhow::anyhow!("Failed to list directory: {}", e))?;
1148
1149            let entries: Vec<DirEntry> = tree_entries.into_iter().map(|e| DirEntry {
1150                name: e.name,
1151                cid: to_hex(&e.hash),
1152                is_directory: e.link_type.is_tree(),
1153                size: e.size,
1154            }).collect();
1155
1156            Ok(Some(DirectoryListing {
1157                dir_name: String::new(),
1158                entries,
1159            }))
1160        })
1161    }
1162
1163    /// Pin a hash (prevent garbage collection)
1164    pub fn pin(&self, hash_hex: &str) -> Result<()> {
1165        let mut wtxn = self.env.write_txn()?;
1166        self.pins.put(&mut wtxn, hash_hex, &())?;
1167        wtxn.commit()?;
1168        Ok(())
1169    }
1170
1171    /// Unpin a hash (allow garbage collection)
1172    pub fn unpin(&self, hash_hex: &str) -> Result<()> {
1173        let mut wtxn = self.env.write_txn()?;
1174        self.pins.delete(&mut wtxn, hash_hex)?;
1175        wtxn.commit()?;
1176        Ok(())
1177    }
1178
1179    /// Check if hash is pinned
1180    pub fn is_pinned(&self, hash_hex: &str) -> Result<bool> {
1181        let rtxn = self.env.read_txn()?;
1182        Ok(self.pins.get(&rtxn, hash_hex)?.is_some())
1183    }
1184
1185    /// List all pinned hashes
1186    pub fn list_pins(&self) -> Result<Vec<String>> {
1187        let rtxn = self.env.read_txn()?;
1188        let mut pins = Vec::new();
1189
1190        for item in self.pins.iter(&rtxn)? {
1191            let (hash_hex, _) = item?;
1192            pins.push(hash_hex.to_string());
1193        }
1194
1195        Ok(pins)
1196    }
1197
1198    /// List all pinned hashes with names
1199    pub fn list_pins_with_names(&self) -> Result<Vec<PinnedItem>> {
1200        let rtxn = self.env.read_txn()?;
1201        let store = self.store_arc();
1202        let tree = HashTree::new(HashTreeConfig::new(store).public());
1203        let mut pins = Vec::new();
1204
1205        for item in self.pins.iter(&rtxn)? {
1206            let (hash_hex, _) = item?;
1207            let hash_hex_str = hash_hex.to_string();
1208
1209            // Try to determine if it's a directory
1210            let is_directory = if let Ok(hash) = from_hex(&hash_hex_str) {
1211                sync_block_on(async {
1212                    tree.is_directory(&hash).await.unwrap_or(false)
1213                })
1214            } else {
1215                false
1216            };
1217
1218            pins.push(PinnedItem {
1219                cid: hash_hex_str,
1220                name: "Unknown".to_string(),
1221                is_directory,
1222            });
1223        }
1224
1225        Ok(pins)
1226    }
1227
1228    // === Tree indexing for eviction ===
1229
1230    /// Index a tree after sync - tracks all blobs in the tree for eviction
1231    ///
1232    /// If `ref_key` is provided (e.g. "npub.../name"), it will replace any existing
1233    /// tree with that ref, allowing old versions to be evicted.
1234    pub fn index_tree(
1235        &self,
1236        root_hash: &Hash,
1237        owner: &str,
1238        name: Option<&str>,
1239        priority: u8,
1240        ref_key: Option<&str>,
1241    ) -> Result<()> {
1242        let root_hex = to_hex(root_hash);
1243
1244        // If ref_key provided, check for and unindex old version
1245        if let Some(key) = ref_key {
1246            let rtxn = self.env.read_txn()?;
1247            if let Some(old_hash_bytes) = self.tree_refs.get(&rtxn, key)? {
1248                if old_hash_bytes != root_hash.as_slice() {
1249                    let old_hash: Hash = old_hash_bytes.try_into()
1250                        .map_err(|_| anyhow::anyhow!("Invalid hash in tree_refs"))?;
1251                    drop(rtxn);
1252                    // Unindex old tree (will delete orphaned blobs)
1253                    let _ = self.unindex_tree(&old_hash);
1254                    tracing::debug!("Replaced old tree for ref {}", key);
1255                }
1256            }
1257        }
1258
1259        let store = self.store_arc();
1260        let tree = HashTree::new(HashTreeConfig::new(store).public());
1261
1262        // Walk tree and collect all blob hashes + compute total size
1263        let (blob_hashes, total_size) = sync_block_on(async {
1264            self.collect_tree_blobs(&tree, root_hash).await
1265        })?;
1266
1267        let mut wtxn = self.env.write_txn()?;
1268
1269        // Store blob-tree relationships (64-byte key: blob_hash ++ tree_hash)
1270        for blob_hash in &blob_hashes {
1271            let mut key = [0u8; 64];
1272            key[..32].copy_from_slice(blob_hash);
1273            key[32..].copy_from_slice(root_hash);
1274            self.blob_trees.put(&mut wtxn, &key[..], &())?;
1275        }
1276
1277        // Store tree metadata
1278        let meta = TreeMeta {
1279            owner: owner.to_string(),
1280            name: name.map(|s| s.to_string()),
1281            synced_at: SystemTime::now()
1282                .duration_since(UNIX_EPOCH)
1283                .unwrap()
1284                .as_secs(),
1285            total_size,
1286            priority,
1287        };
1288        let meta_bytes = rmp_serde::to_vec(&meta)
1289            .map_err(|e| anyhow::anyhow!("Failed to serialize TreeMeta: {}", e))?;
1290        self.tree_meta.put(&mut wtxn, root_hash.as_slice(), &meta_bytes)?;
1291
1292        // Store ref -> hash mapping if ref_key provided
1293        if let Some(key) = ref_key {
1294            self.tree_refs.put(&mut wtxn, key, root_hash.as_slice())?;
1295        }
1296
1297        wtxn.commit()?;
1298
1299        tracing::debug!(
1300            "Indexed tree {} ({} blobs, {} bytes, priority {})",
1301            &root_hex[..8],
1302            blob_hashes.len(),
1303            total_size,
1304            priority
1305        );
1306
1307        Ok(())
1308    }
1309
1310    /// Collect all blob hashes in a tree and compute total size
1311    async fn collect_tree_blobs<S: Store>(
1312        &self,
1313        tree: &HashTree<S>,
1314        root: &Hash,
1315    ) -> Result<(Vec<Hash>, u64)> {
1316        let mut blobs = Vec::new();
1317        let mut total_size = 0u64;
1318        let mut stack = vec![*root];
1319
1320        while let Some(hash) = stack.pop() {
1321            // Check if it's a tree node
1322            let is_tree = tree.is_tree(&hash).await
1323                .map_err(|e| anyhow::anyhow!("Failed to check tree: {}", e))?;
1324
1325            if is_tree {
1326                // Get tree node and add children to stack
1327                if let Some(node) = tree.get_tree_node(&hash).await
1328                    .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
1329                {
1330                    for link in &node.links {
1331                        stack.push(link.hash);
1332                    }
1333                }
1334            } else {
1335                // It's a blob - get its size
1336                if let Some(data) = self.router.get_sync(&hash)
1337                    .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))?
1338                {
1339                    total_size += data.len() as u64;
1340                    blobs.push(hash);
1341                }
1342            }
1343        }
1344
1345        Ok((blobs, total_size))
1346    }
1347
1348    /// Unindex a tree - removes blob-tree mappings and deletes orphaned blobs
1349    /// Returns the number of bytes freed
1350    pub fn unindex_tree(&self, root_hash: &Hash) -> Result<u64> {
1351        let root_hex = to_hex(root_hash);
1352
1353        let store = self.store_arc();
1354        let tree = HashTree::new(HashTreeConfig::new(store).public());
1355
1356        // Walk tree and collect all blob hashes
1357        let (blob_hashes, _) = sync_block_on(async {
1358            self.collect_tree_blobs(&tree, root_hash).await
1359        })?;
1360
1361        let mut wtxn = self.env.write_txn()?;
1362        let mut freed = 0u64;
1363
1364        // For each blob, remove the blob-tree entry and check if orphaned
1365        for blob_hash in &blob_hashes {
1366            // Delete blob-tree entry (64-byte key: blob_hash ++ tree_hash)
1367            let mut key = [0u8; 64];
1368            key[..32].copy_from_slice(blob_hash);
1369            key[32..].copy_from_slice(root_hash);
1370            self.blob_trees.delete(&mut wtxn, &key[..])?;
1371
1372            // Check if blob is in any other tree (prefix scan on first 32 bytes)
1373            let rtxn = self.env.read_txn()?;
1374            let mut has_other_tree = false;
1375
1376            for item in self.blob_trees.prefix_iter(&rtxn, &blob_hash[..])? {
1377                if item.is_ok() {
1378                    has_other_tree = true;
1379                    break;
1380                }
1381            }
1382            drop(rtxn);
1383
1384            // If orphaned, delete the blob
1385            if !has_other_tree {
1386                if let Some(data) = self.router.get_sync(blob_hash)
1387                    .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))?
1388                {
1389                    freed += data.len() as u64;
1390                    // Delete locally only - keep S3 as archive
1391                    self.router.delete_local_only(blob_hash)
1392                        .map_err(|e| anyhow::anyhow!("Failed to delete blob: {}", e))?;
1393                }
1394            }
1395        }
1396
1397        // Delete tree node itself if exists
1398        if let Some(data) = self.router.get_sync(root_hash)
1399            .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
1400        {
1401            freed += data.len() as u64;
1402            // Delete locally only - keep S3 as archive
1403            self.router.delete_local_only(root_hash)
1404                .map_err(|e| anyhow::anyhow!("Failed to delete tree node: {}", e))?;
1405        }
1406
1407        // Delete tree metadata
1408        self.tree_meta.delete(&mut wtxn, root_hash.as_slice())?;
1409
1410        wtxn.commit()?;
1411
1412        tracing::debug!(
1413            "Unindexed tree {} ({} bytes freed)",
1414            &root_hex[..8],
1415            freed
1416        );
1417
1418        Ok(freed)
1419    }
1420
1421    /// Get tree metadata
1422    pub fn get_tree_meta(&self, root_hash: &Hash) -> Result<Option<TreeMeta>> {
1423        let rtxn = self.env.read_txn()?;
1424        if let Some(bytes) = self.tree_meta.get(&rtxn, root_hash.as_slice())? {
1425            let meta: TreeMeta = rmp_serde::from_slice(bytes)
1426                .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
1427            Ok(Some(meta))
1428        } else {
1429            Ok(None)
1430        }
1431    }
1432
1433    /// List all indexed trees
1434    pub fn list_indexed_trees(&self) -> Result<Vec<(Hash, TreeMeta)>> {
1435        let rtxn = self.env.read_txn()?;
1436        let mut trees = Vec::new();
1437
1438        for item in self.tree_meta.iter(&rtxn)? {
1439            let (hash_bytes, meta_bytes) = item?;
1440            let hash: Hash = hash_bytes.try_into()
1441                .map_err(|_| anyhow::anyhow!("Invalid hash in tree_meta"))?;
1442            let meta: TreeMeta = rmp_serde::from_slice(meta_bytes)
1443                .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
1444            trees.push((hash, meta));
1445        }
1446
1447        Ok(trees)
1448    }
1449
1450    /// Get total tracked storage size (sum of all tree_meta.total_size)
1451    pub fn tracked_size(&self) -> Result<u64> {
1452        let rtxn = self.env.read_txn()?;
1453        let mut total = 0u64;
1454
1455        for item in self.tree_meta.iter(&rtxn)? {
1456            let (_, bytes) = item?;
1457            let meta: TreeMeta = rmp_serde::from_slice(bytes)
1458                .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
1459            total += meta.total_size;
1460        }
1461
1462        Ok(total)
1463    }
1464
1465    /// Get evictable trees sorted by (priority ASC, synced_at ASC)
1466    fn get_evictable_trees(&self) -> Result<Vec<(Hash, TreeMeta)>> {
1467        let mut trees = self.list_indexed_trees()?;
1468
1469        // Sort by priority (lower first), then by synced_at (older first)
1470        trees.sort_by(|a, b| {
1471            match a.1.priority.cmp(&b.1.priority) {
1472                std::cmp::Ordering::Equal => a.1.synced_at.cmp(&b.1.synced_at),
1473                other => other,
1474            }
1475        });
1476
1477        Ok(trees)
1478    }
1479
1480    /// Run eviction if storage is over quota
1481    /// Returns bytes freed
1482    ///
1483    /// Eviction order:
1484    /// 1. Orphaned blobs (not in any indexed tree and not pinned)
1485    /// 2. Trees by priority (lowest first) and age (oldest first)
1486    pub fn evict_if_needed(&self) -> Result<u64> {
1487        // Get actual storage used
1488        let stats = self.router.stats()
1489            .map_err(|e| anyhow::anyhow!("Failed to get stats: {}", e))?;
1490        let current = stats.total_bytes;
1491
1492        if current <= self.max_size_bytes {
1493            return Ok(0);
1494        }
1495
1496        // Target 90% of max to avoid constant eviction
1497        let target = self.max_size_bytes * 90 / 100;
1498        let mut freed = 0u64;
1499        let mut current_size = current;
1500
1501        // Phase 1: Evict orphaned blobs (not in any tree and not pinned)
1502        let orphan_freed = self.evict_orphaned_blobs()?;
1503        freed += orphan_freed;
1504        current_size = current_size.saturating_sub(orphan_freed);
1505
1506        if orphan_freed > 0 {
1507            tracing::info!("Evicted orphaned blobs: {} bytes freed", orphan_freed);
1508        }
1509
1510        // Check if we're now under target
1511        if current_size <= target {
1512            if freed > 0 {
1513                tracing::info!("Eviction complete: {} bytes freed", freed);
1514            }
1515            return Ok(freed);
1516        }
1517
1518        // Phase 2: Evict trees by priority (lowest first) and age (oldest first)
1519        // Own trees CAN be evicted (just last), but PINNED trees are never evicted
1520        let evictable = self.get_evictable_trees()?;
1521
1522        for (root_hash, meta) in evictable {
1523            if current_size <= target {
1524                break;
1525            }
1526
1527            let root_hex = to_hex(&root_hash);
1528
1529            // Never evict pinned trees
1530            if self.is_pinned(&root_hex)? {
1531                continue;
1532            }
1533
1534            let tree_freed = self.unindex_tree(&root_hash)?;
1535            freed += tree_freed;
1536            current_size = current_size.saturating_sub(tree_freed);
1537
1538            tracing::info!(
1539                "Evicted tree {} (owner={}, priority={}, {} bytes)",
1540                &root_hex[..8],
1541                &meta.owner[..8.min(meta.owner.len())],
1542                meta.priority,
1543                tree_freed
1544            );
1545        }
1546
1547        if freed > 0 {
1548            tracing::info!("Eviction complete: {} bytes freed", freed);
1549        }
1550
1551        Ok(freed)
1552    }
1553
1554    /// Evict blobs that are not part of any indexed tree and not pinned
1555    fn evict_orphaned_blobs(&self) -> Result<u64> {
1556        let mut freed = 0u64;
1557
1558        // Get all blob hashes from store
1559        let all_hashes = self.router.list()
1560            .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
1561
1562        // Get pinned hashes
1563        let rtxn = self.env.read_txn()?;
1564        let pinned: HashSet<String> = self.pins.iter(&rtxn)?
1565            .filter_map(|item| item.ok())
1566            .map(|(hash_hex, _)| hash_hex.to_string())
1567            .collect();
1568
1569        // Collect all blob hashes that are in at least one tree
1570        // Key format is blob_hash (32 bytes) ++ tree_hash (32 bytes)
1571        let mut blobs_in_trees: HashSet<Hash> = HashSet::new();
1572        for item in self.blob_trees.iter(&rtxn)? {
1573            if let Ok((key_bytes, _)) = item {
1574                if key_bytes.len() >= 32 {
1575                    let blob_hash: Hash = key_bytes[..32].try_into().unwrap();
1576                    blobs_in_trees.insert(blob_hash);
1577                }
1578            }
1579        }
1580        drop(rtxn);
1581
1582        // Find and delete orphaned blobs
1583        for hash in all_hashes {
1584            let hash_hex = to_hex(&hash);
1585
1586            // Skip if pinned
1587            if pinned.contains(&hash_hex) {
1588                continue;
1589            }
1590
1591            // Skip if part of any tree
1592            if blobs_in_trees.contains(&hash) {
1593                continue;
1594            }
1595
1596            // This blob is orphaned - delete locally (keep S3 as archive)
1597            if let Ok(Some(data)) = self.router.get_sync(&hash) {
1598                freed += data.len() as u64;
1599                let _ = self.router.delete_local_only(&hash);
1600                tracing::debug!("Deleted orphaned blob {} ({} bytes)", &hash_hex[..8], data.len());
1601            }
1602        }
1603
1604        Ok(freed)
1605    }
1606
1607    /// Get the maximum storage size in bytes
1608    pub fn max_size_bytes(&self) -> u64 {
1609        self.max_size_bytes
1610    }
1611
1612    /// Get storage usage by priority tier
1613    pub fn storage_by_priority(&self) -> Result<StorageByPriority> {
1614        let rtxn = self.env.read_txn()?;
1615        let mut own = 0u64;
1616        let mut followed = 0u64;
1617        let mut other = 0u64;
1618
1619        for item in self.tree_meta.iter(&rtxn)? {
1620            let (_, bytes) = item?;
1621            let meta: TreeMeta = rmp_serde::from_slice(bytes)
1622                .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
1623
1624            if meta.priority >= PRIORITY_OWN {
1625                own += meta.total_size;
1626            } else if meta.priority >= PRIORITY_FOLLOWED {
1627                followed += meta.total_size;
1628            } else {
1629                other += meta.total_size;
1630            }
1631        }
1632
1633        Ok(StorageByPriority { own, followed, other })
1634    }
1635
1636    /// Get storage statistics
1637    pub fn get_storage_stats(&self) -> Result<StorageStats> {
1638        let rtxn = self.env.read_txn()?;
1639        let total_pins = self.pins.len(&rtxn)? as usize;
1640
1641        let stats = self.router.stats()
1642            .map_err(|e| anyhow::anyhow!("Failed to get stats: {}", e))?;
1643
1644        Ok(StorageStats {
1645            total_dags: stats.count,
1646            pinned_dags: total_pins,
1647            total_bytes: stats.total_bytes,
1648        })
1649    }
1650
1651    // === Cached roots (replaces nostrdb event caching) ===
1652
1653    /// Get cached root for a pubkey/tree_name pair
1654    pub fn get_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<Option<CachedRoot>> {
1655        let key = format!("{}/{}", pubkey_hex, tree_name);
1656        let rtxn = self.env.read_txn()?;
1657        if let Some(bytes) = self.cached_roots.get(&rtxn, &key)? {
1658            let root: CachedRoot = rmp_serde::from_slice(bytes)
1659                .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
1660            Ok(Some(root))
1661        } else {
1662            Ok(None)
1663        }
1664    }
1665
1666    /// Set cached root for a pubkey/tree_name pair
1667    pub fn set_cached_root(
1668        &self,
1669        pubkey_hex: &str,
1670        tree_name: &str,
1671        hash: &str,
1672        key: Option<&str>,
1673        visibility: &str,
1674        updated_at: u64,
1675    ) -> Result<()> {
1676        let db_key = format!("{}/{}", pubkey_hex, tree_name);
1677        let root = CachedRoot {
1678            hash: hash.to_string(),
1679            key: key.map(|k| k.to_string()),
1680            updated_at,
1681            visibility: visibility.to_string(),
1682        };
1683        let bytes = rmp_serde::to_vec(&root)
1684            .map_err(|e| anyhow::anyhow!("Failed to serialize CachedRoot: {}", e))?;
1685        let mut wtxn = self.env.write_txn()?;
1686        self.cached_roots.put(&mut wtxn, &db_key, &bytes)?;
1687        wtxn.commit()?;
1688        Ok(())
1689    }
1690
1691    /// List all cached roots for a pubkey
1692    pub fn list_cached_roots(&self, pubkey_hex: &str) -> Result<Vec<(String, CachedRoot)>> {
1693        let prefix = format!("{}/", pubkey_hex);
1694        let rtxn = self.env.read_txn()?;
1695        let mut results = Vec::new();
1696
1697        for item in self.cached_roots.iter(&rtxn)? {
1698            let (key, bytes) = item?;
1699            if key.starts_with(&prefix) {
1700                let tree_name = key.strip_prefix(&prefix).unwrap_or(key);
1701                let root: CachedRoot = rmp_serde::from_slice(bytes)
1702                    .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
1703                results.push((tree_name.to_string(), root));
1704            }
1705        }
1706
1707        Ok(results)
1708    }
1709
1710    /// Delete a cached root
1711    pub fn delete_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<bool> {
1712        let key = format!("{}/{}", pubkey_hex, tree_name);
1713        let mut wtxn = self.env.write_txn()?;
1714        let deleted = self.cached_roots.delete(&mut wtxn, &key)?;
1715        wtxn.commit()?;
1716        Ok(deleted)
1717    }
1718
1719    /// Garbage collect unpinned content
1720    pub fn gc(&self) -> Result<GcStats> {
1721        let rtxn = self.env.read_txn()?;
1722
1723        // Get all pinned hashes
1724        let pinned: HashSet<String> = self.pins.iter(&rtxn)?
1725            .filter_map(|item| item.ok())
1726            .map(|(hash_hex, _)| hash_hex.to_string())
1727            .collect();
1728
1729        drop(rtxn);
1730
1731        // Get all stored hashes
1732        let all_hashes = self.router.list()
1733            .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
1734
1735        // Delete unpinned hashes
1736        let mut deleted = 0;
1737        let mut freed_bytes = 0u64;
1738
1739        for hash in all_hashes {
1740            let hash_hex = to_hex(&hash);
1741            if !pinned.contains(&hash_hex) {
1742                if let Ok(Some(data)) = self.router.get_sync(&hash) {
1743                    freed_bytes += data.len() as u64;
1744                    // Delete locally only - keep S3 as archive
1745                    let _ = self.router.delete_local_only(&hash);
1746                    deleted += 1;
1747                }
1748            }
1749        }
1750
1751        Ok(GcStats {
1752            deleted_dags: deleted,
1753            freed_bytes,
1754        })
1755    }
1756
1757    /// Verify LMDB blob integrity - checks that stored data matches its key hash
1758    /// Returns verification statistics and optionally deletes corrupted entries
1759    pub fn verify_lmdb_integrity(&self, delete: bool) -> Result<VerifyResult> {
1760        let all_hashes = self.router.list()
1761            .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
1762
1763        let total = all_hashes.len();
1764        let mut valid = 0;
1765        let mut corrupted = 0;
1766        let mut deleted = 0;
1767        let mut corrupted_hashes = Vec::new();
1768
1769        for hash in &all_hashes {
1770            let hash_hex = to_hex(hash);
1771
1772            match self.router.get_sync(hash) {
1773                Ok(Some(data)) => {
1774                    // Compute actual SHA256 of data
1775                    let actual_hash = sha256(&data);
1776
1777                    if actual_hash == *hash {
1778                        valid += 1;
1779                    } else {
1780                        corrupted += 1;
1781                        let actual_hex = to_hex(&actual_hash);
1782                        println!("  CORRUPTED: key={} actual={} size={}",
1783                            &hash_hex[..16], &actual_hex[..16], data.len());
1784                        corrupted_hashes.push(*hash);
1785                    }
1786                }
1787                Ok(None) => {
1788                    // Hash exists in index but data is missing
1789                    corrupted += 1;
1790                    println!("  MISSING: key={}", &hash_hex[..16]);
1791                    corrupted_hashes.push(*hash);
1792                }
1793                Err(e) => {
1794                    corrupted += 1;
1795                    println!("  ERROR: key={} err={}", &hash_hex[..16], e);
1796                    corrupted_hashes.push(*hash);
1797                }
1798            }
1799        }
1800
1801        // Delete corrupted entries if requested
1802        if delete {
1803            for hash in &corrupted_hashes {
1804                match self.router.delete_sync(hash) {
1805                    Ok(true) => deleted += 1,
1806                    Ok(false) => {} // Already deleted
1807                    Err(e) => {
1808                        let hash_hex = to_hex(hash);
1809                        println!("  Failed to delete {}: {}", &hash_hex[..16], e);
1810                    }
1811                }
1812            }
1813        }
1814
1815        Ok(VerifyResult {
1816            total,
1817            valid,
1818            corrupted,
1819            deleted,
1820        })
1821    }
1822
1823    /// Verify R2/S3 blob integrity - lists all objects and verifies hash matches filename
1824    /// Returns verification statistics and optionally deletes corrupted entries
1825    #[cfg(feature = "s3")]
1826    pub async fn verify_r2_integrity(&self, delete: bool) -> Result<VerifyResult> {
1827        use aws_sdk_s3::Client as S3Client;
1828
1829        // Get S3 client from router (we need to access it directly)
1830        // For now, we'll create a new client from config
1831        let config = crate::config::Config::load()?;
1832        let s3_config = config.storage.s3
1833            .ok_or_else(|| anyhow::anyhow!("S3 not configured"))?;
1834
1835        // Build AWS config
1836        let aws_config = aws_config::from_env()
1837            .region(aws_sdk_s3::config::Region::new(s3_config.region.clone()))
1838            .load()
1839            .await;
1840
1841        let s3_client = S3Client::from_conf(
1842            aws_sdk_s3::config::Builder::from(&aws_config)
1843                .endpoint_url(&s3_config.endpoint)
1844                .force_path_style(true)
1845                .build()
1846        );
1847
1848        let bucket = &s3_config.bucket;
1849        let prefix = s3_config.prefix.as_deref().unwrap_or("");
1850
1851        let mut total = 0;
1852        let mut valid = 0;
1853        let mut corrupted = 0;
1854        let mut deleted = 0;
1855        let mut corrupted_keys = Vec::new();
1856
1857        // List all objects in bucket
1858        let mut continuation_token: Option<String> = None;
1859
1860        loop {
1861            let mut list_req = s3_client.list_objects_v2()
1862                .bucket(bucket)
1863                .prefix(prefix);
1864
1865            if let Some(ref token) = continuation_token {
1866                list_req = list_req.continuation_token(token);
1867            }
1868
1869            let list_resp = list_req.send().await
1870                .map_err(|e| anyhow::anyhow!("Failed to list S3 objects: {}", e))?;
1871
1872            for object in list_resp.contents() {
1873                let key = object.key().unwrap_or("");
1874
1875                // Skip non-.bin files
1876                if !key.ends_with(".bin") {
1877                    continue;
1878                }
1879
1880                total += 1;
1881
1882                // Extract expected hash from filename (remove prefix and .bin)
1883                let filename = key.strip_prefix(prefix).unwrap_or(key);
1884                let expected_hash_hex = filename.strip_suffix(".bin").unwrap_or(filename);
1885
1886                // Validate it's a valid hex hash
1887                if expected_hash_hex.len() != 64 {
1888                    corrupted += 1;
1889                    println!("  INVALID KEY: {}", key);
1890                    corrupted_keys.push(key.to_string());
1891                    continue;
1892                }
1893
1894                let expected_hash = match from_hex(expected_hash_hex) {
1895                    Ok(h) => h,
1896                    Err(_) => {
1897                        corrupted += 1;
1898                        println!("  INVALID HEX: {}", key);
1899                        corrupted_keys.push(key.to_string());
1900                        continue;
1901                    }
1902                };
1903
1904                // Download and verify content
1905                match s3_client.get_object()
1906                    .bucket(bucket)
1907                    .key(key)
1908                    .send()
1909                    .await
1910                {
1911                    Ok(resp) => {
1912                        match resp.body.collect().await {
1913                            Ok(bytes) => {
1914                                let data = bytes.into_bytes();
1915                                let actual_hash = sha256(&data);
1916
1917                                if actual_hash == expected_hash {
1918                                    valid += 1;
1919                                } else {
1920                                    corrupted += 1;
1921                                    let actual_hex = to_hex(&actual_hash);
1922                                    println!("  CORRUPTED: key={} actual={} size={}",
1923                                        &expected_hash_hex[..16], &actual_hex[..16], data.len());
1924                                    corrupted_keys.push(key.to_string());
1925                                }
1926                            }
1927                            Err(e) => {
1928                                corrupted += 1;
1929                                println!("  READ ERROR: {} - {}", key, e);
1930                                corrupted_keys.push(key.to_string());
1931                            }
1932                        }
1933                    }
1934                    Err(e) => {
1935                        corrupted += 1;
1936                        println!("  FETCH ERROR: {} - {}", key, e);
1937                        corrupted_keys.push(key.to_string());
1938                    }
1939                }
1940
1941                // Progress indicator every 100 objects
1942                if total % 100 == 0 {
1943                    println!("  Progress: {} objects checked, {} corrupted so far", total, corrupted);
1944                }
1945            }
1946
1947            // Check if there are more objects
1948            if list_resp.is_truncated() == Some(true) {
1949                continuation_token = list_resp.next_continuation_token().map(|s| s.to_string());
1950            } else {
1951                break;
1952            }
1953        }
1954
1955        // Delete corrupted entries if requested
1956        if delete {
1957            for key in &corrupted_keys {
1958                match s3_client.delete_object()
1959                    .bucket(bucket)
1960                    .key(key)
1961                    .send()
1962                    .await
1963                {
1964                    Ok(_) => deleted += 1,
1965                    Err(e) => {
1966                        println!("  Failed to delete {}: {}", key, e);
1967                    }
1968                }
1969            }
1970        }
1971
1972        Ok(VerifyResult {
1973            total,
1974            valid,
1975            corrupted,
1976            deleted,
1977        })
1978    }
1979
1980    /// Fallback for non-S3 builds
1981    #[cfg(not(feature = "s3"))]
1982    pub async fn verify_r2_integrity(&self, _delete: bool) -> Result<VerifyResult> {
1983        Err(anyhow::anyhow!("S3 feature not enabled"))
1984    }
1985}
1986
1987/// Result of blob integrity verification
1988#[derive(Debug, Clone)]
1989pub struct VerifyResult {
1990    pub total: usize,
1991    pub valid: usize,
1992    pub corrupted: usize,
1993    pub deleted: usize,
1994}
1995
1996#[derive(Debug)]
1997pub struct StorageStats {
1998    pub total_dags: usize,
1999    pub pinned_dags: usize,
2000    pub total_bytes: u64,
2001}
2002
2003/// Storage usage broken down by priority tier
2004#[derive(Debug, Clone)]
2005pub struct StorageByPriority {
2006    /// Own/pinned trees (priority 255)
2007    pub own: u64,
2008    /// Followed users' trees (priority 128)
2009    pub followed: u64,
2010    /// Other trees (priority 64)
2011    pub other: u64,
2012}
2013
2014#[derive(Debug, Clone)]
2015pub struct FileChunkMetadata {
2016    pub total_size: u64,
2017    pub chunk_cids: Vec<String>,
2018    pub chunk_sizes: Vec<u64>,
2019    pub is_chunked: bool,
2020}
2021
2022/// Owned iterator for async streaming
2023pub struct FileRangeChunksOwned {
2024    store: Arc<HashtreeStore>,
2025    metadata: FileChunkMetadata,
2026    start: u64,
2027    end: u64,
2028    current_chunk_idx: usize,
2029    current_offset: u64,
2030}
2031
2032impl Iterator for FileRangeChunksOwned {
2033    type Item = Result<Vec<u8>>;
2034
2035    fn next(&mut self) -> Option<Self::Item> {
2036        if !self.metadata.is_chunked || self.current_chunk_idx >= self.metadata.chunk_cids.len() {
2037            return None;
2038        }
2039
2040        if self.current_offset > self.end {
2041            return None;
2042        }
2043
2044        let chunk_cid = &self.metadata.chunk_cids[self.current_chunk_idx];
2045        let chunk_size = self.metadata.chunk_sizes[self.current_chunk_idx];
2046        let chunk_end = self.current_offset + chunk_size - 1;
2047
2048        self.current_chunk_idx += 1;
2049
2050        if chunk_end < self.start || self.current_offset > self.end {
2051            self.current_offset += chunk_size;
2052            return self.next();
2053        }
2054
2055        let chunk_content = match self.store.get_chunk(chunk_cid) {
2056            Ok(Some(content)) => content,
2057            Ok(None) => {
2058                return Some(Err(anyhow::anyhow!("Chunk {} not found", chunk_cid)));
2059            }
2060            Err(e) => {
2061                return Some(Err(e));
2062            }
2063        };
2064
2065        let chunk_read_start = if self.current_offset >= self.start {
2066            0
2067        } else {
2068            (self.start - self.current_offset) as usize
2069        };
2070
2071        let chunk_read_end = if chunk_end <= self.end {
2072            chunk_size as usize - 1
2073        } else {
2074            (self.end - self.current_offset) as usize
2075        };
2076
2077        let result = chunk_content[chunk_read_start..=chunk_read_end].to_vec();
2078        self.current_offset += chunk_size;
2079
2080        Some(Ok(result))
2081    }
2082}
2083
2084#[derive(Debug)]
2085pub struct GcStats {
2086    pub deleted_dags: usize,
2087    pub freed_bytes: u64,
2088}
2089
2090#[derive(Debug, Clone)]
2091pub struct DirEntry {
2092    pub name: String,
2093    pub cid: String,
2094    pub is_directory: bool,
2095    pub size: u64,
2096}
2097
2098#[derive(Debug, Clone)]
2099pub struct DirectoryListing {
2100    pub dir_name: String,
2101    pub entries: Vec<DirEntry>,
2102}
2103
2104#[derive(Debug, Clone)]
2105pub struct PinnedItem {
2106    pub cid: String,
2107    pub name: String,
2108    pub is_directory: bool,
2109}
2110
2111/// Blob metadata for Blossom protocol
2112#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
2113pub struct BlobMetadata {
2114    pub sha256: String,
2115    pub size: u64,
2116    pub mime_type: String,
2117    pub uploaded: u64,
2118}
2119
2120// Implement ContentStore trait for WebRTC data exchange
2121impl crate::webrtc::ContentStore for HashtreeStore {
2122    fn get(&self, hash_hex: &str) -> Result<Option<Vec<u8>>> {
2123        self.get_chunk(hash_hex)
2124    }
2125}