hashtree_cli/
storage.rs

1use anyhow::{Context, Result};
2use heed::{Database, EnvOpenOptions};
3use heed::types::*;
4use hashtree_lmdb::LmdbBlobStore;
5use hashtree_core::{
6    HashTree, HashTreeConfig, Cid,
7    sha256, to_hex, from_hex, TreeNode, DirEntry as HashTreeDirEntry,
8    types::Hash,
9};
10use hashtree_core::store::{Store, StoreError};
11use serde::{Deserialize, Serialize};
12use std::path::Path;
13use std::collections::HashSet;
14use std::io::Read;
15use std::sync::Arc;
16use std::time::{SystemTime, UNIX_EPOCH};
17use futures::executor::block_on as sync_block_on;
18
19/// Priority levels for tree eviction
20pub const PRIORITY_OTHER: u8 = 64;
21pub const PRIORITY_FOLLOWED: u8 = 128;
22pub const PRIORITY_OWN: u8 = 255;
23
24/// Metadata for a synced tree (for eviction tracking)
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct TreeMeta {
27    /// Pubkey of tree owner
28    pub owner: String,
29    /// Tree name if known (from nostr key like "npub.../name")
30    pub name: Option<String>,
31    /// Unix timestamp when this tree was synced
32    pub synced_at: u64,
33    /// Total size of all blobs in this tree
34    pub total_size: u64,
35    /// Eviction priority: 255=own/pinned, 128=followed, 64=other
36    pub priority: u8,
37}
38
39/// Cached root info from Nostr events (replaces nostrdb caching)
40#[derive(Debug, Clone, Serialize, Deserialize)]
41pub struct CachedRoot {
42    /// Root hash (hex)
43    pub hash: String,
44    /// Optional decryption key (hex)
45    pub key: Option<String>,
46    /// Unix timestamp when this was cached (from event created_at)
47    pub updated_at: u64,
48    /// Visibility: "public" or "private"
49    pub visibility: String,
50}
51
52#[cfg(feature = "s3")]
53use tokio::sync::mpsc;
54
55use crate::config::S3Config;
56
57/// Message for background S3 sync
58#[cfg(feature = "s3")]
59enum S3SyncMessage {
60    Upload { hash: Hash, data: Vec<u8> },
61    Delete { hash: Hash },
62}
63
64/// Storage router - LMDB primary with optional S3 backup
65///
66/// Write path: LMDB first (fast), then queue S3 upload (non-blocking)
67/// Read path: LMDB first, fall back to S3 if miss
68pub struct StorageRouter {
69    /// Primary local store (always used)
70    local: Arc<LmdbBlobStore>,
71    /// Optional S3 client for backup
72    #[cfg(feature = "s3")]
73    s3_client: Option<aws_sdk_s3::Client>,
74    #[cfg(feature = "s3")]
75    s3_bucket: Option<String>,
76    #[cfg(feature = "s3")]
77    s3_prefix: String,
78    /// Channel to send uploads to background task
79    #[cfg(feature = "s3")]
80    sync_tx: Option<mpsc::UnboundedSender<S3SyncMessage>>,
81}
82
83impl StorageRouter {
84    /// Create router with LMDB only
85    pub fn new(local: Arc<LmdbBlobStore>) -> Self {
86        Self {
87            local,
88            #[cfg(feature = "s3")]
89            s3_client: None,
90            #[cfg(feature = "s3")]
91            s3_bucket: None,
92            #[cfg(feature = "s3")]
93            s3_prefix: String::new(),
94            #[cfg(feature = "s3")]
95            sync_tx: None,
96        }
97    }
98
99    /// Create router with LMDB + S3 backup
100    #[cfg(feature = "s3")]
101    pub async fn with_s3(local: Arc<LmdbBlobStore>, config: &S3Config) -> Result<Self, anyhow::Error> {
102        use aws_sdk_s3::Client as S3Client;
103
104        // Build AWS config
105        let mut aws_config_loader = aws_config::from_env();
106        aws_config_loader = aws_config_loader.region(aws_sdk_s3::config::Region::new(config.region.clone()));
107        let aws_config = aws_config_loader.load().await;
108
109        // Build S3 client with custom endpoint
110        let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&aws_config);
111        s3_config_builder = s3_config_builder
112            .endpoint_url(&config.endpoint)
113            .force_path_style(true);
114
115        let s3_client = S3Client::from_conf(s3_config_builder.build());
116        let bucket = config.bucket.clone();
117        let prefix = config.prefix.clone().unwrap_or_default();
118
119        // Create background sync channel
120        let (sync_tx, mut sync_rx) = mpsc::unbounded_channel::<S3SyncMessage>();
121
122        // Spawn background sync task
123        let sync_client = s3_client.clone();
124        let sync_bucket = bucket.clone();
125        let sync_prefix = prefix.clone();
126
127        tokio::spawn(async move {
128            use aws_sdk_s3::primitives::ByteStream;
129
130            tracing::info!("S3 background sync task started");
131
132            while let Some(msg) = sync_rx.recv().await {
133                match msg {
134                    S3SyncMessage::Upload { hash, data } => {
135                        let key = format!("{}{}.bin", sync_prefix, to_hex(&hash));
136                        tracing::info!("S3 uploading {} ({} bytes)", &key, data.len());
137
138                        match sync_client
139                            .put_object()
140                            .bucket(&sync_bucket)
141                            .key(&key)
142                            .body(ByteStream::from(data))
143                            .send()
144                            .await
145                        {
146                            Ok(_) => tracing::info!("S3 upload succeeded for {}", &key),
147                            Err(e) => tracing::error!("S3 upload failed for {}: {}", &key, e),
148                        }
149                    }
150                    S3SyncMessage::Delete { hash } => {
151                        let key = format!("{}{}.bin", sync_prefix, to_hex(&hash));
152                        tracing::debug!("S3 deleting {}", &key);
153
154                        if let Err(e) = sync_client
155                            .delete_object()
156                            .bucket(&sync_bucket)
157                            .key(&key)
158                            .send()
159                            .await
160                        {
161                            tracing::error!("S3 delete failed for {}: {}", &key[..16.min(key.len())], e);
162                        }
163                    }
164                }
165            }
166        });
167
168        tracing::info!("S3 storage initialized: bucket={}, prefix={}", bucket, prefix);
169
170        Ok(Self {
171            local,
172            s3_client: Some(s3_client),
173            s3_bucket: Some(bucket),
174            s3_prefix: prefix,
175            sync_tx: Some(sync_tx),
176        })
177    }
178
179    /// Store data - writes to LMDB, queues S3 upload in background
180    pub fn put_sync(&self, hash: Hash, data: &[u8]) -> Result<bool, StoreError> {
181        // Always write to local first
182        let is_new = self.local.put_sync(hash, data)?;
183
184        // Queue S3 upload if configured (non-blocking)
185        // Always upload to S3 (even if not new locally) to ensure S3 has all blobs
186        #[cfg(feature = "s3")]
187        if let Some(ref tx) = self.sync_tx {
188            tracing::info!("Queueing S3 upload for {} ({} bytes, is_new={})",
189                crate::storage::to_hex(&hash)[..16].to_string(), data.len(), is_new);
190            if let Err(e) = tx.send(S3SyncMessage::Upload { hash, data: data.to_vec() }) {
191                tracing::error!("Failed to queue S3 upload: {}", e);
192            }
193        }
194
195        Ok(is_new)
196    }
197
198    /// Get data - tries LMDB first, falls back to S3
199    pub fn get_sync(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
200        // Try local first
201        if let Some(data) = self.local.get_sync(hash)? {
202            return Ok(Some(data));
203        }
204
205        // Fall back to S3 if configured
206        #[cfg(feature = "s3")]
207        if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
208            let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
209
210            match sync_block_on(async {
211                client.get_object()
212                    .bucket(bucket)
213                    .key(&key)
214                    .send()
215                    .await
216            }) {
217                Ok(output) => {
218                    if let Ok(body) = sync_block_on(output.body.collect()) {
219                        let data = body.into_bytes().to_vec();
220                        // Cache locally for future reads
221                        let _ = self.local.put_sync(*hash, &data);
222                        return Ok(Some(data));
223                    }
224                }
225                Err(e) => {
226                    let service_err = e.into_service_error();
227                    if !service_err.is_no_such_key() {
228                        tracing::warn!("S3 get failed: {}", service_err);
229                    }
230                }
231            }
232        }
233
234        Ok(None)
235    }
236
237    /// Check if hash exists
238    pub fn exists(&self, hash: &Hash) -> Result<bool, StoreError> {
239        // Check local first
240        if self.local.exists(hash)? {
241            return Ok(true);
242        }
243
244        // Check S3 if configured
245        #[cfg(feature = "s3")]
246        if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
247            let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
248
249            match sync_block_on(async {
250                client.head_object()
251                    .bucket(bucket)
252                    .key(&key)
253                    .send()
254                    .await
255            }) {
256                Ok(_) => return Ok(true),
257                Err(e) => {
258                    let service_err = e.into_service_error();
259                    if !service_err.is_not_found() {
260                        tracing::warn!("S3 head failed: {}", service_err);
261                    }
262                }
263            }
264        }
265
266        Ok(false)
267    }
268
269    /// Delete data from both local and S3 stores
270    pub fn delete_sync(&self, hash: &Hash) -> Result<bool, StoreError> {
271        let deleted = self.local.delete_sync(hash)?;
272
273        // Queue S3 delete if configured
274        #[cfg(feature = "s3")]
275        if let Some(ref tx) = self.sync_tx {
276            let _ = tx.send(S3SyncMessage::Delete { hash: *hash });
277        }
278
279        Ok(deleted)
280    }
281
282    /// Delete data from local store only (don't propagate to S3)
283    /// Used for eviction where we want to keep S3 as archive
284    pub fn delete_local_only(&self, hash: &Hash) -> Result<bool, StoreError> {
285        self.local.delete_sync(hash)
286    }
287
288    /// Get stats from local store
289    pub fn stats(&self) -> Result<hashtree_lmdb::LmdbStats, StoreError> {
290        self.local.stats()
291    }
292
293    /// List all hashes from local store
294    pub fn list(&self) -> Result<Vec<Hash>, StoreError> {
295        self.local.list()
296    }
297
298    /// Get the underlying LMDB store for HashTree operations
299    pub fn local_store(&self) -> Arc<LmdbBlobStore> {
300        Arc::clone(&self.local)
301    }
302}
303
304pub struct HashtreeStore {
305    env: heed::Env,
306    /// Set of pinned hashes (hex strings, prevents garbage collection)
307    pins: Database<Str, Unit>,
308    /// Maps SHA256 hex -> root hash hex (for blossom compatibility)
309    sha256_index: Database<Str, Str>,
310    /// Blob ownership: sha256 (32 bytes) ++ pubkey (32 bytes) -> () (composite key for multi-owner)
311    blob_owners: Database<Bytes, Unit>,
312    /// Maps pubkey -> blob metadata JSON (for blossom list)
313    pubkey_blobs: Database<Str, Bytes>,
314    /// Tree metadata for eviction: tree_root_hash (32 bytes) -> TreeMeta (msgpack)
315    tree_meta: Database<Bytes, Bytes>,
316    /// Blob-to-tree mapping: blob_hash ++ tree_hash (64 bytes) -> ()
317    blob_trees: Database<Bytes, Unit>,
318    /// Tree refs: "npub/path" -> tree_root_hash (32 bytes) - for replacing old versions
319    tree_refs: Database<Str, Bytes>,
320    /// Cached roots from Nostr: "pubkey_hex/tree_name" -> CachedRoot (msgpack)
321    cached_roots: Database<Str, Bytes>,
322    /// Storage router - handles LMDB + optional S3
323    router: StorageRouter,
324    /// Maximum storage size in bytes (from config)
325    max_size_bytes: u64,
326}
327
328impl HashtreeStore {
329    /// Create a new store with local LMDB storage only (10GB default limit)
330    pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
331        Self::with_options(path, None, 10 * 1024 * 1024 * 1024)
332    }
333
334    /// Create a new store with optional S3 backend (10GB default limit)
335    pub fn with_s3<P: AsRef<Path>>(path: P, s3_config: Option<&S3Config>) -> Result<Self> {
336        Self::with_options(path, s3_config, 10 * 1024 * 1024 * 1024)
337    }
338
339    /// Create a new store with optional S3 backend and custom size limit
340    pub fn with_options<P: AsRef<Path>>(path: P, s3_config: Option<&S3Config>, max_size_bytes: u64) -> Result<Self> {
341        let path = path.as_ref();
342        std::fs::create_dir_all(path)?;
343
344        let env = unsafe {
345            EnvOpenOptions::new()
346                .map_size(10 * 1024 * 1024 * 1024) // 10GB virtual address space
347                .max_dbs(9)  // pins, sha256_index, blob_owners, pubkey_blobs, tree_meta, blob_trees, tree_refs, cached_roots, blobs
348                .open(path)?
349        };
350
351        let mut wtxn = env.write_txn()?;
352        let pins = env.create_database(&mut wtxn, Some("pins"))?;
353        let sha256_index = env.create_database(&mut wtxn, Some("sha256_index"))?;
354        let blob_owners = env.create_database(&mut wtxn, Some("blob_owners"))?;
355        let pubkey_blobs = env.create_database(&mut wtxn, Some("pubkey_blobs"))?;
356        let tree_meta = env.create_database(&mut wtxn, Some("tree_meta"))?;
357        let blob_trees = env.create_database(&mut wtxn, Some("blob_trees"))?;
358        let tree_refs = env.create_database(&mut wtxn, Some("tree_refs"))?;
359        let cached_roots = env.create_database(&mut wtxn, Some("cached_roots"))?;
360        wtxn.commit()?;
361
362        // Create local LMDB blob store
363        let lmdb_store = Arc::new(LmdbBlobStore::new(path.join("blobs"))
364            .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?);
365
366        // Create storage router with optional S3
367        #[cfg(feature = "s3")]
368        let router = if let Some(s3_cfg) = s3_config {
369            tracing::info!("Initializing S3 storage backend: bucket={}, endpoint={}",
370                s3_cfg.bucket, s3_cfg.endpoint);
371
372            sync_block_on(async {
373                StorageRouter::with_s3(lmdb_store, s3_cfg).await
374            })?
375        } else {
376            StorageRouter::new(lmdb_store)
377        };
378
379        #[cfg(not(feature = "s3"))]
380        let router = {
381            if s3_config.is_some() {
382                tracing::warn!("S3 config provided but S3 feature not enabled. Using local storage only.");
383            }
384            StorageRouter::new(lmdb_store)
385        };
386
387        Ok(Self {
388            env,
389            pins,
390            sha256_index,
391            blob_owners,
392            pubkey_blobs,
393            tree_meta,
394            blob_trees,
395            tree_refs,
396            cached_roots,
397            router,
398            max_size_bytes,
399        })
400    }
401
402    /// Get the storage router
403    pub fn router(&self) -> &StorageRouter {
404        &self.router
405    }
406
407    /// Get the underlying LMDB store for HashTree operations
408    pub fn blob_store_arc(&self) -> Arc<LmdbBlobStore> {
409        self.router.local_store()
410    }
411
412    /// Upload a file and return its CID (public/unencrypted), with auto-pin
413    pub fn upload_file<P: AsRef<Path>>(&self, file_path: P) -> Result<String> {
414        self.upload_file_internal(file_path, true)
415    }
416
417    /// Upload a file without pinning (for blossom uploads that can be evicted)
418    pub fn upload_file_no_pin<P: AsRef<Path>>(&self, file_path: P) -> Result<String> {
419        self.upload_file_internal(file_path, false)
420    }
421
422    fn upload_file_internal<P: AsRef<Path>>(&self, file_path: P, pin: bool) -> Result<String> {
423        let file_path = file_path.as_ref();
424        let file_content = std::fs::read(file_path)?;
425
426        // Compute SHA256 hash of file content for blossom compatibility
427        let content_sha256 = sha256(&file_content);
428        let sha256_hex = to_hex(&content_sha256);
429
430        // Use hashtree to store the file (public mode - no encryption)
431        let store = self.router.local_store();
432        let tree = HashTree::new(HashTreeConfig::new(store).public());
433
434        let cid = sync_block_on(async {
435            tree.put(&file_content).await
436        }).context("Failed to store file")?;
437
438        let root_hex = to_hex(&cid.hash);
439
440        let mut wtxn = self.env.write_txn()?;
441
442        // Store SHA256 -> root hash mapping for blossom compatibility
443        self.sha256_index.put(&mut wtxn, &sha256_hex, &root_hex)?;
444
445        // Only pin if requested (htree add = pin, blossom upload = no pin)
446        if pin {
447            self.pins.put(&mut wtxn, &root_hex, &())?;
448        }
449
450        wtxn.commit()?;
451
452        Ok(root_hex)
453    }
454
455    /// Upload a file from a stream with progress callbacks
456    pub fn upload_file_stream<R: Read, F>(
457        &self,
458        mut reader: R,
459        _file_name: impl Into<String>,
460        mut callback: F,
461    ) -> Result<String>
462    where
463        F: FnMut(&str),
464    {
465        // Read all data first to compute SHA256
466        let mut data = Vec::new();
467        reader.read_to_end(&mut data)?;
468
469        // Compute SHA256 hash of file content for blossom compatibility
470        let content_sha256 = sha256(&data);
471        let sha256_hex = to_hex(&content_sha256);
472
473        // Use HashTree.put for upload (public mode for blossom)
474        let store = self.router.local_store();
475        let tree = HashTree::new(HashTreeConfig::new(store).public());
476
477        let cid = sync_block_on(async {
478            tree.put(&data).await
479        }).context("Failed to store file")?;
480
481        let root_hex = to_hex(&cid.hash);
482        callback(&root_hex);
483
484        let mut wtxn = self.env.write_txn()?;
485
486        // Store SHA256 -> root hash mapping for blossom compatibility
487        self.sha256_index.put(&mut wtxn, &sha256_hex, &root_hex)?;
488
489        // Auto-pin on upload
490        self.pins.put(&mut wtxn, &root_hex, &())?;
491
492        wtxn.commit()?;
493
494        Ok(root_hex)
495    }
496
497    /// Upload a directory and return its root hash (hex)
498    /// Respects .gitignore by default
499    pub fn upload_dir<P: AsRef<Path>>(&self, dir_path: P) -> Result<String> {
500        self.upload_dir_with_options(dir_path, true)
501    }
502
503    /// Upload a directory with options (public mode - no encryption)
504    pub fn upload_dir_with_options<P: AsRef<Path>>(&self, dir_path: P, respect_gitignore: bool) -> Result<String> {
505        let dir_path = dir_path.as_ref();
506
507        let store = self.router.local_store();
508        let tree = HashTree::new(HashTreeConfig::new(store).public());
509
510        let root_cid = sync_block_on(async {
511            self.upload_dir_recursive(&tree, dir_path, dir_path, respect_gitignore).await
512        }).context("Failed to upload directory")?;
513
514        let root_hex = to_hex(&root_cid.hash);
515
516        let mut wtxn = self.env.write_txn()?;
517        self.pins.put(&mut wtxn, &root_hex, &())?;
518        wtxn.commit()?;
519
520        Ok(root_hex)
521    }
522
523    async fn upload_dir_recursive<S: Store>(
524        &self,
525        tree: &HashTree<S>,
526        _root_path: &Path,
527        current_path: &Path,
528        respect_gitignore: bool,
529    ) -> Result<Cid> {
530        use ignore::WalkBuilder;
531        use std::collections::HashMap;
532
533        // Build directory structure from flat file list - store full Cid with key
534        let mut dir_contents: HashMap<String, Vec<(String, Cid)>> = HashMap::new();
535        dir_contents.insert(String::new(), Vec::new()); // Root
536
537        let walker = WalkBuilder::new(current_path)
538            .git_ignore(respect_gitignore)
539            .git_global(respect_gitignore)
540            .git_exclude(respect_gitignore)
541            .hidden(false)
542            .build();
543
544        for result in walker {
545            let entry = result?;
546            let path = entry.path();
547
548            // Skip the root directory itself
549            if path == current_path {
550                continue;
551            }
552
553            let relative = path.strip_prefix(current_path)
554                .unwrap_or(path);
555
556            if path.is_file() {
557                let content = std::fs::read(path)?;
558                let cid = tree.put(&content).await
559                    .map_err(|e| anyhow::anyhow!("Failed to upload file {}: {}", path.display(), e))?;
560
561                // Get parent directory path and file name
562                let parent = relative.parent()
563                    .map(|p| p.to_string_lossy().to_string())
564                    .unwrap_or_default();
565                let name = relative.file_name()
566                    .map(|n| n.to_string_lossy().to_string())
567                    .unwrap_or_default();
568
569                dir_contents.entry(parent).or_default().push((name, cid));
570            } else if path.is_dir() {
571                // Ensure directory entry exists
572                let dir_path = relative.to_string_lossy().to_string();
573                dir_contents.entry(dir_path).or_default();
574            }
575        }
576
577        // Build directory tree bottom-up
578        self.build_directory_tree(tree, &mut dir_contents).await
579    }
580
581    async fn build_directory_tree<S: Store>(
582        &self,
583        tree: &HashTree<S>,
584        dir_contents: &mut std::collections::HashMap<String, Vec<(String, Cid)>>,
585    ) -> Result<Cid> {
586        // Sort directories by depth (deepest first) to build bottom-up
587        let mut dirs: Vec<String> = dir_contents.keys().cloned().collect();
588        dirs.sort_by(|a, b| {
589            let depth_a = a.matches('/').count() + if a.is_empty() { 0 } else { 1 };
590            let depth_b = b.matches('/').count() + if b.is_empty() { 0 } else { 1 };
591            depth_b.cmp(&depth_a) // Deepest first
592        });
593
594        let mut dir_cids: std::collections::HashMap<String, Cid> = std::collections::HashMap::new();
595
596        for dir_path in dirs {
597            let files = dir_contents.get(&dir_path).cloned().unwrap_or_default();
598
599            let mut entries: Vec<HashTreeDirEntry> = files.into_iter()
600                .map(|(name, cid)| HashTreeDirEntry::from_cid(name, &cid))
601                .collect();
602
603            // Add subdirectory entries
604            for (subdir_path, cid) in &dir_cids {
605                let parent = std::path::Path::new(subdir_path)
606                    .parent()
607                    .map(|p| p.to_string_lossy().to_string())
608                    .unwrap_or_default();
609
610                if parent == dir_path {
611                    let name = std::path::Path::new(subdir_path)
612                        .file_name()
613                        .map(|n| n.to_string_lossy().to_string())
614                        .unwrap_or_default();
615                    entries.push(HashTreeDirEntry::from_cid(name, cid));
616                }
617            }
618
619            let cid = tree.put_directory(entries).await
620                .map_err(|e| anyhow::anyhow!("Failed to create directory node: {}", e))?;
621
622            dir_cids.insert(dir_path, cid);
623        }
624
625        // Return root Cid
626        dir_cids.get("")
627            .cloned()
628            .ok_or_else(|| anyhow::anyhow!("No root directory"))
629    }
630
631    /// Upload a file with CHK encryption, returns CID in format "hash:key"
632    pub fn upload_file_encrypted<P: AsRef<Path>>(&self, file_path: P) -> Result<String> {
633        let file_path = file_path.as_ref();
634        let file_content = std::fs::read(file_path)?;
635
636        // Use unified API with encryption enabled (default)
637        let store = self.router.local_store();
638        let tree = HashTree::new(HashTreeConfig::new(store));
639
640        let cid = sync_block_on(async {
641            tree.put(&file_content).await
642        }).map_err(|e| anyhow::anyhow!("Failed to encrypt file: {}", e))?;
643
644        let cid_str = cid.to_string();
645
646        let mut wtxn = self.env.write_txn()?;
647        self.pins.put(&mut wtxn, &cid_str, &())?;
648        wtxn.commit()?;
649
650        Ok(cid_str)
651    }
652
653    /// Upload a directory with CHK encryption, returns CID
654    /// Respects .gitignore by default
655    pub fn upload_dir_encrypted<P: AsRef<Path>>(&self, dir_path: P) -> Result<String> {
656        self.upload_dir_encrypted_with_options(dir_path, true)
657    }
658
659    /// Upload a directory with CHK encryption and options
660    /// Returns CID as "hash:key" format for encrypted directories
661    pub fn upload_dir_encrypted_with_options<P: AsRef<Path>>(&self, dir_path: P, respect_gitignore: bool) -> Result<String> {
662        let dir_path = dir_path.as_ref();
663        let store = self.router.local_store();
664
665        // Use unified API with encryption enabled (default)
666        let tree = HashTree::new(HashTreeConfig::new(store));
667
668        let root_cid = sync_block_on(async {
669            self.upload_dir_recursive(&tree, dir_path, dir_path, respect_gitignore).await
670        }).context("Failed to upload encrypted directory")?;
671
672        let cid_str = root_cid.to_string(); // Returns "hash:key" or "hash"
673
674        let mut wtxn = self.env.write_txn()?;
675        // Pin by hash only (the key is for decryption, not identification)
676        self.pins.put(&mut wtxn, &to_hex(&root_cid.hash), &())?;
677        wtxn.commit()?;
678
679        Ok(cid_str)
680    }
681
682    /// Get tree node by hash (hex)
683    pub fn get_tree_node(&self, hash_hex: &str) -> Result<Option<TreeNode>> {
684        let hash = from_hex(hash_hex)
685            .map_err(|e| anyhow::anyhow!("Invalid hash: {}", e))?;
686
687        let store = self.router.local_store();
688        let tree = HashTree::new(HashTreeConfig::new(store).public());
689
690        sync_block_on(async {
691            tree.get_tree_node(&hash).await
692                .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))
693        })
694    }
695
696    /// Look up root hash by SHA256 hash (blossom compatibility)
697    pub fn get_cid_by_sha256(&self, sha256_hex: &str) -> Result<Option<String>> {
698        let rtxn = self.env.read_txn()?;
699        Ok(self.sha256_index.get(&rtxn, sha256_hex)?.map(|s| s.to_string()))
700    }
701
702    /// Store a raw blob, returns SHA256 hash as hex.
703    pub fn put_blob(&self, data: &[u8]) -> Result<String> {
704        let hash = sha256(data);
705        self.router.put_sync(hash, data)
706            .map_err(|e| anyhow::anyhow!("Failed to store blob: {}", e))?;
707        Ok(to_hex(&hash))
708    }
709
710    /// Get a raw blob by SHA256 hex hash.
711    pub fn get_blob(&self, sha256_hex: &str) -> Result<Option<Vec<u8>>> {
712        let hash = from_hex(sha256_hex)
713            .map_err(|e| anyhow::anyhow!("invalid hex: {}", e))?;
714        self.router.get_sync(&hash)
715            .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))
716    }
717
718    /// Check if a blob exists by SHA256 hex hash.
719    pub fn blob_exists(&self, sha256_hex: &str) -> Result<bool> {
720        let hash = from_hex(sha256_hex)
721            .map_err(|e| anyhow::anyhow!("invalid hex: {}", e))?;
722        self.router.exists(&hash)
723            .map_err(|e| anyhow::anyhow!("Failed to check blob: {}", e))
724    }
725
726    // === Blossom ownership tracking ===
727    // Uses composite key: sha256 (32 bytes) ++ pubkey (32 bytes) -> ()
728    // This allows efficient multi-owner tracking with O(1) lookups
729
730    /// Build composite key for blob_owners: sha256 ++ pubkey (64 bytes total)
731    fn blob_owner_key(sha256_hex: &str, pubkey_hex: &str) -> Result<[u8; 64]> {
732        let sha256_bytes = from_hex(sha256_hex)
733            .map_err(|e| anyhow::anyhow!("invalid sha256 hex: {}", e))?;
734        let pubkey_bytes = from_hex(pubkey_hex)
735            .map_err(|e| anyhow::anyhow!("invalid pubkey hex: {}", e))?;
736        let mut key = [0u8; 64];
737        key[..32].copy_from_slice(&sha256_bytes);
738        key[32..].copy_from_slice(&pubkey_bytes);
739        Ok(key)
740    }
741
742    /// Add an owner (pubkey) to a blob for Blossom protocol
743    /// Multiple users can own the same blob - it's only deleted when all owners remove it
744    pub fn set_blob_owner(&self, sha256_hex: &str, pubkey: &str) -> Result<()> {
745        use std::time::{SystemTime, UNIX_EPOCH};
746
747        let key = Self::blob_owner_key(sha256_hex, pubkey)?;
748        let mut wtxn = self.env.write_txn()?;
749
750        // Add ownership entry (idempotent - put overwrites)
751        self.blob_owners.put(&mut wtxn, &key[..], &())?;
752
753        // Get existing blobs for this pubkey (for /list endpoint)
754        let mut blobs: Vec<BlobMetadata> = self
755            .pubkey_blobs
756            .get(&wtxn, pubkey)?
757            .and_then(|b| serde_json::from_slice(b).ok())
758            .unwrap_or_default();
759
760        // Check if blob already exists for this pubkey
761        if !blobs.iter().any(|b| b.sha256 == sha256_hex) {
762            let now = SystemTime::now()
763                .duration_since(UNIX_EPOCH)
764                .unwrap()
765                .as_secs();
766
767            // Get size from root hash lookup
768            let size = self
769                .get_cid_by_sha256(sha256_hex)?
770                .and_then(|cid| self.get_file_chunk_metadata(&cid).ok().flatten())
771                .map(|m| m.total_size)
772                .unwrap_or(0);
773
774            blobs.push(BlobMetadata {
775                sha256: sha256_hex.to_string(),
776                size,
777                mime_type: "application/octet-stream".to_string(),
778                uploaded: now,
779            });
780
781            let blobs_json = serde_json::to_vec(&blobs)?;
782            self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
783        }
784
785        wtxn.commit()?;
786        Ok(())
787    }
788
789    /// Check if a pubkey owns a blob
790    pub fn is_blob_owner(&self, sha256_hex: &str, pubkey: &str) -> Result<bool> {
791        let key = Self::blob_owner_key(sha256_hex, pubkey)?;
792        let rtxn = self.env.read_txn()?;
793        Ok(self.blob_owners.get(&rtxn, &key[..])?.is_some())
794    }
795
796    /// Get all owners (pubkeys) of a blob via prefix scan
797    pub fn get_blob_owners(&self, sha256_hex: &str) -> Result<Vec<String>> {
798        let sha256_bytes = from_hex(sha256_hex)
799            .map_err(|e| anyhow::anyhow!("invalid sha256 hex: {}", e))?;
800        let rtxn = self.env.read_txn()?;
801
802        let mut owners = Vec::new();
803        for item in self.blob_owners.prefix_iter(&rtxn, &sha256_bytes[..])? {
804            let (key, _) = item?;
805            if key.len() == 64 {
806                // Extract pubkey from composite key (bytes 32-64)
807                let pubkey_hex = to_hex(&key[32..64].try_into().unwrap());
808                owners.push(pubkey_hex);
809            }
810        }
811        Ok(owners)
812    }
813
814    /// Check if blob has any owners
815    pub fn blob_has_owners(&self, sha256_hex: &str) -> Result<bool> {
816        let sha256_bytes = from_hex(sha256_hex)
817            .map_err(|e| anyhow::anyhow!("invalid sha256 hex: {}", e))?;
818        let rtxn = self.env.read_txn()?;
819
820        // Just check if any entry exists with this prefix
821        for item in self.blob_owners.prefix_iter(&rtxn, &sha256_bytes[..])? {
822            if item.is_ok() {
823                return Ok(true);
824            }
825        }
826        Ok(false)
827    }
828
829    /// Get the first owner (pubkey) of a blob (for backwards compatibility)
830    pub fn get_blob_owner(&self, sha256_hex: &str) -> Result<Option<String>> {
831        Ok(self.get_blob_owners(sha256_hex)?.into_iter().next())
832    }
833
834    /// Remove a user's ownership of a blossom blob
835    /// Only deletes the actual blob when no owners remain
836    /// Returns true if the blob was actually deleted (no owners left)
837    pub fn delete_blossom_blob(&self, sha256_hex: &str, pubkey: &str) -> Result<bool> {
838        let key = Self::blob_owner_key(sha256_hex, pubkey)?;
839        let mut wtxn = self.env.write_txn()?;
840
841        // Remove this pubkey's ownership entry
842        self.blob_owners.delete(&mut wtxn, &key[..])?;
843
844        // Remove from pubkey's blob list
845        if let Some(blobs_bytes) = self.pubkey_blobs.get(&wtxn, pubkey)? {
846            if let Ok(mut blobs) = serde_json::from_slice::<Vec<BlobMetadata>>(blobs_bytes) {
847                blobs.retain(|b| b.sha256 != sha256_hex);
848                let blobs_json = serde_json::to_vec(&blobs)?;
849                self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
850            }
851        }
852
853        // Check if any other owners remain (prefix scan)
854        let sha256_bytes = from_hex(sha256_hex)
855            .map_err(|e| anyhow::anyhow!("invalid sha256 hex: {}", e))?;
856        let mut has_other_owners = false;
857        for item in self.blob_owners.prefix_iter(&wtxn, &sha256_bytes[..])? {
858            if item.is_ok() {
859                has_other_owners = true;
860                break;
861            }
862        }
863
864        if has_other_owners {
865            wtxn.commit()?;
866            tracing::debug!(
867                "Removed {} from blob {} owners, other owners remain",
868                &pubkey[..8.min(pubkey.len())],
869                &sha256_hex[..8.min(sha256_hex.len())]
870            );
871            return Ok(false);
872        }
873
874        // No owners left - delete the blob completely
875        tracing::info!(
876            "All owners removed from blob {}, deleting",
877            &sha256_hex[..8.min(sha256_hex.len())]
878        );
879
880        // Delete from sha256_index
881        let root_hex = self.sha256_index.get(&wtxn, sha256_hex)?.map(|s| s.to_string());
882        if let Some(ref root_hex) = root_hex {
883            // Unpin
884            self.pins.delete(&mut wtxn, root_hex)?;
885        }
886        self.sha256_index.delete(&mut wtxn, sha256_hex)?;
887
888        // Delete raw blob (by content hash) - this deletes from S3 too
889        let hash = from_hex(sha256_hex)
890            .map_err(|e| anyhow::anyhow!("invalid hex: {}", e))?;
891        let _ = self.router.delete_sync(&hash);
892
893        wtxn.commit()?;
894        Ok(true)
895    }
896
897    /// List all blobs owned by a pubkey (for Blossom /list endpoint)
898    pub fn list_blobs_by_pubkey(&self, pubkey: &str) -> Result<Vec<crate::server::blossom::BlobDescriptor>> {
899        let rtxn = self.env.read_txn()?;
900
901        let blobs: Vec<BlobMetadata> = self
902            .pubkey_blobs
903            .get(&rtxn, pubkey)?
904            .and_then(|b| serde_json::from_slice(b).ok())
905            .unwrap_or_default();
906
907        Ok(blobs
908            .into_iter()
909            .map(|b| crate::server::blossom::BlobDescriptor {
910                url: format!("/{}", b.sha256),
911                sha256: b.sha256,
912                size: b.size,
913                mime_type: b.mime_type,
914                uploaded: b.uploaded,
915            })
916            .collect())
917    }
918
919    /// Get a single chunk/blob by hash (hex)
920    pub fn get_chunk(&self, chunk_hex: &str) -> Result<Option<Vec<u8>>> {
921        let hash = from_hex(chunk_hex)
922            .map_err(|e| anyhow::anyhow!("Invalid hash: {}", e))?;
923        self.router.get_sync(&hash)
924            .map_err(|e| anyhow::anyhow!("Failed to get chunk: {}", e))
925    }
926
927    /// Get file content by hash (hex)
928    /// Returns raw bytes (caller handles decryption if needed)
929    pub fn get_file(&self, hash_hex: &str) -> Result<Option<Vec<u8>>> {
930        let hash = from_hex(hash_hex)
931            .map_err(|e| anyhow::anyhow!("Invalid hash: {}", e))?;
932
933        let store = self.router.local_store();
934        let tree = HashTree::new(HashTreeConfig::new(store).public());
935
936        sync_block_on(async {
937            tree.read_file(&hash).await
938                .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
939        })
940    }
941
942    /// Get chunk metadata for a file (chunk list, sizes, total size)
943    pub fn get_file_chunk_metadata(&self, hash_hex: &str) -> Result<Option<FileChunkMetadata>> {
944        let hash = from_hex(hash_hex)
945            .map_err(|e| anyhow::anyhow!("Invalid hash: {}", e))?;
946
947        let store = self.router.local_store();
948        let tree = HashTree::new(HashTreeConfig::new(store.clone()).public());
949
950        sync_block_on(async {
951            // First check if the hash exists in the store at all
952            // (either as a blob or tree node)
953            let exists = store.has(&hash).await
954                .map_err(|e| anyhow::anyhow!("Failed to check existence: {}", e))?;
955
956            if !exists {
957                return Ok(None);
958            }
959
960            // Get total size
961            let total_size = tree.get_size(&hash).await
962                .map_err(|e| anyhow::anyhow!("Failed to get size: {}", e))?;
963
964            // Check if it's a tree (chunked) or blob
965            let is_tree_node = tree.is_tree(&hash).await
966                .map_err(|e| anyhow::anyhow!("Failed to check tree: {}", e))?;
967
968            if !is_tree_node {
969                // Single blob, not chunked
970                return Ok(Some(FileChunkMetadata {
971                    total_size,
972                    chunk_cids: vec![],
973                    chunk_sizes: vec![],
974                    is_chunked: false,
975                }));
976            }
977
978            // Get tree node to extract chunk info
979            let node = match tree.get_tree_node(&hash).await
980                .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))? {
981                Some(n) => n,
982                None => return Ok(None),
983            };
984
985            // Check if it's a directory (has named links)
986            let is_directory = tree.is_directory(&hash).await
987                .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
988
989            if is_directory {
990                return Ok(None); // Not a file
991            }
992
993            // Extract chunk info from links
994            let chunk_cids: Vec<String> = node.links.iter().map(|l| to_hex(&l.hash)).collect();
995            let chunk_sizes: Vec<u64> = node.links.iter().map(|l| l.size).collect();
996
997            Ok(Some(FileChunkMetadata {
998                total_size,
999                chunk_cids,
1000                chunk_sizes,
1001                is_chunked: !node.links.is_empty(),
1002            }))
1003        })
1004    }
1005
1006    /// Get byte range from file
1007    pub fn get_file_range(&self, hash_hex: &str, start: u64, end: Option<u64>) -> Result<Option<(Vec<u8>, u64)>> {
1008        let metadata = match self.get_file_chunk_metadata(hash_hex)? {
1009            Some(m) => m,
1010            None => return Ok(None),
1011        };
1012
1013        if metadata.total_size == 0 {
1014            return Ok(Some((Vec::new(), 0)));
1015        }
1016
1017        if start >= metadata.total_size {
1018            return Ok(None);
1019        }
1020
1021        let end = end.unwrap_or(metadata.total_size - 1).min(metadata.total_size - 1);
1022
1023        // For non-chunked files, load entire file
1024        if !metadata.is_chunked {
1025            let content = self.get_file(hash_hex)?.unwrap_or_default();
1026            let range_content = if start < content.len() as u64 {
1027                content[start as usize..=(end as usize).min(content.len() - 1)].to_vec()
1028            } else {
1029                Vec::new()
1030            };
1031            return Ok(Some((range_content, metadata.total_size)));
1032        }
1033
1034        // For chunked files, load only needed chunks
1035        let mut result = Vec::new();
1036        let mut current_offset = 0u64;
1037
1038        for (i, chunk_cid) in metadata.chunk_cids.iter().enumerate() {
1039            let chunk_size = metadata.chunk_sizes[i];
1040            let chunk_end = current_offset + chunk_size - 1;
1041
1042            // Check if this chunk overlaps with requested range
1043            if chunk_end >= start && current_offset <= end {
1044                let chunk_content = match self.get_chunk(chunk_cid)? {
1045                    Some(content) => content,
1046                    None => {
1047                        return Err(anyhow::anyhow!("Chunk {} not found", chunk_cid));
1048                    }
1049                };
1050
1051                let chunk_read_start = if current_offset >= start {
1052                    0
1053                } else {
1054                    (start - current_offset) as usize
1055                };
1056
1057                let chunk_read_end = if chunk_end <= end {
1058                    chunk_size as usize - 1
1059                } else {
1060                    (end - current_offset) as usize
1061                };
1062
1063                result.extend_from_slice(&chunk_content[chunk_read_start..=chunk_read_end]);
1064            }
1065
1066            current_offset += chunk_size;
1067
1068            if current_offset > end {
1069                break;
1070            }
1071        }
1072
1073        Ok(Some((result, metadata.total_size)))
1074    }
1075
1076    /// Stream file range as chunks using Arc for async/Send contexts
1077    pub fn stream_file_range_chunks_owned(
1078        self: Arc<Self>,
1079        hash_hex: &str,
1080        start: u64,
1081        end: u64,
1082    ) -> Result<Option<FileRangeChunksOwned>> {
1083        let metadata = match self.get_file_chunk_metadata(hash_hex)? {
1084            Some(m) => m,
1085            None => return Ok(None),
1086        };
1087
1088        if metadata.total_size == 0 || start >= metadata.total_size {
1089            return Ok(None);
1090        }
1091
1092        let end = end.min(metadata.total_size - 1);
1093
1094        Ok(Some(FileRangeChunksOwned {
1095            store: self,
1096            metadata,
1097            start,
1098            end,
1099            current_chunk_idx: 0,
1100            current_offset: 0,
1101        }))
1102    }
1103
1104    /// Get directory structure by hash (hex)
1105    pub fn get_directory_listing(&self, hash_hex: &str) -> Result<Option<DirectoryListing>> {
1106        let hash = from_hex(hash_hex)
1107            .map_err(|e| anyhow::anyhow!("Invalid hash: {}", e))?;
1108
1109        let store = self.router.local_store();
1110        let tree = HashTree::new(HashTreeConfig::new(store).public());
1111
1112        sync_block_on(async {
1113            // Check if it's a directory
1114            let is_dir = tree.is_directory(&hash).await
1115                .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1116
1117            if !is_dir {
1118                return Ok(None);
1119            }
1120
1121            // Get directory entries (public Cid - no encryption key)
1122            let cid = hashtree_core::Cid::public(hash, 0);
1123            let tree_entries = tree.list_directory(&cid).await
1124                .map_err(|e| anyhow::anyhow!("Failed to list directory: {}", e))?;
1125
1126            let entries: Vec<DirEntry> = tree_entries.into_iter().map(|e| DirEntry {
1127                name: e.name,
1128                cid: to_hex(&e.hash),
1129                is_directory: e.link_type.is_tree(),
1130                size: e.size,
1131            }).collect();
1132
1133            Ok(Some(DirectoryListing {
1134                dir_name: String::new(),
1135                entries,
1136            }))
1137        })
1138    }
1139
1140    /// Pin a hash (prevent garbage collection)
1141    pub fn pin(&self, hash_hex: &str) -> Result<()> {
1142        let mut wtxn = self.env.write_txn()?;
1143        self.pins.put(&mut wtxn, hash_hex, &())?;
1144        wtxn.commit()?;
1145        Ok(())
1146    }
1147
1148    /// Unpin a hash (allow garbage collection)
1149    pub fn unpin(&self, hash_hex: &str) -> Result<()> {
1150        let mut wtxn = self.env.write_txn()?;
1151        self.pins.delete(&mut wtxn, hash_hex)?;
1152        wtxn.commit()?;
1153        Ok(())
1154    }
1155
1156    /// Check if hash is pinned
1157    pub fn is_pinned(&self, hash_hex: &str) -> Result<bool> {
1158        let rtxn = self.env.read_txn()?;
1159        Ok(self.pins.get(&rtxn, hash_hex)?.is_some())
1160    }
1161
1162    /// List all pinned hashes
1163    pub fn list_pins(&self) -> Result<Vec<String>> {
1164        let rtxn = self.env.read_txn()?;
1165        let mut pins = Vec::new();
1166
1167        for item in self.pins.iter(&rtxn)? {
1168            let (hash_hex, _) = item?;
1169            pins.push(hash_hex.to_string());
1170        }
1171
1172        Ok(pins)
1173    }
1174
1175    /// List all pinned hashes with names
1176    pub fn list_pins_with_names(&self) -> Result<Vec<PinnedItem>> {
1177        let rtxn = self.env.read_txn()?;
1178        let store = self.router.local_store();
1179        let tree = HashTree::new(HashTreeConfig::new(store).public());
1180        let mut pins = Vec::new();
1181
1182        for item in self.pins.iter(&rtxn)? {
1183            let (hash_hex, _) = item?;
1184            let hash_hex_str = hash_hex.to_string();
1185
1186            // Try to determine if it's a directory
1187            let is_directory = if let Ok(hash) = from_hex(&hash_hex_str) {
1188                sync_block_on(async {
1189                    tree.is_directory(&hash).await.unwrap_or(false)
1190                })
1191            } else {
1192                false
1193            };
1194
1195            pins.push(PinnedItem {
1196                cid: hash_hex_str,
1197                name: "Unknown".to_string(),
1198                is_directory,
1199            });
1200        }
1201
1202        Ok(pins)
1203    }
1204
1205    // === Tree indexing for eviction ===
1206
1207    /// Index a tree after sync - tracks all blobs in the tree for eviction
1208    ///
1209    /// If `ref_key` is provided (e.g. "npub.../name"), it will replace any existing
1210    /// tree with that ref, allowing old versions to be evicted.
1211    pub fn index_tree(
1212        &self,
1213        root_hash: &Hash,
1214        owner: &str,
1215        name: Option<&str>,
1216        priority: u8,
1217        ref_key: Option<&str>,
1218    ) -> Result<()> {
1219        let root_hex = to_hex(root_hash);
1220
1221        // If ref_key provided, check for and unindex old version
1222        if let Some(key) = ref_key {
1223            let rtxn = self.env.read_txn()?;
1224            if let Some(old_hash_bytes) = self.tree_refs.get(&rtxn, key)? {
1225                if old_hash_bytes != root_hash.as_slice() {
1226                    let old_hash: Hash = old_hash_bytes.try_into()
1227                        .map_err(|_| anyhow::anyhow!("Invalid hash in tree_refs"))?;
1228                    drop(rtxn);
1229                    // Unindex old tree (will delete orphaned blobs)
1230                    let _ = self.unindex_tree(&old_hash);
1231                    tracing::debug!("Replaced old tree for ref {}", key);
1232                }
1233            }
1234        }
1235
1236        let store = self.router.local_store();
1237        let tree = HashTree::new(HashTreeConfig::new(store).public());
1238
1239        // Walk tree and collect all blob hashes + compute total size
1240        let (blob_hashes, total_size) = sync_block_on(async {
1241            self.collect_tree_blobs(&tree, root_hash).await
1242        })?;
1243
1244        let mut wtxn = self.env.write_txn()?;
1245
1246        // Store blob-tree relationships (64-byte key: blob_hash ++ tree_hash)
1247        for blob_hash in &blob_hashes {
1248            let mut key = [0u8; 64];
1249            key[..32].copy_from_slice(blob_hash);
1250            key[32..].copy_from_slice(root_hash);
1251            self.blob_trees.put(&mut wtxn, &key[..], &())?;
1252        }
1253
1254        // Store tree metadata
1255        let meta = TreeMeta {
1256            owner: owner.to_string(),
1257            name: name.map(|s| s.to_string()),
1258            synced_at: SystemTime::now()
1259                .duration_since(UNIX_EPOCH)
1260                .unwrap()
1261                .as_secs(),
1262            total_size,
1263            priority,
1264        };
1265        let meta_bytes = rmp_serde::to_vec(&meta)
1266            .map_err(|e| anyhow::anyhow!("Failed to serialize TreeMeta: {}", e))?;
1267        self.tree_meta.put(&mut wtxn, root_hash.as_slice(), &meta_bytes)?;
1268
1269        // Store ref -> hash mapping if ref_key provided
1270        if let Some(key) = ref_key {
1271            self.tree_refs.put(&mut wtxn, key, root_hash.as_slice())?;
1272        }
1273
1274        wtxn.commit()?;
1275
1276        tracing::debug!(
1277            "Indexed tree {} ({} blobs, {} bytes, priority {})",
1278            &root_hex[..8],
1279            blob_hashes.len(),
1280            total_size,
1281            priority
1282        );
1283
1284        Ok(())
1285    }
1286
1287    /// Collect all blob hashes in a tree and compute total size
1288    async fn collect_tree_blobs<S: Store>(
1289        &self,
1290        tree: &HashTree<S>,
1291        root: &Hash,
1292    ) -> Result<(Vec<Hash>, u64)> {
1293        let mut blobs = Vec::new();
1294        let mut total_size = 0u64;
1295        let mut stack = vec![*root];
1296
1297        while let Some(hash) = stack.pop() {
1298            // Check if it's a tree node
1299            let is_tree = tree.is_tree(&hash).await
1300                .map_err(|e| anyhow::anyhow!("Failed to check tree: {}", e))?;
1301
1302            if is_tree {
1303                // Get tree node and add children to stack
1304                if let Some(node) = tree.get_tree_node(&hash).await
1305                    .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
1306                {
1307                    for link in &node.links {
1308                        stack.push(link.hash);
1309                    }
1310                }
1311            } else {
1312                // It's a blob - get its size
1313                if let Some(data) = self.router.get_sync(&hash)
1314                    .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))?
1315                {
1316                    total_size += data.len() as u64;
1317                    blobs.push(hash);
1318                }
1319            }
1320        }
1321
1322        Ok((blobs, total_size))
1323    }
1324
1325    /// Unindex a tree - removes blob-tree mappings and deletes orphaned blobs
1326    /// Returns the number of bytes freed
1327    pub fn unindex_tree(&self, root_hash: &Hash) -> Result<u64> {
1328        let root_hex = to_hex(root_hash);
1329
1330        let store = self.router.local_store();
1331        let tree = HashTree::new(HashTreeConfig::new(store).public());
1332
1333        // Walk tree and collect all blob hashes
1334        let (blob_hashes, _) = sync_block_on(async {
1335            self.collect_tree_blobs(&tree, root_hash).await
1336        })?;
1337
1338        let mut wtxn = self.env.write_txn()?;
1339        let mut freed = 0u64;
1340
1341        // For each blob, remove the blob-tree entry and check if orphaned
1342        for blob_hash in &blob_hashes {
1343            // Delete blob-tree entry (64-byte key: blob_hash ++ tree_hash)
1344            let mut key = [0u8; 64];
1345            key[..32].copy_from_slice(blob_hash);
1346            key[32..].copy_from_slice(root_hash);
1347            self.blob_trees.delete(&mut wtxn, &key[..])?;
1348
1349            // Check if blob is in any other tree (prefix scan on first 32 bytes)
1350            let rtxn = self.env.read_txn()?;
1351            let mut has_other_tree = false;
1352
1353            for item in self.blob_trees.prefix_iter(&rtxn, &blob_hash[..])? {
1354                if item.is_ok() {
1355                    has_other_tree = true;
1356                    break;
1357                }
1358            }
1359            drop(rtxn);
1360
1361            // If orphaned, delete the blob
1362            if !has_other_tree {
1363                if let Some(data) = self.router.get_sync(blob_hash)
1364                    .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))?
1365                {
1366                    freed += data.len() as u64;
1367                    // Delete locally only - keep S3 as archive
1368                    self.router.delete_local_only(blob_hash)
1369                        .map_err(|e| anyhow::anyhow!("Failed to delete blob: {}", e))?;
1370                }
1371            }
1372        }
1373
1374        // Delete tree node itself if exists
1375        if let Some(data) = self.router.get_sync(root_hash)
1376            .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
1377        {
1378            freed += data.len() as u64;
1379            // Delete locally only - keep S3 as archive
1380            self.router.delete_local_only(root_hash)
1381                .map_err(|e| anyhow::anyhow!("Failed to delete tree node: {}", e))?;
1382        }
1383
1384        // Delete tree metadata
1385        self.tree_meta.delete(&mut wtxn, root_hash.as_slice())?;
1386
1387        wtxn.commit()?;
1388
1389        tracing::debug!(
1390            "Unindexed tree {} ({} bytes freed)",
1391            &root_hex[..8],
1392            freed
1393        );
1394
1395        Ok(freed)
1396    }
1397
1398    /// Get tree metadata
1399    pub fn get_tree_meta(&self, root_hash: &Hash) -> Result<Option<TreeMeta>> {
1400        let rtxn = self.env.read_txn()?;
1401        if let Some(bytes) = self.tree_meta.get(&rtxn, root_hash.as_slice())? {
1402            let meta: TreeMeta = rmp_serde::from_slice(bytes)
1403                .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
1404            Ok(Some(meta))
1405        } else {
1406            Ok(None)
1407        }
1408    }
1409
1410    /// List all indexed trees
1411    pub fn list_indexed_trees(&self) -> Result<Vec<(Hash, TreeMeta)>> {
1412        let rtxn = self.env.read_txn()?;
1413        let mut trees = Vec::new();
1414
1415        for item in self.tree_meta.iter(&rtxn)? {
1416            let (hash_bytes, meta_bytes) = item?;
1417            let hash: Hash = hash_bytes.try_into()
1418                .map_err(|_| anyhow::anyhow!("Invalid hash in tree_meta"))?;
1419            let meta: TreeMeta = rmp_serde::from_slice(meta_bytes)
1420                .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
1421            trees.push((hash, meta));
1422        }
1423
1424        Ok(trees)
1425    }
1426
1427    /// Get total tracked storage size (sum of all tree_meta.total_size)
1428    pub fn tracked_size(&self) -> Result<u64> {
1429        let rtxn = self.env.read_txn()?;
1430        let mut total = 0u64;
1431
1432        for item in self.tree_meta.iter(&rtxn)? {
1433            let (_, bytes) = item?;
1434            let meta: TreeMeta = rmp_serde::from_slice(bytes)
1435                .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
1436            total += meta.total_size;
1437        }
1438
1439        Ok(total)
1440    }
1441
1442    /// Get evictable trees sorted by (priority ASC, synced_at ASC)
1443    fn get_evictable_trees(&self) -> Result<Vec<(Hash, TreeMeta)>> {
1444        let mut trees = self.list_indexed_trees()?;
1445
1446        // Sort by priority (lower first), then by synced_at (older first)
1447        trees.sort_by(|a, b| {
1448            match a.1.priority.cmp(&b.1.priority) {
1449                std::cmp::Ordering::Equal => a.1.synced_at.cmp(&b.1.synced_at),
1450                other => other,
1451            }
1452        });
1453
1454        Ok(trees)
1455    }
1456
1457    /// Run eviction if storage is over quota
1458    /// Returns bytes freed
1459    ///
1460    /// Eviction order:
1461    /// 1. Orphaned blobs (not in any indexed tree and not pinned)
1462    /// 2. Trees by priority (lowest first) and age (oldest first)
1463    pub fn evict_if_needed(&self) -> Result<u64> {
1464        // Get actual storage used
1465        let stats = self.router.stats()
1466            .map_err(|e| anyhow::anyhow!("Failed to get stats: {}", e))?;
1467        let current = stats.total_bytes;
1468
1469        if current <= self.max_size_bytes {
1470            return Ok(0);
1471        }
1472
1473        // Target 90% of max to avoid constant eviction
1474        let target = self.max_size_bytes * 90 / 100;
1475        let mut freed = 0u64;
1476        let mut current_size = current;
1477
1478        // Phase 1: Evict orphaned blobs (not in any tree and not pinned)
1479        let orphan_freed = self.evict_orphaned_blobs()?;
1480        freed += orphan_freed;
1481        current_size = current_size.saturating_sub(orphan_freed);
1482
1483        if orphan_freed > 0 {
1484            tracing::info!("Evicted orphaned blobs: {} bytes freed", orphan_freed);
1485        }
1486
1487        // Check if we're now under target
1488        if current_size <= target {
1489            if freed > 0 {
1490                tracing::info!("Eviction complete: {} bytes freed", freed);
1491            }
1492            return Ok(freed);
1493        }
1494
1495        // Phase 2: Evict trees by priority (lowest first) and age (oldest first)
1496        // Own trees CAN be evicted (just last), but PINNED trees are never evicted
1497        let evictable = self.get_evictable_trees()?;
1498
1499        for (root_hash, meta) in evictable {
1500            if current_size <= target {
1501                break;
1502            }
1503
1504            let root_hex = to_hex(&root_hash);
1505
1506            // Never evict pinned trees
1507            if self.is_pinned(&root_hex)? {
1508                continue;
1509            }
1510
1511            let tree_freed = self.unindex_tree(&root_hash)?;
1512            freed += tree_freed;
1513            current_size = current_size.saturating_sub(tree_freed);
1514
1515            tracing::info!(
1516                "Evicted tree {} (owner={}, priority={}, {} bytes)",
1517                &root_hex[..8],
1518                &meta.owner[..8.min(meta.owner.len())],
1519                meta.priority,
1520                tree_freed
1521            );
1522        }
1523
1524        if freed > 0 {
1525            tracing::info!("Eviction complete: {} bytes freed", freed);
1526        }
1527
1528        Ok(freed)
1529    }
1530
1531    /// Evict blobs that are not part of any indexed tree and not pinned
1532    fn evict_orphaned_blobs(&self) -> Result<u64> {
1533        let mut freed = 0u64;
1534
1535        // Get all blob hashes from store
1536        let all_hashes = self.router.list()
1537            .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
1538
1539        // Get pinned hashes
1540        let rtxn = self.env.read_txn()?;
1541        let pinned: HashSet<String> = self.pins.iter(&rtxn)?
1542            .filter_map(|item| item.ok())
1543            .map(|(hash_hex, _)| hash_hex.to_string())
1544            .collect();
1545
1546        // Collect all blob hashes that are in at least one tree
1547        // Key format is blob_hash (32 bytes) ++ tree_hash (32 bytes)
1548        let mut blobs_in_trees: HashSet<Hash> = HashSet::new();
1549        for item in self.blob_trees.iter(&rtxn)? {
1550            if let Ok((key_bytes, _)) = item {
1551                if key_bytes.len() >= 32 {
1552                    let blob_hash: Hash = key_bytes[..32].try_into().unwrap();
1553                    blobs_in_trees.insert(blob_hash);
1554                }
1555            }
1556        }
1557        drop(rtxn);
1558
1559        // Find and delete orphaned blobs
1560        for hash in all_hashes {
1561            let hash_hex = to_hex(&hash);
1562
1563            // Skip if pinned
1564            if pinned.contains(&hash_hex) {
1565                continue;
1566            }
1567
1568            // Skip if part of any tree
1569            if blobs_in_trees.contains(&hash) {
1570                continue;
1571            }
1572
1573            // This blob is orphaned - delete locally (keep S3 as archive)
1574            if let Ok(Some(data)) = self.router.get_sync(&hash) {
1575                freed += data.len() as u64;
1576                let _ = self.router.delete_local_only(&hash);
1577                tracing::debug!("Deleted orphaned blob {} ({} bytes)", &hash_hex[..8], data.len());
1578            }
1579        }
1580
1581        Ok(freed)
1582    }
1583
1584    /// Get the maximum storage size in bytes
1585    pub fn max_size_bytes(&self) -> u64 {
1586        self.max_size_bytes
1587    }
1588
1589    /// Get storage usage by priority tier
1590    pub fn storage_by_priority(&self) -> Result<StorageByPriority> {
1591        let rtxn = self.env.read_txn()?;
1592        let mut own = 0u64;
1593        let mut followed = 0u64;
1594        let mut other = 0u64;
1595
1596        for item in self.tree_meta.iter(&rtxn)? {
1597            let (_, bytes) = item?;
1598            let meta: TreeMeta = rmp_serde::from_slice(bytes)
1599                .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
1600
1601            if meta.priority >= PRIORITY_OWN {
1602                own += meta.total_size;
1603            } else if meta.priority >= PRIORITY_FOLLOWED {
1604                followed += meta.total_size;
1605            } else {
1606                other += meta.total_size;
1607            }
1608        }
1609
1610        Ok(StorageByPriority { own, followed, other })
1611    }
1612
1613    /// Get storage statistics
1614    pub fn get_storage_stats(&self) -> Result<StorageStats> {
1615        let rtxn = self.env.read_txn()?;
1616        let total_pins = self.pins.len(&rtxn)? as usize;
1617
1618        let stats = self.router.stats()
1619            .map_err(|e| anyhow::anyhow!("Failed to get stats: {}", e))?;
1620
1621        Ok(StorageStats {
1622            total_dags: stats.count,
1623            pinned_dags: total_pins,
1624            total_bytes: stats.total_bytes,
1625        })
1626    }
1627
1628    // === Cached roots (replaces nostrdb event caching) ===
1629
1630    /// Get cached root for a pubkey/tree_name pair
1631    pub fn get_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<Option<CachedRoot>> {
1632        let key = format!("{}/{}", pubkey_hex, tree_name);
1633        let rtxn = self.env.read_txn()?;
1634        if let Some(bytes) = self.cached_roots.get(&rtxn, &key)? {
1635            let root: CachedRoot = rmp_serde::from_slice(bytes)
1636                .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
1637            Ok(Some(root))
1638        } else {
1639            Ok(None)
1640        }
1641    }
1642
1643    /// Set cached root for a pubkey/tree_name pair
1644    pub fn set_cached_root(
1645        &self,
1646        pubkey_hex: &str,
1647        tree_name: &str,
1648        hash: &str,
1649        key: Option<&str>,
1650        visibility: &str,
1651        updated_at: u64,
1652    ) -> Result<()> {
1653        let db_key = format!("{}/{}", pubkey_hex, tree_name);
1654        let root = CachedRoot {
1655            hash: hash.to_string(),
1656            key: key.map(|k| k.to_string()),
1657            updated_at,
1658            visibility: visibility.to_string(),
1659        };
1660        let bytes = rmp_serde::to_vec(&root)
1661            .map_err(|e| anyhow::anyhow!("Failed to serialize CachedRoot: {}", e))?;
1662        let mut wtxn = self.env.write_txn()?;
1663        self.cached_roots.put(&mut wtxn, &db_key, &bytes)?;
1664        wtxn.commit()?;
1665        Ok(())
1666    }
1667
1668    /// List all cached roots for a pubkey
1669    pub fn list_cached_roots(&self, pubkey_hex: &str) -> Result<Vec<(String, CachedRoot)>> {
1670        let prefix = format!("{}/", pubkey_hex);
1671        let rtxn = self.env.read_txn()?;
1672        let mut results = Vec::new();
1673
1674        for item in self.cached_roots.iter(&rtxn)? {
1675            let (key, bytes) = item?;
1676            if key.starts_with(&prefix) {
1677                let tree_name = key.strip_prefix(&prefix).unwrap_or(key);
1678                let root: CachedRoot = rmp_serde::from_slice(bytes)
1679                    .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
1680                results.push((tree_name.to_string(), root));
1681            }
1682        }
1683
1684        Ok(results)
1685    }
1686
1687    /// Delete a cached root
1688    pub fn delete_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<bool> {
1689        let key = format!("{}/{}", pubkey_hex, tree_name);
1690        let mut wtxn = self.env.write_txn()?;
1691        let deleted = self.cached_roots.delete(&mut wtxn, &key)?;
1692        wtxn.commit()?;
1693        Ok(deleted)
1694    }
1695
1696    /// Garbage collect unpinned content
1697    pub fn gc(&self) -> Result<GcStats> {
1698        let rtxn = self.env.read_txn()?;
1699
1700        // Get all pinned hashes
1701        let pinned: HashSet<String> = self.pins.iter(&rtxn)?
1702            .filter_map(|item| item.ok())
1703            .map(|(hash_hex, _)| hash_hex.to_string())
1704            .collect();
1705
1706        drop(rtxn);
1707
1708        // Get all stored hashes
1709        let all_hashes = self.router.list()
1710            .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
1711
1712        // Delete unpinned hashes
1713        let mut deleted = 0;
1714        let mut freed_bytes = 0u64;
1715
1716        for hash in all_hashes {
1717            let hash_hex = to_hex(&hash);
1718            if !pinned.contains(&hash_hex) {
1719                if let Ok(Some(data)) = self.router.get_sync(&hash) {
1720                    freed_bytes += data.len() as u64;
1721                    // Delete locally only - keep S3 as archive
1722                    let _ = self.router.delete_local_only(&hash);
1723                    deleted += 1;
1724                }
1725            }
1726        }
1727
1728        Ok(GcStats {
1729            deleted_dags: deleted,
1730            freed_bytes,
1731        })
1732    }
1733}
1734
1735#[derive(Debug)]
1736pub struct StorageStats {
1737    pub total_dags: usize,
1738    pub pinned_dags: usize,
1739    pub total_bytes: u64,
1740}
1741
1742/// Storage usage broken down by priority tier
1743#[derive(Debug, Clone)]
1744pub struct StorageByPriority {
1745    /// Own/pinned trees (priority 255)
1746    pub own: u64,
1747    /// Followed users' trees (priority 128)
1748    pub followed: u64,
1749    /// Other trees (priority 64)
1750    pub other: u64,
1751}
1752
1753#[derive(Debug, Clone)]
1754pub struct FileChunkMetadata {
1755    pub total_size: u64,
1756    pub chunk_cids: Vec<String>,
1757    pub chunk_sizes: Vec<u64>,
1758    pub is_chunked: bool,
1759}
1760
1761/// Owned iterator for async streaming
1762pub struct FileRangeChunksOwned {
1763    store: Arc<HashtreeStore>,
1764    metadata: FileChunkMetadata,
1765    start: u64,
1766    end: u64,
1767    current_chunk_idx: usize,
1768    current_offset: u64,
1769}
1770
1771impl Iterator for FileRangeChunksOwned {
1772    type Item = Result<Vec<u8>>;
1773
1774    fn next(&mut self) -> Option<Self::Item> {
1775        if !self.metadata.is_chunked || self.current_chunk_idx >= self.metadata.chunk_cids.len() {
1776            return None;
1777        }
1778
1779        if self.current_offset > self.end {
1780            return None;
1781        }
1782
1783        let chunk_cid = &self.metadata.chunk_cids[self.current_chunk_idx];
1784        let chunk_size = self.metadata.chunk_sizes[self.current_chunk_idx];
1785        let chunk_end = self.current_offset + chunk_size - 1;
1786
1787        self.current_chunk_idx += 1;
1788
1789        if chunk_end < self.start || self.current_offset > self.end {
1790            self.current_offset += chunk_size;
1791            return self.next();
1792        }
1793
1794        let chunk_content = match self.store.get_chunk(chunk_cid) {
1795            Ok(Some(content)) => content,
1796            Ok(None) => {
1797                return Some(Err(anyhow::anyhow!("Chunk {} not found", chunk_cid)));
1798            }
1799            Err(e) => {
1800                return Some(Err(e));
1801            }
1802        };
1803
1804        let chunk_read_start = if self.current_offset >= self.start {
1805            0
1806        } else {
1807            (self.start - self.current_offset) as usize
1808        };
1809
1810        let chunk_read_end = if chunk_end <= self.end {
1811            chunk_size as usize - 1
1812        } else {
1813            (self.end - self.current_offset) as usize
1814        };
1815
1816        let result = chunk_content[chunk_read_start..=chunk_read_end].to_vec();
1817        self.current_offset += chunk_size;
1818
1819        Some(Ok(result))
1820    }
1821}
1822
1823#[derive(Debug)]
1824pub struct GcStats {
1825    pub deleted_dags: usize,
1826    pub freed_bytes: u64,
1827}
1828
1829#[derive(Debug, Clone)]
1830pub struct DirEntry {
1831    pub name: String,
1832    pub cid: String,
1833    pub is_directory: bool,
1834    pub size: u64,
1835}
1836
1837#[derive(Debug, Clone)]
1838pub struct DirectoryListing {
1839    pub dir_name: String,
1840    pub entries: Vec<DirEntry>,
1841}
1842
1843#[derive(Debug, Clone)]
1844pub struct PinnedItem {
1845    pub cid: String,
1846    pub name: String,
1847    pub is_directory: bool,
1848}
1849
1850/// Blob metadata for Blossom protocol
1851#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
1852pub struct BlobMetadata {
1853    pub sha256: String,
1854    pub size: u64,
1855    pub mime_type: String,
1856    pub uploaded: u64,
1857}
1858
1859// Implement ContentStore trait for WebRTC data exchange
1860impl crate::webrtc::ContentStore for HashtreeStore {
1861    fn get(&self, hash_hex: &str) -> Result<Option<Vec<u8>>> {
1862        self.get_chunk(hash_hex)
1863    }
1864}