Skip to main content

hashtree_cli/
storage.rs

1use anyhow::{Context, Result};
2use async_trait::async_trait;
3use futures::executor::block_on as sync_block_on;
4use futures::StreamExt;
5use hashtree_config::StorageBackend;
6use hashtree_core::store::{Store, StoreError};
7use hashtree_core::{
8    from_hex, sha256, to_hex, types::Hash, Cid, HashTree, HashTreeConfig, TreeNode,
9};
10use hashtree_fs::FsBlobStore;
11#[cfg(feature = "lmdb")]
12use hashtree_lmdb::LmdbBlobStore;
13use heed::types::*;
14use heed::{Database, EnvOpenOptions};
15use serde::{Deserialize, Serialize};
16#[cfg(feature = "s3")]
17use std::future::Future;
18use std::io::Write;
19use std::path::{Path, PathBuf};
20use std::sync::Arc;
21use std::time::{SystemTime, UNIX_EPOCH};
22
23mod upload;
24
25mod maintenance;
26mod retention;
27
28pub use maintenance::{compact_lmdb_environments_under, CompactResult, VerifyResult};
29pub use retention::{PinnedItem, StorageByPriority, StorageStats, TreeMeta};
30
31/// Priority levels for tree eviction
32pub const PRIORITY_OTHER: u8 = 64;
33pub const PRIORITY_FOLLOWED: u8 = 128;
34pub const PRIORITY_OWN: u8 = 255;
35const LMDB_MAX_READERS: u32 = 1024;
36#[cfg(feature = "lmdb")]
37const LMDB_BLOB_MIN_MAP_SIZE_BYTES: u64 = 10 * 1024 * 1024 * 1024;
38
39/// Cached root info from Nostr events.
40#[derive(Debug, Clone, Serialize, Deserialize)]
41pub struct CachedRoot {
42    /// Root hash (hex)
43    pub hash: String,
44    /// Optional decryption key (hex)
45    pub key: Option<String>,
46    /// Unix timestamp when this was cached (from event created_at)
47    pub updated_at: u64,
48    /// Visibility: "public", "link-visible", or "private"
49    pub visibility: String,
50}
51
52/// Storage statistics
53#[derive(Debug, Clone)]
54pub struct LocalStoreStats {
55    pub count: usize,
56    pub total_bytes: u64,
57}
58
59/// Local blob store - wraps either FsBlobStore or LmdbBlobStore
60pub enum LocalStore {
61    Fs(FsBlobStore),
62    #[cfg(feature = "lmdb")]
63    Lmdb(LmdbBlobStore),
64}
65
66#[cfg(feature = "lmdb")]
67fn is_fs_blob_shard_dir(path: &Path) -> bool {
68    path.file_name()
69        .and_then(|name| name.to_str())
70        .map(|name| name.len() == 2 && name.as_bytes().iter().all(u8::is_ascii_hexdigit))
71        .unwrap_or(false)
72}
73
74#[cfg(feature = "lmdb")]
75fn remove_stale_fs_blob_shards(path: &Path) -> Result<(), StoreError> {
76    let entries = std::fs::read_dir(path).map_err(StoreError::Io)?;
77    for entry in entries {
78        let entry = entry.map_err(StoreError::Io)?;
79        let entry_path = entry.path();
80        if entry_path.is_dir() && is_fs_blob_shard_dir(&entry_path) {
81            std::fs::remove_dir_all(&entry_path).map_err(StoreError::Io)?;
82            tracing::info!(
83                "Removed stale filesystem blob shard directory after LMDB cutover: {}",
84                entry_path.display()
85            );
86        }
87    }
88    Ok(())
89}
90
91impl LocalStore {
92    /// Create a new unbounded local store.
93    ///
94    /// Higher-level stores that need quota enforcement should manage eviction
95    /// above this layer so tree metadata, pins, and archival policies stay
96    /// coherent.
97    pub fn new<P: AsRef<Path>>(path: P, backend: &StorageBackend) -> Result<Self, StoreError> {
98        Self::new_unbounded(path, backend)
99    }
100
101    /// Create a new local store with an explicit LMDB logical size cap when using the LMDB backend.
102    ///
103    /// The requested size is used for both the LMDB map and the store's built-in
104    /// byte quota, so standalone blob envs can evict instead of growing forever.
105    pub fn new_with_lmdb_map_size<P: AsRef<Path>>(
106        path: P,
107        backend: &StorageBackend,
108        _map_size_bytes: Option<u64>,
109    ) -> Result<Self, StoreError> {
110        match backend {
111            StorageBackend::Fs => Ok(LocalStore::Fs(FsBlobStore::new(path)?)),
112            #[cfg(feature = "lmdb")]
113            StorageBackend::Lmdb => match _map_size_bytes {
114                Some(map_size_bytes) => {
115                    std::fs::create_dir_all(path.as_ref()).map_err(StoreError::Io)?;
116                    remove_stale_fs_blob_shards(path.as_ref())?;
117                    Ok(LocalStore::Lmdb(LmdbBlobStore::with_max_bytes(
118                        path,
119                        map_size_bytes,
120                    )?))
121                }
122                None => {
123                    std::fs::create_dir_all(path.as_ref()).map_err(StoreError::Io)?;
124                    remove_stale_fs_blob_shards(path.as_ref())?;
125                    Ok(LocalStore::Lmdb(LmdbBlobStore::new(path)?))
126                }
127            },
128            #[cfg(not(feature = "lmdb"))]
129            StorageBackend::Lmdb => {
130                tracing::warn!(
131                    "LMDB backend requested but lmdb feature not enabled, using filesystem storage"
132                );
133                Ok(LocalStore::Fs(FsBlobStore::new(path)?))
134            }
135        }
136    }
137
138    /// Create a new unbounded local store for a specific backend.
139    pub fn new_unbounded<P: AsRef<Path>>(
140        path: P,
141        backend: &StorageBackend,
142    ) -> Result<Self, StoreError> {
143        Self::new_with_lmdb_map_size(path, backend, None)
144    }
145
146    pub fn backend(&self) -> StorageBackend {
147        match self {
148            LocalStore::Fs(_) => StorageBackend::Fs,
149            #[cfg(feature = "lmdb")]
150            LocalStore::Lmdb(_) => StorageBackend::Lmdb,
151        }
152    }
153
154    /// Sync put operation
155    pub fn put_sync(&self, hash: Hash, data: &[u8]) -> Result<bool, StoreError> {
156        match self {
157            LocalStore::Fs(store) => store.put_sync(hash, data),
158            #[cfg(feature = "lmdb")]
159            LocalStore::Lmdb(store) => store.put_sync(hash, data),
160        }
161    }
162
163    /// Sync batch put operation.
164    pub fn put_many_sync(&self, items: &[(Hash, Vec<u8>)]) -> Result<usize, StoreError> {
165        match self {
166            LocalStore::Fs(store) => {
167                let mut inserted = 0usize;
168                for (hash, data) in items {
169                    if store.put_sync(*hash, data.as_slice())? {
170                        inserted += 1;
171                    }
172                }
173                Ok(inserted)
174            }
175            #[cfg(feature = "lmdb")]
176            LocalStore::Lmdb(store) => store.put_many_sync(items),
177        }
178    }
179
180    /// Sync get operation
181    pub fn get_sync(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
182        match self {
183            LocalStore::Fs(store) => store.get_sync(hash),
184            #[cfg(feature = "lmdb")]
185            LocalStore::Lmdb(store) => store.get_sync(hash),
186        }
187    }
188
189    /// Check if hash exists
190    pub fn exists(&self, hash: &Hash) -> Result<bool, StoreError> {
191        match self {
192            LocalStore::Fs(store) => Ok(store.exists(hash)),
193            #[cfg(feature = "lmdb")]
194            LocalStore::Lmdb(store) => store.exists(hash),
195        }
196    }
197
198    /// Sync delete operation
199    pub fn delete_sync(&self, hash: &Hash) -> Result<bool, StoreError> {
200        match self {
201            LocalStore::Fs(store) => store.delete_sync(hash),
202            #[cfg(feature = "lmdb")]
203            LocalStore::Lmdb(store) => store.delete_sync(hash),
204        }
205    }
206
207    /// Get storage statistics
208    pub fn stats(&self) -> Result<LocalStoreStats, StoreError> {
209        match self {
210            LocalStore::Fs(store) => {
211                let stats = store.stats()?;
212                Ok(LocalStoreStats {
213                    count: stats.count,
214                    total_bytes: stats.total_bytes,
215                })
216            }
217            #[cfg(feature = "lmdb")]
218            LocalStore::Lmdb(store) => {
219                let stats = store.stats()?;
220                Ok(LocalStoreStats {
221                    count: stats.count,
222                    total_bytes: stats.total_bytes,
223                })
224            }
225        }
226    }
227
228    /// List all hashes in the store
229    pub fn list(&self) -> Result<Vec<Hash>, StoreError> {
230        match self {
231            LocalStore::Fs(store) => store.list(),
232            #[cfg(feature = "lmdb")]
233            LocalStore::Lmdb(store) => store.list(),
234        }
235    }
236}
237
238#[async_trait]
239impl Store for LocalStore {
240    async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
241        self.put_sync(hash, &data)
242    }
243
244    async fn put_many(&self, items: Vec<(Hash, Vec<u8>)>) -> Result<usize, StoreError> {
245        self.put_many_sync(&items)
246    }
247
248    async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
249        self.get_sync(hash)
250    }
251
252    async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
253        self.exists(hash)
254    }
255
256    async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
257        self.delete_sync(hash)
258    }
259}
260
261#[cfg(feature = "s3")]
262use tokio::sync::mpsc;
263
264use crate::config::S3Config;
265
266/// Message for background S3 sync
267#[cfg(feature = "s3")]
268enum S3SyncMessage {
269    Upload { hash: Hash, data: Vec<u8> },
270    Delete { hash: Hash },
271}
272
273/// Storage router - local store primary with optional S3 backup
274///
275/// Write path: local first (fast), then queue S3 upload (non-blocking)
276/// Read path: local first, fall back to S3 if miss
277pub struct StorageRouter {
278    /// Primary local store (always used)
279    local: Arc<LocalStore>,
280    /// Optional S3 client for backup
281    #[cfg(feature = "s3")]
282    s3_client: Option<aws_sdk_s3::Client>,
283    #[cfg(feature = "s3")]
284    s3_bucket: Option<String>,
285    #[cfg(feature = "s3")]
286    s3_prefix: String,
287    /// Channel to send uploads to background task
288    #[cfg(feature = "s3")]
289    sync_tx: Option<mpsc::UnboundedSender<S3SyncMessage>>,
290}
291
292impl StorageRouter {
293    #[cfg(feature = "s3")]
294    fn run_s3_future_sync<F, T>(future: F) -> Result<T, StoreError>
295    where
296        F: Future<Output = T> + Send + 'static,
297        T: Send + 'static,
298    {
299        if tokio::runtime::Handle::try_current().is_ok() {
300            return std::thread::Builder::new()
301                .name("storage-s3-sync".to_string())
302                .spawn(move || {
303                    tokio::runtime::Builder::new_current_thread()
304                        .enable_all()
305                        .build()
306                        .expect("build storage s3 sync runtime")
307                        .block_on(future)
308                })
309                .map_err(|err| StoreError::Other(format!("spawn S3 sync helper thread: {err}")))?
310                .join()
311                .map_err(|_| StoreError::Other("S3 sync helper thread panicked".to_string()));
312        }
313
314        let runtime = tokio::runtime::Builder::new_current_thread()
315            .enable_all()
316            .build()
317            .map_err(|err| StoreError::Other(format!("build storage s3 sync runtime: {err}")))?;
318        Ok(runtime.block_on(future))
319    }
320
321    /// Create router with local storage only
322    pub fn new(local: Arc<LocalStore>) -> Self {
323        Self {
324            local,
325            #[cfg(feature = "s3")]
326            s3_client: None,
327            #[cfg(feature = "s3")]
328            s3_bucket: None,
329            #[cfg(feature = "s3")]
330            s3_prefix: String::new(),
331            #[cfg(feature = "s3")]
332            sync_tx: None,
333        }
334    }
335
336    /// Create router with local storage + S3 backup
337    #[cfg(feature = "s3")]
338    pub async fn with_s3(local: Arc<LocalStore>, config: &S3Config) -> Result<Self, anyhow::Error> {
339        use aws_sdk_s3::Client as S3Client;
340
341        // Build AWS config
342        let mut aws_config_loader = aws_config::from_env();
343        aws_config_loader =
344            aws_config_loader.region(aws_sdk_s3::config::Region::new(config.region.clone()));
345        let aws_config = aws_config_loader.load().await;
346
347        // Build S3 client with custom endpoint
348        let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&aws_config);
349        s3_config_builder = s3_config_builder
350            .endpoint_url(&config.endpoint)
351            .force_path_style(true);
352
353        let s3_client = S3Client::from_conf(s3_config_builder.build());
354        let bucket = config.bucket.clone();
355        let prefix = config.prefix.clone().unwrap_or_default();
356
357        // Create background sync channel
358        let (sync_tx, mut sync_rx) = mpsc::unbounded_channel::<S3SyncMessage>();
359
360        // Spawn background sync task with bounded concurrent uploads
361        let sync_client = s3_client.clone();
362        let sync_bucket = bucket.clone();
363        let sync_prefix = prefix.clone();
364
365        tokio::spawn(async move {
366            use aws_sdk_s3::primitives::ByteStream;
367
368            tracing::info!("S3 background sync task started");
369
370            // Limit concurrent uploads to prevent overwhelming the runtime
371            let semaphore = std::sync::Arc::new(tokio::sync::Semaphore::new(32));
372            let client = std::sync::Arc::new(sync_client);
373            let bucket = std::sync::Arc::new(sync_bucket);
374            let prefix = std::sync::Arc::new(sync_prefix);
375
376            while let Some(msg) = sync_rx.recv().await {
377                let client = client.clone();
378                let bucket = bucket.clone();
379                let prefix = prefix.clone();
380                let semaphore = semaphore.clone();
381
382                // Spawn each upload with semaphore-bounded concurrency
383                tokio::spawn(async move {
384                    // Acquire permit before uploading
385                    let _permit = semaphore.acquire().await;
386
387                    match msg {
388                        S3SyncMessage::Upload { hash, data } => {
389                            let key = format!("{}{}.bin", prefix, to_hex(&hash));
390                            tracing::debug!("S3 uploading {} ({} bytes)", &key, data.len());
391
392                            match client
393                                .put_object()
394                                .bucket(bucket.as_str())
395                                .key(&key)
396                                .body(ByteStream::from(data))
397                                .send()
398                                .await
399                            {
400                                Ok(_) => tracing::debug!("S3 upload succeeded: {}", &key),
401                                Err(e) => tracing::error!("S3 upload failed {}: {}", &key, e),
402                            }
403                        }
404                        S3SyncMessage::Delete { hash } => {
405                            let key = format!("{}{}.bin", prefix, to_hex(&hash));
406                            tracing::debug!("S3 deleting {}", &key);
407
408                            if let Err(e) = client
409                                .delete_object()
410                                .bucket(bucket.as_str())
411                                .key(&key)
412                                .send()
413                                .await
414                            {
415                                tracing::error!("S3 delete failed {}: {}", &key, e);
416                            }
417                        }
418                    }
419                });
420            }
421        });
422
423        tracing::info!(
424            "S3 storage initialized: bucket={}, prefix={}",
425            bucket,
426            prefix
427        );
428
429        Ok(Self {
430            local,
431            s3_client: Some(s3_client),
432            s3_bucket: Some(bucket),
433            s3_prefix: prefix,
434            sync_tx: Some(sync_tx),
435        })
436    }
437
438    /// Store data - writes to LMDB, queues S3 upload in background
439    pub fn put_sync(&self, hash: Hash, data: &[u8]) -> Result<bool, StoreError> {
440        // Always write to local first
441        let is_new = self.local.put_sync(hash, data)?;
442
443        // Queue S3 upload only for newly inserted blobs.
444        // Existing local blobs were already persisted or are handled by explicit repair/push flows.
445        #[cfg(feature = "s3")]
446        if is_new {
447            if let Some(ref tx) = self.sync_tx {
448                tracing::debug!(
449                    "Queueing S3 upload for {} ({} bytes)",
450                    crate::storage::to_hex(&hash)[..16].to_string(),
451                    data.len(),
452                );
453                if let Err(e) = tx.send(S3SyncMessage::Upload {
454                    hash,
455                    data: data.to_vec(),
456                }) {
457                    tracing::error!("Failed to queue S3 upload: {}", e);
458                }
459            }
460        }
461
462        Ok(is_new)
463    }
464
465    /// Store multiple blobs with a single local batch write when supported.
466    pub fn put_many_sync(&self, items: &[(Hash, Vec<u8>)]) -> Result<usize, StoreError> {
467        #[cfg(feature = "s3")]
468        let pending_uploads = if self.sync_tx.is_some() {
469            let mut pending = Vec::new();
470            for (hash, data) in items {
471                if !self.local.exists(hash)? {
472                    pending.push((*hash, data.clone()));
473                }
474            }
475            pending
476        } else {
477            Vec::new()
478        };
479
480        let inserted = self.local.put_many_sync(items)?;
481
482        #[cfg(feature = "s3")]
483        if let Some(ref tx) = self.sync_tx {
484            for (hash, data) in pending_uploads {
485                if let Err(e) = tx.send(S3SyncMessage::Upload { hash, data }) {
486                    tracing::error!("Failed to queue S3 upload: {}", e);
487                }
488            }
489        }
490
491        Ok(inserted)
492    }
493
494    /// Get data - tries LMDB first, falls back to S3
495    pub fn get_sync(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
496        // Try local first
497        if let Some(data) = self.local.get_sync(hash)? {
498            return Ok(Some(data));
499        }
500
501        // Fall back to S3 if configured
502        #[cfg(feature = "s3")]
503        if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
504            let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
505            let client = client.clone();
506            let bucket = bucket.clone();
507
508            match Self::run_s3_future_sync(async move {
509                client.get_object().bucket(bucket).key(key).send().await
510            }) {
511                Ok(Ok(output)) => {
512                    match Self::run_s3_future_sync(async move { output.body.collect().await }) {
513                        Ok(Ok(body)) => {
514                            let data = body.into_bytes().to_vec();
515                            // Cache locally for future reads
516                            let _ = self.local.put_sync(*hash, &data);
517                            return Ok(Some(data));
518                        }
519                        Ok(Err(err)) => {
520                            tracing::warn!("S3 body collect failed: {}", err);
521                        }
522                        Err(err) => {
523                            tracing::warn!("S3 body collect runtime failed: {}", err);
524                        }
525                    }
526                }
527                Ok(Err(err)) => {
528                    let service_err = err.into_service_error();
529                    if !service_err.is_no_such_key() {
530                        tracing::warn!("S3 get failed: {}", service_err);
531                    }
532                }
533                Err(err) => {
534                    tracing::warn!("S3 get runtime failed: {}", err);
535                }
536            }
537        }
538
539        Ok(None)
540    }
541
542    /// Check if hash exists
543    pub fn exists(&self, hash: &Hash) -> Result<bool, StoreError> {
544        // Check local first
545        if self.local.exists(hash)? {
546            return Ok(true);
547        }
548
549        // Check S3 if configured
550        #[cfg(feature = "s3")]
551        if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
552            let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
553            let client = client.clone();
554            let bucket = bucket.clone();
555
556            match Self::run_s3_future_sync(async move {
557                client.head_object().bucket(bucket).key(&key).send().await
558            }) {
559                Ok(Ok(_)) => return Ok(true),
560                Ok(Err(err)) => {
561                    let service_err = err.into_service_error();
562                    if !service_err.is_not_found() {
563                        tracing::warn!("S3 head failed: {}", service_err);
564                    }
565                }
566                Err(err) => {
567                    tracing::warn!("S3 head runtime failed: {}", err);
568                }
569            }
570        }
571
572        Ok(false)
573    }
574
575    /// Delete data from both local and S3 stores
576    pub fn delete_sync(&self, hash: &Hash) -> Result<bool, StoreError> {
577        let deleted = self.local.delete_sync(hash)?;
578
579        // Queue S3 delete if configured
580        #[cfg(feature = "s3")]
581        if let Some(ref tx) = self.sync_tx {
582            let _ = tx.send(S3SyncMessage::Delete { hash: *hash });
583        }
584
585        Ok(deleted)
586    }
587
588    /// Delete data from local store only (don't propagate to S3)
589    /// Used for eviction where we want to keep S3 as archive
590    pub fn delete_local_only(&self, hash: &Hash) -> Result<bool, StoreError> {
591        self.local.delete_sync(hash)
592    }
593
594    /// Get stats from local store
595    pub fn stats(&self) -> Result<LocalStoreStats, StoreError> {
596        self.local.stats()
597    }
598
599    /// List all hashes from local store
600    pub fn list(&self) -> Result<Vec<Hash>, StoreError> {
601        self.local.list()
602    }
603
604    /// Get the underlying local store for HashTree operations
605    pub fn local_store(&self) -> Arc<LocalStore> {
606        Arc::clone(&self.local)
607    }
608}
609
610// Implement async Store trait for StorageRouter so it can be used directly with HashTree
611// This ensures all writes go through S3 sync
612#[async_trait]
613impl Store for StorageRouter {
614    async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
615        self.put_sync(hash, &data)
616    }
617
618    async fn put_many(&self, items: Vec<(Hash, Vec<u8>)>) -> Result<usize, StoreError> {
619        self.put_many_sync(&items)
620    }
621
622    async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
623        self.get_sync(hash)
624    }
625
626    async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
627        self.exists(hash)
628    }
629
630    async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
631        self.delete_sync(hash)
632    }
633}
634
635pub struct HashtreeStore {
636    base_path: PathBuf,
637    env: heed::Env,
638    /// Set of pinned hashes (32-byte raw hashes, prevents garbage collection)
639    pins: Database<Bytes, Unit>,
640    /// Mutable published refs that should stay subscribed and keep following updates
641    pinned_refs: Database<Str, Unit>,
642    /// Authors whose hashtree publications should be mirrored continuously
643    tracked_authors: Database<Str, Unit>,
644    /// Blob ownership: sha256 (32 bytes) ++ pubkey (32 bytes) -> () (composite key for multi-owner)
645    blob_owners: Database<Bytes, Unit>,
646    /// Maps pubkey (32 bytes) -> blob metadata JSON (for blossom list)
647    pubkey_blobs: Database<Bytes, Bytes>,
648    /// Tree metadata for eviction: tree_root_hash (32 bytes) -> TreeMeta (msgpack)
649    tree_meta: Database<Bytes, Bytes>,
650    /// Blob-to-tree mapping: blob_hash ++ tree_hash (64 bytes) -> ()
651    blob_trees: Database<Bytes, Unit>,
652    /// Tree refs: "npub/path" -> tree_root_hash (32 bytes) - for replacing old versions
653    tree_refs: Database<Str, Bytes>,
654    /// Cached roots from Nostr: "pubkey_hex/tree_name" -> CachedRoot (msgpack)
655    cached_roots: Database<Str, Bytes>,
656    /// Storage router - handles LMDB + optional S3 (Arc for sharing with HashTree)
657    router: Arc<StorageRouter>,
658    /// Maximum storage size in bytes (from config)
659    max_size_bytes: u64,
660    /// Whether quota enforcement may delete local blobs not tracked by any indexed tree.
661    evict_orphans: bool,
662}
663
664impl HashtreeStore {
665    /// Create a new store with the configured local storage limit.
666    pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
667        let config = hashtree_config::Config::load_or_default();
668        let max_size_bytes = config
669            .storage
670            .max_size_gb
671            .saturating_mul(1024 * 1024 * 1024);
672        Self::with_options_and_backend(
673            path,
674            None,
675            max_size_bytes,
676            config.storage.evict_orphans,
677            &config.storage.backend,
678        )
679    }
680
681    /// Create a new store with an explicit local backend and size limit.
682    pub fn new_with_backend<P: AsRef<Path>>(
683        path: P,
684        backend: hashtree_config::StorageBackend,
685        max_size_bytes: u64,
686    ) -> Result<Self> {
687        Self::with_options_and_backend(path, None, max_size_bytes, true, &backend)
688    }
689
690    /// Create a new store with optional S3 backend and the configured local storage limit.
691    pub fn with_s3<P: AsRef<Path>>(path: P, s3_config: Option<&S3Config>) -> Result<Self> {
692        let config = hashtree_config::Config::load_or_default();
693        let max_size_bytes = config
694            .storage
695            .max_size_gb
696            .saturating_mul(1024 * 1024 * 1024);
697        Self::with_options_and_backend(
698            path,
699            s3_config,
700            max_size_bytes,
701            config.storage.evict_orphans,
702            &config.storage.backend,
703        )
704    }
705
706    /// Create a new store with optional S3 backend and custom size limit.
707    ///
708    /// The raw local blob backend remains unbounded. `HashtreeStore` enforces
709    /// `max_size_bytes` at the tree-management layer so eviction can honor pins,
710    /// orphan handling, and local-only eviction when S3 is used as archive.
711    pub fn with_options<P: AsRef<Path>>(
712        path: P,
713        s3_config: Option<&S3Config>,
714        max_size_bytes: u64,
715    ) -> Result<Self> {
716        let config = hashtree_config::Config::load_or_default();
717        Self::with_options_and_backend(
718            path,
719            s3_config,
720            max_size_bytes,
721            config.storage.evict_orphans,
722            &config.storage.backend,
723        )
724    }
725
726    fn with_options_and_backend<P: AsRef<Path>>(
727        path: P,
728        s3_config: Option<&S3Config>,
729        max_size_bytes: u64,
730        evict_orphans: bool,
731        backend: &hashtree_config::StorageBackend,
732    ) -> Result<Self> {
733        let path = path.as_ref();
734        std::fs::create_dir_all(path)?;
735
736        let env = unsafe {
737            EnvOpenOptions::new()
738                .map_size(10 * 1024 * 1024 * 1024) // 10GB virtual address space
739                .max_dbs(10) // pins, pinned_refs, tracked_authors, blob_owners, pubkey_blobs, tree_meta, blob_trees, tree_refs, cached_roots, blobs
740                .max_readers(LMDB_MAX_READERS)
741                .open(path)?
742        };
743        let _ = env.clear_stale_readers();
744
745        let mut wtxn = env.write_txn()?;
746        let pins = env.create_database(&mut wtxn, Some("pins"))?;
747        let pinned_refs = env.create_database(&mut wtxn, Some("pinned_refs"))?;
748        let tracked_authors = env.create_database(&mut wtxn, Some("tracked_authors"))?;
749        let blob_owners = env.create_database(&mut wtxn, Some("blob_owners"))?;
750        let pubkey_blobs = env.create_database(&mut wtxn, Some("pubkey_blobs"))?;
751        let tree_meta = env.create_database(&mut wtxn, Some("tree_meta"))?;
752        let blob_trees = env.create_database(&mut wtxn, Some("blob_trees"))?;
753        let tree_refs = env.create_database(&mut wtxn, Some("tree_refs"))?;
754        let cached_roots = env.create_database(&mut wtxn, Some("cached_roots"))?;
755        wtxn.commit()?;
756
757        // Intentionally keep the raw blob backend unbounded here. HashtreeStore
758        // owns quota policy above this layer, where it can coordinate eviction
759        // with tree refs, blob ownership, pins, and S3 archival behavior.
760        let local_store = Arc::new(match backend {
761            hashtree_config::StorageBackend::Fs => LocalStore::Fs(
762                FsBlobStore::new(path.join("blobs"))
763                    .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
764            ),
765            #[cfg(feature = "lmdb")]
766            hashtree_config::StorageBackend::Lmdb => {
767                std::fs::create_dir_all(path.join("blobs"))?;
768                remove_stale_fs_blob_shards(&path.join("blobs"))
769                    .map_err(|e| anyhow::anyhow!("Failed to clean LMDB blob store path: {}", e))?;
770                let requested_map_size = max_size_bytes.max(LMDB_BLOB_MIN_MAP_SIZE_BYTES);
771                let map_size = usize::try_from(requested_map_size)
772                    .context("LMDB blob map size exceeds usize")?;
773                LocalStore::Lmdb(
774                    LmdbBlobStore::with_map_size(path.join("blobs"), map_size)
775                        .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
776                )
777            }
778            #[cfg(not(feature = "lmdb"))]
779            hashtree_config::StorageBackend::Lmdb => {
780                tracing::warn!(
781                    "LMDB backend requested but lmdb feature not enabled, using filesystem storage"
782                );
783                LocalStore::Fs(
784                    FsBlobStore::new(path.join("blobs"))
785                        .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
786                )
787            }
788        });
789
790        // Create storage router with optional S3
791        #[cfg(feature = "s3")]
792        let router = Arc::new(if let Some(s3_cfg) = s3_config {
793            tracing::info!(
794                "Initializing S3 storage backend: bucket={}, endpoint={}",
795                s3_cfg.bucket,
796                s3_cfg.endpoint
797            );
798
799            sync_block_on(async { StorageRouter::with_s3(local_store, s3_cfg).await })?
800        } else {
801            StorageRouter::new(local_store)
802        });
803
804        #[cfg(not(feature = "s3"))]
805        let router = Arc::new({
806            if s3_config.is_some() {
807                tracing::warn!(
808                    "S3 config provided but S3 feature not enabled. Using local storage only."
809                );
810            }
811            StorageRouter::new(local_store)
812        });
813
814        Ok(Self {
815            base_path: path.to_path_buf(),
816            env,
817            pins,
818            pinned_refs,
819            tracked_authors,
820            blob_owners,
821            pubkey_blobs,
822            tree_meta,
823            blob_trees,
824            tree_refs,
825            cached_roots,
826            router,
827            max_size_bytes,
828            evict_orphans,
829        })
830    }
831
832    pub fn base_path(&self) -> &Path {
833        &self.base_path
834    }
835
836    /// Get the storage router
837    pub fn router(&self) -> &StorageRouter {
838        &self.router
839    }
840
841    /// Get the storage router as Arc (for use with HashTree which needs Arc<dyn Store>)
842    /// All writes through this go to both LMDB and S3
843    pub fn store_arc(&self) -> Arc<StorageRouter> {
844        Arc::clone(&self.router)
845    }
846
847    /// Get tree node by hash (raw bytes)
848    pub fn get_tree_node(&self, hash: &[u8; 32]) -> Result<Option<TreeNode>> {
849        let store = self.store_arc();
850        let tree = HashTree::new(HashTreeConfig::new(store).public());
851
852        sync_block_on(async {
853            tree.get_tree_node(hash)
854                .await
855                .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))
856        })
857    }
858
859    /// Store a raw blob, returns SHA256 hash as hex.
860    pub fn put_blob(&self, data: &[u8]) -> Result<String> {
861        let hash = sha256(data);
862        self.router
863            .put_sync(hash, data)
864            .map_err(|e| anyhow::anyhow!("Failed to store blob: {}", e))?;
865        Ok(to_hex(&hash))
866    }
867
868    /// Store an opportunistically cached blob.
869    ///
870    /// Unlike durable `put_blob` writes, this path may evict disposable orphaned
871    /// blobs to make room under storage pressure. It intentionally avoids touching
872    /// indexed trees, social-graph roots, explicit pins, and owned Blossom blobs.
873    pub fn put_cached_blob(&self, data: &[u8]) -> Result<String> {
874        let hash = sha256(data);
875        if self
876            .router
877            .exists(&hash)
878            .map_err(|e| anyhow::anyhow!("Failed to check cached blob: {}", e))?
879        {
880            return Ok(to_hex(&hash));
881        }
882
883        let incoming_bytes = data.len() as u64;
884        let _ = self.make_room_for_cached_blob(incoming_bytes);
885
886        let mut retried_after_cleanup = false;
887        loop {
888            match self.router.put_sync(hash, data) {
889                Ok(_) => return Ok(to_hex(&hash)),
890                Err(err) if !retried_after_cleanup && is_map_full_store_error(&err) => {
891                    let freed = self.relieve_cached_blob_write_pressure(incoming_bytes)?;
892                    if freed == 0 {
893                        return Err(anyhow::anyhow!("Failed to store cached blob: {}", err));
894                    }
895                    retried_after_cleanup = true;
896                }
897                Err(err) => return Err(anyhow::anyhow!("Failed to store cached blob: {}", err)),
898            }
899        }
900    }
901
902    /// Get a raw blob by SHA256 hash (raw bytes).
903    pub fn get_blob(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
904        self.router
905            .get_sync(hash)
906            .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))
907    }
908
909    /// Check if a blob exists by SHA256 hash (raw bytes).
910    pub fn blob_exists(&self, hash: &[u8; 32]) -> Result<bool> {
911        self.router
912            .exists(hash)
913            .map_err(|e| anyhow::anyhow!("Failed to check blob: {}", e))
914    }
915
916    // === Blossom ownership tracking ===
917    // Uses composite key: sha256 (32 bytes) ++ pubkey (32 bytes) -> ()
918    // This allows efficient multi-owner tracking with O(1) lookups
919
920    /// Build composite key for blob_owners: sha256 ++ pubkey (64 bytes total)
921    fn blob_owner_key(sha256: &[u8; 32], pubkey: &[u8; 32]) -> [u8; 64] {
922        let mut key = [0u8; 64];
923        key[..32].copy_from_slice(sha256);
924        key[32..].copy_from_slice(pubkey);
925        key
926    }
927
928    /// Add an owner (pubkey) to a blob for Blossom protocol
929    /// Multiple users can own the same blob - it's only deleted when all owners remove it
930    pub fn set_blob_owner(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<()> {
931        let key = Self::blob_owner_key(sha256, pubkey);
932        let mut wtxn = self.env.write_txn()?;
933
934        // Add ownership entry (idempotent - put overwrites)
935        self.blob_owners.put(&mut wtxn, &key[..], &())?;
936
937        // Convert sha256 to hex for BlobMetadata (which stores sha256 as hex string)
938        let sha256_hex = to_hex(sha256);
939
940        // Get existing blobs for this pubkey (for /list endpoint)
941        let mut blobs: Vec<BlobMetadata> = self
942            .pubkey_blobs
943            .get(&wtxn, pubkey)?
944            .and_then(|b| serde_json::from_slice(b).ok())
945            .unwrap_or_default();
946
947        // Check if blob already exists for this pubkey
948        if !blobs.iter().any(|b| b.sha256 == sha256_hex) {
949            let now = SystemTime::now()
950                .duration_since(UNIX_EPOCH)
951                .unwrap()
952                .as_secs();
953
954            // Get size from raw blob
955            let size = self
956                .get_blob(sha256)?
957                .map(|data| data.len() as u64)
958                .unwrap_or(0);
959
960            blobs.push(BlobMetadata {
961                sha256: sha256_hex,
962                size,
963                mime_type: "application/octet-stream".to_string(),
964                uploaded: now,
965            });
966
967            let blobs_json = serde_json::to_vec(&blobs)?;
968            self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
969        }
970
971        wtxn.commit()?;
972        Ok(())
973    }
974
975    /// Check if a pubkey owns a blob
976    pub fn is_blob_owner(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<bool> {
977        let key = Self::blob_owner_key(sha256, pubkey);
978        let rtxn = self.env.read_txn()?;
979        Ok(self.blob_owners.get(&rtxn, &key[..])?.is_some())
980    }
981
982    /// Get all owners (pubkeys) of a blob via prefix scan (returns raw bytes)
983    pub fn get_blob_owners(&self, sha256: &[u8; 32]) -> Result<Vec<[u8; 32]>> {
984        let rtxn = self.env.read_txn()?;
985
986        let mut owners = Vec::new();
987        for item in self.blob_owners.prefix_iter(&rtxn, &sha256[..])? {
988            let (key, _) = item?;
989            if key.len() == 64 {
990                // Extract pubkey from composite key (bytes 32-64)
991                let mut pubkey = [0u8; 32];
992                pubkey.copy_from_slice(&key[32..64]);
993                owners.push(pubkey);
994            }
995        }
996        Ok(owners)
997    }
998
999    /// Check if blob has any owners
1000    pub fn blob_has_owners(&self, sha256: &[u8; 32]) -> Result<bool> {
1001        let rtxn = self.env.read_txn()?;
1002
1003        // Just check if any entry exists with this prefix
1004        for item in self.blob_owners.prefix_iter(&rtxn, &sha256[..])? {
1005            if item.is_ok() {
1006                return Ok(true);
1007            }
1008        }
1009        Ok(false)
1010    }
1011
1012    /// Get the first owner (pubkey) of a blob (for backwards compatibility)
1013    pub fn get_blob_owner(&self, sha256: &[u8; 32]) -> Result<Option<[u8; 32]>> {
1014        Ok(self.get_blob_owners(sha256)?.into_iter().next())
1015    }
1016
1017    /// Remove a user's ownership of a blossom blob
1018    /// Only deletes the actual blob when no owners remain
1019    /// Returns true if the blob was actually deleted (no owners left)
1020    pub fn delete_blossom_blob(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<bool> {
1021        let key = Self::blob_owner_key(sha256, pubkey);
1022        let mut wtxn = self.env.write_txn()?;
1023
1024        // Remove this pubkey's ownership entry
1025        self.blob_owners.delete(&mut wtxn, &key[..])?;
1026
1027        // Hex strings for logging and BlobMetadata (which stores sha256 as hex string)
1028        let sha256_hex = to_hex(sha256);
1029
1030        // Remove from pubkey's blob list
1031        if let Some(blobs_bytes) = self.pubkey_blobs.get(&wtxn, pubkey)? {
1032            if let Ok(mut blobs) = serde_json::from_slice::<Vec<BlobMetadata>>(blobs_bytes) {
1033                blobs.retain(|b| b.sha256 != sha256_hex);
1034                let blobs_json = serde_json::to_vec(&blobs)?;
1035                self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
1036            }
1037        }
1038
1039        // Check if any other owners remain (prefix scan)
1040        let mut has_other_owners = false;
1041        for item in self.blob_owners.prefix_iter(&wtxn, &sha256[..])? {
1042            if item.is_ok() {
1043                has_other_owners = true;
1044                break;
1045            }
1046        }
1047
1048        if has_other_owners {
1049            wtxn.commit()?;
1050            tracing::debug!(
1051                "Removed {} from blob {} owners, other owners remain",
1052                &to_hex(pubkey)[..8],
1053                &sha256_hex[..8]
1054            );
1055            return Ok(false);
1056        }
1057
1058        // No owners left - delete the blob completely
1059        tracing::info!(
1060            "All owners removed from blob {}, deleting",
1061            &sha256_hex[..8]
1062        );
1063
1064        // Delete raw blob (by content hash) - this deletes from S3 too
1065        let _ = self.router.delete_sync(sha256);
1066
1067        wtxn.commit()?;
1068        Ok(true)
1069    }
1070
1071    /// List all blobs owned by a pubkey (for Blossom /list endpoint)
1072    pub fn list_blobs_by_pubkey(
1073        &self,
1074        pubkey: &[u8; 32],
1075    ) -> Result<Vec<crate::server::blossom::BlobDescriptor>> {
1076        let rtxn = self.env.read_txn()?;
1077
1078        let blobs: Vec<BlobMetadata> = self
1079            .pubkey_blobs
1080            .get(&rtxn, pubkey)?
1081            .and_then(|b| serde_json::from_slice(b).ok())
1082            .unwrap_or_default();
1083
1084        Ok(blobs
1085            .into_iter()
1086            .map(|b| crate::server::blossom::BlobDescriptor {
1087                url: format!("/{}", b.sha256),
1088                sha256: b.sha256,
1089                size: b.size,
1090                mime_type: b.mime_type,
1091                uploaded: b.uploaded,
1092            })
1093            .collect())
1094    }
1095
1096    /// Get a single chunk/blob by hash (raw bytes)
1097    pub fn get_chunk(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
1098        self.router
1099            .get_sync(hash)
1100            .map_err(|e| anyhow::anyhow!("Failed to get chunk: {}", e))
1101    }
1102
1103    /// Get file content by hash (raw bytes)
1104    /// Returns raw bytes (caller handles decryption if needed)
1105    pub fn get_file(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
1106        let store = self.store_arc();
1107        let tree = HashTree::new(HashTreeConfig::new(store).public());
1108
1109        sync_block_on(async {
1110            tree.read_file(hash)
1111                .await
1112                .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
1113        })
1114    }
1115
1116    /// Get file content by Cid (hash + optional decryption key as raw bytes)
1117    /// Handles decryption automatically if key is present
1118    pub fn get_file_by_cid(&self, cid: &Cid) -> Result<Option<Vec<u8>>> {
1119        let store = self.store_arc();
1120        let tree = HashTree::new(HashTreeConfig::new(store).public());
1121
1122        sync_block_on(async {
1123            tree.get(cid, None)
1124                .await
1125                .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
1126        })
1127    }
1128
1129    fn ensure_cid_exists(&self, cid: &Cid) -> Result<()> {
1130        let exists = self
1131            .router
1132            .exists(&cid.hash)
1133            .map_err(|e| anyhow::anyhow!("Failed to check cid existence: {}", e))?;
1134        if !exists {
1135            anyhow::bail!("CID not found: {}", to_hex(&cid.hash));
1136        }
1137        Ok(())
1138    }
1139
1140    /// Stream file content identified by Cid into a writer without buffering full file in memory.
1141    pub fn write_file_by_cid_to_writer<W: Write>(&self, cid: &Cid, writer: &mut W) -> Result<u64> {
1142        self.ensure_cid_exists(cid)?;
1143
1144        let store = self.store_arc();
1145        let tree = HashTree::new(HashTreeConfig::new(store).public());
1146        let mut total_bytes = 0u64;
1147        let mut streamed_any_chunk = false;
1148
1149        sync_block_on(async {
1150            let mut stream = tree.get_stream(cid);
1151            while let Some(chunk) = stream.next().await {
1152                streamed_any_chunk = true;
1153                let chunk =
1154                    chunk.map_err(|e| anyhow::anyhow!("Failed to stream file chunk: {}", e))?;
1155                writer
1156                    .write_all(&chunk)
1157                    .map_err(|e| anyhow::anyhow!("Failed to write file chunk: {}", e))?;
1158                total_bytes += chunk.len() as u64;
1159            }
1160            Ok::<(), anyhow::Error>(())
1161        })?;
1162
1163        if !streamed_any_chunk {
1164            anyhow::bail!("CID not found: {}", to_hex(&cid.hash));
1165        }
1166
1167        writer
1168            .flush()
1169            .map_err(|e| anyhow::anyhow!("Failed to flush output: {}", e))?;
1170        Ok(total_bytes)
1171    }
1172
1173    /// Stream file content identified by Cid directly into a destination path.
1174    pub fn write_file_by_cid<P: AsRef<Path>>(&self, cid: &Cid, output_path: P) -> Result<u64> {
1175        self.ensure_cid_exists(cid)?;
1176
1177        let output_path = output_path.as_ref();
1178        if let Some(parent) = output_path.parent() {
1179            if !parent.as_os_str().is_empty() {
1180                std::fs::create_dir_all(parent).with_context(|| {
1181                    format!("Failed to create output directory {}", parent.display())
1182                })?;
1183            }
1184        }
1185
1186        let mut file = std::fs::File::create(output_path)
1187            .with_context(|| format!("Failed to create output file {}", output_path.display()))?;
1188        self.write_file_by_cid_to_writer(cid, &mut file)
1189    }
1190
1191    /// Stream a public (unencrypted) file by hash directly into a destination path.
1192    pub fn write_file<P: AsRef<Path>>(&self, hash: &[u8; 32], output_path: P) -> Result<u64> {
1193        self.write_file_by_cid(&Cid::public(*hash), output_path)
1194    }
1195
1196    /// Resolve a path within a tree (returns Cid with key if encrypted)
1197    pub fn resolve_path(&self, cid: &Cid, path: &str) -> Result<Option<Cid>> {
1198        let store = self.store_arc();
1199        let tree = HashTree::new(HashTreeConfig::new(store).public());
1200
1201        sync_block_on(async {
1202            tree.resolve_path(cid, path)
1203                .await
1204                .map_err(|e| anyhow::anyhow!("Failed to resolve path: {}", e))
1205        })
1206    }
1207
1208    /// Get chunk metadata for a file (chunk list, sizes, total size)
1209    pub fn get_file_chunk_metadata(&self, hash: &[u8; 32]) -> Result<Option<FileChunkMetadata>> {
1210        let store = self.store_arc();
1211        let tree = HashTree::new(HashTreeConfig::new(store.clone()).public());
1212
1213        sync_block_on(async {
1214            // First check if the hash exists in the store at all
1215            // (either as a blob or tree node)
1216            let exists = store
1217                .has(hash)
1218                .await
1219                .map_err(|e| anyhow::anyhow!("Failed to check existence: {}", e))?;
1220
1221            if !exists {
1222                return Ok(None);
1223            }
1224
1225            // Get total size
1226            let total_size = tree
1227                .get_size(hash)
1228                .await
1229                .map_err(|e| anyhow::anyhow!("Failed to get size: {}", e))?;
1230
1231            // Check if it's a tree (chunked) or blob
1232            let is_tree_node = tree
1233                .is_tree(hash)
1234                .await
1235                .map_err(|e| anyhow::anyhow!("Failed to check tree: {}", e))?;
1236
1237            if !is_tree_node {
1238                // Single blob, not chunked
1239                return Ok(Some(FileChunkMetadata {
1240                    total_size,
1241                    chunk_hashes: vec![],
1242                    chunk_sizes: vec![],
1243                    is_chunked: false,
1244                }));
1245            }
1246
1247            // Get tree node to extract chunk info
1248            let node = match tree
1249                .get_tree_node(hash)
1250                .await
1251                .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
1252            {
1253                Some(n) => n,
1254                None => return Ok(None),
1255            };
1256
1257            // Check if it's a directory (has named links)
1258            let is_directory = tree
1259                .is_directory(hash)
1260                .await
1261                .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1262
1263            if is_directory {
1264                return Ok(None); // Not a file
1265            }
1266
1267            // Extract chunk info from links
1268            let chunk_hashes: Vec<Hash> = node.links.iter().map(|l| l.hash).collect();
1269            let chunk_sizes: Vec<u64> = node.links.iter().map(|l| l.size).collect();
1270
1271            Ok(Some(FileChunkMetadata {
1272                total_size,
1273                chunk_hashes,
1274                chunk_sizes,
1275                is_chunked: !node.links.is_empty(),
1276            }))
1277        })
1278    }
1279
1280    /// Get byte range from file
1281    pub fn get_file_range(
1282        &self,
1283        hash: &[u8; 32],
1284        start: u64,
1285        end: Option<u64>,
1286    ) -> Result<Option<(Vec<u8>, u64)>> {
1287        let metadata = match self.get_file_chunk_metadata(hash)? {
1288            Some(m) => m,
1289            None => return Ok(None),
1290        };
1291
1292        if metadata.total_size == 0 {
1293            return Ok(Some((Vec::new(), 0)));
1294        }
1295
1296        if start >= metadata.total_size {
1297            return Ok(None);
1298        }
1299
1300        let end = end
1301            .unwrap_or(metadata.total_size - 1)
1302            .min(metadata.total_size - 1);
1303
1304        // For non-chunked files, load entire file
1305        if !metadata.is_chunked {
1306            let content = self.get_file(hash)?.unwrap_or_default();
1307            let range_content = if start < content.len() as u64 {
1308                content[start as usize..=(end as usize).min(content.len() - 1)].to_vec()
1309            } else {
1310                Vec::new()
1311            };
1312            return Ok(Some((range_content, metadata.total_size)));
1313        }
1314
1315        // For chunked files, load only needed chunks
1316        let mut result = Vec::new();
1317        let mut current_offset = 0u64;
1318
1319        for (i, chunk_hash) in metadata.chunk_hashes.iter().enumerate() {
1320            let chunk_size = metadata.chunk_sizes[i];
1321            let chunk_end = current_offset + chunk_size - 1;
1322
1323            // Check if this chunk overlaps with requested range
1324            if chunk_end >= start && current_offset <= end {
1325                let chunk_content = match self.get_chunk(chunk_hash)? {
1326                    Some(content) => content,
1327                    None => {
1328                        return Err(anyhow::anyhow!("Chunk {} not found", to_hex(chunk_hash)));
1329                    }
1330                };
1331
1332                let chunk_read_start = if current_offset >= start {
1333                    0
1334                } else {
1335                    (start - current_offset) as usize
1336                };
1337
1338                let chunk_read_end = if chunk_end <= end {
1339                    chunk_size as usize - 1
1340                } else {
1341                    (end - current_offset) as usize
1342                };
1343
1344                result.extend_from_slice(&chunk_content[chunk_read_start..=chunk_read_end]);
1345            }
1346
1347            current_offset += chunk_size;
1348
1349            if current_offset > end {
1350                break;
1351            }
1352        }
1353
1354        Ok(Some((result, metadata.total_size)))
1355    }
1356
1357    /// Stream file range as chunks using Arc for async/Send contexts
1358    pub fn stream_file_range_chunks_owned(
1359        self: Arc<Self>,
1360        hash: &[u8; 32],
1361        start: u64,
1362        end: u64,
1363    ) -> Result<Option<FileRangeChunksOwned>> {
1364        let metadata = match self.get_file_chunk_metadata(hash)? {
1365            Some(m) => m,
1366            None => return Ok(None),
1367        };
1368
1369        if metadata.total_size == 0 || start >= metadata.total_size {
1370            return Ok(None);
1371        }
1372
1373        let end = end.min(metadata.total_size - 1);
1374
1375        Ok(Some(FileRangeChunksOwned {
1376            store: self,
1377            metadata,
1378            start,
1379            end,
1380            current_chunk_idx: 0,
1381            current_offset: 0,
1382        }))
1383    }
1384
1385    /// Get directory structure by hash (raw bytes)
1386    pub fn get_directory_listing(&self, hash: &[u8; 32]) -> Result<Option<DirectoryListing>> {
1387        let store = self.store_arc();
1388        let tree = HashTree::new(HashTreeConfig::new(store).public());
1389
1390        sync_block_on(async {
1391            // Check if it's a directory
1392            let is_dir = tree
1393                .is_directory(hash)
1394                .await
1395                .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1396
1397            if !is_dir {
1398                return Ok(None);
1399            }
1400
1401            // Get directory entries (public Cid - no encryption key)
1402            let cid = hashtree_core::Cid::public(*hash);
1403            let tree_entries = tree
1404                .list_directory(&cid)
1405                .await
1406                .map_err(|e| anyhow::anyhow!("Failed to list directory: {}", e))?;
1407
1408            let entries: Vec<DirEntry> = tree_entries
1409                .into_iter()
1410                .map(|e| DirEntry {
1411                    name: e.name,
1412                    cid: to_hex(&e.hash),
1413                    is_directory: e.link_type.is_tree(),
1414                    size: e.size,
1415                })
1416                .collect();
1417
1418            Ok(Some(DirectoryListing {
1419                dir_name: String::new(),
1420                entries,
1421            }))
1422        })
1423    }
1424
1425    /// Get directory structure by CID, supporting encrypted directories.
1426    pub fn get_directory_listing_by_cid(&self, cid: &Cid) -> Result<Option<DirectoryListing>> {
1427        let store = self.store_arc();
1428        let tree = HashTree::new(HashTreeConfig::new(store).public());
1429        let cid = cid.clone();
1430
1431        sync_block_on(async {
1432            let is_dir = tree
1433                .is_dir(&cid)
1434                .await
1435                .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1436
1437            if !is_dir {
1438                return Ok(None);
1439            }
1440
1441            let tree_entries = tree
1442                .list_directory(&cid)
1443                .await
1444                .map_err(|e| anyhow::anyhow!("Failed to list directory: {}", e))?;
1445
1446            let entries: Vec<DirEntry> = tree_entries
1447                .into_iter()
1448                .map(|e| DirEntry {
1449                    name: e.name,
1450                    cid: Cid {
1451                        hash: e.hash,
1452                        key: e.key,
1453                    }
1454                    .to_string(),
1455                    is_directory: e.link_type.is_tree(),
1456                    size: e.size,
1457                })
1458                .collect();
1459
1460            Ok(Some(DirectoryListing {
1461                dir_name: String::new(),
1462                entries,
1463            }))
1464        })
1465    }
1466
1467    // === Cached roots ===
1468
1469    /// Persist a mutable published ref that should stay subscribed.
1470    pub fn add_pinned_ref(&self, key: &str) -> Result<()> {
1471        let mut wtxn = self.env.write_txn()?;
1472        self.pinned_refs.put(&mut wtxn, key, &())?;
1473        wtxn.commit()?;
1474        Ok(())
1475    }
1476
1477    /// Remove a mutable published ref from the live pinned set.
1478    pub fn remove_pinned_ref(&self, key: &str) -> Result<bool> {
1479        let mut wtxn = self.env.write_txn()?;
1480        let removed = self.pinned_refs.delete(&mut wtxn, key)?;
1481        wtxn.commit()?;
1482        Ok(removed)
1483    }
1484
1485    /// List mutable published refs that should stay subscribed.
1486    pub fn list_pinned_refs(&self) -> Result<Vec<String>> {
1487        let rtxn = self.env.read_txn()?;
1488        let mut refs = Vec::new();
1489
1490        for item in self.pinned_refs.iter(&rtxn)? {
1491            let (key, _) = item?;
1492            refs.push(key.to_string());
1493        }
1494
1495        refs.sort();
1496        Ok(refs)
1497    }
1498
1499    /// Persist an author whose published trees should stay mirrored.
1500    pub fn add_tracked_author(&self, npub: &str) -> Result<bool> {
1501        let mut wtxn = self.env.write_txn()?;
1502        let inserted = self.tracked_authors.get(&wtxn, npub)?.is_none();
1503        self.tracked_authors.put(&mut wtxn, npub, &())?;
1504        wtxn.commit()?;
1505        Ok(inserted)
1506    }
1507
1508    /// Remove an author from the continuous mirror set.
1509    pub fn remove_tracked_author(&self, npub: &str) -> Result<bool> {
1510        let mut wtxn = self.env.write_txn()?;
1511        let removed = self.tracked_authors.delete(&mut wtxn, npub)?;
1512        wtxn.commit()?;
1513        Ok(removed)
1514    }
1515
1516    /// List authors whose published trees should stay mirrored.
1517    pub fn list_tracked_authors(&self) -> Result<Vec<String>> {
1518        let rtxn = self.env.read_txn()?;
1519        let mut authors = Vec::new();
1520
1521        for item in self.tracked_authors.iter(&rtxn)? {
1522            let (npub, _) = item?;
1523            authors.push(npub.to_string());
1524        }
1525
1526        authors.sort();
1527        Ok(authors)
1528    }
1529
1530    /// Get cached root for a pubkey/tree_name pair
1531    pub fn get_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<Option<CachedRoot>> {
1532        let key = format!("{}/{}", pubkey_hex, tree_name);
1533        let rtxn = self.env.read_txn()?;
1534        if let Some(bytes) = self.cached_roots.get(&rtxn, &key)? {
1535            let root: CachedRoot = rmp_serde::from_slice(bytes)
1536                .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
1537            Ok(Some(root))
1538        } else {
1539            Ok(None)
1540        }
1541    }
1542
1543    /// Set cached root for a pubkey/tree_name pair
1544    pub fn set_cached_root(
1545        &self,
1546        pubkey_hex: &str,
1547        tree_name: &str,
1548        hash: &str,
1549        key: Option<&str>,
1550        visibility: &str,
1551        updated_at: u64,
1552    ) -> Result<()> {
1553        let db_key = format!("{}/{}", pubkey_hex, tree_name);
1554        let root = CachedRoot {
1555            hash: hash.to_string(),
1556            key: key.map(|k| k.to_string()),
1557            updated_at,
1558            visibility: visibility.to_string(),
1559        };
1560        let bytes = rmp_serde::to_vec(&root)
1561            .map_err(|e| anyhow::anyhow!("Failed to serialize CachedRoot: {}", e))?;
1562        let mut wtxn = self.env.write_txn()?;
1563        self.cached_roots.put(&mut wtxn, &db_key, &bytes)?;
1564        wtxn.commit()?;
1565        Ok(())
1566    }
1567
1568    /// List all cached roots for a pubkey
1569    pub fn list_cached_roots(&self, pubkey_hex: &str) -> Result<Vec<(String, CachedRoot)>> {
1570        let prefix = format!("{}/", pubkey_hex);
1571        let rtxn = self.env.read_txn()?;
1572        let mut results = Vec::new();
1573
1574        for item in self.cached_roots.iter(&rtxn)? {
1575            let (key, bytes) = item?;
1576            if key.starts_with(&prefix) {
1577                let tree_name = key.strip_prefix(&prefix).unwrap_or(key);
1578                let root: CachedRoot = rmp_serde::from_slice(bytes)
1579                    .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
1580                results.push((tree_name.to_string(), root));
1581            }
1582        }
1583
1584        Ok(results)
1585    }
1586
1587    /// Delete a cached root
1588    pub fn delete_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<bool> {
1589        let key = format!("{}/{}", pubkey_hex, tree_name);
1590        let mut wtxn = self.env.write_txn()?;
1591        let deleted = self.cached_roots.delete(&mut wtxn, &key)?;
1592        wtxn.commit()?;
1593        Ok(deleted)
1594    }
1595}
1596
1597fn is_map_full_store_error(err: &StoreError) -> bool {
1598    let message = err.to_string();
1599    message.contains("MDB_MAP_FULL") || message.contains("MapFull")
1600}
1601
1602#[derive(Debug, Clone)]
1603pub struct FileChunkMetadata {
1604    pub total_size: u64,
1605    pub chunk_hashes: Vec<Hash>,
1606    pub chunk_sizes: Vec<u64>,
1607    pub is_chunked: bool,
1608}
1609
1610/// Owned iterator for async streaming
1611pub struct FileRangeChunksOwned {
1612    store: Arc<HashtreeStore>,
1613    metadata: FileChunkMetadata,
1614    start: u64,
1615    end: u64,
1616    current_chunk_idx: usize,
1617    current_offset: u64,
1618}
1619
1620impl Iterator for FileRangeChunksOwned {
1621    type Item = Result<Vec<u8>>;
1622
1623    fn next(&mut self) -> Option<Self::Item> {
1624        if !self.metadata.is_chunked || self.current_chunk_idx >= self.metadata.chunk_hashes.len() {
1625            return None;
1626        }
1627
1628        if self.current_offset > self.end {
1629            return None;
1630        }
1631
1632        let chunk_hash = &self.metadata.chunk_hashes[self.current_chunk_idx];
1633        let chunk_size = self.metadata.chunk_sizes[self.current_chunk_idx];
1634        let chunk_end = self.current_offset + chunk_size - 1;
1635
1636        self.current_chunk_idx += 1;
1637
1638        if chunk_end < self.start || self.current_offset > self.end {
1639            self.current_offset += chunk_size;
1640            return self.next();
1641        }
1642
1643        let chunk_content = match self.store.get_chunk(chunk_hash) {
1644            Ok(Some(content)) => content,
1645            Ok(None) => {
1646                return Some(Err(anyhow::anyhow!(
1647                    "Chunk {} not found",
1648                    to_hex(chunk_hash)
1649                )));
1650            }
1651            Err(e) => {
1652                return Some(Err(e));
1653            }
1654        };
1655
1656        let chunk_read_start = if self.current_offset >= self.start {
1657            0
1658        } else {
1659            (self.start - self.current_offset) as usize
1660        };
1661
1662        let chunk_read_end = if chunk_end <= self.end {
1663            chunk_size as usize - 1
1664        } else {
1665            (self.end - self.current_offset) as usize
1666        };
1667
1668        let result = chunk_content[chunk_read_start..=chunk_read_end].to_vec();
1669        self.current_offset += chunk_size;
1670
1671        Some(Ok(result))
1672    }
1673}
1674
1675#[derive(Debug)]
1676pub struct GcStats {
1677    pub deleted_dags: usize,
1678    pub freed_bytes: u64,
1679}
1680
1681#[derive(Debug, Clone)]
1682pub struct DirEntry {
1683    pub name: String,
1684    pub cid: String,
1685    pub is_directory: bool,
1686    pub size: u64,
1687}
1688
1689#[derive(Debug, Clone)]
1690pub struct DirectoryListing {
1691    pub dir_name: String,
1692    pub entries: Vec<DirEntry>,
1693}
1694
1695/// Blob metadata for Blossom protocol
1696#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
1697pub struct BlobMetadata {
1698    pub sha256: String,
1699    pub size: u64,
1700    pub mime_type: String,
1701    pub uploaded: u64,
1702}
1703
1704// Implement ContentStore trait for WebRTC data exchange
1705impl crate::webrtc::ContentStore for HashtreeStore {
1706    fn get(&self, hash_hex: &str) -> Result<Option<Vec<u8>>> {
1707        let hash = from_hex(hash_hex).map_err(|e| anyhow::anyhow!("Invalid hash: {}", e))?;
1708        self.get_chunk(&hash)
1709    }
1710}
1711
1712#[cfg(test)]
1713mod tests {
1714    use super::*;
1715    #[cfg(feature = "lmdb")]
1716    use tempfile::TempDir;
1717
1718    #[cfg(feature = "lmdb")]
1719    #[test]
1720    fn hashtree_store_expands_blob_lmdb_map_size_to_storage_budget() -> Result<()> {
1721        let temp = TempDir::new()?;
1722        let requested = LMDB_BLOB_MIN_MAP_SIZE_BYTES + 64 * 1024 * 1024;
1723        let store = HashtreeStore::with_options_and_backend(
1724            temp.path(),
1725            None,
1726            requested,
1727            true,
1728            &StorageBackend::Lmdb,
1729        )?;
1730
1731        let map_size = match store.router.local.as_ref() {
1732            LocalStore::Lmdb(local) => local.map_size_bytes() as u64,
1733            LocalStore::Fs(_) => panic!("expected LMDB local store"),
1734        };
1735
1736        assert!(
1737            map_size >= requested,
1738            "expected blob LMDB map to grow to at least {requested} bytes, got {map_size}"
1739        );
1740
1741        drop(store);
1742        Ok(())
1743    }
1744
1745    #[cfg(feature = "lmdb")]
1746    #[test]
1747    fn local_store_can_override_lmdb_map_size() -> Result<()> {
1748        let temp = TempDir::new()?;
1749        let requested = 512 * 1024 * 1024u64;
1750        let store = LocalStore::new_with_lmdb_map_size(
1751            temp.path().join("lmdb-blobs"),
1752            &StorageBackend::Lmdb,
1753            Some(requested),
1754        )?;
1755
1756        let map_size = match store {
1757            LocalStore::Lmdb(local) => local.map_size_bytes() as u64,
1758            LocalStore::Fs(_) => panic!("expected LMDB local store"),
1759        };
1760
1761        assert!(
1762            map_size >= requested,
1763            "expected LMDB map to grow to at least {requested} bytes, got {map_size}"
1764        );
1765
1766        Ok(())
1767    }
1768
1769    #[cfg(feature = "lmdb")]
1770    #[test]
1771    fn lmdb_local_store_removes_stale_fs_blob_shard_dirs() -> Result<()> {
1772        let temp = TempDir::new()?;
1773        let path = temp.path().join("lmdb-blobs");
1774        std::fs::create_dir_all(path.join("aa"))?;
1775        std::fs::create_dir_all(path.join("b2"))?;
1776        std::fs::create_dir_all(path.join("keep-me"))?;
1777        std::fs::write(path.join("aa").join("blob.bin"), b"old fs shard")?;
1778        std::fs::write(path.join("b2").join("blob.bin"), b"old fs shard")?;
1779        std::fs::write(path.join("keep-me").join("note.txt"), b"keep")?;
1780
1781        let _store = LocalStore::new_with_lmdb_map_size(
1782            &path,
1783            &StorageBackend::Lmdb,
1784            Some(128 * 1024 * 1024),
1785        )?;
1786
1787        assert!(!path.join("aa").exists());
1788        assert!(!path.join("b2").exists());
1789        assert!(path.join("keep-me").exists());
1790        assert!(path.join("data.mdb").exists());
1791        assert!(path.join("lock.mdb").exists());
1792
1793        Ok(())
1794    }
1795
1796    #[cfg(feature = "lmdb")]
1797    #[test]
1798    fn replacing_tree_ref_unpins_and_unindexes_superseded_root() -> Result<()> {
1799        let temp = TempDir::new()?;
1800        let store = HashtreeStore::with_options_and_backend(
1801            temp.path(),
1802            None,
1803            LMDB_BLOB_MIN_MAP_SIZE_BYTES,
1804            true,
1805            &StorageBackend::Lmdb,
1806        )?;
1807
1808        let old_bytes = b"old published root";
1809        let new_bytes = b"new published root";
1810        let old_root = sha256(old_bytes);
1811        let new_root = sha256(new_bytes);
1812
1813        store.put_blob(old_bytes)?;
1814        store.pin(&old_root)?;
1815        store.index_tree(
1816            &old_root,
1817            "owner",
1818            Some("playlist"),
1819            PRIORITY_OWN,
1820            Some("npub1owner/playlist"),
1821        )?;
1822
1823        assert!(store.is_pinned(&old_root)?);
1824        assert!(store.get_tree_meta(&old_root)?.is_some());
1825
1826        store.put_blob(new_bytes)?;
1827        store.pin(&new_root)?;
1828        store.index_tree(
1829            &new_root,
1830            "owner",
1831            Some("playlist"),
1832            PRIORITY_OWN,
1833            Some("npub1owner/playlist"),
1834        )?;
1835
1836        assert!(
1837            !store.is_pinned(&old_root)?,
1838            "superseded root should be unpinned when ref is replaced"
1839        );
1840        assert!(
1841            store.get_tree_meta(&old_root)?.is_none(),
1842            "superseded root metadata should be removed when ref is replaced"
1843        );
1844        assert!(store.is_pinned(&new_root)?);
1845        assert!(store.get_tree_meta(&new_root)?.is_some());
1846
1847        Ok(())
1848    }
1849
1850    #[test]
1851    fn tracked_authors_round_trip_sorted_and_deduplicated() -> Result<()> {
1852        let temp = TempDir::new()?;
1853        let store = HashtreeStore::with_options(temp.path(), None, 1024 * 1024)?;
1854
1855        store
1856            .add_tracked_author("npub1zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzs9d3kk")?;
1857        store
1858            .add_tracked_author("npub1aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaqf5slm")?;
1859        store
1860            .add_tracked_author("npub1zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzs9d3kk")?;
1861
1862        assert_eq!(
1863            store.list_tracked_authors()?,
1864            vec![
1865                "npub1aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaqf5slm".to_string(),
1866                "npub1zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzs9d3kk".to_string(),
1867            ]
1868        );
1869        assert!(store.remove_tracked_author(
1870            "npub1aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaqf5slm"
1871        )?);
1872        assert!(!store.remove_tracked_author(
1873            "npub1bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbpqqqqq"
1874        )?);
1875        assert_eq!(
1876            store.list_tracked_authors()?,
1877            vec!["npub1zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzs9d3kk".to_string()]
1878        );
1879
1880        Ok(())
1881    }
1882
1883    #[cfg(feature = "s3")]
1884    #[test]
1885    fn async_store_s3_fallback_does_not_reenter_futures_executor() -> Result<()> {
1886        let temp = tempfile::TempDir::new()?;
1887        let local = Arc::new(LocalStore::new(
1888            temp.path().join("blobs"),
1889            &StorageBackend::Fs,
1890        )?);
1891
1892        let outcome = std::panic::catch_unwind(|| {
1893            sync_block_on(async {
1894                let aws_config = aws_config::from_env()
1895                    .region(aws_sdk_s3::config::Region::new("auto"))
1896                    .load()
1897                    .await;
1898                let s3_client = aws_sdk_s3::Client::from_conf(
1899                    aws_sdk_s3::config::Builder::from(&aws_config)
1900                        .endpoint_url("http://127.0.0.1:9")
1901                        .force_path_style(true)
1902                        .build(),
1903                );
1904
1905                let router = StorageRouter {
1906                    local,
1907                    s3_client: Some(s3_client),
1908                    s3_bucket: Some("test-bucket".to_string()),
1909                    s3_prefix: String::new(),
1910                    sync_tx: None,
1911                };
1912                let hash = [0u8; 32];
1913
1914                let _ = Store::has(&router, &hash).await;
1915                let _ = Store::get(&router, &hash).await;
1916            });
1917        });
1918
1919        assert!(
1920            outcome.is_ok(),
1921            "S3-backed async store methods should not panic inside futures::block_on"
1922        );
1923
1924        Ok(())
1925    }
1926}