Skip to main content

hashtree_cli/
storage.rs

1use anyhow::{Context, Result};
2use async_trait::async_trait;
3use futures::executor::block_on as sync_block_on;
4use hashtree_config::StorageBackend;
5use hashtree_core::store::{Store, StoreError};
6use hashtree_core::{
7    from_hex, sha256, to_hex, types::Hash, Cid, DirEntry as HashTreeDirEntry, HashTree,
8    HashTreeConfig, TreeNode,
9};
10use hashtree_fs::FsBlobStore;
11#[cfg(feature = "lmdb")]
12use hashtree_lmdb::LmdbBlobStore;
13use heed::types::*;
14use heed::{Database, EnvOpenOptions};
15use serde::{Deserialize, Serialize};
16use std::collections::HashSet;
17use std::io::Read;
18use std::path::Path;
19use std::sync::Arc;
20use std::time::{SystemTime, UNIX_EPOCH};
21
22/// Priority levels for tree eviction
23pub const PRIORITY_OTHER: u8 = 64;
24pub const PRIORITY_FOLLOWED: u8 = 128;
25pub const PRIORITY_OWN: u8 = 255;
26
27/// Metadata for a synced tree (for eviction tracking)
28#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct TreeMeta {
30    /// Pubkey of tree owner
31    pub owner: String,
32    /// Tree name if known (from nostr key like "npub.../name")
33    pub name: Option<String>,
34    /// Unix timestamp when this tree was synced
35    pub synced_at: u64,
36    /// Total size of all blobs in this tree
37    pub total_size: u64,
38    /// Eviction priority: 255=own/pinned, 128=followed, 64=other
39    pub priority: u8,
40}
41
42/// Cached root info from Nostr events (replaces nostrdb caching)
43#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct CachedRoot {
45    /// Root hash (hex)
46    pub hash: String,
47    /// Optional decryption key (hex)
48    pub key: Option<String>,
49    /// Unix timestamp when this was cached (from event created_at)
50    pub updated_at: u64,
51    /// Visibility: "public", "link-visible", or "private"
52    pub visibility: String,
53}
54
55/// Storage statistics
56#[derive(Debug, Clone)]
57pub struct LocalStoreStats {
58    pub count: usize,
59    pub total_bytes: u64,
60}
61
62/// Local blob store - wraps either FsBlobStore or LmdbBlobStore
63pub enum LocalStore {
64    Fs(FsBlobStore),
65    #[cfg(feature = "lmdb")]
66    Lmdb(LmdbBlobStore),
67}
68
69impl LocalStore {
70    /// Create a new local store based on config
71    pub fn new<P: AsRef<Path>>(path: P, backend: &StorageBackend) -> Result<Self, StoreError> {
72        match backend {
73            StorageBackend::Fs => Ok(LocalStore::Fs(FsBlobStore::new(path)?)),
74            #[cfg(feature = "lmdb")]
75            StorageBackend::Lmdb => Ok(LocalStore::Lmdb(LmdbBlobStore::new(path)?)),
76            #[cfg(not(feature = "lmdb"))]
77            StorageBackend::Lmdb => {
78                tracing::warn!(
79                    "LMDB backend requested but lmdb feature not enabled, using filesystem storage"
80                );
81                Ok(LocalStore::Fs(FsBlobStore::new(path)?))
82            }
83        }
84    }
85
86    /// Sync put operation
87    pub fn put_sync(&self, hash: Hash, data: &[u8]) -> Result<bool, StoreError> {
88        match self {
89            LocalStore::Fs(store) => store.put_sync(hash, data),
90            #[cfg(feature = "lmdb")]
91            LocalStore::Lmdb(store) => store.put_sync(hash, data),
92        }
93    }
94
95    /// Sync get operation
96    pub fn get_sync(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
97        match self {
98            LocalStore::Fs(store) => store.get_sync(hash),
99            #[cfg(feature = "lmdb")]
100            LocalStore::Lmdb(store) => store.get_sync(hash),
101        }
102    }
103
104    /// Check if hash exists
105    pub fn exists(&self, hash: &Hash) -> Result<bool, StoreError> {
106        match self {
107            LocalStore::Fs(store) => Ok(store.exists(hash)),
108            #[cfg(feature = "lmdb")]
109            LocalStore::Lmdb(store) => store.exists(hash),
110        }
111    }
112
113    /// Sync delete operation
114    pub fn delete_sync(&self, hash: &Hash) -> Result<bool, StoreError> {
115        match self {
116            LocalStore::Fs(store) => store.delete_sync(hash),
117            #[cfg(feature = "lmdb")]
118            LocalStore::Lmdb(store) => store.delete_sync(hash),
119        }
120    }
121
122    /// Get storage statistics
123    pub fn stats(&self) -> Result<LocalStoreStats, StoreError> {
124        match self {
125            LocalStore::Fs(store) => {
126                let stats = store.stats()?;
127                Ok(LocalStoreStats {
128                    count: stats.count,
129                    total_bytes: stats.total_bytes,
130                })
131            }
132            #[cfg(feature = "lmdb")]
133            LocalStore::Lmdb(store) => {
134                let stats = store.stats()?;
135                Ok(LocalStoreStats {
136                    count: stats.count,
137                    total_bytes: stats.total_bytes,
138                })
139            }
140        }
141    }
142
143    /// List all hashes in the store
144    pub fn list(&self) -> Result<Vec<Hash>, StoreError> {
145        match self {
146            LocalStore::Fs(store) => store.list(),
147            #[cfg(feature = "lmdb")]
148            LocalStore::Lmdb(store) => store.list(),
149        }
150    }
151}
152
153#[async_trait]
154impl Store for LocalStore {
155    async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
156        self.put_sync(hash, &data)
157    }
158
159    async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
160        self.get_sync(hash)
161    }
162
163    async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
164        self.exists(hash)
165    }
166
167    async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
168        self.delete_sync(hash)
169    }
170}
171
172#[cfg(feature = "s3")]
173use tokio::sync::mpsc;
174
175use crate::config::S3Config;
176
177/// Message for background S3 sync
178#[cfg(feature = "s3")]
179enum S3SyncMessage {
180    Upload { hash: Hash, data: Vec<u8> },
181    Delete { hash: Hash },
182}
183
184/// Storage router - local store primary with optional S3 backup
185///
186/// Write path: local first (fast), then queue S3 upload (non-blocking)
187/// Read path: local first, fall back to S3 if miss
188pub struct StorageRouter {
189    /// Primary local store (always used)
190    local: Arc<LocalStore>,
191    /// Optional S3 client for backup
192    #[cfg(feature = "s3")]
193    s3_client: Option<aws_sdk_s3::Client>,
194    #[cfg(feature = "s3")]
195    s3_bucket: Option<String>,
196    #[cfg(feature = "s3")]
197    s3_prefix: String,
198    /// Channel to send uploads to background task
199    #[cfg(feature = "s3")]
200    sync_tx: Option<mpsc::UnboundedSender<S3SyncMessage>>,
201}
202
203impl StorageRouter {
204    /// Create router with local storage only
205    pub fn new(local: Arc<LocalStore>) -> Self {
206        Self {
207            local,
208            #[cfg(feature = "s3")]
209            s3_client: None,
210            #[cfg(feature = "s3")]
211            s3_bucket: None,
212            #[cfg(feature = "s3")]
213            s3_prefix: String::new(),
214            #[cfg(feature = "s3")]
215            sync_tx: None,
216        }
217    }
218
219    /// Create router with local storage + S3 backup
220    #[cfg(feature = "s3")]
221    pub async fn with_s3(local: Arc<LocalStore>, config: &S3Config) -> Result<Self, anyhow::Error> {
222        use aws_sdk_s3::Client as S3Client;
223
224        // Build AWS config
225        let mut aws_config_loader = aws_config::from_env();
226        aws_config_loader =
227            aws_config_loader.region(aws_sdk_s3::config::Region::new(config.region.clone()));
228        let aws_config = aws_config_loader.load().await;
229
230        // Build S3 client with custom endpoint
231        let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&aws_config);
232        s3_config_builder = s3_config_builder
233            .endpoint_url(&config.endpoint)
234            .force_path_style(true);
235
236        let s3_client = S3Client::from_conf(s3_config_builder.build());
237        let bucket = config.bucket.clone();
238        let prefix = config.prefix.clone().unwrap_or_default();
239
240        // Create background sync channel
241        let (sync_tx, mut sync_rx) = mpsc::unbounded_channel::<S3SyncMessage>();
242
243        // Spawn background sync task with bounded concurrent uploads
244        let sync_client = s3_client.clone();
245        let sync_bucket = bucket.clone();
246        let sync_prefix = prefix.clone();
247
248        tokio::spawn(async move {
249            use aws_sdk_s3::primitives::ByteStream;
250
251            tracing::info!("S3 background sync task started");
252
253            // Limit concurrent uploads to prevent overwhelming the runtime
254            let semaphore = std::sync::Arc::new(tokio::sync::Semaphore::new(32));
255            let client = std::sync::Arc::new(sync_client);
256            let bucket = std::sync::Arc::new(sync_bucket);
257            let prefix = std::sync::Arc::new(sync_prefix);
258
259            while let Some(msg) = sync_rx.recv().await {
260                let client = client.clone();
261                let bucket = bucket.clone();
262                let prefix = prefix.clone();
263                let semaphore = semaphore.clone();
264
265                // Spawn each upload with semaphore-bounded concurrency
266                tokio::spawn(async move {
267                    // Acquire permit before uploading
268                    let _permit = semaphore.acquire().await;
269
270                    match msg {
271                        S3SyncMessage::Upload { hash, data } => {
272                            let key = format!("{}{}.bin", prefix, to_hex(&hash));
273                            tracing::debug!("S3 uploading {} ({} bytes)", &key, data.len());
274
275                            match client
276                                .put_object()
277                                .bucket(bucket.as_str())
278                                .key(&key)
279                                .body(ByteStream::from(data))
280                                .send()
281                                .await
282                            {
283                                Ok(_) => tracing::debug!("S3 upload succeeded: {}", &key),
284                                Err(e) => tracing::error!("S3 upload failed {}: {}", &key, e),
285                            }
286                        }
287                        S3SyncMessage::Delete { hash } => {
288                            let key = format!("{}{}.bin", prefix, to_hex(&hash));
289                            tracing::debug!("S3 deleting {}", &key);
290
291                            if let Err(e) = client
292                                .delete_object()
293                                .bucket(bucket.as_str())
294                                .key(&key)
295                                .send()
296                                .await
297                            {
298                                tracing::error!("S3 delete failed {}: {}", &key, e);
299                            }
300                        }
301                    }
302                });
303            }
304        });
305
306        tracing::info!(
307            "S3 storage initialized: bucket={}, prefix={}",
308            bucket,
309            prefix
310        );
311
312        Ok(Self {
313            local,
314            s3_client: Some(s3_client),
315            s3_bucket: Some(bucket),
316            s3_prefix: prefix,
317            sync_tx: Some(sync_tx),
318        })
319    }
320
321    /// Store data - writes to LMDB, queues S3 upload in background
322    pub fn put_sync(&self, hash: Hash, data: &[u8]) -> Result<bool, StoreError> {
323        // Always write to local first
324        let is_new = self.local.put_sync(hash, data)?;
325
326        // Queue S3 upload if configured (non-blocking)
327        // Always upload to S3 (even if not new locally) to ensure S3 has all blobs
328        #[cfg(feature = "s3")]
329        if let Some(ref tx) = self.sync_tx {
330            tracing::info!(
331                "Queueing S3 upload for {} ({} bytes, is_new={})",
332                crate::storage::to_hex(&hash)[..16].to_string(),
333                data.len(),
334                is_new
335            );
336            if let Err(e) = tx.send(S3SyncMessage::Upload {
337                hash,
338                data: data.to_vec(),
339            }) {
340                tracing::error!("Failed to queue S3 upload: {}", e);
341            }
342        }
343
344        Ok(is_new)
345    }
346
347    /// Get data - tries LMDB first, falls back to S3
348    pub fn get_sync(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
349        // Try local first
350        if let Some(data) = self.local.get_sync(hash)? {
351            return Ok(Some(data));
352        }
353
354        // Fall back to S3 if configured
355        #[cfg(feature = "s3")]
356        if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
357            let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
358
359            match sync_block_on(async { client.get_object().bucket(bucket).key(&key).send().await })
360            {
361                Ok(output) => {
362                    if let Ok(body) = sync_block_on(output.body.collect()) {
363                        let data = body.into_bytes().to_vec();
364                        // Cache locally for future reads
365                        let _ = self.local.put_sync(*hash, &data);
366                        return Ok(Some(data));
367                    }
368                }
369                Err(e) => {
370                    let service_err = e.into_service_error();
371                    if !service_err.is_no_such_key() {
372                        tracing::warn!("S3 get failed: {}", service_err);
373                    }
374                }
375            }
376        }
377
378        Ok(None)
379    }
380
381    /// Check if hash exists
382    pub fn exists(&self, hash: &Hash) -> Result<bool, StoreError> {
383        // Check local first
384        if self.local.exists(hash)? {
385            return Ok(true);
386        }
387
388        // Check S3 if configured
389        #[cfg(feature = "s3")]
390        if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
391            let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
392
393            match sync_block_on(async {
394                client.head_object().bucket(bucket).key(&key).send().await
395            }) {
396                Ok(_) => return Ok(true),
397                Err(e) => {
398                    let service_err = e.into_service_error();
399                    if !service_err.is_not_found() {
400                        tracing::warn!("S3 head failed: {}", service_err);
401                    }
402                }
403            }
404        }
405
406        Ok(false)
407    }
408
409    /// Delete data from both local and S3 stores
410    pub fn delete_sync(&self, hash: &Hash) -> Result<bool, StoreError> {
411        let deleted = self.local.delete_sync(hash)?;
412
413        // Queue S3 delete if configured
414        #[cfg(feature = "s3")]
415        if let Some(ref tx) = self.sync_tx {
416            let _ = tx.send(S3SyncMessage::Delete { hash: *hash });
417        }
418
419        Ok(deleted)
420    }
421
422    /// Delete data from local store only (don't propagate to S3)
423    /// Used for eviction where we want to keep S3 as archive
424    pub fn delete_local_only(&self, hash: &Hash) -> Result<bool, StoreError> {
425        self.local.delete_sync(hash)
426    }
427
428    /// Get stats from local store
429    pub fn stats(&self) -> Result<LocalStoreStats, StoreError> {
430        self.local.stats()
431    }
432
433    /// List all hashes from local store
434    pub fn list(&self) -> Result<Vec<Hash>, StoreError> {
435        self.local.list()
436    }
437
438    /// Get the underlying local store for HashTree operations
439    pub fn local_store(&self) -> Arc<LocalStore> {
440        Arc::clone(&self.local)
441    }
442}
443
444// Implement async Store trait for StorageRouter so it can be used directly with HashTree
445// This ensures all writes go through S3 sync
446#[async_trait]
447impl Store for StorageRouter {
448    async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
449        self.put_sync(hash, &data)
450    }
451
452    async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
453        self.get_sync(hash)
454    }
455
456    async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
457        self.exists(hash)
458    }
459
460    async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
461        self.delete_sync(hash)
462    }
463}
464
465pub struct HashtreeStore {
466    env: heed::Env,
467    /// Set of pinned hashes (32-byte raw hashes, prevents garbage collection)
468    pins: Database<Bytes, Unit>,
469    /// Blob ownership: sha256 (32 bytes) ++ pubkey (32 bytes) -> () (composite key for multi-owner)
470    blob_owners: Database<Bytes, Unit>,
471    /// Maps pubkey (32 bytes) -> blob metadata JSON (for blossom list)
472    pubkey_blobs: Database<Bytes, Bytes>,
473    /// Tree metadata for eviction: tree_root_hash (32 bytes) -> TreeMeta (msgpack)
474    tree_meta: Database<Bytes, Bytes>,
475    /// Blob-to-tree mapping: blob_hash ++ tree_hash (64 bytes) -> ()
476    blob_trees: Database<Bytes, Unit>,
477    /// Tree refs: "npub/path" -> tree_root_hash (32 bytes) - for replacing old versions
478    tree_refs: Database<Str, Bytes>,
479    /// Cached roots from Nostr: "pubkey_hex/tree_name" -> CachedRoot (msgpack)
480    cached_roots: Database<Str, Bytes>,
481    /// Storage router - handles LMDB + optional S3 (Arc for sharing with HashTree)
482    router: Arc<StorageRouter>,
483    /// Maximum storage size in bytes (from config)
484    max_size_bytes: u64,
485}
486
487impl HashtreeStore {
488    /// Create a new store with local LMDB storage only (10GB default limit)
489    pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
490        Self::with_options(path, None, 10 * 1024 * 1024 * 1024)
491    }
492
493    /// Create a new store with optional S3 backend (10GB default limit)
494    pub fn with_s3<P: AsRef<Path>>(path: P, s3_config: Option<&S3Config>) -> Result<Self> {
495        Self::with_options(path, s3_config, 10 * 1024 * 1024 * 1024)
496    }
497
498    /// Create a new store with optional S3 backend and custom size limit
499    pub fn with_options<P: AsRef<Path>>(
500        path: P,
501        s3_config: Option<&S3Config>,
502        max_size_bytes: u64,
503    ) -> Result<Self> {
504        let path = path.as_ref();
505        std::fs::create_dir_all(path)?;
506
507        let env = unsafe {
508            EnvOpenOptions::new()
509                .map_size(10 * 1024 * 1024 * 1024) // 10GB virtual address space
510                .max_dbs(8) // pins, blob_owners, pubkey_blobs, tree_meta, blob_trees, tree_refs, cached_roots, blobs
511                .open(path)?
512        };
513
514        let mut wtxn = env.write_txn()?;
515        let pins = env.create_database(&mut wtxn, Some("pins"))?;
516        let blob_owners = env.create_database(&mut wtxn, Some("blob_owners"))?;
517        let pubkey_blobs = env.create_database(&mut wtxn, Some("pubkey_blobs"))?;
518        let tree_meta = env.create_database(&mut wtxn, Some("tree_meta"))?;
519        let blob_trees = env.create_database(&mut wtxn, Some("blob_trees"))?;
520        let tree_refs = env.create_database(&mut wtxn, Some("tree_refs"))?;
521        let cached_roots = env.create_database(&mut wtxn, Some("cached_roots"))?;
522        wtxn.commit()?;
523
524        // Get storage backend from config
525        let config = hashtree_config::Config::load_or_default();
526        let backend = &config.storage.backend;
527
528        // Create local blob store based on configured backend
529        let local_store = Arc::new(
530            LocalStore::new(path.join("blobs"), backend)
531                .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
532        );
533
534        // Create storage router with optional S3
535        #[cfg(feature = "s3")]
536        let router = Arc::new(if let Some(s3_cfg) = s3_config {
537            tracing::info!(
538                "Initializing S3 storage backend: bucket={}, endpoint={}",
539                s3_cfg.bucket,
540                s3_cfg.endpoint
541            );
542
543            sync_block_on(async { StorageRouter::with_s3(local_store, s3_cfg).await })?
544        } else {
545            StorageRouter::new(local_store)
546        });
547
548        #[cfg(not(feature = "s3"))]
549        let router = Arc::new({
550            if s3_config.is_some() {
551                tracing::warn!(
552                    "S3 config provided but S3 feature not enabled. Using local storage only."
553                );
554            }
555            StorageRouter::new(local_store)
556        });
557
558        Ok(Self {
559            env,
560            pins,
561            blob_owners,
562            pubkey_blobs,
563            tree_meta,
564            blob_trees,
565            tree_refs,
566            cached_roots,
567            router,
568            max_size_bytes,
569        })
570    }
571
572    /// Get the storage router
573    pub fn router(&self) -> &StorageRouter {
574        &self.router
575    }
576
577    /// Get the storage router as Arc (for use with HashTree which needs Arc<dyn Store>)
578    /// All writes through this go to both LMDB and S3
579    pub fn store_arc(&self) -> Arc<StorageRouter> {
580        Arc::clone(&self.router)
581    }
582
583    /// Upload a file and return its CID (public/unencrypted), with auto-pin
584    pub fn upload_file<P: AsRef<Path>>(&self, file_path: P) -> Result<String> {
585        self.upload_file_internal(file_path, true)
586    }
587
588    /// Upload a file without pinning (for blossom uploads that can be evicted)
589    pub fn upload_file_no_pin<P: AsRef<Path>>(&self, file_path: P) -> Result<String> {
590        self.upload_file_internal(file_path, false)
591    }
592
593    fn upload_file_internal<P: AsRef<Path>>(&self, file_path: P, pin: bool) -> Result<String> {
594        let file_path = file_path.as_ref();
595        let file_content = std::fs::read(file_path)?;
596
597        // Use hashtree to store the file (public mode - no encryption)
598        let store = self.store_arc();
599        let tree = HashTree::new(HashTreeConfig::new(store).public());
600
601        let (cid, _size) = sync_block_on(async { tree.put(&file_content).await })
602            .context("Failed to store file")?;
603
604        // Only pin if requested (htree add = pin, blossom upload = no pin)
605        if pin {
606            let mut wtxn = self.env.write_txn()?;
607            self.pins.put(&mut wtxn, cid.hash.as_slice(), &())?;
608            wtxn.commit()?;
609        }
610
611        Ok(to_hex(&cid.hash))
612    }
613
614    /// Upload a file from a stream with progress callbacks
615    pub fn upload_file_stream<R: Read, F>(
616        &self,
617        mut reader: R,
618        _file_name: impl Into<String>,
619        mut callback: F,
620    ) -> Result<String>
621    where
622        F: FnMut(&str),
623    {
624        let mut data = Vec::new();
625        reader.read_to_end(&mut data)?;
626
627        // Use HashTree.put for upload (public mode)
628        let store = self.store_arc();
629        let tree = HashTree::new(HashTreeConfig::new(store).public());
630
631        let (cid, _size) =
632            sync_block_on(async { tree.put(&data).await }).context("Failed to store file")?;
633
634        let root_hex = to_hex(&cid.hash);
635        callback(&root_hex);
636
637        // Auto-pin on upload
638        let mut wtxn = self.env.write_txn()?;
639        self.pins.put(&mut wtxn, cid.hash.as_slice(), &())?;
640        wtxn.commit()?;
641
642        Ok(root_hex)
643    }
644
645    /// Upload a directory and return its root hash (hex)
646    /// Respects .gitignore by default
647    pub fn upload_dir<P: AsRef<Path>>(&self, dir_path: P) -> Result<String> {
648        self.upload_dir_with_options(dir_path, true)
649    }
650
651    /// Upload a directory with options (public mode - no encryption)
652    pub fn upload_dir_with_options<P: AsRef<Path>>(
653        &self,
654        dir_path: P,
655        respect_gitignore: bool,
656    ) -> Result<String> {
657        let dir_path = dir_path.as_ref();
658
659        let store = self.store_arc();
660        let tree = HashTree::new(HashTreeConfig::new(store).public());
661
662        let root_cid = sync_block_on(async {
663            self.upload_dir_recursive(&tree, dir_path, dir_path, respect_gitignore)
664                .await
665        })
666        .context("Failed to upload directory")?;
667
668        let root_hex = to_hex(&root_cid.hash);
669
670        let mut wtxn = self.env.write_txn()?;
671        self.pins.put(&mut wtxn, root_cid.hash.as_slice(), &())?;
672        wtxn.commit()?;
673
674        Ok(root_hex)
675    }
676
677    async fn upload_dir_recursive<S: Store>(
678        &self,
679        tree: &HashTree<S>,
680        _root_path: &Path,
681        current_path: &Path,
682        respect_gitignore: bool,
683    ) -> Result<Cid> {
684        use ignore::WalkBuilder;
685        use std::collections::HashMap;
686
687        // Build directory structure from flat file list - store full Cid with key
688        let mut dir_contents: HashMap<String, Vec<(String, Cid)>> = HashMap::new();
689        dir_contents.insert(String::new(), Vec::new()); // Root
690
691        let walker = WalkBuilder::new(current_path)
692            .git_ignore(respect_gitignore)
693            .git_global(respect_gitignore)
694            .git_exclude(respect_gitignore)
695            .hidden(false)
696            .build();
697
698        for result in walker {
699            let entry = result?;
700            let path = entry.path();
701
702            // Skip the root directory itself
703            if path == current_path {
704                continue;
705            }
706
707            let relative = path.strip_prefix(current_path).unwrap_or(path);
708
709            if path.is_file() {
710                let content = std::fs::read(path)?;
711                let (cid, _size) = tree.put(&content).await.map_err(|e| {
712                    anyhow::anyhow!("Failed to upload file {}: {}", path.display(), e)
713                })?;
714
715                // Get parent directory path and file name
716                let parent = relative
717                    .parent()
718                    .map(|p| p.to_string_lossy().to_string())
719                    .unwrap_or_default();
720                let name = relative
721                    .file_name()
722                    .map(|n| n.to_string_lossy().to_string())
723                    .unwrap_or_default();
724
725                dir_contents.entry(parent).or_default().push((name, cid));
726            } else if path.is_dir() {
727                // Ensure directory entry exists
728                let dir_path = relative.to_string_lossy().to_string();
729                dir_contents.entry(dir_path).or_default();
730            }
731        }
732
733        // Build directory tree bottom-up
734        self.build_directory_tree(tree, &mut dir_contents).await
735    }
736
737    async fn build_directory_tree<S: Store>(
738        &self,
739        tree: &HashTree<S>,
740        dir_contents: &mut std::collections::HashMap<String, Vec<(String, Cid)>>,
741    ) -> Result<Cid> {
742        // Sort directories by depth (deepest first) to build bottom-up
743        let mut dirs: Vec<String> = dir_contents.keys().cloned().collect();
744        dirs.sort_by(|a, b| {
745            let depth_a = a.matches('/').count() + if a.is_empty() { 0 } else { 1 };
746            let depth_b = b.matches('/').count() + if b.is_empty() { 0 } else { 1 };
747            depth_b.cmp(&depth_a) // Deepest first
748        });
749
750        let mut dir_cids: std::collections::HashMap<String, Cid> = std::collections::HashMap::new();
751
752        for dir_path in dirs {
753            let files = dir_contents.get(&dir_path).cloned().unwrap_or_default();
754
755            let mut entries: Vec<HashTreeDirEntry> = files
756                .into_iter()
757                .map(|(name, cid)| HashTreeDirEntry::from_cid(name, &cid))
758                .collect();
759
760            // Add subdirectory entries
761            for (subdir_path, cid) in &dir_cids {
762                let parent = std::path::Path::new(subdir_path)
763                    .parent()
764                    .map(|p| p.to_string_lossy().to_string())
765                    .unwrap_or_default();
766
767                if parent == dir_path {
768                    let name = std::path::Path::new(subdir_path)
769                        .file_name()
770                        .map(|n| n.to_string_lossy().to_string())
771                        .unwrap_or_default();
772                    entries.push(HashTreeDirEntry::from_cid(name, cid));
773                }
774            }
775
776            let cid = tree
777                .put_directory(entries)
778                .await
779                .map_err(|e| anyhow::anyhow!("Failed to create directory node: {}", e))?;
780
781            dir_cids.insert(dir_path, cid);
782        }
783
784        // Return root Cid
785        dir_cids
786            .get("")
787            .cloned()
788            .ok_or_else(|| anyhow::anyhow!("No root directory"))
789    }
790
791    /// Upload a file with CHK encryption, returns CID in format "hash:key"
792    pub fn upload_file_encrypted<P: AsRef<Path>>(&self, file_path: P) -> Result<String> {
793        let file_path = file_path.as_ref();
794        let file_content = std::fs::read(file_path)?;
795
796        // Use unified API with encryption enabled (default)
797        let store = self.store_arc();
798        let tree = HashTree::new(HashTreeConfig::new(store));
799
800        let (cid, _size) = sync_block_on(async { tree.put(&file_content).await })
801            .map_err(|e| anyhow::anyhow!("Failed to encrypt file: {}", e))?;
802
803        let cid_str = cid.to_string();
804
805        let mut wtxn = self.env.write_txn()?;
806        self.pins.put(&mut wtxn, cid.hash.as_slice(), &())?;
807        wtxn.commit()?;
808
809        Ok(cid_str)
810    }
811
812    /// Upload a directory with CHK encryption, returns CID
813    /// Respects .gitignore by default
814    pub fn upload_dir_encrypted<P: AsRef<Path>>(&self, dir_path: P) -> Result<String> {
815        self.upload_dir_encrypted_with_options(dir_path, true)
816    }
817
818    /// Upload a directory with CHK encryption and options
819    /// Returns CID as "hash:key" format for encrypted directories
820    pub fn upload_dir_encrypted_with_options<P: AsRef<Path>>(
821        &self,
822        dir_path: P,
823        respect_gitignore: bool,
824    ) -> Result<String> {
825        let dir_path = dir_path.as_ref();
826        let store = self.store_arc();
827
828        // Use unified API with encryption enabled (default)
829        let tree = HashTree::new(HashTreeConfig::new(store));
830
831        let root_cid = sync_block_on(async {
832            self.upload_dir_recursive(&tree, dir_path, dir_path, respect_gitignore)
833                .await
834        })
835        .context("Failed to upload encrypted directory")?;
836
837        let cid_str = root_cid.to_string(); // Returns "hash:key" or "hash"
838
839        let mut wtxn = self.env.write_txn()?;
840        // Pin by hash only (the key is for decryption, not identification)
841        self.pins.put(&mut wtxn, root_cid.hash.as_slice(), &())?;
842        wtxn.commit()?;
843
844        Ok(cid_str)
845    }
846
847    /// Get tree node by hash (raw bytes)
848    pub fn get_tree_node(&self, hash: &[u8; 32]) -> Result<Option<TreeNode>> {
849        let store = self.store_arc();
850        let tree = HashTree::new(HashTreeConfig::new(store).public());
851
852        sync_block_on(async {
853            tree.get_tree_node(hash)
854                .await
855                .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))
856        })
857    }
858
859    /// Store a raw blob, returns SHA256 hash as hex.
860    pub fn put_blob(&self, data: &[u8]) -> Result<String> {
861        let hash = sha256(data);
862        self.router
863            .put_sync(hash, data)
864            .map_err(|e| anyhow::anyhow!("Failed to store blob: {}", e))?;
865        Ok(to_hex(&hash))
866    }
867
868    /// Get a raw blob by SHA256 hash (raw bytes).
869    pub fn get_blob(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
870        self.router
871            .get_sync(hash)
872            .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))
873    }
874
875    /// Check if a blob exists by SHA256 hash (raw bytes).
876    pub fn blob_exists(&self, hash: &[u8; 32]) -> Result<bool> {
877        self.router
878            .exists(hash)
879            .map_err(|e| anyhow::anyhow!("Failed to check blob: {}", e))
880    }
881
882    // === Blossom ownership tracking ===
883    // Uses composite key: sha256 (32 bytes) ++ pubkey (32 bytes) -> ()
884    // This allows efficient multi-owner tracking with O(1) lookups
885
886    /// Build composite key for blob_owners: sha256 ++ pubkey (64 bytes total)
887    fn blob_owner_key(sha256: &[u8; 32], pubkey: &[u8; 32]) -> [u8; 64] {
888        let mut key = [0u8; 64];
889        key[..32].copy_from_slice(sha256);
890        key[32..].copy_from_slice(pubkey);
891        key
892    }
893
894    /// Add an owner (pubkey) to a blob for Blossom protocol
895    /// Multiple users can own the same blob - it's only deleted when all owners remove it
896    pub fn set_blob_owner(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<()> {
897        let key = Self::blob_owner_key(sha256, pubkey);
898        let mut wtxn = self.env.write_txn()?;
899
900        // Add ownership entry (idempotent - put overwrites)
901        self.blob_owners.put(&mut wtxn, &key[..], &())?;
902
903        // Convert sha256 to hex for BlobMetadata (which stores sha256 as hex string)
904        let sha256_hex = to_hex(sha256);
905
906        // Get existing blobs for this pubkey (for /list endpoint)
907        let mut blobs: Vec<BlobMetadata> = self
908            .pubkey_blobs
909            .get(&wtxn, pubkey)?
910            .and_then(|b| serde_json::from_slice(b).ok())
911            .unwrap_or_default();
912
913        // Check if blob already exists for this pubkey
914        if !blobs.iter().any(|b| b.sha256 == sha256_hex) {
915            let now = SystemTime::now()
916                .duration_since(UNIX_EPOCH)
917                .unwrap()
918                .as_secs();
919
920            // Get size from raw blob
921            let size = self
922                .get_blob(sha256)?
923                .map(|data| data.len() as u64)
924                .unwrap_or(0);
925
926            blobs.push(BlobMetadata {
927                sha256: sha256_hex,
928                size,
929                mime_type: "application/octet-stream".to_string(),
930                uploaded: now,
931            });
932
933            let blobs_json = serde_json::to_vec(&blobs)?;
934            self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
935        }
936
937        wtxn.commit()?;
938        Ok(())
939    }
940
941    /// Check if a pubkey owns a blob
942    pub fn is_blob_owner(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<bool> {
943        let key = Self::blob_owner_key(sha256, pubkey);
944        let rtxn = self.env.read_txn()?;
945        Ok(self.blob_owners.get(&rtxn, &key[..])?.is_some())
946    }
947
948    /// Get all owners (pubkeys) of a blob via prefix scan (returns raw bytes)
949    pub fn get_blob_owners(&self, sha256: &[u8; 32]) -> Result<Vec<[u8; 32]>> {
950        let rtxn = self.env.read_txn()?;
951
952        let mut owners = Vec::new();
953        for item in self.blob_owners.prefix_iter(&rtxn, &sha256[..])? {
954            let (key, _) = item?;
955            if key.len() == 64 {
956                // Extract pubkey from composite key (bytes 32-64)
957                let mut pubkey = [0u8; 32];
958                pubkey.copy_from_slice(&key[32..64]);
959                owners.push(pubkey);
960            }
961        }
962        Ok(owners)
963    }
964
965    /// Check if blob has any owners
966    pub fn blob_has_owners(&self, sha256: &[u8; 32]) -> Result<bool> {
967        let rtxn = self.env.read_txn()?;
968
969        // Just check if any entry exists with this prefix
970        for item in self.blob_owners.prefix_iter(&rtxn, &sha256[..])? {
971            if item.is_ok() {
972                return Ok(true);
973            }
974        }
975        Ok(false)
976    }
977
978    /// Get the first owner (pubkey) of a blob (for backwards compatibility)
979    pub fn get_blob_owner(&self, sha256: &[u8; 32]) -> Result<Option<[u8; 32]>> {
980        Ok(self.get_blob_owners(sha256)?.into_iter().next())
981    }
982
983    /// Remove a user's ownership of a blossom blob
984    /// Only deletes the actual blob when no owners remain
985    /// Returns true if the blob was actually deleted (no owners left)
986    pub fn delete_blossom_blob(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<bool> {
987        let key = Self::blob_owner_key(sha256, pubkey);
988        let mut wtxn = self.env.write_txn()?;
989
990        // Remove this pubkey's ownership entry
991        self.blob_owners.delete(&mut wtxn, &key[..])?;
992
993        // Hex strings for logging and BlobMetadata (which stores sha256 as hex string)
994        let sha256_hex = to_hex(sha256);
995
996        // Remove from pubkey's blob list
997        if let Some(blobs_bytes) = self.pubkey_blobs.get(&wtxn, pubkey)? {
998            if let Ok(mut blobs) = serde_json::from_slice::<Vec<BlobMetadata>>(blobs_bytes) {
999                blobs.retain(|b| b.sha256 != sha256_hex);
1000                let blobs_json = serde_json::to_vec(&blobs)?;
1001                self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
1002            }
1003        }
1004
1005        // Check if any other owners remain (prefix scan)
1006        let mut has_other_owners = false;
1007        for item in self.blob_owners.prefix_iter(&wtxn, &sha256[..])? {
1008            if item.is_ok() {
1009                has_other_owners = true;
1010                break;
1011            }
1012        }
1013
1014        if has_other_owners {
1015            wtxn.commit()?;
1016            tracing::debug!(
1017                "Removed {} from blob {} owners, other owners remain",
1018                &to_hex(pubkey)[..8],
1019                &sha256_hex[..8]
1020            );
1021            return Ok(false);
1022        }
1023
1024        // No owners left - delete the blob completely
1025        tracing::info!(
1026            "All owners removed from blob {}, deleting",
1027            &sha256_hex[..8]
1028        );
1029
1030        // Delete raw blob (by content hash) - this deletes from S3 too
1031        let _ = self.router.delete_sync(sha256);
1032
1033        wtxn.commit()?;
1034        Ok(true)
1035    }
1036
1037    /// List all blobs owned by a pubkey (for Blossom /list endpoint)
1038    pub fn list_blobs_by_pubkey(
1039        &self,
1040        pubkey: &[u8; 32],
1041    ) -> Result<Vec<crate::server::blossom::BlobDescriptor>> {
1042        let rtxn = self.env.read_txn()?;
1043
1044        let blobs: Vec<BlobMetadata> = self
1045            .pubkey_blobs
1046            .get(&rtxn, pubkey)?
1047            .and_then(|b| serde_json::from_slice(b).ok())
1048            .unwrap_or_default();
1049
1050        Ok(blobs
1051            .into_iter()
1052            .map(|b| crate::server::blossom::BlobDescriptor {
1053                url: format!("/{}", b.sha256),
1054                sha256: b.sha256,
1055                size: b.size,
1056                mime_type: b.mime_type,
1057                uploaded: b.uploaded,
1058            })
1059            .collect())
1060    }
1061
1062    /// Get a single chunk/blob by hash (raw bytes)
1063    pub fn get_chunk(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
1064        self.router
1065            .get_sync(hash)
1066            .map_err(|e| anyhow::anyhow!("Failed to get chunk: {}", e))
1067    }
1068
1069    /// Get file content by hash (raw bytes)
1070    /// Returns raw bytes (caller handles decryption if needed)
1071    pub fn get_file(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
1072        let store = self.store_arc();
1073        let tree = HashTree::new(HashTreeConfig::new(store).public());
1074
1075        sync_block_on(async {
1076            tree.read_file(hash)
1077                .await
1078                .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
1079        })
1080    }
1081
1082    /// Get file content by Cid (hash + optional decryption key as raw bytes)
1083    /// Handles decryption automatically if key is present
1084    pub fn get_file_by_cid(&self, cid: &Cid) -> Result<Option<Vec<u8>>> {
1085        let store = self.store_arc();
1086        let tree = HashTree::new(HashTreeConfig::new(store).public());
1087
1088        sync_block_on(async {
1089            tree.get(cid)
1090                .await
1091                .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
1092        })
1093    }
1094
1095    /// Resolve a path within a tree (returns Cid with key if encrypted)
1096    pub fn resolve_path(&self, cid: &Cid, path: &str) -> Result<Option<Cid>> {
1097        let store = self.store_arc();
1098        let tree = HashTree::new(HashTreeConfig::new(store).public());
1099
1100        sync_block_on(async {
1101            tree.resolve_path(cid, path)
1102                .await
1103                .map_err(|e| anyhow::anyhow!("Failed to resolve path: {}", e))
1104        })
1105    }
1106
1107    /// Get chunk metadata for a file (chunk list, sizes, total size)
1108    pub fn get_file_chunk_metadata(&self, hash: &[u8; 32]) -> Result<Option<FileChunkMetadata>> {
1109        let store = self.store_arc();
1110        let tree = HashTree::new(HashTreeConfig::new(store.clone()).public());
1111
1112        sync_block_on(async {
1113            // First check if the hash exists in the store at all
1114            // (either as a blob or tree node)
1115            let exists = store
1116                .has(&hash)
1117                .await
1118                .map_err(|e| anyhow::anyhow!("Failed to check existence: {}", e))?;
1119
1120            if !exists {
1121                return Ok(None);
1122            }
1123
1124            // Get total size
1125            let total_size = tree
1126                .get_size(&hash)
1127                .await
1128                .map_err(|e| anyhow::anyhow!("Failed to get size: {}", e))?;
1129
1130            // Check if it's a tree (chunked) or blob
1131            let is_tree_node = tree
1132                .is_tree(&hash)
1133                .await
1134                .map_err(|e| anyhow::anyhow!("Failed to check tree: {}", e))?;
1135
1136            if !is_tree_node {
1137                // Single blob, not chunked
1138                return Ok(Some(FileChunkMetadata {
1139                    total_size,
1140                    chunk_hashes: vec![],
1141                    chunk_sizes: vec![],
1142                    is_chunked: false,
1143                }));
1144            }
1145
1146            // Get tree node to extract chunk info
1147            let node = match tree
1148                .get_tree_node(&hash)
1149                .await
1150                .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
1151            {
1152                Some(n) => n,
1153                None => return Ok(None),
1154            };
1155
1156            // Check if it's a directory (has named links)
1157            let is_directory = tree
1158                .is_directory(&hash)
1159                .await
1160                .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1161
1162            if is_directory {
1163                return Ok(None); // Not a file
1164            }
1165
1166            // Extract chunk info from links
1167            let chunk_hashes: Vec<Hash> = node.links.iter().map(|l| l.hash).collect();
1168            let chunk_sizes: Vec<u64> = node.links.iter().map(|l| l.size).collect();
1169
1170            Ok(Some(FileChunkMetadata {
1171                total_size,
1172                chunk_hashes,
1173                chunk_sizes,
1174                is_chunked: !node.links.is_empty(),
1175            }))
1176        })
1177    }
1178
1179    /// Get byte range from file
1180    pub fn get_file_range(
1181        &self,
1182        hash: &[u8; 32],
1183        start: u64,
1184        end: Option<u64>,
1185    ) -> Result<Option<(Vec<u8>, u64)>> {
1186        let metadata = match self.get_file_chunk_metadata(hash)? {
1187            Some(m) => m,
1188            None => return Ok(None),
1189        };
1190
1191        if metadata.total_size == 0 {
1192            return Ok(Some((Vec::new(), 0)));
1193        }
1194
1195        if start >= metadata.total_size {
1196            return Ok(None);
1197        }
1198
1199        let end = end
1200            .unwrap_or(metadata.total_size - 1)
1201            .min(metadata.total_size - 1);
1202
1203        // For non-chunked files, load entire file
1204        if !metadata.is_chunked {
1205            let content = self.get_file(hash)?.unwrap_or_default();
1206            let range_content = if start < content.len() as u64 {
1207                content[start as usize..=(end as usize).min(content.len() - 1)].to_vec()
1208            } else {
1209                Vec::new()
1210            };
1211            return Ok(Some((range_content, metadata.total_size)));
1212        }
1213
1214        // For chunked files, load only needed chunks
1215        let mut result = Vec::new();
1216        let mut current_offset = 0u64;
1217
1218        for (i, chunk_hash) in metadata.chunk_hashes.iter().enumerate() {
1219            let chunk_size = metadata.chunk_sizes[i];
1220            let chunk_end = current_offset + chunk_size - 1;
1221
1222            // Check if this chunk overlaps with requested range
1223            if chunk_end >= start && current_offset <= end {
1224                let chunk_content = match self.get_chunk(chunk_hash)? {
1225                    Some(content) => content,
1226                    None => {
1227                        return Err(anyhow::anyhow!("Chunk {} not found", to_hex(chunk_hash)));
1228                    }
1229                };
1230
1231                let chunk_read_start = if current_offset >= start {
1232                    0
1233                } else {
1234                    (start - current_offset) as usize
1235                };
1236
1237                let chunk_read_end = if chunk_end <= end {
1238                    chunk_size as usize - 1
1239                } else {
1240                    (end - current_offset) as usize
1241                };
1242
1243                result.extend_from_slice(&chunk_content[chunk_read_start..=chunk_read_end]);
1244            }
1245
1246            current_offset += chunk_size;
1247
1248            if current_offset > end {
1249                break;
1250            }
1251        }
1252
1253        Ok(Some((result, metadata.total_size)))
1254    }
1255
1256    /// Stream file range as chunks using Arc for async/Send contexts
1257    pub fn stream_file_range_chunks_owned(
1258        self: Arc<Self>,
1259        hash: &[u8; 32],
1260        start: u64,
1261        end: u64,
1262    ) -> Result<Option<FileRangeChunksOwned>> {
1263        let metadata = match self.get_file_chunk_metadata(hash)? {
1264            Some(m) => m,
1265            None => return Ok(None),
1266        };
1267
1268        if metadata.total_size == 0 || start >= metadata.total_size {
1269            return Ok(None);
1270        }
1271
1272        let end = end.min(metadata.total_size - 1);
1273
1274        Ok(Some(FileRangeChunksOwned {
1275            store: self,
1276            metadata,
1277            start,
1278            end,
1279            current_chunk_idx: 0,
1280            current_offset: 0,
1281        }))
1282    }
1283
1284    /// Get directory structure by hash (raw bytes)
1285    pub fn get_directory_listing(&self, hash: &[u8; 32]) -> Result<Option<DirectoryListing>> {
1286        let store = self.store_arc();
1287        let tree = HashTree::new(HashTreeConfig::new(store).public());
1288
1289        sync_block_on(async {
1290            // Check if it's a directory
1291            let is_dir = tree
1292                .is_directory(&hash)
1293                .await
1294                .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1295
1296            if !is_dir {
1297                return Ok(None);
1298            }
1299
1300            // Get directory entries (public Cid - no encryption key)
1301            let cid = hashtree_core::Cid::public(*hash);
1302            let tree_entries = tree
1303                .list_directory(&cid)
1304                .await
1305                .map_err(|e| anyhow::anyhow!("Failed to list directory: {}", e))?;
1306
1307            let entries: Vec<DirEntry> = tree_entries
1308                .into_iter()
1309                .map(|e| DirEntry {
1310                    name: e.name,
1311                    cid: to_hex(&e.hash),
1312                    is_directory: e.link_type.is_tree(),
1313                    size: e.size,
1314                })
1315                .collect();
1316
1317            Ok(Some(DirectoryListing {
1318                dir_name: String::new(),
1319                entries,
1320            }))
1321        })
1322    }
1323
1324    /// Pin a hash (prevent garbage collection)
1325    pub fn pin(&self, hash: &[u8; 32]) -> Result<()> {
1326        let mut wtxn = self.env.write_txn()?;
1327        self.pins.put(&mut wtxn, hash.as_slice(), &())?;
1328        wtxn.commit()?;
1329        Ok(())
1330    }
1331
1332    /// Unpin a hash (allow garbage collection)
1333    pub fn unpin(&self, hash: &[u8; 32]) -> Result<()> {
1334        let mut wtxn = self.env.write_txn()?;
1335        self.pins.delete(&mut wtxn, hash.as_slice())?;
1336        wtxn.commit()?;
1337        Ok(())
1338    }
1339
1340    /// Check if hash is pinned
1341    pub fn is_pinned(&self, hash: &[u8; 32]) -> Result<bool> {
1342        let rtxn = self.env.read_txn()?;
1343        Ok(self.pins.get(&rtxn, hash.as_slice())?.is_some())
1344    }
1345
1346    /// List all pinned hashes (raw bytes)
1347    pub fn list_pins_raw(&self) -> Result<Vec<[u8; 32]>> {
1348        let rtxn = self.env.read_txn()?;
1349        let mut pins = Vec::new();
1350
1351        for item in self.pins.iter(&rtxn)? {
1352            let (hash_bytes, _) = item?;
1353            if hash_bytes.len() == 32 {
1354                let mut hash = [0u8; 32];
1355                hash.copy_from_slice(hash_bytes);
1356                pins.push(hash);
1357            }
1358        }
1359
1360        Ok(pins)
1361    }
1362
1363    /// List all pinned hashes with names
1364    pub fn list_pins_with_names(&self) -> Result<Vec<PinnedItem>> {
1365        let rtxn = self.env.read_txn()?;
1366        let store = self.store_arc();
1367        let tree = HashTree::new(HashTreeConfig::new(store).public());
1368        let mut pins = Vec::new();
1369
1370        for item in self.pins.iter(&rtxn)? {
1371            let (hash_bytes, _) = item?;
1372            if hash_bytes.len() != 32 {
1373                continue;
1374            }
1375            let mut hash = [0u8; 32];
1376            hash.copy_from_slice(hash_bytes);
1377
1378            // Try to determine if it's a directory
1379            let is_directory =
1380                sync_block_on(async { tree.is_directory(&hash).await.unwrap_or(false) });
1381
1382            pins.push(PinnedItem {
1383                cid: to_hex(&hash),
1384                name: "Unknown".to_string(),
1385                is_directory,
1386            });
1387        }
1388
1389        Ok(pins)
1390    }
1391
1392    // === Tree indexing for eviction ===
1393
1394    /// Index a tree after sync - tracks all blobs in the tree for eviction
1395    ///
1396    /// If `ref_key` is provided (e.g. "npub.../name"), it will replace any existing
1397    /// tree with that ref, allowing old versions to be evicted.
1398    pub fn index_tree(
1399        &self,
1400        root_hash: &Hash,
1401        owner: &str,
1402        name: Option<&str>,
1403        priority: u8,
1404        ref_key: Option<&str>,
1405    ) -> Result<()> {
1406        let root_hex = to_hex(root_hash);
1407
1408        // If ref_key provided, check for and unindex old version
1409        if let Some(key) = ref_key {
1410            let rtxn = self.env.read_txn()?;
1411            if let Some(old_hash_bytes) = self.tree_refs.get(&rtxn, key)? {
1412                if old_hash_bytes != root_hash.as_slice() {
1413                    let old_hash: Hash = old_hash_bytes
1414                        .try_into()
1415                        .map_err(|_| anyhow::anyhow!("Invalid hash in tree_refs"))?;
1416                    drop(rtxn);
1417                    // Unindex old tree (will delete orphaned blobs)
1418                    let _ = self.unindex_tree(&old_hash);
1419                    tracing::debug!("Replaced old tree for ref {}", key);
1420                }
1421            }
1422        }
1423
1424        let store = self.store_arc();
1425        let tree = HashTree::new(HashTreeConfig::new(store).public());
1426
1427        // Walk tree and collect all blob hashes + compute total size
1428        let (blob_hashes, total_size) =
1429            sync_block_on(async { self.collect_tree_blobs(&tree, root_hash).await })?;
1430
1431        let mut wtxn = self.env.write_txn()?;
1432
1433        // Store blob-tree relationships (64-byte key: blob_hash ++ tree_hash)
1434        for blob_hash in &blob_hashes {
1435            let mut key = [0u8; 64];
1436            key[..32].copy_from_slice(blob_hash);
1437            key[32..].copy_from_slice(root_hash);
1438            self.blob_trees.put(&mut wtxn, &key[..], &())?;
1439        }
1440
1441        // Store tree metadata
1442        let meta = TreeMeta {
1443            owner: owner.to_string(),
1444            name: name.map(|s| s.to_string()),
1445            synced_at: SystemTime::now()
1446                .duration_since(UNIX_EPOCH)
1447                .unwrap()
1448                .as_secs(),
1449            total_size,
1450            priority,
1451        };
1452        let meta_bytes = rmp_serde::to_vec(&meta)
1453            .map_err(|e| anyhow::anyhow!("Failed to serialize TreeMeta: {}", e))?;
1454        self.tree_meta
1455            .put(&mut wtxn, root_hash.as_slice(), &meta_bytes)?;
1456
1457        // Store ref -> hash mapping if ref_key provided
1458        if let Some(key) = ref_key {
1459            self.tree_refs.put(&mut wtxn, key, root_hash.as_slice())?;
1460        }
1461
1462        wtxn.commit()?;
1463
1464        tracing::debug!(
1465            "Indexed tree {} ({} blobs, {} bytes, priority {})",
1466            &root_hex[..8],
1467            blob_hashes.len(),
1468            total_size,
1469            priority
1470        );
1471
1472        Ok(())
1473    }
1474
1475    /// Collect all blob hashes in a tree and compute total size
1476    async fn collect_tree_blobs<S: Store>(
1477        &self,
1478        tree: &HashTree<S>,
1479        root: &Hash,
1480    ) -> Result<(Vec<Hash>, u64)> {
1481        let mut blobs = Vec::new();
1482        let mut total_size = 0u64;
1483        let mut stack = vec![*root];
1484
1485        while let Some(hash) = stack.pop() {
1486            // Check if it's a tree node
1487            let is_tree = tree
1488                .is_tree(&hash)
1489                .await
1490                .map_err(|e| anyhow::anyhow!("Failed to check tree: {}", e))?;
1491
1492            if is_tree {
1493                // Get tree node and add children to stack
1494                if let Some(node) = tree
1495                    .get_tree_node(&hash)
1496                    .await
1497                    .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
1498                {
1499                    for link in &node.links {
1500                        stack.push(link.hash);
1501                    }
1502                }
1503            } else {
1504                // It's a blob - get its size
1505                if let Some(data) = self
1506                    .router
1507                    .get_sync(&hash)
1508                    .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))?
1509                {
1510                    total_size += data.len() as u64;
1511                    blobs.push(hash);
1512                }
1513            }
1514        }
1515
1516        Ok((blobs, total_size))
1517    }
1518
1519    /// Unindex a tree - removes blob-tree mappings and deletes orphaned blobs
1520    /// Returns the number of bytes freed
1521    pub fn unindex_tree(&self, root_hash: &Hash) -> Result<u64> {
1522        let root_hex = to_hex(root_hash);
1523
1524        let store = self.store_arc();
1525        let tree = HashTree::new(HashTreeConfig::new(store).public());
1526
1527        // Walk tree and collect all blob hashes
1528        let (blob_hashes, _) =
1529            sync_block_on(async { self.collect_tree_blobs(&tree, root_hash).await })?;
1530
1531        let mut wtxn = self.env.write_txn()?;
1532        let mut freed = 0u64;
1533
1534        // For each blob, remove the blob-tree entry and check if orphaned
1535        for blob_hash in &blob_hashes {
1536            // Delete blob-tree entry (64-byte key: blob_hash ++ tree_hash)
1537            let mut key = [0u8; 64];
1538            key[..32].copy_from_slice(blob_hash);
1539            key[32..].copy_from_slice(root_hash);
1540            self.blob_trees.delete(&mut wtxn, &key[..])?;
1541
1542            // Check if blob is in any other tree (prefix scan on first 32 bytes)
1543            let rtxn = self.env.read_txn()?;
1544            let mut has_other_tree = false;
1545
1546            for item in self.blob_trees.prefix_iter(&rtxn, &blob_hash[..])? {
1547                if item.is_ok() {
1548                    has_other_tree = true;
1549                    break;
1550                }
1551            }
1552            drop(rtxn);
1553
1554            // If orphaned, delete the blob
1555            if !has_other_tree {
1556                if let Some(data) = self
1557                    .router
1558                    .get_sync(blob_hash)
1559                    .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))?
1560                {
1561                    freed += data.len() as u64;
1562                    // Delete locally only - keep S3 as archive
1563                    self.router
1564                        .delete_local_only(blob_hash)
1565                        .map_err(|e| anyhow::anyhow!("Failed to delete blob: {}", e))?;
1566                }
1567            }
1568        }
1569
1570        // Delete tree node itself if exists
1571        if let Some(data) = self
1572            .router
1573            .get_sync(root_hash)
1574            .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
1575        {
1576            freed += data.len() as u64;
1577            // Delete locally only - keep S3 as archive
1578            self.router
1579                .delete_local_only(root_hash)
1580                .map_err(|e| anyhow::anyhow!("Failed to delete tree node: {}", e))?;
1581        }
1582
1583        // Delete tree metadata
1584        self.tree_meta.delete(&mut wtxn, root_hash.as_slice())?;
1585
1586        wtxn.commit()?;
1587
1588        tracing::debug!("Unindexed tree {} ({} bytes freed)", &root_hex[..8], freed);
1589
1590        Ok(freed)
1591    }
1592
1593    /// Get tree metadata
1594    pub fn get_tree_meta(&self, root_hash: &Hash) -> Result<Option<TreeMeta>> {
1595        let rtxn = self.env.read_txn()?;
1596        if let Some(bytes) = self.tree_meta.get(&rtxn, root_hash.as_slice())? {
1597            let meta: TreeMeta = rmp_serde::from_slice(bytes)
1598                .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
1599            Ok(Some(meta))
1600        } else {
1601            Ok(None)
1602        }
1603    }
1604
1605    /// List all indexed trees
1606    pub fn list_indexed_trees(&self) -> Result<Vec<(Hash, TreeMeta)>> {
1607        let rtxn = self.env.read_txn()?;
1608        let mut trees = Vec::new();
1609
1610        for item in self.tree_meta.iter(&rtxn)? {
1611            let (hash_bytes, meta_bytes) = item?;
1612            let hash: Hash = hash_bytes
1613                .try_into()
1614                .map_err(|_| anyhow::anyhow!("Invalid hash in tree_meta"))?;
1615            let meta: TreeMeta = rmp_serde::from_slice(meta_bytes)
1616                .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
1617            trees.push((hash, meta));
1618        }
1619
1620        Ok(trees)
1621    }
1622
1623    /// Get total tracked storage size (sum of all tree_meta.total_size)
1624    pub fn tracked_size(&self) -> Result<u64> {
1625        let rtxn = self.env.read_txn()?;
1626        let mut total = 0u64;
1627
1628        for item in self.tree_meta.iter(&rtxn)? {
1629            let (_, bytes) = item?;
1630            let meta: TreeMeta = rmp_serde::from_slice(bytes)
1631                .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
1632            total += meta.total_size;
1633        }
1634
1635        Ok(total)
1636    }
1637
1638    /// Get evictable trees sorted by (priority ASC, synced_at ASC)
1639    fn get_evictable_trees(&self) -> Result<Vec<(Hash, TreeMeta)>> {
1640        let mut trees = self.list_indexed_trees()?;
1641
1642        // Sort by priority (lower first), then by synced_at (older first)
1643        trees.sort_by(|a, b| match a.1.priority.cmp(&b.1.priority) {
1644            std::cmp::Ordering::Equal => a.1.synced_at.cmp(&b.1.synced_at),
1645            other => other,
1646        });
1647
1648        Ok(trees)
1649    }
1650
1651    /// Run eviction if storage is over quota
1652    /// Returns bytes freed
1653    ///
1654    /// Eviction order:
1655    /// 1. Orphaned blobs (not in any indexed tree and not pinned)
1656    /// 2. Trees by priority (lowest first) and age (oldest first)
1657    pub fn evict_if_needed(&self) -> Result<u64> {
1658        // Get actual storage used
1659        let stats = self
1660            .router
1661            .stats()
1662            .map_err(|e| anyhow::anyhow!("Failed to get stats: {}", e))?;
1663        let current = stats.total_bytes;
1664
1665        if current <= self.max_size_bytes {
1666            return Ok(0);
1667        }
1668
1669        // Target 90% of max to avoid constant eviction
1670        let target = self.max_size_bytes * 90 / 100;
1671        let mut freed = 0u64;
1672        let mut current_size = current;
1673
1674        // Phase 1: Evict orphaned blobs (not in any tree and not pinned)
1675        let orphan_freed = self.evict_orphaned_blobs()?;
1676        freed += orphan_freed;
1677        current_size = current_size.saturating_sub(orphan_freed);
1678
1679        if orphan_freed > 0 {
1680            tracing::info!("Evicted orphaned blobs: {} bytes freed", orphan_freed);
1681        }
1682
1683        // Check if we're now under target
1684        if current_size <= target {
1685            if freed > 0 {
1686                tracing::info!("Eviction complete: {} bytes freed", freed);
1687            }
1688            return Ok(freed);
1689        }
1690
1691        // Phase 2: Evict trees by priority (lowest first) and age (oldest first)
1692        // Own trees CAN be evicted (just last), but PINNED trees are never evicted
1693        let evictable = self.get_evictable_trees()?;
1694
1695        for (root_hash, meta) in evictable {
1696            if current_size <= target {
1697                break;
1698            }
1699
1700            let root_hex = to_hex(&root_hash);
1701
1702            // Never evict pinned trees
1703            if self.is_pinned(&root_hash)? {
1704                continue;
1705            }
1706
1707            let tree_freed = self.unindex_tree(&root_hash)?;
1708            freed += tree_freed;
1709            current_size = current_size.saturating_sub(tree_freed);
1710
1711            tracing::info!(
1712                "Evicted tree {} (owner={}, priority={}, {} bytes)",
1713                &root_hex[..8],
1714                &meta.owner[..8.min(meta.owner.len())],
1715                meta.priority,
1716                tree_freed
1717            );
1718        }
1719
1720        if freed > 0 {
1721            tracing::info!("Eviction complete: {} bytes freed", freed);
1722        }
1723
1724        Ok(freed)
1725    }
1726
1727    /// Evict blobs that are not part of any indexed tree and not pinned
1728    fn evict_orphaned_blobs(&self) -> Result<u64> {
1729        let mut freed = 0u64;
1730
1731        // Get all blob hashes from store
1732        let all_hashes = self
1733            .router
1734            .list()
1735            .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
1736
1737        // Get pinned hashes as raw bytes
1738        let rtxn = self.env.read_txn()?;
1739        let pinned: HashSet<Hash> = self
1740            .pins
1741            .iter(&rtxn)?
1742            .filter_map(|item| item.ok())
1743            .filter_map(|(hash_bytes, _)| {
1744                if hash_bytes.len() == 32 {
1745                    let mut hash = [0u8; 32];
1746                    hash.copy_from_slice(hash_bytes);
1747                    Some(hash)
1748                } else {
1749                    None
1750                }
1751            })
1752            .collect();
1753
1754        // Collect all blob hashes that are in at least one tree
1755        // Key format is blob_hash (32 bytes) ++ tree_hash (32 bytes)
1756        let mut blobs_in_trees: HashSet<Hash> = HashSet::new();
1757        for item in self.blob_trees.iter(&rtxn)? {
1758            if let Ok((key_bytes, _)) = item {
1759                if key_bytes.len() >= 32 {
1760                    let blob_hash: Hash = key_bytes[..32].try_into().unwrap();
1761                    blobs_in_trees.insert(blob_hash);
1762                }
1763            }
1764        }
1765        drop(rtxn);
1766
1767        // Find and delete orphaned blobs
1768        for hash in all_hashes {
1769            // Skip if pinned
1770            if pinned.contains(&hash) {
1771                continue;
1772            }
1773
1774            // Skip if part of any tree
1775            if blobs_in_trees.contains(&hash) {
1776                continue;
1777            }
1778
1779            // This blob is orphaned - delete locally (keep S3 as archive)
1780            if let Ok(Some(data)) = self.router.get_sync(&hash) {
1781                freed += data.len() as u64;
1782                let _ = self.router.delete_local_only(&hash);
1783                tracing::debug!(
1784                    "Deleted orphaned blob {} ({} bytes)",
1785                    &to_hex(&hash)[..8],
1786                    data.len()
1787                );
1788            }
1789        }
1790
1791        Ok(freed)
1792    }
1793
1794    /// Get the maximum storage size in bytes
1795    pub fn max_size_bytes(&self) -> u64 {
1796        self.max_size_bytes
1797    }
1798
1799    /// Get storage usage by priority tier
1800    pub fn storage_by_priority(&self) -> Result<StorageByPriority> {
1801        let rtxn = self.env.read_txn()?;
1802        let mut own = 0u64;
1803        let mut followed = 0u64;
1804        let mut other = 0u64;
1805
1806        for item in self.tree_meta.iter(&rtxn)? {
1807            let (_, bytes) = item?;
1808            let meta: TreeMeta = rmp_serde::from_slice(bytes)
1809                .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
1810
1811            if meta.priority >= PRIORITY_OWN {
1812                own += meta.total_size;
1813            } else if meta.priority >= PRIORITY_FOLLOWED {
1814                followed += meta.total_size;
1815            } else {
1816                other += meta.total_size;
1817            }
1818        }
1819
1820        Ok(StorageByPriority {
1821            own,
1822            followed,
1823            other,
1824        })
1825    }
1826
1827    /// Get storage statistics
1828    pub fn get_storage_stats(&self) -> Result<StorageStats> {
1829        let rtxn = self.env.read_txn()?;
1830        let total_pins = self.pins.len(&rtxn)? as usize;
1831
1832        let stats = self
1833            .router
1834            .stats()
1835            .map_err(|e| anyhow::anyhow!("Failed to get stats: {}", e))?;
1836
1837        Ok(StorageStats {
1838            total_dags: stats.count,
1839            pinned_dags: total_pins,
1840            total_bytes: stats.total_bytes,
1841        })
1842    }
1843
1844    // === Cached roots (replaces nostrdb event caching) ===
1845
1846    /// Get cached root for a pubkey/tree_name pair
1847    pub fn get_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<Option<CachedRoot>> {
1848        let key = format!("{}/{}", pubkey_hex, tree_name);
1849        let rtxn = self.env.read_txn()?;
1850        if let Some(bytes) = self.cached_roots.get(&rtxn, &key)? {
1851            let root: CachedRoot = rmp_serde::from_slice(bytes)
1852                .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
1853            Ok(Some(root))
1854        } else {
1855            Ok(None)
1856        }
1857    }
1858
1859    /// Set cached root for a pubkey/tree_name pair
1860    pub fn set_cached_root(
1861        &self,
1862        pubkey_hex: &str,
1863        tree_name: &str,
1864        hash: &str,
1865        key: Option<&str>,
1866        visibility: &str,
1867        updated_at: u64,
1868    ) -> Result<()> {
1869        let db_key = format!("{}/{}", pubkey_hex, tree_name);
1870        let root = CachedRoot {
1871            hash: hash.to_string(),
1872            key: key.map(|k| k.to_string()),
1873            updated_at,
1874            visibility: visibility.to_string(),
1875        };
1876        let bytes = rmp_serde::to_vec(&root)
1877            .map_err(|e| anyhow::anyhow!("Failed to serialize CachedRoot: {}", e))?;
1878        let mut wtxn = self.env.write_txn()?;
1879        self.cached_roots.put(&mut wtxn, &db_key, &bytes)?;
1880        wtxn.commit()?;
1881        Ok(())
1882    }
1883
1884    /// List all cached roots for a pubkey
1885    pub fn list_cached_roots(&self, pubkey_hex: &str) -> Result<Vec<(String, CachedRoot)>> {
1886        let prefix = format!("{}/", pubkey_hex);
1887        let rtxn = self.env.read_txn()?;
1888        let mut results = Vec::new();
1889
1890        for item in self.cached_roots.iter(&rtxn)? {
1891            let (key, bytes) = item?;
1892            if key.starts_with(&prefix) {
1893                let tree_name = key.strip_prefix(&prefix).unwrap_or(key);
1894                let root: CachedRoot = rmp_serde::from_slice(bytes)
1895                    .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
1896                results.push((tree_name.to_string(), root));
1897            }
1898        }
1899
1900        Ok(results)
1901    }
1902
1903    /// Delete a cached root
1904    pub fn delete_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<bool> {
1905        let key = format!("{}/{}", pubkey_hex, tree_name);
1906        let mut wtxn = self.env.write_txn()?;
1907        let deleted = self.cached_roots.delete(&mut wtxn, &key)?;
1908        wtxn.commit()?;
1909        Ok(deleted)
1910    }
1911
1912    /// Garbage collect unpinned content
1913    pub fn gc(&self) -> Result<GcStats> {
1914        let rtxn = self.env.read_txn()?;
1915
1916        // Get all pinned hashes as raw bytes
1917        let pinned: HashSet<Hash> = self
1918            .pins
1919            .iter(&rtxn)?
1920            .filter_map(|item| item.ok())
1921            .filter_map(|(hash_bytes, _)| {
1922                if hash_bytes.len() == 32 {
1923                    let mut hash = [0u8; 32];
1924                    hash.copy_from_slice(hash_bytes);
1925                    Some(hash)
1926                } else {
1927                    None
1928                }
1929            })
1930            .collect();
1931
1932        drop(rtxn);
1933
1934        // Get all stored hashes
1935        let all_hashes = self
1936            .router
1937            .list()
1938            .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
1939
1940        // Delete unpinned hashes
1941        let mut deleted = 0;
1942        let mut freed_bytes = 0u64;
1943
1944        for hash in all_hashes {
1945            if !pinned.contains(&hash) {
1946                if let Ok(Some(data)) = self.router.get_sync(&hash) {
1947                    freed_bytes += data.len() as u64;
1948                    // Delete locally only - keep S3 as archive
1949                    let _ = self.router.delete_local_only(&hash);
1950                    deleted += 1;
1951                }
1952            }
1953        }
1954
1955        Ok(GcStats {
1956            deleted_dags: deleted,
1957            freed_bytes,
1958        })
1959    }
1960
1961    /// Verify LMDB blob integrity - checks that stored data matches its key hash
1962    /// Returns verification statistics and optionally deletes corrupted entries
1963    pub fn verify_lmdb_integrity(&self, delete: bool) -> Result<VerifyResult> {
1964        let all_hashes = self
1965            .router
1966            .list()
1967            .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
1968
1969        let total = all_hashes.len();
1970        let mut valid = 0;
1971        let mut corrupted = 0;
1972        let mut deleted = 0;
1973        let mut corrupted_hashes = Vec::new();
1974
1975        for hash in &all_hashes {
1976            let hash_hex = to_hex(hash);
1977
1978            match self.router.get_sync(hash) {
1979                Ok(Some(data)) => {
1980                    // Compute actual SHA256 of data
1981                    let actual_hash = sha256(&data);
1982
1983                    if actual_hash == *hash {
1984                        valid += 1;
1985                    } else {
1986                        corrupted += 1;
1987                        let actual_hex = to_hex(&actual_hash);
1988                        println!(
1989                            "  CORRUPTED: key={} actual={} size={}",
1990                            &hash_hex[..16],
1991                            &actual_hex[..16],
1992                            data.len()
1993                        );
1994                        corrupted_hashes.push(*hash);
1995                    }
1996                }
1997                Ok(None) => {
1998                    // Hash exists in index but data is missing
1999                    corrupted += 1;
2000                    println!("  MISSING: key={}", &hash_hex[..16]);
2001                    corrupted_hashes.push(*hash);
2002                }
2003                Err(e) => {
2004                    corrupted += 1;
2005                    println!("  ERROR: key={} err={}", &hash_hex[..16], e);
2006                    corrupted_hashes.push(*hash);
2007                }
2008            }
2009        }
2010
2011        // Delete corrupted entries if requested
2012        if delete {
2013            for hash in &corrupted_hashes {
2014                match self.router.delete_sync(hash) {
2015                    Ok(true) => deleted += 1,
2016                    Ok(false) => {} // Already deleted
2017                    Err(e) => {
2018                        let hash_hex = to_hex(hash);
2019                        println!("  Failed to delete {}: {}", &hash_hex[..16], e);
2020                    }
2021                }
2022            }
2023        }
2024
2025        Ok(VerifyResult {
2026            total,
2027            valid,
2028            corrupted,
2029            deleted,
2030        })
2031    }
2032
2033    /// Verify R2/S3 blob integrity - lists all objects and verifies hash matches filename
2034    /// Returns verification statistics and optionally deletes corrupted entries
2035    #[cfg(feature = "s3")]
2036    pub async fn verify_r2_integrity(&self, delete: bool) -> Result<VerifyResult> {
2037        use aws_sdk_s3::Client as S3Client;
2038
2039        // Get S3 client from router (we need to access it directly)
2040        // For now, we'll create a new client from config
2041        let config = crate::config::Config::load()?;
2042        let s3_config = config
2043            .storage
2044            .s3
2045            .ok_or_else(|| anyhow::anyhow!("S3 not configured"))?;
2046
2047        // Build AWS config
2048        let aws_config = aws_config::from_env()
2049            .region(aws_sdk_s3::config::Region::new(s3_config.region.clone()))
2050            .load()
2051            .await;
2052
2053        let s3_client = S3Client::from_conf(
2054            aws_sdk_s3::config::Builder::from(&aws_config)
2055                .endpoint_url(&s3_config.endpoint)
2056                .force_path_style(true)
2057                .build(),
2058        );
2059
2060        let bucket = &s3_config.bucket;
2061        let prefix = s3_config.prefix.as_deref().unwrap_or("");
2062
2063        let mut total = 0;
2064        let mut valid = 0;
2065        let mut corrupted = 0;
2066        let mut deleted = 0;
2067        let mut corrupted_keys = Vec::new();
2068
2069        // List all objects in bucket
2070        let mut continuation_token: Option<String> = None;
2071
2072        loop {
2073            let mut list_req = s3_client.list_objects_v2().bucket(bucket).prefix(prefix);
2074
2075            if let Some(ref token) = continuation_token {
2076                list_req = list_req.continuation_token(token);
2077            }
2078
2079            let list_resp = list_req
2080                .send()
2081                .await
2082                .map_err(|e| anyhow::anyhow!("Failed to list S3 objects: {}", e))?;
2083
2084            for object in list_resp.contents() {
2085                let key = object.key().unwrap_or("");
2086
2087                // Skip non-.bin files
2088                if !key.ends_with(".bin") {
2089                    continue;
2090                }
2091
2092                total += 1;
2093
2094                // Extract expected hash from filename (remove prefix and .bin)
2095                let filename = key.strip_prefix(prefix).unwrap_or(key);
2096                let expected_hash_hex = filename.strip_suffix(".bin").unwrap_or(filename);
2097
2098                // Validate it's a valid hex hash
2099                if expected_hash_hex.len() != 64 {
2100                    corrupted += 1;
2101                    println!("  INVALID KEY: {}", key);
2102                    corrupted_keys.push(key.to_string());
2103                    continue;
2104                }
2105
2106                let expected_hash = match from_hex(expected_hash_hex) {
2107                    Ok(h) => h,
2108                    Err(_) => {
2109                        corrupted += 1;
2110                        println!("  INVALID HEX: {}", key);
2111                        corrupted_keys.push(key.to_string());
2112                        continue;
2113                    }
2114                };
2115
2116                // Download and verify content
2117                match s3_client.get_object().bucket(bucket).key(key).send().await {
2118                    Ok(resp) => match resp.body.collect().await {
2119                        Ok(bytes) => {
2120                            let data = bytes.into_bytes();
2121                            let actual_hash = sha256(&data);
2122
2123                            if actual_hash == expected_hash {
2124                                valid += 1;
2125                            } else {
2126                                corrupted += 1;
2127                                let actual_hex = to_hex(&actual_hash);
2128                                println!(
2129                                    "  CORRUPTED: key={} actual={} size={}",
2130                                    &expected_hash_hex[..16],
2131                                    &actual_hex[..16],
2132                                    data.len()
2133                                );
2134                                corrupted_keys.push(key.to_string());
2135                            }
2136                        }
2137                        Err(e) => {
2138                            corrupted += 1;
2139                            println!("  READ ERROR: {} - {}", key, e);
2140                            corrupted_keys.push(key.to_string());
2141                        }
2142                    },
2143                    Err(e) => {
2144                        corrupted += 1;
2145                        println!("  FETCH ERROR: {} - {}", key, e);
2146                        corrupted_keys.push(key.to_string());
2147                    }
2148                }
2149
2150                // Progress indicator every 100 objects
2151                if total % 100 == 0 {
2152                    println!(
2153                        "  Progress: {} objects checked, {} corrupted so far",
2154                        total, corrupted
2155                    );
2156                }
2157            }
2158
2159            // Check if there are more objects
2160            if list_resp.is_truncated() == Some(true) {
2161                continuation_token = list_resp.next_continuation_token().map(|s| s.to_string());
2162            } else {
2163                break;
2164            }
2165        }
2166
2167        // Delete corrupted entries if requested
2168        if delete {
2169            for key in &corrupted_keys {
2170                match s3_client
2171                    .delete_object()
2172                    .bucket(bucket)
2173                    .key(key)
2174                    .send()
2175                    .await
2176                {
2177                    Ok(_) => deleted += 1,
2178                    Err(e) => {
2179                        println!("  Failed to delete {}: {}", key, e);
2180                    }
2181                }
2182            }
2183        }
2184
2185        Ok(VerifyResult {
2186            total,
2187            valid,
2188            corrupted,
2189            deleted,
2190        })
2191    }
2192
2193    /// Fallback for non-S3 builds
2194    #[cfg(not(feature = "s3"))]
2195    pub async fn verify_r2_integrity(&self, _delete: bool) -> Result<VerifyResult> {
2196        Err(anyhow::anyhow!("S3 feature not enabled"))
2197    }
2198}
2199
2200/// Result of blob integrity verification
2201#[derive(Debug, Clone)]
2202pub struct VerifyResult {
2203    pub total: usize,
2204    pub valid: usize,
2205    pub corrupted: usize,
2206    pub deleted: usize,
2207}
2208
2209#[derive(Debug)]
2210pub struct StorageStats {
2211    pub total_dags: usize,
2212    pub pinned_dags: usize,
2213    pub total_bytes: u64,
2214}
2215
2216/// Storage usage broken down by priority tier
2217#[derive(Debug, Clone)]
2218pub struct StorageByPriority {
2219    /// Own/pinned trees (priority 255)
2220    pub own: u64,
2221    /// Followed users' trees (priority 128)
2222    pub followed: u64,
2223    /// Other trees (priority 64)
2224    pub other: u64,
2225}
2226
2227#[derive(Debug, Clone)]
2228pub struct FileChunkMetadata {
2229    pub total_size: u64,
2230    pub chunk_hashes: Vec<Hash>,
2231    pub chunk_sizes: Vec<u64>,
2232    pub is_chunked: bool,
2233}
2234
2235/// Owned iterator for async streaming
2236pub struct FileRangeChunksOwned {
2237    store: Arc<HashtreeStore>,
2238    metadata: FileChunkMetadata,
2239    start: u64,
2240    end: u64,
2241    current_chunk_idx: usize,
2242    current_offset: u64,
2243}
2244
2245impl Iterator for FileRangeChunksOwned {
2246    type Item = Result<Vec<u8>>;
2247
2248    fn next(&mut self) -> Option<Self::Item> {
2249        if !self.metadata.is_chunked || self.current_chunk_idx >= self.metadata.chunk_hashes.len() {
2250            return None;
2251        }
2252
2253        if self.current_offset > self.end {
2254            return None;
2255        }
2256
2257        let chunk_hash = &self.metadata.chunk_hashes[self.current_chunk_idx];
2258        let chunk_size = self.metadata.chunk_sizes[self.current_chunk_idx];
2259        let chunk_end = self.current_offset + chunk_size - 1;
2260
2261        self.current_chunk_idx += 1;
2262
2263        if chunk_end < self.start || self.current_offset > self.end {
2264            self.current_offset += chunk_size;
2265            return self.next();
2266        }
2267
2268        let chunk_content = match self.store.get_chunk(chunk_hash) {
2269            Ok(Some(content)) => content,
2270            Ok(None) => {
2271                return Some(Err(anyhow::anyhow!(
2272                    "Chunk {} not found",
2273                    to_hex(chunk_hash)
2274                )));
2275            }
2276            Err(e) => {
2277                return Some(Err(e));
2278            }
2279        };
2280
2281        let chunk_read_start = if self.current_offset >= self.start {
2282            0
2283        } else {
2284            (self.start - self.current_offset) as usize
2285        };
2286
2287        let chunk_read_end = if chunk_end <= self.end {
2288            chunk_size as usize - 1
2289        } else {
2290            (self.end - self.current_offset) as usize
2291        };
2292
2293        let result = chunk_content[chunk_read_start..=chunk_read_end].to_vec();
2294        self.current_offset += chunk_size;
2295
2296        Some(Ok(result))
2297    }
2298}
2299
2300#[derive(Debug)]
2301pub struct GcStats {
2302    pub deleted_dags: usize,
2303    pub freed_bytes: u64,
2304}
2305
2306#[derive(Debug, Clone)]
2307pub struct DirEntry {
2308    pub name: String,
2309    pub cid: String,
2310    pub is_directory: bool,
2311    pub size: u64,
2312}
2313
2314#[derive(Debug, Clone)]
2315pub struct DirectoryListing {
2316    pub dir_name: String,
2317    pub entries: Vec<DirEntry>,
2318}
2319
2320#[derive(Debug, Clone)]
2321pub struct PinnedItem {
2322    pub cid: String,
2323    pub name: String,
2324    pub is_directory: bool,
2325}
2326
2327/// Blob metadata for Blossom protocol
2328#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
2329pub struct BlobMetadata {
2330    pub sha256: String,
2331    pub size: u64,
2332    pub mime_type: String,
2333    pub uploaded: u64,
2334}
2335
2336// Implement ContentStore trait for WebRTC data exchange
2337impl crate::webrtc::ContentStore for HashtreeStore {
2338    fn get(&self, hash_hex: &str) -> Result<Option<Vec<u8>>> {
2339        let hash = from_hex(hash_hex).map_err(|e| anyhow::anyhow!("Invalid hash: {}", e))?;
2340        self.get_chunk(&hash)
2341    }
2342}