Skip to main content

hashtree_cli/
storage.rs

1use anyhow::{Context, Result};
2use async_trait::async_trait;
3use futures::executor::block_on as sync_block_on;
4use futures::StreamExt;
5use hashtree_config::StorageBackend;
6use hashtree_core::store::{Store, StoreError};
7use hashtree_core::{
8    from_hex, sha256, to_hex, types::Hash, Cid, HashTree, HashTreeConfig, TreeNode,
9};
10use hashtree_fs::FsBlobStore;
11#[cfg(feature = "lmdb")]
12use hashtree_lmdb::LmdbBlobStore;
13use heed::types::*;
14use heed::{Database, EnvOpenOptions};
15use serde::{Deserialize, Serialize};
16#[cfg(feature = "s3")]
17use std::future::Future;
18use std::io::Write;
19use std::path::{Path, PathBuf};
20use std::sync::Arc;
21use std::time::{SystemTime, UNIX_EPOCH};
22
23mod upload;
24
25mod maintenance;
26mod retention;
27
28pub use maintenance::{compact_lmdb_environments_under, CompactResult, VerifyResult};
29pub use retention::{PinnedItem, StorageByPriority, StorageStats, TreeMeta};
30
31/// Priority levels for tree eviction
32pub const PRIORITY_OTHER: u8 = 64;
33pub const PRIORITY_FOLLOWED: u8 = 128;
34pub const PRIORITY_OWN: u8 = 255;
35const LMDB_MAX_READERS: u32 = 1024;
36#[cfg(feature = "lmdb")]
37const LMDB_BLOB_MIN_MAP_SIZE_BYTES: u64 = 10 * 1024 * 1024 * 1024;
38
39/// Cached root info from Nostr events.
40#[derive(Debug, Clone, Serialize, Deserialize)]
41pub struct CachedRoot {
42    /// Root hash (hex)
43    pub hash: String,
44    /// Optional decryption key (hex)
45    pub key: Option<String>,
46    /// Unix timestamp when this was cached (from event created_at)
47    pub updated_at: u64,
48    /// Visibility: "public", "link-visible", or "private"
49    pub visibility: String,
50}
51
52/// Storage statistics
53#[derive(Debug, Clone)]
54pub struct LocalStoreStats {
55    pub count: usize,
56    pub total_bytes: u64,
57}
58
59/// Local blob store - wraps either FsBlobStore or LmdbBlobStore
60pub enum LocalStore {
61    Fs(FsBlobStore),
62    #[cfg(feature = "lmdb")]
63    Lmdb(LmdbBlobStore),
64}
65
66#[cfg(feature = "lmdb")]
67fn is_fs_blob_shard_dir(path: &Path) -> bool {
68    path.file_name()
69        .and_then(|name| name.to_str())
70        .map(|name| name.len() == 2 && name.as_bytes().iter().all(u8::is_ascii_hexdigit))
71        .unwrap_or(false)
72}
73
74#[cfg(feature = "lmdb")]
75fn remove_stale_fs_blob_shards(path: &Path) -> Result<(), StoreError> {
76    let entries = std::fs::read_dir(path).map_err(StoreError::Io)?;
77    for entry in entries {
78        let entry = entry.map_err(StoreError::Io)?;
79        let entry_path = entry.path();
80        if entry_path.is_dir() && is_fs_blob_shard_dir(&entry_path) {
81            std::fs::remove_dir_all(&entry_path).map_err(StoreError::Io)?;
82            tracing::info!(
83                "Removed stale filesystem blob shard directory after LMDB cutover: {}",
84                entry_path.display()
85            );
86        }
87    }
88    Ok(())
89}
90
91impl LocalStore {
92    /// Create a new unbounded local store.
93    ///
94    /// Higher-level stores that need quota enforcement should manage eviction
95    /// above this layer so tree metadata, pins, and archival policies stay
96    /// coherent.
97    pub fn new<P: AsRef<Path>>(path: P, backend: &StorageBackend) -> Result<Self, StoreError> {
98        Self::new_unbounded(path, backend)
99    }
100
101    /// Create a new local store with an explicit LMDB logical size cap when using the LMDB backend.
102    ///
103    /// The requested size is used for both the LMDB map and the store's built-in
104    /// byte quota, so standalone blob envs can evict instead of growing forever.
105    pub fn new_with_lmdb_map_size<P: AsRef<Path>>(
106        path: P,
107        backend: &StorageBackend,
108        _map_size_bytes: Option<u64>,
109    ) -> Result<Self, StoreError> {
110        match backend {
111            StorageBackend::Fs => Ok(LocalStore::Fs(FsBlobStore::new(path)?)),
112            #[cfg(feature = "lmdb")]
113            StorageBackend::Lmdb => match _map_size_bytes {
114                Some(map_size_bytes) => {
115                    std::fs::create_dir_all(path.as_ref()).map_err(StoreError::Io)?;
116                    remove_stale_fs_blob_shards(path.as_ref())?;
117                    Ok(LocalStore::Lmdb(LmdbBlobStore::with_max_bytes(
118                        path,
119                        map_size_bytes,
120                    )?))
121                }
122                None => {
123                    std::fs::create_dir_all(path.as_ref()).map_err(StoreError::Io)?;
124                    remove_stale_fs_blob_shards(path.as_ref())?;
125                    Ok(LocalStore::Lmdb(LmdbBlobStore::new(path)?))
126                }
127            },
128            #[cfg(not(feature = "lmdb"))]
129            StorageBackend::Lmdb => {
130                tracing::warn!(
131                    "LMDB backend requested but lmdb feature not enabled, using filesystem storage"
132                );
133                Ok(LocalStore::Fs(FsBlobStore::new(path)?))
134            }
135        }
136    }
137
138    /// Create a new unbounded local store for a specific backend.
139    pub fn new_unbounded<P: AsRef<Path>>(
140        path: P,
141        backend: &StorageBackend,
142    ) -> Result<Self, StoreError> {
143        Self::new_with_lmdb_map_size(path, backend, None)
144    }
145
146    pub fn backend(&self) -> StorageBackend {
147        match self {
148            LocalStore::Fs(_) => StorageBackend::Fs,
149            #[cfg(feature = "lmdb")]
150            LocalStore::Lmdb(_) => StorageBackend::Lmdb,
151        }
152    }
153
154    /// Sync put operation
155    pub fn put_sync(&self, hash: Hash, data: &[u8]) -> Result<bool, StoreError> {
156        match self {
157            LocalStore::Fs(store) => store.put_sync(hash, data),
158            #[cfg(feature = "lmdb")]
159            LocalStore::Lmdb(store) => store.put_sync(hash, data),
160        }
161    }
162
163    /// Sync batch put operation.
164    pub fn put_many_sync(&self, items: &[(Hash, Vec<u8>)]) -> Result<usize, StoreError> {
165        match self {
166            LocalStore::Fs(store) => {
167                let mut inserted = 0usize;
168                for (hash, data) in items {
169                    if store.put_sync(*hash, data.as_slice())? {
170                        inserted += 1;
171                    }
172                }
173                Ok(inserted)
174            }
175            #[cfg(feature = "lmdb")]
176            LocalStore::Lmdb(store) => store.put_many_sync(items),
177        }
178    }
179
180    /// Sync get operation
181    pub fn get_sync(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
182        match self {
183            LocalStore::Fs(store) => store.get_sync(hash),
184            #[cfg(feature = "lmdb")]
185            LocalStore::Lmdb(store) => store.get_sync(hash),
186        }
187    }
188
189    /// Check if hash exists
190    pub fn exists(&self, hash: &Hash) -> Result<bool, StoreError> {
191        match self {
192            LocalStore::Fs(store) => Ok(store.exists(hash)),
193            #[cfg(feature = "lmdb")]
194            LocalStore::Lmdb(store) => store.exists(hash),
195        }
196    }
197
198    /// Sync delete operation
199    pub fn delete_sync(&self, hash: &Hash) -> Result<bool, StoreError> {
200        match self {
201            LocalStore::Fs(store) => store.delete_sync(hash),
202            #[cfg(feature = "lmdb")]
203            LocalStore::Lmdb(store) => store.delete_sync(hash),
204        }
205    }
206
207    /// Get storage statistics
208    pub fn stats(&self) -> Result<LocalStoreStats, StoreError> {
209        match self {
210            LocalStore::Fs(store) => {
211                let stats = store.stats()?;
212                Ok(LocalStoreStats {
213                    count: stats.count,
214                    total_bytes: stats.total_bytes,
215                })
216            }
217            #[cfg(feature = "lmdb")]
218            LocalStore::Lmdb(store) => {
219                let stats = store.stats()?;
220                Ok(LocalStoreStats {
221                    count: stats.count,
222                    total_bytes: stats.total_bytes,
223                })
224            }
225        }
226    }
227
228    /// List all hashes in the store
229    pub fn list(&self) -> Result<Vec<Hash>, StoreError> {
230        match self {
231            LocalStore::Fs(store) => store.list(),
232            #[cfg(feature = "lmdb")]
233            LocalStore::Lmdb(store) => store.list(),
234        }
235    }
236}
237
238#[async_trait]
239impl Store for LocalStore {
240    async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
241        self.put_sync(hash, &data)
242    }
243
244    async fn put_many(&self, items: Vec<(Hash, Vec<u8>)>) -> Result<usize, StoreError> {
245        self.put_many_sync(&items)
246    }
247
248    async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
249        self.get_sync(hash)
250    }
251
252    async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
253        self.exists(hash)
254    }
255
256    async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
257        self.delete_sync(hash)
258    }
259}
260
261#[cfg(feature = "s3")]
262use tokio::sync::mpsc;
263
264use crate::config::S3Config;
265
266/// Message for background S3 sync
267#[cfg(feature = "s3")]
268enum S3SyncMessage {
269    Upload { hash: Hash, data: Vec<u8> },
270    Delete { hash: Hash },
271}
272
273/// Storage router - local store primary with optional S3 backup
274///
275/// Write path: local first (fast), then queue S3 upload (non-blocking)
276/// Read path: local first, fall back to S3 if miss
277pub struct StorageRouter {
278    /// Primary local store (always used)
279    local: Arc<LocalStore>,
280    /// Optional S3 client for backup
281    #[cfg(feature = "s3")]
282    s3_client: Option<aws_sdk_s3::Client>,
283    #[cfg(feature = "s3")]
284    s3_bucket: Option<String>,
285    #[cfg(feature = "s3")]
286    s3_prefix: String,
287    /// Channel to send uploads to background task
288    #[cfg(feature = "s3")]
289    sync_tx: Option<mpsc::UnboundedSender<S3SyncMessage>>,
290}
291
292impl StorageRouter {
293    #[cfg(feature = "s3")]
294    fn run_s3_future_sync<F, T>(future: F) -> Result<T, StoreError>
295    where
296        F: Future<Output = T> + Send + 'static,
297        T: Send + 'static,
298    {
299        if tokio::runtime::Handle::try_current().is_ok() {
300            return std::thread::Builder::new()
301                .name("storage-s3-sync".to_string())
302                .spawn(move || {
303                    let runtime = tokio::runtime::Builder::new_current_thread()
304                        .enable_all()
305                        .build()
306                        .map_err(|err| {
307                            StoreError::Other(format!("build storage s3 sync runtime: {err}"))
308                        })?;
309                    Ok(runtime.block_on(future))
310                })
311                .map_err(|err| StoreError::Other(format!("spawn S3 sync helper thread: {err}")))?
312                .join()
313                .map_err(|_| StoreError::Other("S3 sync helper thread panicked".to_string()))?;
314        }
315
316        let runtime = tokio::runtime::Builder::new_current_thread()
317            .enable_all()
318            .build()
319            .map_err(|err| StoreError::Other(format!("build storage s3 sync runtime: {err}")))?;
320        Ok(runtime.block_on(future))
321    }
322
323    /// Create router with local storage only
324    pub fn new(local: Arc<LocalStore>) -> Self {
325        Self {
326            local,
327            #[cfg(feature = "s3")]
328            s3_client: None,
329            #[cfg(feature = "s3")]
330            s3_bucket: None,
331            #[cfg(feature = "s3")]
332            s3_prefix: String::new(),
333            #[cfg(feature = "s3")]
334            sync_tx: None,
335        }
336    }
337
338    /// Create router with local storage + S3 backup
339    #[cfg(feature = "s3")]
340    pub async fn with_s3(local: Arc<LocalStore>, config: &S3Config) -> Result<Self, anyhow::Error> {
341        use aws_sdk_s3::Client as S3Client;
342
343        // Build AWS config
344        let mut aws_config_loader = aws_config::from_env();
345        aws_config_loader =
346            aws_config_loader.region(aws_sdk_s3::config::Region::new(config.region.clone()));
347        let aws_config = aws_config_loader.load().await;
348
349        // Build S3 client with custom endpoint
350        let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&aws_config);
351        s3_config_builder = s3_config_builder
352            .endpoint_url(&config.endpoint)
353            .force_path_style(true);
354
355        let s3_client = S3Client::from_conf(s3_config_builder.build());
356        let bucket = config.bucket.clone();
357        let prefix = config.prefix.clone().unwrap_or_default();
358
359        // Create background sync channel
360        let (sync_tx, mut sync_rx) = mpsc::unbounded_channel::<S3SyncMessage>();
361
362        // Spawn background sync task with bounded concurrent uploads
363        let sync_client = s3_client.clone();
364        let sync_bucket = bucket.clone();
365        let sync_prefix = prefix.clone();
366
367        tokio::spawn(async move {
368            use aws_sdk_s3::primitives::ByteStream;
369
370            tracing::info!("S3 background sync task started");
371
372            // Keep S3 writes parallel, but avoid dispatch failures during mirror backfill bursts.
373            let semaphore = std::sync::Arc::new(tokio::sync::Semaphore::new(8));
374            let client = std::sync::Arc::new(sync_client);
375            let bucket = std::sync::Arc::new(sync_bucket);
376            let prefix = std::sync::Arc::new(sync_prefix);
377
378            while let Some(msg) = sync_rx.recv().await {
379                let client = client.clone();
380                let bucket = bucket.clone();
381                let prefix = prefix.clone();
382                let semaphore = semaphore.clone();
383
384                // Spawn each upload with semaphore-bounded concurrency
385                tokio::spawn(async move {
386                    // Acquire permit before uploading
387                    let _permit = semaphore.acquire().await;
388
389                    match msg {
390                        S3SyncMessage::Upload { hash, data } => {
391                            let key = format!("{}{}.bin", prefix, to_hex(&hash));
392                            tracing::debug!("S3 uploading {} ({} bytes)", &key, data.len());
393
394                            let mut attempt = 1u8;
395                            loop {
396                                match client
397                                    .put_object()
398                                    .bucket(bucket.as_str())
399                                    .key(&key)
400                                    .body(ByteStream::from(data.clone()))
401                                    .send()
402                                    .await
403                                {
404                                    Ok(_) => {
405                                        tracing::debug!("S3 upload succeeded: {}", &key);
406                                        break;
407                                    }
408                                    Err(e) if attempt < 3 => {
409                                        tracing::warn!(
410                                            "S3 upload retrying {}: attempt={} error={}",
411                                            &key,
412                                            attempt,
413                                            e
414                                        );
415                                        tokio::time::sleep(std::time::Duration::from_millis(
416                                            250 * u64::from(attempt),
417                                        ))
418                                        .await;
419                                        attempt += 1;
420                                    }
421                                    Err(e) => {
422                                        tracing::error!(
423                                            "S3 upload failed {} after {} attempts: {}",
424                                            &key,
425                                            attempt,
426                                            e
427                                        );
428                                        break;
429                                    }
430                                }
431                            }
432                        }
433                        S3SyncMessage::Delete { hash } => {
434                            let key = format!("{}{}.bin", prefix, to_hex(&hash));
435                            tracing::debug!("S3 deleting {}", &key);
436
437                            let mut attempt = 1u8;
438                            loop {
439                                match client
440                                    .delete_object()
441                                    .bucket(bucket.as_str())
442                                    .key(&key)
443                                    .send()
444                                    .await
445                                {
446                                    Ok(_) => break,
447                                    Err(e) if attempt < 3 => {
448                                        tracing::warn!(
449                                            "S3 delete retrying {}: attempt={} error={}",
450                                            &key,
451                                            attempt,
452                                            e
453                                        );
454                                        tokio::time::sleep(std::time::Duration::from_millis(
455                                            250 * u64::from(attempt),
456                                        ))
457                                        .await;
458                                        attempt += 1;
459                                    }
460                                    Err(e) => {
461                                        tracing::error!(
462                                            "S3 delete failed {} after {} attempts: {}",
463                                            &key,
464                                            attempt,
465                                            e
466                                        );
467                                        break;
468                                    }
469                                }
470                            }
471                        }
472                    }
473                });
474            }
475        });
476
477        tracing::info!(
478            "S3 storage initialized: bucket={}, prefix={}",
479            bucket,
480            prefix
481        );
482
483        Ok(Self {
484            local,
485            s3_client: Some(s3_client),
486            s3_bucket: Some(bucket),
487            s3_prefix: prefix,
488            sync_tx: Some(sync_tx),
489        })
490    }
491
492    /// Store data - writes to LMDB, queues S3 upload in background
493    pub fn put_sync(&self, hash: Hash, data: &[u8]) -> Result<bool, StoreError> {
494        // Always write to local first
495        let is_new = self.local.put_sync(hash, data)?;
496
497        // Queue S3 upload only for newly inserted blobs.
498        // Existing local blobs were already persisted or are handled by explicit repair/push flows.
499        #[cfg(feature = "s3")]
500        if is_new {
501            if let Some(ref tx) = self.sync_tx {
502                tracing::debug!(
503                    "Queueing S3 upload for {} ({} bytes)",
504                    crate::storage::to_hex(&hash)[..16].to_string(),
505                    data.len(),
506                );
507                if let Err(e) = tx.send(S3SyncMessage::Upload {
508                    hash,
509                    data: data.to_vec(),
510                }) {
511                    tracing::error!("Failed to queue S3 upload: {}", e);
512                }
513            }
514        }
515
516        Ok(is_new)
517    }
518
519    /// Store multiple blobs with a single local batch write when supported.
520    pub fn put_many_sync(&self, items: &[(Hash, Vec<u8>)]) -> Result<usize, StoreError> {
521        #[cfg(feature = "s3")]
522        let pending_uploads = if self.sync_tx.is_some() {
523            let mut pending = Vec::new();
524            for (hash, data) in items {
525                if !self.local.exists(hash)? {
526                    pending.push((*hash, data.clone()));
527                }
528            }
529            pending
530        } else {
531            Vec::new()
532        };
533
534        let inserted = self.local.put_many_sync(items)?;
535
536        #[cfg(feature = "s3")]
537        if let Some(ref tx) = self.sync_tx {
538            for (hash, data) in pending_uploads {
539                if let Err(e) = tx.send(S3SyncMessage::Upload { hash, data }) {
540                    tracing::error!("Failed to queue S3 upload: {}", e);
541                }
542            }
543        }
544
545        Ok(inserted)
546    }
547
548    /// Get data - tries LMDB first, falls back to S3
549    pub fn get_sync(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
550        // Try local first
551        if let Some(data) = self.local.get_sync(hash)? {
552            return Ok(Some(data));
553        }
554
555        // Fall back to S3 if configured
556        #[cfg(feature = "s3")]
557        if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
558            let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
559            let client = client.clone();
560            let bucket = bucket.clone();
561
562            match Self::run_s3_future_sync(async move {
563                client.get_object().bucket(bucket).key(key).send().await
564            }) {
565                Ok(Ok(output)) => {
566                    match Self::run_s3_future_sync(async move { output.body.collect().await }) {
567                        Ok(Ok(body)) => {
568                            let data = body.into_bytes().to_vec();
569                            // Cache locally for future reads
570                            let _ = self.local.put_sync(*hash, &data);
571                            return Ok(Some(data));
572                        }
573                        Ok(Err(err)) => {
574                            tracing::warn!("S3 body collect failed: {}", err);
575                        }
576                        Err(err) => {
577                            tracing::warn!("S3 body collect runtime failed: {}", err);
578                        }
579                    }
580                }
581                Ok(Err(err)) => {
582                    let service_err = err.into_service_error();
583                    if !service_err.is_no_such_key() {
584                        tracing::warn!("S3 get failed: {}", service_err);
585                    }
586                }
587                Err(err) => {
588                    tracing::warn!("S3 get runtime failed: {}", err);
589                }
590            }
591        }
592
593        Ok(None)
594    }
595
596    /// Check if hash exists
597    pub fn exists(&self, hash: &Hash) -> Result<bool, StoreError> {
598        // Check local first
599        if self.local.exists(hash)? {
600            return Ok(true);
601        }
602
603        // Check S3 if configured
604        #[cfg(feature = "s3")]
605        if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
606            let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
607            let client = client.clone();
608            let bucket = bucket.clone();
609
610            match Self::run_s3_future_sync(async move {
611                client.head_object().bucket(bucket).key(&key).send().await
612            }) {
613                Ok(Ok(_)) => return Ok(true),
614                Ok(Err(err)) => {
615                    let service_err = err.into_service_error();
616                    if !service_err.is_not_found() {
617                        tracing::warn!("S3 head failed: {}", service_err);
618                    }
619                }
620                Err(err) => {
621                    tracing::warn!("S3 head runtime failed: {}", err);
622                }
623            }
624        }
625
626        Ok(false)
627    }
628
629    /// Delete data from both local and S3 stores
630    pub fn delete_sync(&self, hash: &Hash) -> Result<bool, StoreError> {
631        let deleted = self.local.delete_sync(hash)?;
632
633        // Queue S3 delete if configured
634        #[cfg(feature = "s3")]
635        if let Some(ref tx) = self.sync_tx {
636            let _ = tx.send(S3SyncMessage::Delete { hash: *hash });
637        }
638
639        Ok(deleted)
640    }
641
642    /// Delete data from local store only (don't propagate to S3)
643    /// Used for eviction where we want to keep S3 as archive
644    pub fn delete_local_only(&self, hash: &Hash) -> Result<bool, StoreError> {
645        self.local.delete_sync(hash)
646    }
647
648    /// Get stats from local store
649    pub fn stats(&self) -> Result<LocalStoreStats, StoreError> {
650        self.local.stats()
651    }
652
653    /// List all hashes from local store
654    pub fn list(&self) -> Result<Vec<Hash>, StoreError> {
655        self.local.list()
656    }
657
658    /// Get the underlying local store for HashTree operations
659    pub fn local_store(&self) -> Arc<LocalStore> {
660        Arc::clone(&self.local)
661    }
662}
663
664// Implement async Store trait for StorageRouter so it can be used directly with HashTree
665// This ensures all writes go through S3 sync
666#[async_trait]
667impl Store for StorageRouter {
668    async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
669        self.put_sync(hash, &data)
670    }
671
672    async fn put_many(&self, items: Vec<(Hash, Vec<u8>)>) -> Result<usize, StoreError> {
673        self.put_many_sync(&items)
674    }
675
676    async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
677        self.get_sync(hash)
678    }
679
680    async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
681        self.exists(hash)
682    }
683
684    async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
685        self.delete_sync(hash)
686    }
687}
688
689pub struct HashtreeStore {
690    base_path: PathBuf,
691    env: heed::Env,
692    /// Set of pinned hashes (32-byte raw hashes, prevents garbage collection)
693    pins: Database<Bytes, Unit>,
694    /// Mutable published refs that should stay subscribed and keep following updates
695    pinned_refs: Database<Str, Unit>,
696    /// Authors whose hashtree publications should be mirrored continuously
697    tracked_authors: Database<Str, Unit>,
698    /// Blob ownership: sha256 (32 bytes) ++ pubkey (32 bytes) -> () (composite key for multi-owner)
699    blob_owners: Database<Bytes, Unit>,
700    /// Maps pubkey (32 bytes) -> blob metadata JSON (for blossom list)
701    pubkey_blobs: Database<Bytes, Bytes>,
702    /// Tree metadata for eviction: tree_root_hash (32 bytes) -> TreeMeta (msgpack)
703    tree_meta: Database<Bytes, Bytes>,
704    /// Blob-to-tree mapping: blob_hash ++ tree_hash (64 bytes) -> ()
705    blob_trees: Database<Bytes, Unit>,
706    /// Tree refs: "npub/path" -> tree_root_hash (32 bytes) - for replacing old versions
707    tree_refs: Database<Str, Bytes>,
708    /// Cached roots from Nostr: "pubkey_hex/tree_name" -> CachedRoot (msgpack)
709    cached_roots: Database<Str, Bytes>,
710    /// Storage router - handles LMDB + optional S3 (Arc for sharing with HashTree)
711    router: Arc<StorageRouter>,
712    /// Maximum storage size in bytes (from config)
713    max_size_bytes: u64,
714    /// Whether quota enforcement may delete local blobs not tracked by any indexed tree.
715    evict_orphans: bool,
716}
717
718impl HashtreeStore {
719    /// Create a new store with the configured local storage limit.
720    pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
721        let config = hashtree_config::Config::load_or_default();
722        let max_size_bytes = config
723            .storage
724            .max_size_gb
725            .saturating_mul(1024 * 1024 * 1024);
726        Self::with_options_and_backend(
727            path,
728            None,
729            max_size_bytes,
730            config.storage.evict_orphans,
731            &config.storage.backend,
732        )
733    }
734
735    /// Create a new store with an explicit local backend and size limit.
736    pub fn new_with_backend<P: AsRef<Path>>(
737        path: P,
738        backend: hashtree_config::StorageBackend,
739        max_size_bytes: u64,
740    ) -> Result<Self> {
741        Self::with_options_and_backend(path, None, max_size_bytes, true, &backend)
742    }
743
744    /// Create a new store with optional S3 backend and the configured local storage limit.
745    pub fn with_s3<P: AsRef<Path>>(path: P, s3_config: Option<&S3Config>) -> Result<Self> {
746        let config = hashtree_config::Config::load_or_default();
747        let max_size_bytes = config
748            .storage
749            .max_size_gb
750            .saturating_mul(1024 * 1024 * 1024);
751        Self::with_options_and_backend(
752            path,
753            s3_config,
754            max_size_bytes,
755            config.storage.evict_orphans,
756            &config.storage.backend,
757        )
758    }
759
760    /// Create a new store with optional S3 backend and custom size limit.
761    ///
762    /// The raw local blob backend remains unbounded. `HashtreeStore` enforces
763    /// `max_size_bytes` at the tree-management layer so eviction can honor pins,
764    /// orphan handling, and local-only eviction when S3 is used as archive.
765    pub fn with_options<P: AsRef<Path>>(
766        path: P,
767        s3_config: Option<&S3Config>,
768        max_size_bytes: u64,
769    ) -> Result<Self> {
770        let config = hashtree_config::Config::load_or_default();
771        Self::with_options_and_backend(
772            path,
773            s3_config,
774            max_size_bytes,
775            config.storage.evict_orphans,
776            &config.storage.backend,
777        )
778    }
779
780    fn with_options_and_backend<P: AsRef<Path>>(
781        path: P,
782        s3_config: Option<&S3Config>,
783        max_size_bytes: u64,
784        evict_orphans: bool,
785        backend: &hashtree_config::StorageBackend,
786    ) -> Result<Self> {
787        let path = path.as_ref();
788        std::fs::create_dir_all(path)?;
789
790        let env = unsafe {
791            EnvOpenOptions::new()
792                .map_size(10 * 1024 * 1024 * 1024) // 10GB virtual address space
793                .max_dbs(10) // pins, pinned_refs, tracked_authors, blob_owners, pubkey_blobs, tree_meta, blob_trees, tree_refs, cached_roots, blobs
794                .max_readers(LMDB_MAX_READERS)
795                .open(path)?
796        };
797        let _ = env.clear_stale_readers();
798
799        let mut wtxn = env.write_txn()?;
800        let pins = env.create_database(&mut wtxn, Some("pins"))?;
801        let pinned_refs = env.create_database(&mut wtxn, Some("pinned_refs"))?;
802        let tracked_authors = env.create_database(&mut wtxn, Some("tracked_authors"))?;
803        let blob_owners = env.create_database(&mut wtxn, Some("blob_owners"))?;
804        let pubkey_blobs = env.create_database(&mut wtxn, Some("pubkey_blobs"))?;
805        let tree_meta = env.create_database(&mut wtxn, Some("tree_meta"))?;
806        let blob_trees = env.create_database(&mut wtxn, Some("blob_trees"))?;
807        let tree_refs = env.create_database(&mut wtxn, Some("tree_refs"))?;
808        let cached_roots = env.create_database(&mut wtxn, Some("cached_roots"))?;
809        wtxn.commit()?;
810
811        // Intentionally keep the raw blob backend unbounded here. HashtreeStore
812        // owns quota policy above this layer, where it can coordinate eviction
813        // with tree refs, blob ownership, pins, and S3 archival behavior.
814        let local_store = Arc::new(match backend {
815            hashtree_config::StorageBackend::Fs => LocalStore::Fs(
816                FsBlobStore::new(path.join("blobs"))
817                    .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
818            ),
819            #[cfg(feature = "lmdb")]
820            hashtree_config::StorageBackend::Lmdb => {
821                std::fs::create_dir_all(path.join("blobs"))?;
822                remove_stale_fs_blob_shards(&path.join("blobs"))
823                    .map_err(|e| anyhow::anyhow!("Failed to clean LMDB blob store path: {}", e))?;
824                let requested_map_size = max_size_bytes.max(LMDB_BLOB_MIN_MAP_SIZE_BYTES);
825                let map_size = usize::try_from(requested_map_size)
826                    .context("LMDB blob map size exceeds usize")?;
827                LocalStore::Lmdb(
828                    LmdbBlobStore::with_map_size(path.join("blobs"), map_size)
829                        .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
830                )
831            }
832            #[cfg(not(feature = "lmdb"))]
833            hashtree_config::StorageBackend::Lmdb => {
834                tracing::warn!(
835                    "LMDB backend requested but lmdb feature not enabled, using filesystem storage"
836                );
837                LocalStore::Fs(
838                    FsBlobStore::new(path.join("blobs"))
839                        .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
840                )
841            }
842        });
843
844        // Create storage router with optional S3
845        #[cfg(feature = "s3")]
846        let router = Arc::new(if let Some(s3_cfg) = s3_config {
847            tracing::info!(
848                "Initializing S3 storage backend: bucket={}, endpoint={}",
849                s3_cfg.bucket,
850                s3_cfg.endpoint
851            );
852
853            sync_block_on(async { StorageRouter::with_s3(local_store, s3_cfg).await })?
854        } else {
855            StorageRouter::new(local_store)
856        });
857
858        #[cfg(not(feature = "s3"))]
859        let router = Arc::new({
860            if s3_config.is_some() {
861                tracing::warn!(
862                    "S3 config provided but S3 feature not enabled. Using local storage only."
863                );
864            }
865            StorageRouter::new(local_store)
866        });
867
868        Ok(Self {
869            base_path: path.to_path_buf(),
870            env,
871            pins,
872            pinned_refs,
873            tracked_authors,
874            blob_owners,
875            pubkey_blobs,
876            tree_meta,
877            blob_trees,
878            tree_refs,
879            cached_roots,
880            router,
881            max_size_bytes,
882            evict_orphans,
883        })
884    }
885
886    pub fn base_path(&self) -> &Path {
887        &self.base_path
888    }
889
890    /// Get the storage router
891    pub fn router(&self) -> &StorageRouter {
892        &self.router
893    }
894
895    /// Get the storage router as Arc (for use with HashTree which needs Arc<dyn Store>)
896    /// All writes through this go to both LMDB and S3
897    pub fn store_arc(&self) -> Arc<StorageRouter> {
898        Arc::clone(&self.router)
899    }
900
901    /// Get tree node by hash (raw bytes)
902    pub fn get_tree_node(&self, hash: &[u8; 32]) -> Result<Option<TreeNode>> {
903        let store = self.store_arc();
904        let tree = HashTree::new(HashTreeConfig::new(store).public());
905
906        sync_block_on(async {
907            tree.get_tree_node(hash)
908                .await
909                .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))
910        })
911    }
912
913    /// Store a raw blob, returns SHA256 hash as hex.
914    pub fn put_blob(&self, data: &[u8]) -> Result<String> {
915        let hash = sha256(data);
916        self.router
917            .put_sync(hash, data)
918            .map_err(|e| anyhow::anyhow!("Failed to store blob: {}", e))?;
919        Ok(to_hex(&hash))
920    }
921
922    /// Store an opportunistically cached blob.
923    ///
924    /// Unlike durable `put_blob` writes, this path may evict disposable orphaned
925    /// blobs to make room under storage pressure. It intentionally avoids touching
926    /// indexed trees, social-graph roots, explicit pins, and owned Blossom blobs.
927    pub fn put_cached_blob(&self, data: &[u8]) -> Result<String> {
928        let hash = sha256(data);
929        if self
930            .router
931            .exists(&hash)
932            .map_err(|e| anyhow::anyhow!("Failed to check cached blob: {}", e))?
933        {
934            return Ok(to_hex(&hash));
935        }
936
937        let incoming_bytes = data.len() as u64;
938        let _ = self.make_room_for_cached_blob(incoming_bytes);
939
940        let mut retried_after_cleanup = false;
941        loop {
942            match self.router.put_sync(hash, data) {
943                Ok(_) => return Ok(to_hex(&hash)),
944                Err(err) if !retried_after_cleanup && is_map_full_store_error(&err) => {
945                    let freed = self.relieve_cached_blob_write_pressure(incoming_bytes)?;
946                    if freed == 0 {
947                        return Err(anyhow::anyhow!("Failed to store cached blob: {}", err));
948                    }
949                    retried_after_cleanup = true;
950                }
951                Err(err) => return Err(anyhow::anyhow!("Failed to store cached blob: {}", err)),
952            }
953        }
954    }
955
956    /// Get a raw blob by SHA256 hash (raw bytes).
957    pub fn get_blob(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
958        self.router
959            .get_sync(hash)
960            .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))
961    }
962
963    /// Check if a blob exists by SHA256 hash (raw bytes).
964    pub fn blob_exists(&self, hash: &[u8; 32]) -> Result<bool> {
965        self.router
966            .exists(hash)
967            .map_err(|e| anyhow::anyhow!("Failed to check blob: {}", e))
968    }
969
970    // === Blossom ownership tracking ===
971    // Uses composite key: sha256 (32 bytes) ++ pubkey (32 bytes) -> ()
972    // This allows efficient multi-owner tracking with O(1) lookups
973
974    /// Build composite key for blob_owners: sha256 ++ pubkey (64 bytes total)
975    fn blob_owner_key(sha256: &[u8; 32], pubkey: &[u8; 32]) -> [u8; 64] {
976        let mut key = [0u8; 64];
977        key[..32].copy_from_slice(sha256);
978        key[32..].copy_from_slice(pubkey);
979        key
980    }
981
982    /// Add an owner (pubkey) to a blob for Blossom protocol
983    /// Multiple users can own the same blob - it's only deleted when all owners remove it
984    pub fn set_blob_owner(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<()> {
985        let key = Self::blob_owner_key(sha256, pubkey);
986        let mut wtxn = self.env.write_txn()?;
987
988        // Add ownership entry (idempotent - put overwrites)
989        self.blob_owners.put(&mut wtxn, &key[..], &())?;
990
991        // Convert sha256 to hex for BlobMetadata (which stores sha256 as hex string)
992        let sha256_hex = to_hex(sha256);
993
994        // Get existing blobs for this pubkey (for /list endpoint)
995        let mut blobs: Vec<BlobMetadata> = self
996            .pubkey_blobs
997            .get(&wtxn, pubkey)?
998            .and_then(|b| serde_json::from_slice(b).ok())
999            .unwrap_or_default();
1000
1001        // Check if blob already exists for this pubkey
1002        if !blobs.iter().any(|b| b.sha256 == sha256_hex) {
1003            let now = SystemTime::now()
1004                .duration_since(UNIX_EPOCH)
1005                .unwrap()
1006                .as_secs();
1007
1008            // Get size from raw blob
1009            let size = self
1010                .get_blob(sha256)?
1011                .map(|data| data.len() as u64)
1012                .unwrap_or(0);
1013
1014            blobs.push(BlobMetadata {
1015                sha256: sha256_hex,
1016                size,
1017                mime_type: "application/octet-stream".to_string(),
1018                uploaded: now,
1019            });
1020
1021            let blobs_json = serde_json::to_vec(&blobs)?;
1022            self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
1023        }
1024
1025        wtxn.commit()?;
1026        Ok(())
1027    }
1028
1029    /// Check if a pubkey owns a blob
1030    pub fn is_blob_owner(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<bool> {
1031        let key = Self::blob_owner_key(sha256, pubkey);
1032        let rtxn = self.env.read_txn()?;
1033        Ok(self.blob_owners.get(&rtxn, &key[..])?.is_some())
1034    }
1035
1036    /// Get all owners (pubkeys) of a blob via prefix scan (returns raw bytes)
1037    pub fn get_blob_owners(&self, sha256: &[u8; 32]) -> Result<Vec<[u8; 32]>> {
1038        let rtxn = self.env.read_txn()?;
1039
1040        let mut owners = Vec::new();
1041        for item in self.blob_owners.prefix_iter(&rtxn, &sha256[..])? {
1042            let (key, _) = item?;
1043            if key.len() == 64 {
1044                // Extract pubkey from composite key (bytes 32-64)
1045                let mut pubkey = [0u8; 32];
1046                pubkey.copy_from_slice(&key[32..64]);
1047                owners.push(pubkey);
1048            }
1049        }
1050        Ok(owners)
1051    }
1052
1053    /// Check if blob has any owners
1054    pub fn blob_has_owners(&self, sha256: &[u8; 32]) -> Result<bool> {
1055        let rtxn = self.env.read_txn()?;
1056
1057        // Just check if any entry exists with this prefix
1058        for item in self.blob_owners.prefix_iter(&rtxn, &sha256[..])? {
1059            if item.is_ok() {
1060                return Ok(true);
1061            }
1062        }
1063        Ok(false)
1064    }
1065
1066    /// Get the first owner (pubkey) of a blob (for backwards compatibility)
1067    pub fn get_blob_owner(&self, sha256: &[u8; 32]) -> Result<Option<[u8; 32]>> {
1068        Ok(self.get_blob_owners(sha256)?.into_iter().next())
1069    }
1070
1071    /// Remove a user's ownership of a blossom blob
1072    /// Only deletes the actual blob when no owners remain
1073    /// Returns true if the blob was actually deleted (no owners left)
1074    pub fn delete_blossom_blob(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<bool> {
1075        let key = Self::blob_owner_key(sha256, pubkey);
1076        let mut wtxn = self.env.write_txn()?;
1077
1078        // Remove this pubkey's ownership entry
1079        self.blob_owners.delete(&mut wtxn, &key[..])?;
1080
1081        // Hex strings for logging and BlobMetadata (which stores sha256 as hex string)
1082        let sha256_hex = to_hex(sha256);
1083
1084        // Remove from pubkey's blob list
1085        if let Some(blobs_bytes) = self.pubkey_blobs.get(&wtxn, pubkey)? {
1086            if let Ok(mut blobs) = serde_json::from_slice::<Vec<BlobMetadata>>(blobs_bytes) {
1087                blobs.retain(|b| b.sha256 != sha256_hex);
1088                let blobs_json = serde_json::to_vec(&blobs)?;
1089                self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
1090            }
1091        }
1092
1093        // Check if any other owners remain (prefix scan)
1094        let mut has_other_owners = false;
1095        for item in self.blob_owners.prefix_iter(&wtxn, &sha256[..])? {
1096            if item.is_ok() {
1097                has_other_owners = true;
1098                break;
1099            }
1100        }
1101
1102        if has_other_owners {
1103            wtxn.commit()?;
1104            tracing::debug!(
1105                "Removed {} from blob {} owners, other owners remain",
1106                &to_hex(pubkey)[..8],
1107                &sha256_hex[..8]
1108            );
1109            return Ok(false);
1110        }
1111
1112        // No owners left - delete the blob completely
1113        tracing::info!(
1114            "All owners removed from blob {}, deleting",
1115            &sha256_hex[..8]
1116        );
1117
1118        // Delete raw blob (by content hash) - this deletes from S3 too
1119        let _ = self.router.delete_sync(sha256);
1120
1121        wtxn.commit()?;
1122        Ok(true)
1123    }
1124
1125    /// List all blobs owned by a pubkey (for Blossom /list endpoint)
1126    pub fn list_blobs_by_pubkey(
1127        &self,
1128        pubkey: &[u8; 32],
1129    ) -> Result<Vec<crate::server::blossom::BlobDescriptor>> {
1130        let rtxn = self.env.read_txn()?;
1131
1132        let blobs: Vec<BlobMetadata> = self
1133            .pubkey_blobs
1134            .get(&rtxn, pubkey)?
1135            .and_then(|b| serde_json::from_slice(b).ok())
1136            .unwrap_or_default();
1137
1138        Ok(blobs
1139            .into_iter()
1140            .map(|b| crate::server::blossom::BlobDescriptor {
1141                url: format!("/{}", b.sha256),
1142                sha256: b.sha256,
1143                size: b.size,
1144                mime_type: b.mime_type,
1145                uploaded: b.uploaded,
1146            })
1147            .collect())
1148    }
1149
1150    /// Get a single chunk/blob by hash (raw bytes)
1151    pub fn get_chunk(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
1152        self.router
1153            .get_sync(hash)
1154            .map_err(|e| anyhow::anyhow!("Failed to get chunk: {}", e))
1155    }
1156
1157    /// Get file content by hash (raw bytes)
1158    /// Returns raw bytes (caller handles decryption if needed)
1159    pub fn get_file(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
1160        let store = self.store_arc();
1161        let tree = HashTree::new(HashTreeConfig::new(store).public());
1162
1163        sync_block_on(async {
1164            tree.read_file(hash)
1165                .await
1166                .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
1167        })
1168    }
1169
1170    /// Get file content by Cid (hash + optional decryption key as raw bytes)
1171    /// Handles decryption automatically if key is present
1172    pub fn get_file_by_cid(&self, cid: &Cid) -> Result<Option<Vec<u8>>> {
1173        let store = self.store_arc();
1174        let tree = HashTree::new(HashTreeConfig::new(store).public());
1175
1176        sync_block_on(async {
1177            tree.get(cid, None)
1178                .await
1179                .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
1180        })
1181    }
1182
1183    fn ensure_cid_exists(&self, cid: &Cid) -> Result<()> {
1184        let exists = self
1185            .router
1186            .exists(&cid.hash)
1187            .map_err(|e| anyhow::anyhow!("Failed to check cid existence: {}", e))?;
1188        if !exists {
1189            anyhow::bail!("CID not found: {}", to_hex(&cid.hash));
1190        }
1191        Ok(())
1192    }
1193
1194    /// Stream file content identified by Cid into a writer without buffering full file in memory.
1195    pub fn write_file_by_cid_to_writer<W: Write>(&self, cid: &Cid, writer: &mut W) -> Result<u64> {
1196        self.ensure_cid_exists(cid)?;
1197
1198        let store = self.store_arc();
1199        let tree = HashTree::new(HashTreeConfig::new(store).public());
1200        let mut total_bytes = 0u64;
1201        let mut streamed_any_chunk = false;
1202
1203        sync_block_on(async {
1204            let mut stream = tree.get_stream(cid);
1205            while let Some(chunk) = stream.next().await {
1206                streamed_any_chunk = true;
1207                let chunk =
1208                    chunk.map_err(|e| anyhow::anyhow!("Failed to stream file chunk: {}", e))?;
1209                writer
1210                    .write_all(&chunk)
1211                    .map_err(|e| anyhow::anyhow!("Failed to write file chunk: {}", e))?;
1212                total_bytes += chunk.len() as u64;
1213            }
1214            Ok::<(), anyhow::Error>(())
1215        })?;
1216
1217        if !streamed_any_chunk {
1218            anyhow::bail!("CID not found: {}", to_hex(&cid.hash));
1219        }
1220
1221        writer
1222            .flush()
1223            .map_err(|e| anyhow::anyhow!("Failed to flush output: {}", e))?;
1224        Ok(total_bytes)
1225    }
1226
1227    /// Stream file content identified by Cid directly into a destination path.
1228    pub fn write_file_by_cid<P: AsRef<Path>>(&self, cid: &Cid, output_path: P) -> Result<u64> {
1229        self.ensure_cid_exists(cid)?;
1230
1231        let output_path = output_path.as_ref();
1232        if let Some(parent) = output_path.parent() {
1233            if !parent.as_os_str().is_empty() {
1234                std::fs::create_dir_all(parent).with_context(|| {
1235                    format!("Failed to create output directory {}", parent.display())
1236                })?;
1237            }
1238        }
1239
1240        let mut file = std::fs::File::create(output_path)
1241            .with_context(|| format!("Failed to create output file {}", output_path.display()))?;
1242        self.write_file_by_cid_to_writer(cid, &mut file)
1243    }
1244
1245    /// Stream a public (unencrypted) file by hash directly into a destination path.
1246    pub fn write_file<P: AsRef<Path>>(&self, hash: &[u8; 32], output_path: P) -> Result<u64> {
1247        self.write_file_by_cid(&Cid::public(*hash), output_path)
1248    }
1249
1250    /// Resolve a path within a tree (returns Cid with key if encrypted)
1251    pub fn resolve_path(&self, cid: &Cid, path: &str) -> Result<Option<Cid>> {
1252        let store = self.store_arc();
1253        let tree = HashTree::new(HashTreeConfig::new(store).public());
1254
1255        sync_block_on(async {
1256            tree.resolve_path(cid, path)
1257                .await
1258                .map_err(|e| anyhow::anyhow!("Failed to resolve path: {}", e))
1259        })
1260    }
1261
1262    /// Get chunk metadata for a file (chunk list, sizes, total size)
1263    pub fn get_file_chunk_metadata(&self, hash: &[u8; 32]) -> Result<Option<FileChunkMetadata>> {
1264        let store = self.store_arc();
1265        let tree = HashTree::new(HashTreeConfig::new(store.clone()).public());
1266
1267        sync_block_on(async {
1268            // First check if the hash exists in the store at all
1269            // (either as a blob or tree node)
1270            let exists = store
1271                .has(hash)
1272                .await
1273                .map_err(|e| anyhow::anyhow!("Failed to check existence: {}", e))?;
1274
1275            if !exists {
1276                return Ok(None);
1277            }
1278
1279            // Get total size
1280            let total_size = tree
1281                .get_size(hash)
1282                .await
1283                .map_err(|e| anyhow::anyhow!("Failed to get size: {}", e))?;
1284
1285            // Check if it's a tree (chunked) or blob
1286            let is_tree_node = tree
1287                .is_tree(hash)
1288                .await
1289                .map_err(|e| anyhow::anyhow!("Failed to check tree: {}", e))?;
1290
1291            if !is_tree_node {
1292                // Single blob, not chunked
1293                return Ok(Some(FileChunkMetadata {
1294                    total_size,
1295                    chunk_hashes: vec![],
1296                    chunk_sizes: vec![],
1297                    is_chunked: false,
1298                }));
1299            }
1300
1301            // Get tree node to extract chunk info
1302            let node = match tree
1303                .get_tree_node(hash)
1304                .await
1305                .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
1306            {
1307                Some(n) => n,
1308                None => return Ok(None),
1309            };
1310
1311            // Check if it's a directory (has named links)
1312            let is_directory = tree
1313                .is_directory(hash)
1314                .await
1315                .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1316
1317            if is_directory {
1318                return Ok(None); // Not a file
1319            }
1320
1321            // Extract chunk info from links
1322            let chunk_hashes: Vec<Hash> = node.links.iter().map(|l| l.hash).collect();
1323            let chunk_sizes: Vec<u64> = node.links.iter().map(|l| l.size).collect();
1324
1325            Ok(Some(FileChunkMetadata {
1326                total_size,
1327                chunk_hashes,
1328                chunk_sizes,
1329                is_chunked: !node.links.is_empty(),
1330            }))
1331        })
1332    }
1333
1334    /// Get byte range from file
1335    pub fn get_file_range(
1336        &self,
1337        hash: &[u8; 32],
1338        start: u64,
1339        end: Option<u64>,
1340    ) -> Result<Option<(Vec<u8>, u64)>> {
1341        let metadata = match self.get_file_chunk_metadata(hash)? {
1342            Some(m) => m,
1343            None => return Ok(None),
1344        };
1345
1346        if metadata.total_size == 0 {
1347            return Ok(Some((Vec::new(), 0)));
1348        }
1349
1350        if start >= metadata.total_size {
1351            return Ok(None);
1352        }
1353
1354        let end = end
1355            .unwrap_or(metadata.total_size - 1)
1356            .min(metadata.total_size - 1);
1357
1358        // For non-chunked files, load entire file
1359        if !metadata.is_chunked {
1360            let content = self.get_file(hash)?.unwrap_or_default();
1361            let range_content = if start < content.len() as u64 {
1362                content[start as usize..=(end as usize).min(content.len() - 1)].to_vec()
1363            } else {
1364                Vec::new()
1365            };
1366            return Ok(Some((range_content, metadata.total_size)));
1367        }
1368
1369        // For chunked files, load only needed chunks
1370        let mut result = Vec::new();
1371        let mut current_offset = 0u64;
1372
1373        for (i, chunk_hash) in metadata.chunk_hashes.iter().enumerate() {
1374            let chunk_size = metadata.chunk_sizes[i];
1375            let chunk_end = current_offset + chunk_size - 1;
1376
1377            // Check if this chunk overlaps with requested range
1378            if chunk_end >= start && current_offset <= end {
1379                let chunk_content = match self.get_chunk(chunk_hash)? {
1380                    Some(content) => content,
1381                    None => {
1382                        return Err(anyhow::anyhow!("Chunk {} not found", to_hex(chunk_hash)));
1383                    }
1384                };
1385
1386                let chunk_read_start = if current_offset >= start {
1387                    0
1388                } else {
1389                    (start - current_offset) as usize
1390                };
1391
1392                let chunk_read_end = if chunk_end <= end {
1393                    chunk_size as usize - 1
1394                } else {
1395                    (end - current_offset) as usize
1396                };
1397
1398                result.extend_from_slice(&chunk_content[chunk_read_start..=chunk_read_end]);
1399            }
1400
1401            current_offset += chunk_size;
1402
1403            if current_offset > end {
1404                break;
1405            }
1406        }
1407
1408        Ok(Some((result, metadata.total_size)))
1409    }
1410
1411    /// Stream file range as chunks using Arc for async/Send contexts
1412    pub fn stream_file_range_chunks_owned(
1413        self: Arc<Self>,
1414        hash: &[u8; 32],
1415        start: u64,
1416        end: u64,
1417    ) -> Result<Option<FileRangeChunksOwned>> {
1418        let metadata = match self.get_file_chunk_metadata(hash)? {
1419            Some(m) => m,
1420            None => return Ok(None),
1421        };
1422
1423        if metadata.total_size == 0 || start >= metadata.total_size {
1424            return Ok(None);
1425        }
1426
1427        let end = end.min(metadata.total_size - 1);
1428
1429        Ok(Some(FileRangeChunksOwned {
1430            store: self,
1431            metadata,
1432            start,
1433            end,
1434            current_chunk_idx: 0,
1435            current_offset: 0,
1436        }))
1437    }
1438
1439    /// Get directory structure by hash (raw bytes)
1440    pub fn get_directory_listing(&self, hash: &[u8; 32]) -> Result<Option<DirectoryListing>> {
1441        let store = self.store_arc();
1442        let tree = HashTree::new(HashTreeConfig::new(store).public());
1443
1444        sync_block_on(async {
1445            // Check if it's a directory
1446            let is_dir = tree
1447                .is_directory(hash)
1448                .await
1449                .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1450
1451            if !is_dir {
1452                return Ok(None);
1453            }
1454
1455            // Get directory entries (public Cid - no encryption key)
1456            let cid = hashtree_core::Cid::public(*hash);
1457            let tree_entries = tree
1458                .list_directory(&cid)
1459                .await
1460                .map_err(|e| anyhow::anyhow!("Failed to list directory: {}", e))?;
1461
1462            let entries: Vec<DirEntry> = tree_entries
1463                .into_iter()
1464                .map(|e| DirEntry {
1465                    name: e.name,
1466                    cid: to_hex(&e.hash),
1467                    is_directory: e.link_type.is_tree(),
1468                    size: e.size,
1469                })
1470                .collect();
1471
1472            Ok(Some(DirectoryListing {
1473                dir_name: String::new(),
1474                entries,
1475            }))
1476        })
1477    }
1478
1479    /// Get directory structure by CID, supporting encrypted directories.
1480    pub fn get_directory_listing_by_cid(&self, cid: &Cid) -> Result<Option<DirectoryListing>> {
1481        let store = self.store_arc();
1482        let tree = HashTree::new(HashTreeConfig::new(store).public());
1483        let cid = cid.clone();
1484
1485        sync_block_on(async {
1486            let is_dir = tree
1487                .is_dir(&cid)
1488                .await
1489                .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1490
1491            if !is_dir {
1492                return Ok(None);
1493            }
1494
1495            let tree_entries = tree
1496                .list_directory(&cid)
1497                .await
1498                .map_err(|e| anyhow::anyhow!("Failed to list directory: {}", e))?;
1499
1500            let entries: Vec<DirEntry> = tree_entries
1501                .into_iter()
1502                .map(|e| DirEntry {
1503                    name: e.name,
1504                    cid: Cid {
1505                        hash: e.hash,
1506                        key: e.key,
1507                    }
1508                    .to_string(),
1509                    is_directory: e.link_type.is_tree(),
1510                    size: e.size,
1511                })
1512                .collect();
1513
1514            Ok(Some(DirectoryListing {
1515                dir_name: String::new(),
1516                entries,
1517            }))
1518        })
1519    }
1520
1521    // === Cached roots ===
1522
1523    /// Persist a mutable published ref that should stay subscribed.
1524    pub fn add_pinned_ref(&self, key: &str) -> Result<()> {
1525        let mut wtxn = self.env.write_txn()?;
1526        self.pinned_refs.put(&mut wtxn, key, &())?;
1527        wtxn.commit()?;
1528        Ok(())
1529    }
1530
1531    /// Remove a mutable published ref from the live pinned set.
1532    pub fn remove_pinned_ref(&self, key: &str) -> Result<bool> {
1533        let mut wtxn = self.env.write_txn()?;
1534        let removed = self.pinned_refs.delete(&mut wtxn, key)?;
1535        wtxn.commit()?;
1536        Ok(removed)
1537    }
1538
1539    /// List mutable published refs that should stay subscribed.
1540    pub fn list_pinned_refs(&self) -> Result<Vec<String>> {
1541        let rtxn = self.env.read_txn()?;
1542        let mut refs = Vec::new();
1543
1544        for item in self.pinned_refs.iter(&rtxn)? {
1545            let (key, _) = item?;
1546            refs.push(key.to_string());
1547        }
1548
1549        refs.sort();
1550        Ok(refs)
1551    }
1552
1553    /// Persist an author whose published trees should stay mirrored.
1554    pub fn add_tracked_author(&self, npub: &str) -> Result<bool> {
1555        let mut wtxn = self.env.write_txn()?;
1556        let inserted = self.tracked_authors.get(&wtxn, npub)?.is_none();
1557        self.tracked_authors.put(&mut wtxn, npub, &())?;
1558        wtxn.commit()?;
1559        Ok(inserted)
1560    }
1561
1562    /// Remove an author from the continuous mirror set.
1563    pub fn remove_tracked_author(&self, npub: &str) -> Result<bool> {
1564        let mut wtxn = self.env.write_txn()?;
1565        let removed = self.tracked_authors.delete(&mut wtxn, npub)?;
1566        wtxn.commit()?;
1567        Ok(removed)
1568    }
1569
1570    /// List authors whose published trees should stay mirrored.
1571    pub fn list_tracked_authors(&self) -> Result<Vec<String>> {
1572        let rtxn = self.env.read_txn()?;
1573        let mut authors = Vec::new();
1574
1575        for item in self.tracked_authors.iter(&rtxn)? {
1576            let (npub, _) = item?;
1577            authors.push(npub.to_string());
1578        }
1579
1580        authors.sort();
1581        Ok(authors)
1582    }
1583
1584    /// Get cached root for a pubkey/tree_name pair
1585    pub fn get_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<Option<CachedRoot>> {
1586        let key = format!("{}/{}", pubkey_hex, tree_name);
1587        let rtxn = self.env.read_txn()?;
1588        if let Some(bytes) = self.cached_roots.get(&rtxn, &key)? {
1589            let root: CachedRoot = rmp_serde::from_slice(bytes)
1590                .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
1591            Ok(Some(root))
1592        } else {
1593            Ok(None)
1594        }
1595    }
1596
1597    /// Set cached root for a pubkey/tree_name pair
1598    pub fn set_cached_root(
1599        &self,
1600        pubkey_hex: &str,
1601        tree_name: &str,
1602        hash: &str,
1603        key: Option<&str>,
1604        visibility: &str,
1605        updated_at: u64,
1606    ) -> Result<()> {
1607        let db_key = format!("{}/{}", pubkey_hex, tree_name);
1608        let root = CachedRoot {
1609            hash: hash.to_string(),
1610            key: key.map(|k| k.to_string()),
1611            updated_at,
1612            visibility: visibility.to_string(),
1613        };
1614        let bytes = rmp_serde::to_vec(&root)
1615            .map_err(|e| anyhow::anyhow!("Failed to serialize CachedRoot: {}", e))?;
1616        let mut wtxn = self.env.write_txn()?;
1617        self.cached_roots.put(&mut wtxn, &db_key, &bytes)?;
1618        wtxn.commit()?;
1619        Ok(())
1620    }
1621
1622    /// List all cached roots for a pubkey
1623    pub fn list_cached_roots(&self, pubkey_hex: &str) -> Result<Vec<(String, CachedRoot)>> {
1624        let prefix = format!("{}/", pubkey_hex);
1625        let rtxn = self.env.read_txn()?;
1626        let mut results = Vec::new();
1627
1628        for item in self.cached_roots.iter(&rtxn)? {
1629            let (key, bytes) = item?;
1630            if key.starts_with(&prefix) {
1631                let tree_name = key.strip_prefix(&prefix).unwrap_or(key);
1632                let root: CachedRoot = rmp_serde::from_slice(bytes)
1633                    .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
1634                results.push((tree_name.to_string(), root));
1635            }
1636        }
1637
1638        Ok(results)
1639    }
1640
1641    /// Delete a cached root
1642    pub fn delete_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<bool> {
1643        let key = format!("{}/{}", pubkey_hex, tree_name);
1644        let mut wtxn = self.env.write_txn()?;
1645        let deleted = self.cached_roots.delete(&mut wtxn, &key)?;
1646        wtxn.commit()?;
1647        Ok(deleted)
1648    }
1649}
1650
1651fn is_map_full_store_error(err: &StoreError) -> bool {
1652    let message = err.to_string();
1653    message.contains("MDB_MAP_FULL") || message.contains("MapFull")
1654}
1655
1656#[derive(Debug, Clone)]
1657pub struct FileChunkMetadata {
1658    pub total_size: u64,
1659    pub chunk_hashes: Vec<Hash>,
1660    pub chunk_sizes: Vec<u64>,
1661    pub is_chunked: bool,
1662}
1663
1664/// Owned iterator for async streaming
1665pub struct FileRangeChunksOwned {
1666    store: Arc<HashtreeStore>,
1667    metadata: FileChunkMetadata,
1668    start: u64,
1669    end: u64,
1670    current_chunk_idx: usize,
1671    current_offset: u64,
1672}
1673
1674impl Iterator for FileRangeChunksOwned {
1675    type Item = Result<Vec<u8>>;
1676
1677    fn next(&mut self) -> Option<Self::Item> {
1678        if !self.metadata.is_chunked || self.current_chunk_idx >= self.metadata.chunk_hashes.len() {
1679            return None;
1680        }
1681
1682        if self.current_offset > self.end {
1683            return None;
1684        }
1685
1686        let chunk_hash = &self.metadata.chunk_hashes[self.current_chunk_idx];
1687        let chunk_size = self.metadata.chunk_sizes[self.current_chunk_idx];
1688        let chunk_end = self.current_offset + chunk_size - 1;
1689
1690        self.current_chunk_idx += 1;
1691
1692        if chunk_end < self.start || self.current_offset > self.end {
1693            self.current_offset += chunk_size;
1694            return self.next();
1695        }
1696
1697        let chunk_content = match self.store.get_chunk(chunk_hash) {
1698            Ok(Some(content)) => content,
1699            Ok(None) => {
1700                return Some(Err(anyhow::anyhow!(
1701                    "Chunk {} not found",
1702                    to_hex(chunk_hash)
1703                )));
1704            }
1705            Err(e) => {
1706                return Some(Err(e));
1707            }
1708        };
1709
1710        let chunk_read_start = if self.current_offset >= self.start {
1711            0
1712        } else {
1713            (self.start - self.current_offset) as usize
1714        };
1715
1716        let chunk_read_end = if chunk_end <= self.end {
1717            chunk_size as usize - 1
1718        } else {
1719            (self.end - self.current_offset) as usize
1720        };
1721
1722        let result = chunk_content[chunk_read_start..=chunk_read_end].to_vec();
1723        self.current_offset += chunk_size;
1724
1725        Some(Ok(result))
1726    }
1727}
1728
1729#[derive(Debug)]
1730pub struct GcStats {
1731    pub deleted_dags: usize,
1732    pub freed_bytes: u64,
1733}
1734
1735#[derive(Debug, Clone)]
1736pub struct DirEntry {
1737    pub name: String,
1738    pub cid: String,
1739    pub is_directory: bool,
1740    pub size: u64,
1741}
1742
1743#[derive(Debug, Clone)]
1744pub struct DirectoryListing {
1745    pub dir_name: String,
1746    pub entries: Vec<DirEntry>,
1747}
1748
1749/// Blob metadata for Blossom protocol
1750#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
1751pub struct BlobMetadata {
1752    pub sha256: String,
1753    pub size: u64,
1754    pub mime_type: String,
1755    pub uploaded: u64,
1756}
1757
1758// Implement ContentStore trait for WebRTC data exchange
1759impl crate::webrtc::ContentStore for HashtreeStore {
1760    fn get(&self, hash_hex: &str) -> Result<Option<Vec<u8>>> {
1761        let hash = from_hex(hash_hex).map_err(|e| anyhow::anyhow!("Invalid hash: {}", e))?;
1762        self.get_chunk(&hash)
1763    }
1764}
1765
1766#[cfg(test)]
1767mod tests {
1768    use super::*;
1769    #[cfg(feature = "lmdb")]
1770    use tempfile::TempDir;
1771
1772    #[cfg(feature = "lmdb")]
1773    #[test]
1774    fn hashtree_store_expands_blob_lmdb_map_size_to_storage_budget() -> Result<()> {
1775        let temp = TempDir::new()?;
1776        let requested = LMDB_BLOB_MIN_MAP_SIZE_BYTES + 64 * 1024 * 1024;
1777        let store = HashtreeStore::with_options_and_backend(
1778            temp.path(),
1779            None,
1780            requested,
1781            true,
1782            &StorageBackend::Lmdb,
1783        )?;
1784
1785        let map_size = match store.router.local.as_ref() {
1786            LocalStore::Lmdb(local) => local.map_size_bytes() as u64,
1787            LocalStore::Fs(_) => panic!("expected LMDB local store"),
1788        };
1789
1790        assert!(
1791            map_size >= requested,
1792            "expected blob LMDB map to grow to at least {requested} bytes, got {map_size}"
1793        );
1794
1795        drop(store);
1796        Ok(())
1797    }
1798
1799    #[cfg(feature = "lmdb")]
1800    #[test]
1801    fn local_store_can_override_lmdb_map_size() -> Result<()> {
1802        let temp = TempDir::new()?;
1803        let requested = 512 * 1024 * 1024u64;
1804        let store = LocalStore::new_with_lmdb_map_size(
1805            temp.path().join("lmdb-blobs"),
1806            &StorageBackend::Lmdb,
1807            Some(requested),
1808        )?;
1809
1810        let map_size = match store {
1811            LocalStore::Lmdb(local) => local.map_size_bytes() as u64,
1812            LocalStore::Fs(_) => panic!("expected LMDB local store"),
1813        };
1814
1815        assert!(
1816            map_size >= requested,
1817            "expected LMDB map to grow to at least {requested} bytes, got {map_size}"
1818        );
1819
1820        Ok(())
1821    }
1822
1823    #[cfg(feature = "lmdb")]
1824    #[test]
1825    fn lmdb_local_store_removes_stale_fs_blob_shard_dirs() -> Result<()> {
1826        let temp = TempDir::new()?;
1827        let path = temp.path().join("lmdb-blobs");
1828        std::fs::create_dir_all(path.join("aa"))?;
1829        std::fs::create_dir_all(path.join("b2"))?;
1830        std::fs::create_dir_all(path.join("keep-me"))?;
1831        std::fs::write(path.join("aa").join("blob.bin"), b"old fs shard")?;
1832        std::fs::write(path.join("b2").join("blob.bin"), b"old fs shard")?;
1833        std::fs::write(path.join("keep-me").join("note.txt"), b"keep")?;
1834
1835        let _store = LocalStore::new_with_lmdb_map_size(
1836            &path,
1837            &StorageBackend::Lmdb,
1838            Some(128 * 1024 * 1024),
1839        )?;
1840
1841        assert!(!path.join("aa").exists());
1842        assert!(!path.join("b2").exists());
1843        assert!(path.join("keep-me").exists());
1844        assert!(path.join("data.mdb").exists());
1845        assert!(path.join("lock.mdb").exists());
1846
1847        Ok(())
1848    }
1849
1850    #[cfg(feature = "lmdb")]
1851    #[test]
1852    fn replacing_tree_ref_unpins_and_unindexes_superseded_root() -> Result<()> {
1853        let temp = TempDir::new()?;
1854        let store = HashtreeStore::with_options_and_backend(
1855            temp.path(),
1856            None,
1857            LMDB_BLOB_MIN_MAP_SIZE_BYTES,
1858            true,
1859            &StorageBackend::Lmdb,
1860        )?;
1861
1862        let old_bytes = b"old published root";
1863        let new_bytes = b"new published root";
1864        let old_root = sha256(old_bytes);
1865        let new_root = sha256(new_bytes);
1866
1867        store.put_blob(old_bytes)?;
1868        store.pin(&old_root)?;
1869        store.index_tree(
1870            &old_root,
1871            "owner",
1872            Some("playlist"),
1873            PRIORITY_OWN,
1874            Some("npub1owner/playlist"),
1875        )?;
1876
1877        assert!(store.is_pinned(&old_root)?);
1878        assert!(store.get_tree_meta(&old_root)?.is_some());
1879
1880        store.put_blob(new_bytes)?;
1881        store.pin(&new_root)?;
1882        store.index_tree(
1883            &new_root,
1884            "owner",
1885            Some("playlist"),
1886            PRIORITY_OWN,
1887            Some("npub1owner/playlist"),
1888        )?;
1889
1890        assert!(
1891            !store.is_pinned(&old_root)?,
1892            "superseded root should be unpinned when ref is replaced"
1893        );
1894        assert!(
1895            store.get_tree_meta(&old_root)?.is_none(),
1896            "superseded root metadata should be removed when ref is replaced"
1897        );
1898        assert!(store.is_pinned(&new_root)?);
1899        assert!(store.get_tree_meta(&new_root)?.is_some());
1900
1901        Ok(())
1902    }
1903
1904    #[test]
1905    fn tracked_authors_round_trip_sorted_and_deduplicated() -> Result<()> {
1906        let temp = TempDir::new()?;
1907        let store = HashtreeStore::with_options(temp.path(), None, 1024 * 1024)?;
1908
1909        store
1910            .add_tracked_author("npub1zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzs9d3kk")?;
1911        store
1912            .add_tracked_author("npub1aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaqf5slm")?;
1913        store
1914            .add_tracked_author("npub1zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzs9d3kk")?;
1915
1916        assert_eq!(
1917            store.list_tracked_authors()?,
1918            vec![
1919                "npub1aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaqf5slm".to_string(),
1920                "npub1zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzs9d3kk".to_string(),
1921            ]
1922        );
1923        assert!(store.remove_tracked_author(
1924            "npub1aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaqf5slm"
1925        )?);
1926        assert!(!store.remove_tracked_author(
1927            "npub1bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbpqqqqq"
1928        )?);
1929        assert_eq!(
1930            store.list_tracked_authors()?,
1931            vec!["npub1zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzs9d3kk".to_string()]
1932        );
1933
1934        Ok(())
1935    }
1936
1937    #[cfg(feature = "s3")]
1938    #[test]
1939    fn async_store_s3_fallback_does_not_reenter_futures_executor() -> Result<()> {
1940        let temp = tempfile::TempDir::new()?;
1941        let local = Arc::new(LocalStore::new(
1942            temp.path().join("blobs"),
1943            &StorageBackend::Fs,
1944        )?);
1945
1946        let outcome = std::panic::catch_unwind(|| {
1947            sync_block_on(async {
1948                let aws_config = aws_config::from_env()
1949                    .region(aws_sdk_s3::config::Region::new("auto"))
1950                    .load()
1951                    .await;
1952                let s3_client = aws_sdk_s3::Client::from_conf(
1953                    aws_sdk_s3::config::Builder::from(&aws_config)
1954                        .endpoint_url("http://127.0.0.1:9")
1955                        .force_path_style(true)
1956                        .build(),
1957                );
1958
1959                let router = StorageRouter {
1960                    local,
1961                    s3_client: Some(s3_client),
1962                    s3_bucket: Some("test-bucket".to_string()),
1963                    s3_prefix: String::new(),
1964                    sync_tx: None,
1965                };
1966                let hash = [0u8; 32];
1967
1968                let _ = Store::has(&router, &hash).await;
1969                let _ = Store::get(&router, &hash).await;
1970            });
1971        });
1972
1973        assert!(
1974            outcome.is_ok(),
1975            "S3-backed async store methods should not panic inside futures::block_on"
1976        );
1977
1978        Ok(())
1979    }
1980}