Skip to main content

hashtree_cli/
storage.rs

1use anyhow::{Context, Result};
2use async_trait::async_trait;
3use futures::executor::block_on as sync_block_on;
4use futures::StreamExt;
5use hashtree_config::StorageBackend;
6use hashtree_core::store::{Store, StoreError};
7use hashtree_core::{
8    from_hex, sha256, to_hex, types::Hash, Cid, HashTree, HashTreeConfig, TreeNode,
9};
10use hashtree_fs::FsBlobStore;
11#[cfg(feature = "lmdb")]
12use hashtree_lmdb::LmdbBlobStore;
13use heed::types::*;
14use heed::{Database, EnvOpenOptions};
15use serde::{Deserialize, Serialize};
16use std::collections::{HashMap, HashSet};
17#[cfg(feature = "s3")]
18use std::future::Future;
19use std::io::Write;
20use std::path::{Path, PathBuf};
21use std::sync::atomic::{AtomicBool, Ordering};
22use std::sync::{Arc, Mutex};
23use std::time::{SystemTime, UNIX_EPOCH};
24
25mod upload;
26
27mod maintenance;
28mod retention;
29
30#[cfg(feature = "s3")]
31const DEFAULT_S3_SYNC_TIMEOUT_MS: u64 = 5_000;
32#[cfg(feature = "s3")]
33const S3_SYNC_TIMEOUT_MS_ENV: &str = "HTREE_S3_SYNC_TIMEOUT_MS";
34
35pub use maintenance::{
36    compact_lmdb_environments_under, CompactResult, R2ImportOptions, R2ImportResult, VerifyResult,
37};
38pub use retention::{OwnedBlobStats, PinnedItem, StorageByPriority, StorageStats, TreeMeta};
39
40/// Priority levels for tree eviction
41pub const PRIORITY_OTHER: u8 = 64;
42pub const PRIORITY_FOLLOWED: u8 = 128;
43pub const PRIORITY_OWN: u8 = 255;
44const LMDB_MAX_READERS: u32 = 1024;
45const LMDB_METADATA_MIN_MAP_SIZE_BYTES: u64 = 64 * 1024 * 1024;
46const LMDB_METADATA_MAX_MAP_SIZE_BYTES: u64 = 64 * 1024 * 1024 * 1024;
47const LMDB_METADATA_STORAGE_RATIO_DIVISOR: u64 = 1024;
48#[cfg(feature = "lmdb")]
49const LMDB_BLOB_MIN_MAP_SIZE_BYTES: u64 = 16 * 1024 * 1024;
50const ACCESS_UPDATE_INTERVAL_SECS: u64 = 300;
51const ACCESS_UPDATE_GATE_MAX_ENTRIES: usize = 4096;
52const ACCESS_UPDATE_BACKGROUND_BATCH_LIMIT: usize = 1024;
53
54fn unix_timestamp_now() -> u64 {
55    SystemTime::now()
56        .duration_since(UNIX_EPOCH)
57        .unwrap_or_default()
58        .as_secs()
59}
60
61/// Cached root info from Nostr events.
62#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct CachedRoot {
64    /// Root hash (hex)
65    pub hash: String,
66    /// Optional decryption key (hex)
67    pub key: Option<String>,
68    /// Unix timestamp when this was cached (from event created_at)
69    pub updated_at: u64,
70    /// Visibility: "public", "link-visible", or "private"
71    pub visibility: String,
72}
73
74/// Storage statistics
75#[derive(Debug, Clone)]
76pub struct LocalStoreStats {
77    pub count: usize,
78    pub total_bytes: u64,
79}
80
81#[derive(Default)]
82struct BlobAccessUpdateGate {
83    next_update_by_hash: Mutex<HashMap<Hash, u64>>,
84}
85
86impl BlobAccessUpdateGate {
87    fn due_hashes<I>(&self, hashes: I, now: u64) -> Vec<Hash>
88    where
89        I: IntoIterator<Item = Hash>,
90    {
91        let Ok(mut next_update_by_hash) = self.next_update_by_hash.try_lock() else {
92            return Vec::new();
93        };
94
95        if next_update_by_hash.len() >= ACCESS_UPDATE_GATE_MAX_ENTRIES {
96            next_update_by_hash.retain(|_, next_update| *next_update > now);
97            if next_update_by_hash.len() >= ACCESS_UPDATE_GATE_MAX_ENTRIES {
98                next_update_by_hash.clear();
99            }
100        }
101
102        let mut due = Vec::new();
103        let mut seen = HashSet::new();
104        for hash in hashes {
105            if !seen.insert(hash) {
106                continue;
107            }
108            if next_update_by_hash
109                .get(&hash)
110                .is_some_and(|next_update| now < *next_update)
111            {
112                continue;
113            }
114            next_update_by_hash.insert(hash, now.saturating_add(ACCESS_UPDATE_INTERVAL_SECS));
115            due.push(hash);
116        }
117        due
118    }
119}
120
121/// Local blob store - wraps either FsBlobStore or LmdbBlobStore
122pub enum LocalStore {
123    Fs(FsBlobStore),
124    #[cfg(feature = "lmdb")]
125    Lmdb(LmdbBlobStore),
126}
127
128#[cfg(feature = "lmdb")]
129fn is_fs_blob_shard_dir(path: &Path) -> bool {
130    path.file_name()
131        .and_then(|name| name.to_str())
132        .map(|name| name.len() == 2 && name.as_bytes().iter().all(u8::is_ascii_hexdigit))
133        .unwrap_or(false)
134}
135
136fn lmdb_metadata_map_size_for_storage_budget(max_size_bytes: u64) -> u64 {
137    if max_size_bytes == 0 {
138        return LMDB_METADATA_MAX_MAP_SIZE_BYTES;
139    }
140
141    max_size_bytes
142        .saturating_div(LMDB_METADATA_STORAGE_RATIO_DIVISOR)
143        .clamp(
144            LMDB_METADATA_MIN_MAP_SIZE_BYTES,
145            LMDB_METADATA_MAX_MAP_SIZE_BYTES,
146        )
147}
148
149fn lmdb_map_size_for_existing_env(path: &Path, requested_bytes: u64) -> Result<usize> {
150    let existing_bytes = std::fs::metadata(path.join("data.mdb"))
151        .map(|metadata| metadata.len())
152        .unwrap_or(0);
153    let requested = align_lmdb_map_size(requested_bytes.max(existing_bytes));
154    usize::try_from(requested).context("LMDB map size exceeds usize")
155}
156
157fn align_lmdb_map_size(bytes: u64) -> u64 {
158    let page_size = (page_size::get() as u64).max(4096);
159    let remainder = bytes % page_size;
160    if remainder == 0 {
161        bytes
162    } else {
163        bytes.saturating_add(page_size - remainder)
164    }
165}
166
167#[cfg(feature = "lmdb")]
168fn remove_stale_fs_blob_shards(path: &Path) -> Result<(), StoreError> {
169    let entries = std::fs::read_dir(path).map_err(StoreError::Io)?;
170    for entry in entries {
171        let entry = entry.map_err(StoreError::Io)?;
172        let entry_path = entry.path();
173        if entry_path.is_dir() && is_fs_blob_shard_dir(&entry_path) {
174            std::fs::remove_dir_all(&entry_path).map_err(StoreError::Io)?;
175            tracing::info!(
176                "Removed stale filesystem blob shard directory after LMDB cutover: {}",
177                entry_path.display()
178            );
179        }
180    }
181    Ok(())
182}
183
184impl LocalStore {
185    /// Create a new unbounded local store.
186    ///
187    /// Higher-level stores that need quota enforcement should manage eviction
188    /// above this layer so tree metadata, pins, and archival policies stay
189    /// coherent.
190    pub fn new<P: AsRef<Path>>(path: P, backend: &StorageBackend) -> Result<Self, StoreError> {
191        Self::new_unbounded(path, backend)
192    }
193
194    /// Create a new local store with an explicit LMDB logical size cap when using the LMDB backend.
195    ///
196    /// The requested size is used for both the LMDB map and the store's built-in
197    /// byte quota, so standalone blob envs can evict instead of growing forever.
198    pub fn new_with_lmdb_map_size<P: AsRef<Path>>(
199        path: P,
200        backend: &StorageBackend,
201        _map_size_bytes: Option<u64>,
202    ) -> Result<Self, StoreError> {
203        match backend {
204            StorageBackend::Fs => Ok(LocalStore::Fs(FsBlobStore::new(path)?)),
205            #[cfg(feature = "lmdb")]
206            StorageBackend::Lmdb => match _map_size_bytes {
207                Some(map_size_bytes) => {
208                    std::fs::create_dir_all(path.as_ref()).map_err(StoreError::Io)?;
209                    remove_stale_fs_blob_shards(path.as_ref())?;
210                    Ok(LocalStore::Lmdb(LmdbBlobStore::with_max_bytes(
211                        path,
212                        map_size_bytes,
213                    )?))
214                }
215                None => {
216                    std::fs::create_dir_all(path.as_ref()).map_err(StoreError::Io)?;
217                    remove_stale_fs_blob_shards(path.as_ref())?;
218                    Ok(LocalStore::Lmdb(LmdbBlobStore::new(path)?))
219                }
220            },
221            #[cfg(not(feature = "lmdb"))]
222            StorageBackend::Lmdb => {
223                tracing::warn!(
224                    "LMDB backend requested but lmdb feature not enabled, using filesystem storage"
225                );
226                Ok(LocalStore::Fs(FsBlobStore::new(path)?))
227            }
228        }
229    }
230
231    /// Create a new unbounded local store for a specific backend.
232    pub fn new_unbounded<P: AsRef<Path>>(
233        path: P,
234        backend: &StorageBackend,
235    ) -> Result<Self, StoreError> {
236        Self::new_with_lmdb_map_size(path, backend, None)
237    }
238
239    pub fn backend(&self) -> StorageBackend {
240        match self {
241            LocalStore::Fs(_) => StorageBackend::Fs,
242            #[cfg(feature = "lmdb")]
243            LocalStore::Lmdb(_) => StorageBackend::Lmdb,
244        }
245    }
246
247    /// Sync put operation
248    pub fn put_sync(&self, hash: Hash, data: &[u8]) -> Result<bool, StoreError> {
249        match self {
250            LocalStore::Fs(store) => store.put_sync(hash, data),
251            #[cfg(feature = "lmdb")]
252            LocalStore::Lmdb(store) => store.put_sync(hash, data),
253        }
254    }
255
256    /// Sync batch put operation.
257    pub fn put_many_sync(&self, items: &[(Hash, Vec<u8>)]) -> Result<usize, StoreError> {
258        match self {
259            LocalStore::Fs(store) => {
260                let mut inserted = 0usize;
261                for (hash, data) in items {
262                    if store.put_sync(*hash, data.as_slice())? {
263                        inserted += 1;
264                    }
265                }
266                Ok(inserted)
267            }
268            #[cfg(feature = "lmdb")]
269            LocalStore::Lmdb(store) => store.put_many_sync(items),
270        }
271    }
272
273    /// Sync get operation
274    pub fn get_sync(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
275        match self {
276            LocalStore::Fs(store) => store.get_sync(hash),
277            #[cfg(feature = "lmdb")]
278            LocalStore::Lmdb(store) => store.get_sync(hash),
279        }
280    }
281
282    pub fn get_range_sync(
283        &self,
284        hash: &Hash,
285        start: u64,
286        end_inclusive: u64,
287    ) -> Result<Option<Vec<u8>>, StoreError> {
288        match self {
289            LocalStore::Fs(store) => store.get_range_sync(hash, start, end_inclusive),
290            #[cfg(feature = "lmdb")]
291            LocalStore::Lmdb(store) => store.get_range_sync(hash, start, end_inclusive),
292        }
293    }
294
295    pub fn blob_size_sync(&self, hash: &Hash) -> Result<Option<u64>, StoreError> {
296        match self {
297            LocalStore::Fs(store) => store.blob_size_sync(hash),
298            #[cfg(feature = "lmdb")]
299            LocalStore::Lmdb(store) => store.blob_size_sync(hash),
300        }
301    }
302
303    pub fn touch_accessed_sync(&self, hash: &Hash, now: u64) -> Result<bool, StoreError> {
304        match self {
305            LocalStore::Fs(store) => store.touch_accessed_sync(hash, now),
306            #[cfg(feature = "lmdb")]
307            LocalStore::Lmdb(store) => store.touch_accessed_sync(hash, now),
308        }
309    }
310
311    pub fn touch_many_accessed_sync(&self, hashes: &[Hash], now: u64) -> Result<usize, StoreError> {
312        match self {
313            LocalStore::Fs(store) => store.touch_many_accessed_sync(hashes, now),
314            #[cfg(feature = "lmdb")]
315            LocalStore::Lmdb(store) => store.touch_many_accessed_sync(hashes, now),
316        }
317    }
318
319    pub fn last_accessed_at_sync(&self, hash: &Hash) -> Result<Option<u64>, StoreError> {
320        match self {
321            LocalStore::Fs(store) => store.last_accessed_at_sync(hash),
322            #[cfg(feature = "lmdb")]
323            LocalStore::Lmdb(store) => store.last_accessed_at_sync(hash),
324        }
325    }
326
327    pub fn many_last_accessed_at_sync(
328        &self,
329        hashes: &[Hash],
330    ) -> Result<Vec<(Hash, u64)>, StoreError> {
331        match self {
332            LocalStore::Fs(store) => store.many_last_accessed_at_sync(hashes),
333            #[cfg(feature = "lmdb")]
334            LocalStore::Lmdb(store) => store.many_last_accessed_at_sync(hashes),
335        }
336    }
337
338    /// Check if hash exists
339    pub fn exists(&self, hash: &Hash) -> Result<bool, StoreError> {
340        match self {
341            LocalStore::Fs(store) => Ok(store.exists(hash)),
342            #[cfg(feature = "lmdb")]
343            LocalStore::Lmdb(store) => store.exists(hash),
344        }
345    }
346
347    /// Mark which sorted hashes already exist in local storage.
348    pub fn existing_hashes_in_sorted_candidates(
349        &self,
350        sorted_hashes: &[Hash],
351    ) -> Result<Vec<bool>, StoreError> {
352        match self {
353            LocalStore::Fs(store) => Ok(sorted_hashes
354                .iter()
355                .map(|hash| store.exists(hash))
356                .collect()),
357            #[cfg(feature = "lmdb")]
358            LocalStore::Lmdb(store) => store.existing_hashes_in_sorted_candidates(sorted_hashes),
359        }
360    }
361
362    /// Sync delete operation
363    pub fn delete_sync(&self, hash: &Hash) -> Result<bool, StoreError> {
364        match self {
365            LocalStore::Fs(store) => store.delete_sync(hash),
366            #[cfg(feature = "lmdb")]
367            LocalStore::Lmdb(store) => store.delete_sync(hash),
368        }
369    }
370
371    /// Get storage statistics
372    pub fn stats(&self) -> Result<LocalStoreStats, StoreError> {
373        match self {
374            LocalStore::Fs(store) => {
375                let stats = store.stats()?;
376                Ok(LocalStoreStats {
377                    count: stats.count,
378                    total_bytes: stats.total_bytes,
379                })
380            }
381            #[cfg(feature = "lmdb")]
382            LocalStore::Lmdb(store) => {
383                let stats = store.stats()?;
384                Ok(LocalStoreStats {
385                    count: stats.count,
386                    total_bytes: stats.total_bytes,
387                })
388            }
389        }
390    }
391
392    /// List all hashes in the store
393    pub fn list(&self) -> Result<Vec<Hash>, StoreError> {
394        match self {
395            LocalStore::Fs(store) => store.list(),
396            #[cfg(feature = "lmdb")]
397            LocalStore::Lmdb(store) => store.list(),
398        }
399    }
400}
401
402#[async_trait]
403impl Store for LocalStore {
404    async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
405        self.put_sync(hash, &data)
406    }
407
408    async fn put_many(&self, items: Vec<(Hash, Vec<u8>)>) -> Result<usize, StoreError> {
409        self.put_many_sync(&items)
410    }
411
412    async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
413        self.get_sync(hash)
414    }
415
416    async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
417        self.exists(hash)
418    }
419
420    async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
421        self.delete_sync(hash)
422    }
423}
424
425#[cfg(feature = "s3")]
426use tokio::sync::mpsc;
427
428use crate::config::S3Config;
429
430/// Message for background S3 sync
431#[cfg(feature = "s3")]
432enum S3SyncMessage {
433    Upload { hash: Hash, data: Vec<u8> },
434    Delete { hash: Hash },
435}
436
437/// Storage router - local store primary with optional S3 backup
438///
439/// Write path: local first (fast), then queue S3 upload (non-blocking)
440/// Read path: local first, fall back to S3 if miss
441pub struct StorageRouter {
442    /// Primary local store (always used)
443    local: Arc<LocalStore>,
444    /// Optional S3 client for backup
445    #[cfg(feature = "s3")]
446    s3_client: Option<aws_sdk_s3::Client>,
447    #[cfg(feature = "s3")]
448    s3_bucket: Option<String>,
449    #[cfg(feature = "s3")]
450    s3_prefix: String,
451    /// Channel to send uploads to background task
452    #[cfg(feature = "s3")]
453    sync_tx: Option<mpsc::UnboundedSender<S3SyncMessage>>,
454}
455
456impl StorageRouter {
457    #[cfg(feature = "s3")]
458    fn s3_sync_timeout() -> std::time::Duration {
459        let millis = std::env::var(S3_SYNC_TIMEOUT_MS_ENV)
460            .ok()
461            .and_then(|value| value.parse::<u64>().ok())
462            .filter(|value| *value > 0)
463            .unwrap_or(DEFAULT_S3_SYNC_TIMEOUT_MS);
464        std::time::Duration::from_millis(millis)
465    }
466
467    #[cfg(feature = "s3")]
468    fn s3_sync_timeout_error(timeout: std::time::Duration) -> StoreError {
469        StoreError::Other(format!(
470            "S3 sync operation timed out after {}ms",
471            timeout.as_millis()
472        ))
473    }
474
475    #[cfg(feature = "s3")]
476    fn run_s3_future_sync<F, T>(future: F) -> Result<T, StoreError>
477    where
478        F: Future<Output = T> + Send + 'static,
479        T: Send + 'static,
480    {
481        let timeout = Self::s3_sync_timeout();
482        if tokio::runtime::Handle::try_current().is_ok() {
483            return std::thread::Builder::new()
484                .name("storage-s3-sync".to_string())
485                .spawn(move || {
486                    let runtime = tokio::runtime::Builder::new_current_thread()
487                        .enable_all()
488                        .build()
489                        .map_err(|err| {
490                            StoreError::Other(format!("build storage s3 sync runtime: {err}"))
491                        })?;
492                    runtime.block_on(async move {
493                        tokio::time::timeout(timeout, future)
494                            .await
495                            .map_err(|_| Self::s3_sync_timeout_error(timeout))
496                    })
497                })
498                .map_err(|err| StoreError::Other(format!("spawn S3 sync helper thread: {err}")))?
499                .join()
500                .map_err(|_| StoreError::Other("S3 sync helper thread panicked".to_string()))?;
501        }
502
503        let runtime = tokio::runtime::Builder::new_current_thread()
504            .enable_all()
505            .build()
506            .map_err(|err| StoreError::Other(format!("build storage s3 sync runtime: {err}")))?;
507        runtime.block_on(async move {
508            tokio::time::timeout(timeout, future)
509                .await
510                .map_err(|_| Self::s3_sync_timeout_error(timeout))
511        })
512    }
513
514    /// Create router with local storage only
515    pub fn new(local: Arc<LocalStore>) -> Self {
516        Self {
517            local,
518            #[cfg(feature = "s3")]
519            s3_client: None,
520            #[cfg(feature = "s3")]
521            s3_bucket: None,
522            #[cfg(feature = "s3")]
523            s3_prefix: String::new(),
524            #[cfg(feature = "s3")]
525            sync_tx: None,
526        }
527    }
528
529    /// Create router with local storage + S3 backup
530    #[cfg(feature = "s3")]
531    pub async fn with_s3(local: Arc<LocalStore>, config: &S3Config) -> Result<Self, anyhow::Error> {
532        use aws_sdk_s3::Client as S3Client;
533
534        // Build AWS config
535        let mut aws_config_loader = aws_config::from_env();
536        aws_config_loader =
537            aws_config_loader.region(aws_sdk_s3::config::Region::new(config.region.clone()));
538        let aws_config = aws_config_loader.load().await;
539
540        // Build S3 client with custom endpoint
541        let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&aws_config);
542        s3_config_builder = s3_config_builder
543            .endpoint_url(&config.endpoint)
544            .force_path_style(true);
545
546        let s3_client = S3Client::from_conf(s3_config_builder.build());
547        let bucket = config.bucket.clone();
548        let prefix = config.prefix.clone().unwrap_or_default();
549
550        // Create background sync channel
551        let (sync_tx, mut sync_rx) = mpsc::unbounded_channel::<S3SyncMessage>();
552
553        // Spawn background sync task with bounded concurrent uploads
554        let sync_client = s3_client.clone();
555        let sync_bucket = bucket.clone();
556        let sync_prefix = prefix.clone();
557
558        tokio::spawn(async move {
559            use aws_sdk_s3::primitives::ByteStream;
560
561            tracing::info!("S3 background sync task started");
562
563            // Keep S3 writes parallel, but avoid dispatch failures during mirror backfill bursts.
564            let semaphore = std::sync::Arc::new(tokio::sync::Semaphore::new(8));
565            let client = std::sync::Arc::new(sync_client);
566            let bucket = std::sync::Arc::new(sync_bucket);
567            let prefix = std::sync::Arc::new(sync_prefix);
568
569            while let Some(msg) = sync_rx.recv().await {
570                let client = client.clone();
571                let bucket = bucket.clone();
572                let prefix = prefix.clone();
573                let semaphore = semaphore.clone();
574
575                // Spawn each upload with semaphore-bounded concurrency
576                tokio::spawn(async move {
577                    // Acquire permit before uploading
578                    let _permit = semaphore.acquire().await;
579
580                    match msg {
581                        S3SyncMessage::Upload { hash, data } => {
582                            let key = format!("{}{}.bin", prefix, to_hex(&hash));
583                            tracing::debug!("S3 uploading {} ({} bytes)", &key, data.len());
584
585                            let mut attempt = 1u8;
586                            loop {
587                                match client
588                                    .put_object()
589                                    .bucket(bucket.as_str())
590                                    .key(&key)
591                                    .body(ByteStream::from(data.clone()))
592                                    .send()
593                                    .await
594                                {
595                                    Ok(_) => {
596                                        tracing::debug!("S3 upload succeeded: {}", &key);
597                                        break;
598                                    }
599                                    Err(e) if attempt < 3 => {
600                                        tracing::warn!(
601                                            "S3 upload retrying {}: attempt={} error={}",
602                                            &key,
603                                            attempt,
604                                            e
605                                        );
606                                        tokio::time::sleep(std::time::Duration::from_millis(
607                                            250 * u64::from(attempt),
608                                        ))
609                                        .await;
610                                        attempt += 1;
611                                    }
612                                    Err(e) => {
613                                        tracing::error!(
614                                            "S3 upload failed {} after {} attempts: {}",
615                                            &key,
616                                            attempt,
617                                            e
618                                        );
619                                        break;
620                                    }
621                                }
622                            }
623                        }
624                        S3SyncMessage::Delete { hash } => {
625                            let key = format!("{}{}.bin", prefix, to_hex(&hash));
626                            tracing::debug!("S3 deleting {}", &key);
627
628                            let mut attempt = 1u8;
629                            loop {
630                                match client
631                                    .delete_object()
632                                    .bucket(bucket.as_str())
633                                    .key(&key)
634                                    .send()
635                                    .await
636                                {
637                                    Ok(_) => break,
638                                    Err(e) if attempt < 3 => {
639                                        tracing::warn!(
640                                            "S3 delete retrying {}: attempt={} error={}",
641                                            &key,
642                                            attempt,
643                                            e
644                                        );
645                                        tokio::time::sleep(std::time::Duration::from_millis(
646                                            250 * u64::from(attempt),
647                                        ))
648                                        .await;
649                                        attempt += 1;
650                                    }
651                                    Err(e) => {
652                                        tracing::error!(
653                                            "S3 delete failed {} after {} attempts: {}",
654                                            &key,
655                                            attempt,
656                                            e
657                                        );
658                                        break;
659                                    }
660                                }
661                            }
662                        }
663                    }
664                });
665            }
666        });
667
668        tracing::info!(
669            "S3 storage initialized: bucket={}, prefix={}",
670            bucket,
671            prefix
672        );
673
674        Ok(Self {
675            local,
676            s3_client: Some(s3_client),
677            s3_bucket: Some(bucket),
678            s3_prefix: prefix,
679            sync_tx: Some(sync_tx),
680        })
681    }
682
683    /// Store data - writes to LMDB, queues S3 upload in background
684    pub fn put_sync(&self, hash: Hash, data: &[u8]) -> Result<bool, StoreError> {
685        // Always write to local first
686        let is_new = self.local.put_sync(hash, data)?;
687
688        // Queue S3 upload only for newly inserted blobs.
689        // Existing local blobs were already persisted or are handled by explicit repair/push flows.
690        #[cfg(feature = "s3")]
691        if is_new {
692            if let Some(ref tx) = self.sync_tx {
693                tracing::debug!(
694                    "Queueing S3 upload for {} ({} bytes)",
695                    crate::storage::to_hex(&hash)[..16].to_string(),
696                    data.len(),
697                );
698                if let Err(e) = tx.send(S3SyncMessage::Upload {
699                    hash,
700                    data: data.to_vec(),
701                }) {
702                    tracing::error!("Failed to queue S3 upload: {}", e);
703                }
704            }
705        }
706
707        Ok(is_new)
708    }
709
710    /// Store multiple blobs with a single local batch write when supported.
711    pub fn put_many_sync(&self, items: &[(Hash, Vec<u8>)]) -> Result<usize, StoreError> {
712        #[cfg(feature = "s3")]
713        let pending_uploads = if self.sync_tx.is_some() {
714            let mut pending = Vec::new();
715            for (hash, data) in items {
716                if !self.local.exists(hash)? {
717                    pending.push((*hash, data.clone()));
718                }
719            }
720            pending
721        } else {
722            Vec::new()
723        };
724
725        let inserted = self.local.put_many_sync(items)?;
726
727        #[cfg(feature = "s3")]
728        if let Some(ref tx) = self.sync_tx {
729            for (hash, data) in pending_uploads {
730                if let Err(e) = tx.send(S3SyncMessage::Upload { hash, data }) {
731                    tracing::error!("Failed to queue S3 upload: {}", e);
732                }
733            }
734        }
735
736        Ok(inserted)
737    }
738
739    /// Get data - tries LMDB first, falls back to S3
740    pub fn get_sync(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
741        // Try local first
742        if let Some(data) = self.local.get_sync(hash)? {
743            return Ok(Some(data));
744        }
745
746        // Fall back to S3 if configured
747        #[cfg(feature = "s3")]
748        if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
749            let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
750            let client = client.clone();
751            let bucket = bucket.clone();
752
753            match Self::run_s3_future_sync(async move {
754                client.get_object().bucket(bucket).key(key).send().await
755            }) {
756                Ok(Ok(output)) => {
757                    match Self::run_s3_future_sync(async move { output.body.collect().await }) {
758                        Ok(Ok(body)) => {
759                            let data = body.into_bytes().to_vec();
760                            // Cache locally for future reads
761                            let _ = self.local.put_sync(*hash, &data);
762                            return Ok(Some(data));
763                        }
764                        Ok(Err(err)) => {
765                            tracing::warn!("S3 body collect failed: {}", err);
766                        }
767                        Err(err) => {
768                            tracing::warn!("S3 body collect runtime failed: {}", err);
769                        }
770                    }
771                }
772                Ok(Err(err)) => {
773                    let service_err = err.into_service_error();
774                    if !service_err.is_no_such_key() {
775                        tracing::warn!("S3 get failed: {}", service_err);
776                    }
777                }
778                Err(err) => {
779                    tracing::warn!("S3 get runtime failed: {}", err);
780                }
781            }
782        }
783
784        Ok(None)
785    }
786
787    pub fn get_range_sync(
788        &self,
789        hash: &Hash,
790        start: u64,
791        end_inclusive: u64,
792    ) -> Result<Option<Vec<u8>>, StoreError> {
793        self.local.get_range_sync(hash, start, end_inclusive)
794    }
795
796    pub fn blob_size_sync(&self, hash: &Hash) -> Result<Option<u64>, StoreError> {
797        self.local.blob_size_sync(hash)
798    }
799
800    pub fn touch_accessed_sync(&self, hash: &Hash, now: u64) -> Result<bool, StoreError> {
801        self.local.touch_accessed_sync(hash, now)
802    }
803
804    pub fn touch_many_accessed_sync(&self, hashes: &[Hash], now: u64) -> Result<usize, StoreError> {
805        self.local.touch_many_accessed_sync(hashes, now)
806    }
807
808    pub fn last_accessed_at_sync(&self, hash: &Hash) -> Result<Option<u64>, StoreError> {
809        self.local.last_accessed_at_sync(hash)
810    }
811
812    pub fn many_last_accessed_at_sync(
813        &self,
814        hashes: &[Hash],
815    ) -> Result<Vec<(Hash, u64)>, StoreError> {
816        self.local.many_last_accessed_at_sync(hashes)
817    }
818
819    /// Check if hash exists
820    pub fn exists(&self, hash: &Hash) -> Result<bool, StoreError> {
821        // Check local first
822        if self.local.exists(hash)? {
823            return Ok(true);
824        }
825
826        // Check S3 if configured
827        #[cfg(feature = "s3")]
828        if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
829            let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
830            let client = client.clone();
831            let bucket = bucket.clone();
832
833            match Self::run_s3_future_sync(async move {
834                client.head_object().bucket(bucket).key(&key).send().await
835            }) {
836                Ok(Ok(_)) => return Ok(true),
837                Ok(Err(err)) => {
838                    let service_err = err.into_service_error();
839                    if !service_err.is_not_found() {
840                        tracing::warn!("S3 head failed: {}", service_err);
841                    }
842                }
843                Err(err) => {
844                    tracing::warn!("S3 head runtime failed: {}", err);
845                }
846            }
847        }
848
849        Ok(false)
850    }
851
852    /// Delete data from both local and S3 stores
853    pub fn delete_sync(&self, hash: &Hash) -> Result<bool, StoreError> {
854        let deleted = self.local.delete_sync(hash)?;
855
856        // Queue S3 delete if configured
857        #[cfg(feature = "s3")]
858        if let Some(ref tx) = self.sync_tx {
859            let _ = tx.send(S3SyncMessage::Delete { hash: *hash });
860        }
861
862        Ok(deleted)
863    }
864
865    /// Delete data from local store only (don't propagate to S3)
866    /// Used for eviction where we want to keep S3 as archive
867    pub fn delete_local_only(&self, hash: &Hash) -> Result<bool, StoreError> {
868        self.local.delete_sync(hash)
869    }
870
871    /// Get stats from local store
872    pub fn stats(&self) -> Result<LocalStoreStats, StoreError> {
873        self.local.stats()
874    }
875
876    /// List all hashes from local store
877    pub fn list(&self) -> Result<Vec<Hash>, StoreError> {
878        self.local.list()
879    }
880
881    /// Get the underlying local store for HashTree operations
882    pub fn local_store(&self) -> Arc<LocalStore> {
883        Arc::clone(&self.local)
884    }
885}
886
887#[derive(Clone)]
888struct AccessRecordingStore {
889    inner: Arc<StorageRouter>,
890    accessed: Arc<Mutex<HashSet<Hash>>>,
891}
892
893impl AccessRecordingStore {
894    fn new(inner: Arc<StorageRouter>) -> Self {
895        Self {
896            inner,
897            accessed: Arc::new(Mutex::new(HashSet::new())),
898        }
899    }
900
901    fn take_accessed_hashes(&self) -> Vec<Hash> {
902        let Ok(mut accessed) = self.accessed.lock() else {
903            return Vec::new();
904        };
905        accessed.drain().collect()
906    }
907
908    fn record_access(&self, hash: &Hash) {
909        let Ok(mut accessed) = self.accessed.lock() else {
910            return;
911        };
912        accessed.insert(*hash);
913    }
914}
915
916#[async_trait]
917impl Store for AccessRecordingStore {
918    async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
919        self.inner.put(hash, data).await
920    }
921
922    async fn put_many(&self, items: Vec<(Hash, Vec<u8>)>) -> Result<usize, StoreError> {
923        self.inner.put_many(items).await
924    }
925
926    async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
927        let data = self.inner.get(hash).await?;
928        if data.is_some() {
929            self.record_access(hash);
930        }
931        Ok(data)
932    }
933
934    async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
935        self.inner.has(hash).await
936    }
937
938    async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
939        self.inner.delete(hash).await
940    }
941}
942
943// Implement async Store trait for StorageRouter so it can be used directly with HashTree
944// This ensures all writes go through S3 sync
945#[async_trait]
946impl Store for StorageRouter {
947    async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
948        self.put_sync(hash, &data)
949    }
950
951    async fn put_many(&self, items: Vec<(Hash, Vec<u8>)>) -> Result<usize, StoreError> {
952        self.put_many_sync(&items)
953    }
954
955    async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
956        self.get_sync(hash)
957    }
958
959    async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
960        self.exists(hash)
961    }
962
963    async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
964        self.delete_sync(hash)
965    }
966}
967
968pub struct HashtreeStore {
969    base_path: PathBuf,
970    env: heed::Env,
971    /// Set of pinned hashes (32-byte raw hashes, prevents garbage collection)
972    pins: Database<Bytes, Unit>,
973    /// Mutable published refs that should stay subscribed and keep following updates
974    pinned_refs: Database<Str, Unit>,
975    /// Authors whose hashtree publications should be mirrored continuously
976    tracked_authors: Database<Str, Unit>,
977    /// Blob ownership: sha256 (32 bytes) ++ pubkey (32 bytes) -> () (composite key for multi-owner)
978    blob_owners: Database<Bytes, Unit>,
979    /// Maps pubkey (32 bytes) -> blob metadata JSON (for blossom list)
980    pubkey_blobs: Database<Bytes, Bytes>,
981    /// Tree metadata for eviction: tree_root_hash (32 bytes) -> TreeMeta (msgpack)
982    tree_meta: Database<Bytes, Bytes>,
983    /// Blob-to-tree mapping: blob_hash ++ tree_hash (64 bytes) -> ()
984    blob_trees: Database<Bytes, Unit>,
985    /// Tree refs: "npub/path" -> tree_root_hash (32 bytes) - for replacing old versions
986    tree_refs: Database<Str, Bytes>,
987    /// Cached roots from Nostr: "pubkey_hex/tree_name" -> CachedRoot (msgpack)
988    cached_roots: Database<Str, Bytes>,
989    /// Storage router - handles LMDB + optional S3 (Arc for sharing with HashTree)
990    router: Arc<StorageRouter>,
991    /// Maximum storage size in bytes (from config)
992    max_size_bytes: u64,
993    /// Whether quota enforcement may delete local blobs not tracked by any indexed tree.
994    evict_orphans: bool,
995    /// Best-effort in-memory throttle for blob access metadata writes.
996    blob_access_update_gate: BlobAccessUpdateGate,
997    /// Keeps access-time maintenance out of foreground blob reads.
998    blob_access_update_inflight: Arc<AtomicBool>,
999}
1000
1001impl HashtreeStore {
1002    /// Create a new store with the configured local storage limit.
1003    pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
1004        let config = hashtree_config::Config::load_or_default();
1005        let max_size_bytes = config
1006            .storage
1007            .max_size_gb
1008            .saturating_mul(1024 * 1024 * 1024);
1009        Self::with_options_and_backend(
1010            path,
1011            None,
1012            max_size_bytes,
1013            config.storage.evict_orphans,
1014            &config.storage.backend,
1015        )
1016    }
1017
1018    /// Create a new store with an explicit local backend and size limit.
1019    pub fn new_with_backend<P: AsRef<Path>>(
1020        path: P,
1021        backend: hashtree_config::StorageBackend,
1022        max_size_bytes: u64,
1023    ) -> Result<Self> {
1024        Self::with_options_and_backend(path, None, max_size_bytes, true, &backend)
1025    }
1026
1027    /// Create a new store with optional S3 backend and the configured local storage limit.
1028    pub fn with_s3<P: AsRef<Path>>(path: P, s3_config: Option<&S3Config>) -> Result<Self> {
1029        let config = hashtree_config::Config::load_or_default();
1030        let max_size_bytes = config
1031            .storage
1032            .max_size_gb
1033            .saturating_mul(1024 * 1024 * 1024);
1034        Self::with_options_and_backend(
1035            path,
1036            s3_config,
1037            max_size_bytes,
1038            config.storage.evict_orphans,
1039            &config.storage.backend,
1040        )
1041    }
1042
1043    /// Create a new store with optional S3 backend and custom size limit.
1044    ///
1045    /// The raw local blob backend remains unbounded. `HashtreeStore` enforces
1046    /// `max_size_bytes` at the tree-management layer so eviction can honor pins,
1047    /// orphan handling, and local-only eviction when S3 is used as archive.
1048    pub fn with_options<P: AsRef<Path>>(
1049        path: P,
1050        s3_config: Option<&S3Config>,
1051        max_size_bytes: u64,
1052    ) -> Result<Self> {
1053        let config = hashtree_config::Config::load_or_default();
1054        Self::with_options_and_backend(
1055            path,
1056            s3_config,
1057            max_size_bytes,
1058            config.storage.evict_orphans,
1059            &config.storage.backend,
1060        )
1061    }
1062
1063    pub fn with_options_and_backend<P: AsRef<Path>>(
1064        path: P,
1065        s3_config: Option<&S3Config>,
1066        max_size_bytes: u64,
1067        evict_orphans: bool,
1068        backend: &hashtree_config::StorageBackend,
1069    ) -> Result<Self> {
1070        let path = path.as_ref();
1071        std::fs::create_dir_all(path)?;
1072        let metadata_map_size = lmdb_map_size_for_existing_env(
1073            path,
1074            lmdb_metadata_map_size_for_storage_budget(max_size_bytes),
1075        )?;
1076
1077        let env = unsafe {
1078            EnvOpenOptions::new()
1079                .map_size(metadata_map_size)
1080                .max_dbs(10) // pins, pinned_refs, tracked_authors, blob_owners, pubkey_blobs, tree_meta, blob_trees, tree_refs, cached_roots, blobs
1081                .max_readers(LMDB_MAX_READERS)
1082                .open(path)?
1083        };
1084        let _ = env.clear_stale_readers();
1085        if env.info().map_size < metadata_map_size {
1086            unsafe { env.resize(metadata_map_size) }?;
1087        }
1088
1089        let mut wtxn = env.write_txn()?;
1090        let pins = env.create_database(&mut wtxn, Some("pins"))?;
1091        let pinned_refs = env.create_database(&mut wtxn, Some("pinned_refs"))?;
1092        let tracked_authors = env.create_database(&mut wtxn, Some("tracked_authors"))?;
1093        let blob_owners = env.create_database(&mut wtxn, Some("blob_owners"))?;
1094        let pubkey_blobs = env.create_database(&mut wtxn, Some("pubkey_blobs"))?;
1095        let tree_meta = env.create_database(&mut wtxn, Some("tree_meta"))?;
1096        let blob_trees = env.create_database(&mut wtxn, Some("blob_trees"))?;
1097        let tree_refs = env.create_database(&mut wtxn, Some("tree_refs"))?;
1098        let cached_roots = env.create_database(&mut wtxn, Some("cached_roots"))?;
1099        wtxn.commit()?;
1100
1101        // Intentionally keep the raw blob backend unbounded here. HashtreeStore
1102        // owns quota policy above this layer, where it can coordinate eviction
1103        // with tree refs, blob ownership, pins, and S3 archival behavior.
1104        let local_store = Arc::new(match backend {
1105            hashtree_config::StorageBackend::Fs => LocalStore::Fs(
1106                FsBlobStore::new(path.join("blobs"))
1107                    .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
1108            ),
1109            #[cfg(feature = "lmdb")]
1110            hashtree_config::StorageBackend::Lmdb => {
1111                std::fs::create_dir_all(path.join("blobs"))?;
1112                remove_stale_fs_blob_shards(&path.join("blobs"))
1113                    .map_err(|e| anyhow::anyhow!("Failed to clean LMDB blob store path: {}", e))?;
1114                let requested_map_size = max_size_bytes.max(LMDB_BLOB_MIN_MAP_SIZE_BYTES);
1115                let map_size = usize::try_from(requested_map_size)
1116                    .context("LMDB blob map size exceeds usize")?;
1117                LocalStore::Lmdb(
1118                    LmdbBlobStore::with_map_size(path.join("blobs"), map_size)
1119                        .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
1120                )
1121            }
1122            #[cfg(not(feature = "lmdb"))]
1123            hashtree_config::StorageBackend::Lmdb => {
1124                tracing::warn!(
1125                    "LMDB backend requested but lmdb feature not enabled, using filesystem storage"
1126                );
1127                LocalStore::Fs(
1128                    FsBlobStore::new(path.join("blobs"))
1129                        .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
1130                )
1131            }
1132        });
1133
1134        // Create storage router with optional S3
1135        #[cfg(feature = "s3")]
1136        let router = Arc::new(if let Some(s3_cfg) = s3_config {
1137            tracing::info!(
1138                "Initializing S3 storage backend: bucket={}, endpoint={}",
1139                s3_cfg.bucket,
1140                s3_cfg.endpoint
1141            );
1142
1143            sync_block_on(async { StorageRouter::with_s3(local_store, s3_cfg).await })?
1144        } else {
1145            StorageRouter::new(local_store)
1146        });
1147
1148        #[cfg(not(feature = "s3"))]
1149        let router = Arc::new({
1150            if s3_config.is_some() {
1151                tracing::warn!(
1152                    "S3 config provided but S3 feature not enabled. Using local storage only."
1153                );
1154            }
1155            StorageRouter::new(local_store)
1156        });
1157
1158        Ok(Self {
1159            base_path: path.to_path_buf(),
1160            env,
1161            pins,
1162            pinned_refs,
1163            tracked_authors,
1164            blob_owners,
1165            pubkey_blobs,
1166            tree_meta,
1167            blob_trees,
1168            tree_refs,
1169            cached_roots,
1170            router,
1171            max_size_bytes,
1172            evict_orphans,
1173            blob_access_update_gate: BlobAccessUpdateGate::default(),
1174            blob_access_update_inflight: Arc::new(AtomicBool::new(false)),
1175        })
1176    }
1177
1178    pub fn base_path(&self) -> &Path {
1179        &self.base_path
1180    }
1181
1182    /// Get the storage router
1183    pub fn router(&self) -> &StorageRouter {
1184        &self.router
1185    }
1186
1187    /// Get the storage router as Arc (for use with HashTree which needs Arc<dyn Store>)
1188    /// All writes through this go to both LMDB and S3
1189    pub fn store_arc(&self) -> Arc<StorageRouter> {
1190        Arc::clone(&self.router)
1191    }
1192
1193    fn access_tracking_tree(&self) -> (HashTree<AccessRecordingStore>, AccessRecordingStore) {
1194        let access_store = AccessRecordingStore::new(self.store_arc());
1195        let tree = HashTree::new(HashTreeConfig::new(Arc::new(access_store.clone())).public());
1196        (tree, access_store)
1197    }
1198
1199    pub fn record_blob_accesses<I>(&self, hashes: I)
1200    where
1201        I: IntoIterator<Item = Hash>,
1202    {
1203        let now = unix_timestamp_now();
1204        let mut due_hashes = self.blob_access_update_gate.due_hashes(hashes, now);
1205        if due_hashes.is_empty() {
1206            return;
1207        }
1208
1209        if self
1210            .blob_access_update_inflight
1211            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
1212            .is_err()
1213        {
1214            return;
1215        }
1216
1217        if due_hashes.len() > ACCESS_UPDATE_BACKGROUND_BATCH_LIMIT {
1218            due_hashes.truncate(ACCESS_UPDATE_BACKGROUND_BATCH_LIMIT);
1219        }
1220
1221        let router = Arc::clone(&self.router);
1222        let inflight = Arc::clone(&self.blob_access_update_inflight);
1223        let spawn_result = std::thread::Builder::new()
1224            .name("blob-access-update".to_string())
1225            .spawn(move || {
1226                if let Err(err) = router.touch_many_accessed_sync(&due_hashes, now) {
1227                    tracing::debug!("Failed to update blob access metadata: {}", err);
1228                }
1229                inflight.store(false, Ordering::Release);
1230            });
1231        if let Err(err) = spawn_result {
1232            self.blob_access_update_inflight
1233                .store(false, Ordering::Release);
1234            tracing::debug!("Failed to spawn blob access metadata updater: {}", err);
1235        }
1236    }
1237
1238    fn record_blob_access_now(&self, hash: &Hash) {
1239        if let Err(err) = self.router.touch_accessed_sync(hash, unix_timestamp_now()) {
1240            tracing::debug!("Failed to update blob access metadata: {}", err);
1241        }
1242    }
1243
1244    pub fn blob_last_accessed_at(&self, hash: &Hash) -> Result<Option<u64>> {
1245        self.router
1246            .last_accessed_at_sync(hash)
1247            .map_err(|e| anyhow::anyhow!("Failed to read blob access metadata: {}", e))
1248    }
1249
1250    pub fn blob_last_accessed_many(&self, hashes: &[Hash]) -> Result<Vec<(Hash, u64)>> {
1251        self.router
1252            .many_last_accessed_at_sync(hashes)
1253            .map_err(|e| anyhow::anyhow!("Failed to read blob access metadata: {}", e))
1254    }
1255
1256    /// Get tree node by hash (raw bytes)
1257    pub fn get_tree_node(&self, hash: &[u8; 32]) -> Result<Option<TreeNode>> {
1258        let (tree, access_store) = self.access_tracking_tree();
1259
1260        let result = sync_block_on(async {
1261            tree.get_tree_node(hash)
1262                .await
1263                .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))
1264        })?;
1265        if result.is_some() {
1266            self.record_blob_accesses(access_store.take_accessed_hashes());
1267        }
1268        Ok(result)
1269    }
1270
1271    /// Store a raw blob, returns SHA256 hash as hex.
1272    pub fn put_blob(&self, data: &[u8]) -> Result<String> {
1273        let hash = sha256(data);
1274        let inserted = self
1275            .router
1276            .put_sync(hash, data)
1277            .map_err(|e| anyhow::anyhow!("Failed to store blob: {}", e))?;
1278        if !inserted {
1279            self.record_blob_access_now(&hash);
1280        }
1281        Ok(to_hex(&hash))
1282    }
1283
1284    /// Store an owned Blossom blob under the configured durable storage limit.
1285    pub fn put_owned_blob(&self, data: &[u8], pubkey: &[u8; 32]) -> Result<String> {
1286        let hash = sha256(data);
1287        if !self
1288            .router
1289            .exists(&hash)
1290            .map_err(|e| anyhow::anyhow!("Failed to check blob: {}", e))?
1291        {
1292            self.make_room_for_durable_blob(data.len() as u64)?;
1293            self.router
1294                .put_sync(hash, data)
1295                .map_err(|e| anyhow::anyhow!("Failed to store blob: {}", e))?;
1296        } else {
1297            self.record_blob_access_now(&hash);
1298        }
1299        self.set_blob_owner(&hash, pubkey)?;
1300        Ok(to_hex(&hash))
1301    }
1302
1303    /// Store an opportunistically cached blob.
1304    ///
1305    /// Unlike durable `put_blob` writes, this path may evict disposable orphaned
1306    /// blobs to make room under storage pressure. It intentionally avoids touching
1307    /// indexed trees, social-graph roots, explicit pins, and owned Blossom blobs.
1308    pub fn put_cached_blob(&self, data: &[u8]) -> Result<String> {
1309        let hash = sha256(data);
1310        if self
1311            .router
1312            .exists(&hash)
1313            .map_err(|e| anyhow::anyhow!("Failed to check cached blob: {}", e))?
1314        {
1315            self.record_blob_access_now(&hash);
1316            return Ok(to_hex(&hash));
1317        }
1318
1319        let incoming_bytes = data.len() as u64;
1320        let _ = self.make_room_for_cached_blob(incoming_bytes);
1321
1322        let mut retried_after_cleanup = false;
1323        loop {
1324            match self.router.put_sync(hash, data) {
1325                Ok(_) => return Ok(to_hex(&hash)),
1326                Err(err) if !retried_after_cleanup && is_map_full_store_error(&err) => {
1327                    let freed = self.relieve_cached_blob_write_pressure(incoming_bytes)?;
1328                    if freed == 0 {
1329                        return Err(anyhow::anyhow!("Failed to store cached blob: {}", err));
1330                    }
1331                    retried_after_cleanup = true;
1332                }
1333                Err(err) => return Err(anyhow::anyhow!("Failed to store cached blob: {}", err)),
1334            }
1335        }
1336    }
1337
1338    /// Get a raw blob by SHA256 hash (raw bytes).
1339    pub fn get_blob(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
1340        let data = self
1341            .router
1342            .get_sync(hash)
1343            .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))?;
1344        if data.is_some() {
1345            self.record_blob_accesses(std::iter::once(*hash));
1346        }
1347        Ok(data)
1348    }
1349
1350    pub fn get_blob_range(
1351        &self,
1352        hash: &[u8; 32],
1353        start: u64,
1354        end_inclusive: u64,
1355    ) -> Result<Option<Vec<u8>>> {
1356        let data = self
1357            .router
1358            .get_range_sync(hash, start, end_inclusive)
1359            .map_err(|e| anyhow::anyhow!("Failed to get blob range: {}", e))?;
1360        if data.is_some() {
1361            self.record_blob_accesses(std::iter::once(*hash));
1362        }
1363        Ok(data)
1364    }
1365
1366    pub fn blob_size(&self, hash: &[u8; 32]) -> Result<Option<u64>> {
1367        self.router
1368            .blob_size_sync(hash)
1369            .map_err(|e| anyhow::anyhow!("Failed to get blob size: {}", e))
1370    }
1371
1372    /// Check if a blob exists by SHA256 hash (raw bytes).
1373    pub fn blob_exists(&self, hash: &[u8; 32]) -> Result<bool> {
1374        self.router
1375            .exists(hash)
1376            .map_err(|e| anyhow::anyhow!("Failed to check blob: {}", e))
1377    }
1378
1379    // === Blossom ownership tracking ===
1380    // Uses composite key: sha256 (32 bytes) ++ pubkey (32 bytes) -> ()
1381    // This allows efficient multi-owner tracking with O(1) lookups
1382
1383    /// Build composite key for blob_owners: sha256 ++ pubkey (64 bytes total)
1384    fn blob_owner_key(sha256: &[u8; 32], pubkey: &[u8; 32]) -> [u8; 64] {
1385        let mut key = [0u8; 64];
1386        key[..32].copy_from_slice(sha256);
1387        key[32..].copy_from_slice(pubkey);
1388        key
1389    }
1390
1391    /// Add an owner (pubkey) to a blob for Blossom protocol
1392    /// Multiple users can own the same blob - it's only deleted when all owners remove it
1393    pub fn set_blob_owner(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<()> {
1394        let key = Self::blob_owner_key(sha256, pubkey);
1395        let mut wtxn = self.env.write_txn()?;
1396
1397        // Add ownership entry (idempotent - put overwrites)
1398        self.blob_owners.put(&mut wtxn, &key[..], &())?;
1399
1400        // Convert sha256 to hex for BlobMetadata (which stores sha256 as hex string)
1401        let sha256_hex = to_hex(sha256);
1402
1403        // Get existing blobs for this pubkey (for /list endpoint)
1404        let mut blobs: Vec<BlobMetadata> = self
1405            .pubkey_blobs
1406            .get(&wtxn, pubkey)?
1407            .and_then(|b| serde_json::from_slice(b).ok())
1408            .unwrap_or_default();
1409
1410        // Check if blob already exists for this pubkey
1411        if !blobs.iter().any(|b| b.sha256 == sha256_hex) {
1412            let now = SystemTime::now()
1413                .duration_since(UNIX_EPOCH)
1414                .unwrap()
1415                .as_secs();
1416
1417            // Only metadata is needed here. Reading the full blob body makes
1418            // duplicate Blossom uploads compete with normal content serving.
1419            let size = self
1420                .router
1421                .blob_size_sync(sha256)
1422                .map_err(|e| anyhow::anyhow!("Failed to get blob size: {}", e))?
1423                .unwrap_or(0);
1424
1425            blobs.push(BlobMetadata {
1426                sha256: sha256_hex,
1427                size,
1428                mime_type: "application/octet-stream".to_string(),
1429                uploaded: now,
1430            });
1431
1432            let blobs_json = serde_json::to_vec(&blobs)?;
1433            self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
1434        }
1435
1436        wtxn.commit()?;
1437        Ok(())
1438    }
1439
1440    /// Check if a pubkey owns a blob
1441    pub fn is_blob_owner(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<bool> {
1442        let key = Self::blob_owner_key(sha256, pubkey);
1443        let rtxn = self.env.read_txn()?;
1444        Ok(self.blob_owners.get(&rtxn, &key[..])?.is_some())
1445    }
1446
1447    /// Get all owners (pubkeys) of a blob via prefix scan (returns raw bytes)
1448    pub fn get_blob_owners(&self, sha256: &[u8; 32]) -> Result<Vec<[u8; 32]>> {
1449        let rtxn = self.env.read_txn()?;
1450
1451        let mut owners = Vec::new();
1452        for item in self.blob_owners.prefix_iter(&rtxn, &sha256[..])? {
1453            let (key, _) = item?;
1454            if key.len() == 64 {
1455                // Extract pubkey from composite key (bytes 32-64)
1456                let mut pubkey = [0u8; 32];
1457                pubkey.copy_from_slice(&key[32..64]);
1458                owners.push(pubkey);
1459            }
1460        }
1461        Ok(owners)
1462    }
1463
1464    /// Check if blob has any owners
1465    pub fn blob_has_owners(&self, sha256: &[u8; 32]) -> Result<bool> {
1466        let rtxn = self.env.read_txn()?;
1467
1468        // Just check if any entry exists with this prefix
1469        for item in self.blob_owners.prefix_iter(&rtxn, &sha256[..])? {
1470            if item.is_ok() {
1471                return Ok(true);
1472            }
1473        }
1474        Ok(false)
1475    }
1476
1477    /// Get the first owner (pubkey) of a blob (for backwards compatibility)
1478    pub fn get_blob_owner(&self, sha256: &[u8; 32]) -> Result<Option<[u8; 32]>> {
1479        Ok(self.get_blob_owners(sha256)?.into_iter().next())
1480    }
1481
1482    /// Remove a user's ownership of a blossom blob
1483    /// Only deletes the actual blob when no owners remain
1484    /// Returns true if the blob was actually deleted (no owners left)
1485    pub fn delete_blossom_blob(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<bool> {
1486        let key = Self::blob_owner_key(sha256, pubkey);
1487        let mut wtxn = self.env.write_txn()?;
1488
1489        // Remove this pubkey's ownership entry
1490        self.blob_owners.delete(&mut wtxn, &key[..])?;
1491
1492        // Hex strings for logging and BlobMetadata (which stores sha256 as hex string)
1493        let sha256_hex = to_hex(sha256);
1494
1495        // Remove from pubkey's blob list
1496        if let Some(blobs_bytes) = self.pubkey_blobs.get(&wtxn, pubkey)? {
1497            if let Ok(mut blobs) = serde_json::from_slice::<Vec<BlobMetadata>>(blobs_bytes) {
1498                blobs.retain(|b| b.sha256 != sha256_hex);
1499                let blobs_json = serde_json::to_vec(&blobs)?;
1500                self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
1501            }
1502        }
1503
1504        // Check if any other owners remain (prefix scan)
1505        let mut has_other_owners = false;
1506        for item in self.blob_owners.prefix_iter(&wtxn, &sha256[..])? {
1507            if item.is_ok() {
1508                has_other_owners = true;
1509                break;
1510            }
1511        }
1512
1513        if has_other_owners {
1514            wtxn.commit()?;
1515            tracing::debug!(
1516                "Removed {} from blob {} owners, other owners remain",
1517                &to_hex(pubkey)[..8],
1518                &sha256_hex[..8]
1519            );
1520            return Ok(false);
1521        }
1522
1523        // No owners left - delete the blob completely
1524        tracing::info!(
1525            "All owners removed from blob {}, deleting",
1526            &sha256_hex[..8]
1527        );
1528
1529        // Delete raw blob (by content hash) - this deletes from S3 too
1530        let _ = self.router.delete_sync(sha256);
1531
1532        wtxn.commit()?;
1533        Ok(true)
1534    }
1535
1536    /// List all blobs owned by a pubkey (for Blossom /list endpoint)
1537    pub fn list_blobs_by_pubkey(
1538        &self,
1539        pubkey: &[u8; 32],
1540    ) -> Result<Vec<crate::server::blossom::BlobDescriptor>> {
1541        let rtxn = self.env.read_txn()?;
1542
1543        let blobs: Vec<BlobMetadata> = self
1544            .pubkey_blobs
1545            .get(&rtxn, pubkey)?
1546            .and_then(|b| serde_json::from_slice(b).ok())
1547            .unwrap_or_default();
1548
1549        Ok(blobs
1550            .into_iter()
1551            .map(|b| crate::server::blossom::BlobDescriptor {
1552                url: format!("/{}", b.sha256),
1553                sha256: b.sha256,
1554                size: b.size,
1555                mime_type: b.mime_type,
1556                uploaded: b.uploaded,
1557            })
1558            .collect())
1559    }
1560
1561    /// Get a single chunk/blob by hash (raw bytes)
1562    pub fn get_chunk(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
1563        let data = self
1564            .router
1565            .get_sync(hash)
1566            .map_err(|e| anyhow::anyhow!("Failed to get chunk: {}", e))?;
1567        if data.is_some() {
1568            self.record_blob_accesses(std::iter::once(*hash));
1569        }
1570        Ok(data)
1571    }
1572
1573    /// Get file content by hash (raw bytes)
1574    /// Returns raw bytes (caller handles decryption if needed)
1575    pub fn get_file(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
1576        let (tree, access_store) = self.access_tracking_tree();
1577
1578        let result = sync_block_on(async {
1579            tree.read_file(hash)
1580                .await
1581                .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
1582        })?;
1583        if result.is_some() {
1584            self.record_blob_accesses(access_store.take_accessed_hashes());
1585        }
1586        Ok(result)
1587    }
1588
1589    /// Get file content by Cid (hash + optional decryption key as raw bytes)
1590    /// Handles decryption automatically if key is present
1591    pub fn get_file_by_cid(&self, cid: &Cid) -> Result<Option<Vec<u8>>> {
1592        let (tree, access_store) = self.access_tracking_tree();
1593
1594        let result = sync_block_on(async {
1595            tree.get(cid, None)
1596                .await
1597                .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
1598        })?;
1599        if result.is_some() {
1600            self.record_blob_accesses(access_store.take_accessed_hashes());
1601        }
1602        Ok(result)
1603    }
1604
1605    fn ensure_cid_exists(&self, cid: &Cid) -> Result<()> {
1606        let exists = self
1607            .router
1608            .exists(&cid.hash)
1609            .map_err(|e| anyhow::anyhow!("Failed to check cid existence: {}", e))?;
1610        if !exists {
1611            anyhow::bail!("CID not found: {}", to_hex(&cid.hash));
1612        }
1613        Ok(())
1614    }
1615
1616    /// Stream file content identified by Cid into a writer without buffering full file in memory.
1617    pub fn write_file_by_cid_to_writer<W: Write>(&self, cid: &Cid, writer: &mut W) -> Result<u64> {
1618        self.ensure_cid_exists(cid)?;
1619
1620        let (tree, access_store) = self.access_tracking_tree();
1621        let mut total_bytes = 0u64;
1622        let mut streamed_any_chunk = false;
1623
1624        sync_block_on(async {
1625            let mut stream = tree.get_stream(cid);
1626            while let Some(chunk) = stream.next().await {
1627                streamed_any_chunk = true;
1628                let chunk =
1629                    chunk.map_err(|e| anyhow::anyhow!("Failed to stream file chunk: {}", e))?;
1630                writer
1631                    .write_all(&chunk)
1632                    .map_err(|e| anyhow::anyhow!("Failed to write file chunk: {}", e))?;
1633                total_bytes += chunk.len() as u64;
1634            }
1635            Ok::<(), anyhow::Error>(())
1636        })?;
1637
1638        if !streamed_any_chunk {
1639            anyhow::bail!("CID not found: {}", to_hex(&cid.hash));
1640        }
1641        self.record_blob_accesses(access_store.take_accessed_hashes());
1642
1643        writer
1644            .flush()
1645            .map_err(|e| anyhow::anyhow!("Failed to flush output: {}", e))?;
1646        Ok(total_bytes)
1647    }
1648
1649    /// Stream file content identified by Cid directly into a destination path.
1650    pub fn write_file_by_cid<P: AsRef<Path>>(&self, cid: &Cid, output_path: P) -> Result<u64> {
1651        self.ensure_cid_exists(cid)?;
1652
1653        let output_path = output_path.as_ref();
1654        if let Some(parent) = output_path.parent() {
1655            if !parent.as_os_str().is_empty() {
1656                std::fs::create_dir_all(parent).with_context(|| {
1657                    format!("Failed to create output directory {}", parent.display())
1658                })?;
1659            }
1660        }
1661
1662        let mut file = std::fs::File::create(output_path)
1663            .with_context(|| format!("Failed to create output file {}", output_path.display()))?;
1664        self.write_file_by_cid_to_writer(cid, &mut file)
1665    }
1666
1667    /// Stream a public (unencrypted) file by hash directly into a destination path.
1668    pub fn write_file<P: AsRef<Path>>(&self, hash: &[u8; 32], output_path: P) -> Result<u64> {
1669        self.write_file_by_cid(&Cid::public(*hash), output_path)
1670    }
1671
1672    /// Resolve a path within a tree (returns Cid with key if encrypted)
1673    pub fn resolve_path(&self, cid: &Cid, path: &str) -> Result<Option<Cid>> {
1674        let (tree, access_store) = self.access_tracking_tree();
1675
1676        let result = sync_block_on(async {
1677            tree.resolve_path(cid, path)
1678                .await
1679                .map_err(|e| anyhow::anyhow!("Failed to resolve path: {}", e))
1680        })?;
1681        if result.is_some() {
1682            self.record_blob_accesses(access_store.take_accessed_hashes());
1683        }
1684        Ok(result)
1685    }
1686
1687    /// Get chunk metadata for a file (chunk list, sizes, total size)
1688    pub fn get_file_chunk_metadata(&self, hash: &[u8; 32]) -> Result<Option<FileChunkMetadata>> {
1689        let access_store = AccessRecordingStore::new(self.store_arc());
1690        let tree = HashTree::new(HashTreeConfig::new(Arc::new(access_store.clone())).public());
1691
1692        let metadata: Result<Option<FileChunkMetadata>> = sync_block_on(async {
1693            // First check if the hash exists in the store at all
1694            // (either as a blob or tree node)
1695            let exists = access_store
1696                .has(hash)
1697                .await
1698                .map_err(|e| anyhow::anyhow!("Failed to check existence: {}", e))?;
1699
1700            if !exists {
1701                return Ok(None);
1702            }
1703
1704            // Get total size
1705            let total_size = tree
1706                .get_size(hash)
1707                .await
1708                .map_err(|e| anyhow::anyhow!("Failed to get size: {}", e))?;
1709
1710            // Check if it's a tree (chunked) or blob
1711            let is_tree_node = tree
1712                .is_tree(hash)
1713                .await
1714                .map_err(|e| anyhow::anyhow!("Failed to check tree: {}", e))?;
1715
1716            if !is_tree_node {
1717                // Single blob, not chunked
1718                return Ok(Some(FileChunkMetadata {
1719                    total_size,
1720                    chunk_hashes: vec![],
1721                    chunk_sizes: vec![],
1722                    is_chunked: false,
1723                }));
1724            }
1725
1726            // Get tree node to extract chunk info
1727            let node = match tree
1728                .get_tree_node(hash)
1729                .await
1730                .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
1731            {
1732                Some(n) => n,
1733                None => return Ok(None),
1734            };
1735
1736            // Check if it's a directory (has named links)
1737            let is_directory = tree
1738                .is_directory(hash)
1739                .await
1740                .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1741
1742            if is_directory {
1743                return Ok(None); // Not a file
1744            }
1745
1746            // Extract chunk info from links
1747            let chunk_hashes: Vec<Hash> = node.links.iter().map(|l| l.hash).collect();
1748            let chunk_sizes: Vec<u64> = node.links.iter().map(|l| l.size).collect();
1749
1750            Ok(Some(FileChunkMetadata {
1751                total_size,
1752                chunk_hashes,
1753                chunk_sizes,
1754                is_chunked: !node.links.is_empty(),
1755            }))
1756        });
1757        let metadata = metadata?;
1758        if metadata.is_some() {
1759            self.record_blob_accesses(access_store.take_accessed_hashes());
1760        }
1761        Ok(metadata)
1762    }
1763
1764    /// Get byte range from file
1765    pub fn get_file_range(
1766        &self,
1767        hash: &[u8; 32],
1768        start: u64,
1769        end: Option<u64>,
1770    ) -> Result<Option<(Vec<u8>, u64)>> {
1771        let metadata = match self.get_file_chunk_metadata(hash)? {
1772            Some(m) => m,
1773            None => return Ok(None),
1774        };
1775
1776        if metadata.total_size == 0 {
1777            return Ok(Some((Vec::new(), 0)));
1778        }
1779
1780        if start >= metadata.total_size {
1781            return Ok(None);
1782        }
1783
1784        let end = end
1785            .unwrap_or(metadata.total_size - 1)
1786            .min(metadata.total_size - 1);
1787
1788        // For non-chunked files, load entire file
1789        if !metadata.is_chunked {
1790            let content = self.get_file(hash)?.unwrap_or_default();
1791            let range_content = if start < content.len() as u64 {
1792                content[start as usize..=(end as usize).min(content.len() - 1)].to_vec()
1793            } else {
1794                Vec::new()
1795            };
1796            return Ok(Some((range_content, metadata.total_size)));
1797        }
1798
1799        // For chunked files, load only needed chunks
1800        let mut result = Vec::new();
1801        let mut current_offset = 0u64;
1802
1803        for (i, chunk_hash) in metadata.chunk_hashes.iter().enumerate() {
1804            let chunk_size = metadata.chunk_sizes[i];
1805            let chunk_end = current_offset + chunk_size - 1;
1806
1807            // Check if this chunk overlaps with requested range
1808            if chunk_end >= start && current_offset <= end {
1809                let chunk_content = match self.get_chunk(chunk_hash)? {
1810                    Some(content) => content,
1811                    None => {
1812                        return Err(anyhow::anyhow!("Chunk {} not found", to_hex(chunk_hash)));
1813                    }
1814                };
1815
1816                let chunk_read_start = if current_offset >= start {
1817                    0
1818                } else {
1819                    (start - current_offset) as usize
1820                };
1821
1822                let chunk_read_end = if chunk_end <= end {
1823                    chunk_size as usize - 1
1824                } else {
1825                    (end - current_offset) as usize
1826                };
1827
1828                result.extend_from_slice(&chunk_content[chunk_read_start..=chunk_read_end]);
1829            }
1830
1831            current_offset += chunk_size;
1832
1833            if current_offset > end {
1834                break;
1835            }
1836        }
1837
1838        Ok(Some((result, metadata.total_size)))
1839    }
1840
1841    /// Stream file range as chunks using Arc for async/Send contexts
1842    pub fn stream_file_range_chunks_owned(
1843        self: Arc<Self>,
1844        hash: &[u8; 32],
1845        start: u64,
1846        end: u64,
1847    ) -> Result<Option<FileRangeChunksOwned>> {
1848        let metadata = match self.get_file_chunk_metadata(hash)? {
1849            Some(m) => m,
1850            None => return Ok(None),
1851        };
1852
1853        if metadata.total_size == 0 || start >= metadata.total_size {
1854            return Ok(None);
1855        }
1856
1857        let end = end.min(metadata.total_size - 1);
1858
1859        Ok(Some(FileRangeChunksOwned {
1860            store: self,
1861            metadata,
1862            start,
1863            end,
1864            current_chunk_idx: 0,
1865            current_offset: 0,
1866        }))
1867    }
1868
1869    /// Get directory structure by hash (raw bytes)
1870    pub fn get_directory_listing(&self, hash: &[u8; 32]) -> Result<Option<DirectoryListing>> {
1871        let (tree, access_store) = self.access_tracking_tree();
1872
1873        let listing: Result<Option<DirectoryListing>> = sync_block_on(async {
1874            // Check if it's a directory
1875            let is_dir = tree
1876                .is_directory(hash)
1877                .await
1878                .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1879
1880            if !is_dir {
1881                return Ok(None);
1882            }
1883
1884            // Get directory entries (public Cid - no encryption key)
1885            let cid = hashtree_core::Cid::public(*hash);
1886            let tree_entries = tree
1887                .list_directory(&cid)
1888                .await
1889                .map_err(|e| anyhow::anyhow!("Failed to list directory: {}", e))?;
1890
1891            let entries: Vec<DirEntry> = tree_entries
1892                .into_iter()
1893                .map(|e| DirEntry {
1894                    name: e.name,
1895                    cid: to_hex(&e.hash),
1896                    is_directory: e.link_type.is_tree(),
1897                    size: e.size,
1898                })
1899                .collect();
1900
1901            Ok(Some(DirectoryListing {
1902                dir_name: String::new(),
1903                entries,
1904            }))
1905        });
1906        let listing = listing?;
1907        if listing.is_some() {
1908            self.record_blob_accesses(access_store.take_accessed_hashes());
1909        }
1910        Ok(listing)
1911    }
1912
1913    /// Get directory structure by CID, supporting encrypted directories.
1914    pub fn get_directory_listing_by_cid(&self, cid: &Cid) -> Result<Option<DirectoryListing>> {
1915        let (tree, access_store) = self.access_tracking_tree();
1916        let cid = cid.clone();
1917
1918        let listing: Result<Option<DirectoryListing>> = sync_block_on(async {
1919            let is_dir = tree
1920                .is_dir(&cid)
1921                .await
1922                .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1923
1924            if !is_dir {
1925                return Ok(None);
1926            }
1927
1928            let tree_entries = tree
1929                .list_directory(&cid)
1930                .await
1931                .map_err(|e| anyhow::anyhow!("Failed to list directory: {}", e))?;
1932
1933            let entries: Vec<DirEntry> = tree_entries
1934                .into_iter()
1935                .map(|e| DirEntry {
1936                    name: e.name,
1937                    cid: Cid {
1938                        hash: e.hash,
1939                        key: e.key,
1940                    }
1941                    .to_string(),
1942                    is_directory: e.link_type.is_tree(),
1943                    size: e.size,
1944                })
1945                .collect();
1946
1947            Ok(Some(DirectoryListing {
1948                dir_name: String::new(),
1949                entries,
1950            }))
1951        });
1952        let listing = listing?;
1953        if listing.is_some() {
1954            self.record_blob_accesses(access_store.take_accessed_hashes());
1955        }
1956        Ok(listing)
1957    }
1958
1959    // === Cached roots ===
1960
1961    /// Persist a mutable published ref that should stay subscribed.
1962    pub fn add_pinned_ref(&self, key: &str) -> Result<()> {
1963        let mut wtxn = self.env.write_txn()?;
1964        self.pinned_refs.put(&mut wtxn, key, &())?;
1965        wtxn.commit()?;
1966        Ok(())
1967    }
1968
1969    /// Remove a mutable published ref from the live pinned set.
1970    pub fn remove_pinned_ref(&self, key: &str) -> Result<bool> {
1971        let mut wtxn = self.env.write_txn()?;
1972        let removed = self.pinned_refs.delete(&mut wtxn, key)?;
1973        wtxn.commit()?;
1974        Ok(removed)
1975    }
1976
1977    /// List mutable published refs that should stay subscribed.
1978    pub fn list_pinned_refs(&self) -> Result<Vec<String>> {
1979        let rtxn = self.env.read_txn()?;
1980        let mut refs = Vec::new();
1981
1982        for item in self.pinned_refs.iter(&rtxn)? {
1983            let (key, _) = item?;
1984            refs.push(key.to_string());
1985        }
1986
1987        refs.sort();
1988        Ok(refs)
1989    }
1990
1991    /// Persist an author whose published trees should stay mirrored.
1992    pub fn add_tracked_author(&self, npub: &str) -> Result<bool> {
1993        let mut wtxn = self.env.write_txn()?;
1994        let inserted = self.tracked_authors.get(&wtxn, npub)?.is_none();
1995        self.tracked_authors.put(&mut wtxn, npub, &())?;
1996        wtxn.commit()?;
1997        Ok(inserted)
1998    }
1999
2000    /// Remove an author from the continuous mirror set.
2001    pub fn remove_tracked_author(&self, npub: &str) -> Result<bool> {
2002        let mut wtxn = self.env.write_txn()?;
2003        let removed = self.tracked_authors.delete(&mut wtxn, npub)?;
2004        wtxn.commit()?;
2005        Ok(removed)
2006    }
2007
2008    /// List authors whose published trees should stay mirrored.
2009    pub fn list_tracked_authors(&self) -> Result<Vec<String>> {
2010        let rtxn = self.env.read_txn()?;
2011        let mut authors = Vec::new();
2012
2013        for item in self.tracked_authors.iter(&rtxn)? {
2014            let (npub, _) = item?;
2015            authors.push(npub.to_string());
2016        }
2017
2018        authors.sort();
2019        Ok(authors)
2020    }
2021
2022    /// Get cached root for a pubkey/tree_name pair
2023    pub fn get_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<Option<CachedRoot>> {
2024        let key = format!("{}/{}", pubkey_hex, tree_name);
2025        let rtxn = self.env.read_txn()?;
2026        if let Some(bytes) = self.cached_roots.get(&rtxn, &key)? {
2027            let root: CachedRoot = rmp_serde::from_slice(bytes)
2028                .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
2029            Ok(Some(root))
2030        } else {
2031            Ok(None)
2032        }
2033    }
2034
2035    /// Set cached root for a pubkey/tree_name pair
2036    pub fn set_cached_root(
2037        &self,
2038        pubkey_hex: &str,
2039        tree_name: &str,
2040        hash: &str,
2041        key: Option<&str>,
2042        visibility: &str,
2043        updated_at: u64,
2044    ) -> Result<()> {
2045        let db_key = format!("{}/{}", pubkey_hex, tree_name);
2046        let root = CachedRoot {
2047            hash: hash.to_string(),
2048            key: key.map(|k| k.to_string()),
2049            updated_at,
2050            visibility: visibility.to_string(),
2051        };
2052        let bytes = rmp_serde::to_vec(&root)
2053            .map_err(|e| anyhow::anyhow!("Failed to serialize CachedRoot: {}", e))?;
2054        let mut wtxn = self.env.write_txn()?;
2055        self.cached_roots.put(&mut wtxn, &db_key, &bytes)?;
2056        wtxn.commit()?;
2057        Ok(())
2058    }
2059
2060    /// List all cached roots for a pubkey
2061    pub fn list_cached_roots(&self, pubkey_hex: &str) -> Result<Vec<(String, CachedRoot)>> {
2062        let prefix = format!("{}/", pubkey_hex);
2063        let rtxn = self.env.read_txn()?;
2064        let mut results = Vec::new();
2065
2066        for item in self.cached_roots.iter(&rtxn)? {
2067            let (key, bytes) = item?;
2068            if key.starts_with(&prefix) {
2069                let tree_name = key.strip_prefix(&prefix).unwrap_or(key);
2070                let root: CachedRoot = rmp_serde::from_slice(bytes)
2071                    .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
2072                results.push((tree_name.to_string(), root));
2073            }
2074        }
2075
2076        Ok(results)
2077    }
2078
2079    /// Delete a cached root
2080    pub fn delete_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<bool> {
2081        let key = format!("{}/{}", pubkey_hex, tree_name);
2082        let mut wtxn = self.env.write_txn()?;
2083        let deleted = self.cached_roots.delete(&mut wtxn, &key)?;
2084        wtxn.commit()?;
2085        Ok(deleted)
2086    }
2087}
2088
2089fn is_map_full_store_error(err: &StoreError) -> bool {
2090    let message = err.to_string();
2091    message.contains("MDB_MAP_FULL") || message.contains("MapFull")
2092}
2093
2094#[derive(Debug, Clone)]
2095pub struct FileChunkMetadata {
2096    pub total_size: u64,
2097    pub chunk_hashes: Vec<Hash>,
2098    pub chunk_sizes: Vec<u64>,
2099    pub is_chunked: bool,
2100}
2101
2102/// Owned iterator for async streaming
2103pub struct FileRangeChunksOwned {
2104    store: Arc<HashtreeStore>,
2105    metadata: FileChunkMetadata,
2106    start: u64,
2107    end: u64,
2108    current_chunk_idx: usize,
2109    current_offset: u64,
2110}
2111
2112impl Iterator for FileRangeChunksOwned {
2113    type Item = Result<Vec<u8>>;
2114
2115    fn next(&mut self) -> Option<Self::Item> {
2116        if !self.metadata.is_chunked || self.current_chunk_idx >= self.metadata.chunk_hashes.len() {
2117            return None;
2118        }
2119
2120        if self.current_offset > self.end {
2121            return None;
2122        }
2123
2124        let chunk_hash = &self.metadata.chunk_hashes[self.current_chunk_idx];
2125        let chunk_size = self.metadata.chunk_sizes[self.current_chunk_idx];
2126        let chunk_end = self.current_offset + chunk_size - 1;
2127
2128        self.current_chunk_idx += 1;
2129
2130        if chunk_end < self.start || self.current_offset > self.end {
2131            self.current_offset += chunk_size;
2132            return self.next();
2133        }
2134
2135        let chunk_content = match self.store.get_chunk(chunk_hash) {
2136            Ok(Some(content)) => content,
2137            Ok(None) => {
2138                return Some(Err(anyhow::anyhow!(
2139                    "Chunk {} not found",
2140                    to_hex(chunk_hash)
2141                )));
2142            }
2143            Err(e) => {
2144                return Some(Err(e));
2145            }
2146        };
2147
2148        let chunk_read_start = if self.current_offset >= self.start {
2149            0
2150        } else {
2151            (self.start - self.current_offset) as usize
2152        };
2153
2154        let chunk_read_end = if chunk_end <= self.end {
2155            chunk_size as usize - 1
2156        } else {
2157            (self.end - self.current_offset) as usize
2158        };
2159
2160        let result = chunk_content[chunk_read_start..=chunk_read_end].to_vec();
2161        self.current_offset += chunk_size;
2162
2163        Some(Ok(result))
2164    }
2165}
2166
2167#[derive(Debug)]
2168pub struct GcStats {
2169    pub deleted_dags: usize,
2170    pub freed_bytes: u64,
2171}
2172
2173#[derive(Debug, Clone)]
2174pub struct DirEntry {
2175    pub name: String,
2176    pub cid: String,
2177    pub is_directory: bool,
2178    pub size: u64,
2179}
2180
2181#[derive(Debug, Clone)]
2182pub struct DirectoryListing {
2183    pub dir_name: String,
2184    pub entries: Vec<DirEntry>,
2185}
2186
2187/// Blob metadata for Blossom protocol
2188#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
2189pub struct BlobMetadata {
2190    pub sha256: String,
2191    pub size: u64,
2192    pub mime_type: String,
2193    pub uploaded: u64,
2194}
2195
2196// Implement ContentStore trait for WebRTC data exchange
2197impl crate::webrtc::ContentStore for HashtreeStore {
2198    fn get(&self, hash_hex: &str) -> Result<Option<Vec<u8>>> {
2199        let hash = from_hex(hash_hex).map_err(|e| anyhow::anyhow!("Invalid hash: {}", e))?;
2200        self.get_chunk(&hash)
2201    }
2202}
2203
2204#[cfg(test)]
2205mod tests {
2206    use super::*;
2207    #[cfg(feature = "lmdb")]
2208    use tempfile::TempDir;
2209
2210    #[test]
2211    fn blob_access_update_gate_deduplicates_and_throttles() {
2212        let gate = BlobAccessUpdateGate::default();
2213        let first = sha256(b"first");
2214        let second = sha256(b"second");
2215
2216        assert_eq!(
2217            gate.due_hashes([first, first, second], 10),
2218            vec![first, second]
2219        );
2220        assert!(gate.due_hashes([first, second], 11).is_empty());
2221        assert_eq!(
2222            gate.due_hashes([second, first], 10 + ACCESS_UPDATE_INTERVAL_SECS),
2223            vec![second, first]
2224        );
2225    }
2226
2227    #[cfg(feature = "lmdb")]
2228    #[test]
2229    fn hashtree_store_expands_blob_lmdb_map_size_to_storage_budget() -> Result<()> {
2230        let temp = TempDir::new()?;
2231        let requested = LMDB_BLOB_MIN_MAP_SIZE_BYTES + 64 * 1024 * 1024;
2232        let store = HashtreeStore::with_options_and_backend(
2233            temp.path(),
2234            None,
2235            requested,
2236            true,
2237            &StorageBackend::Lmdb,
2238        )?;
2239
2240        let map_size = match store.router.local.as_ref() {
2241            LocalStore::Lmdb(local) => local.map_size_bytes() as u64,
2242            LocalStore::Fs(_) => panic!("expected LMDB local store"),
2243        };
2244
2245        assert!(
2246            map_size >= requested,
2247            "expected blob LMDB map to grow to at least {requested} bytes, got {map_size}"
2248        );
2249
2250        drop(store);
2251        Ok(())
2252    }
2253
2254    #[cfg(feature = "lmdb")]
2255    #[test]
2256    fn hashtree_store_expands_metadata_lmdb_map_size_to_storage_budget() -> Result<()> {
2257        let temp = TempDir::new()?;
2258        let storage_budget = 256 * 1024 * 1024 * 1024u64;
2259        let expected = lmdb_metadata_map_size_for_storage_budget(storage_budget);
2260        let store = HashtreeStore::with_options_and_backend(
2261            temp.path(),
2262            None,
2263            storage_budget,
2264            true,
2265            &StorageBackend::Lmdb,
2266        )?;
2267
2268        let map_size = store.env.info().map_size as u64;
2269        assert!(
2270            map_size >= expected,
2271            "expected metadata LMDB map to grow to at least {expected} bytes, got {map_size}"
2272        );
2273
2274        drop(store);
2275        Ok(())
2276    }
2277
2278    #[cfg(feature = "lmdb")]
2279    #[test]
2280    fn local_store_can_override_lmdb_map_size() -> Result<()> {
2281        let temp = TempDir::new()?;
2282        let requested = 512 * 1024 * 1024u64;
2283        let store = LocalStore::new_with_lmdb_map_size(
2284            temp.path().join("lmdb-blobs"),
2285            &StorageBackend::Lmdb,
2286            Some(requested),
2287        )?;
2288
2289        let map_size = match store {
2290            LocalStore::Lmdb(local) => local.map_size_bytes() as u64,
2291            LocalStore::Fs(_) => panic!("expected LMDB local store"),
2292        };
2293
2294        assert!(
2295            map_size >= requested,
2296            "expected LMDB map to grow to at least {requested} bytes, got {map_size}"
2297        );
2298
2299        Ok(())
2300    }
2301
2302    #[cfg(feature = "lmdb")]
2303    #[test]
2304    fn lmdb_local_store_removes_stale_fs_blob_shard_dirs() -> Result<()> {
2305        let temp = TempDir::new()?;
2306        let path = temp.path().join("lmdb-blobs");
2307        std::fs::create_dir_all(path.join("aa"))?;
2308        std::fs::create_dir_all(path.join("b2"))?;
2309        std::fs::create_dir_all(path.join("keep-me"))?;
2310        std::fs::write(path.join("aa").join("blob.bin"), b"old fs shard")?;
2311        std::fs::write(path.join("b2").join("blob.bin"), b"old fs shard")?;
2312        std::fs::write(path.join("keep-me").join("note.txt"), b"keep")?;
2313
2314        let _store = LocalStore::new_with_lmdb_map_size(
2315            &path,
2316            &StorageBackend::Lmdb,
2317            Some(128 * 1024 * 1024),
2318        )?;
2319
2320        assert!(!path.join("aa").exists());
2321        assert!(!path.join("b2").exists());
2322        assert!(path.join("keep-me").exists());
2323        assert!(path.join("data.mdb").exists());
2324        assert!(path.join("lock.mdb").exists());
2325
2326        Ok(())
2327    }
2328
2329    #[cfg(feature = "lmdb")]
2330    #[test]
2331    fn duplicate_blossom_writes_refresh_blob_last_accessed() -> Result<()> {
2332        let temp = TempDir::new()?;
2333        let store = HashtreeStore::with_options_and_backend(
2334            temp.path(),
2335            None,
2336            LMDB_BLOB_MIN_MAP_SIZE_BYTES,
2337            true,
2338            &StorageBackend::Lmdb,
2339        )?;
2340
2341        let data = b"cached blossom duplicate";
2342        let hash = sha256(data);
2343        store.put_cached_blob(data)?;
2344        store.router.touch_accessed_sync(&hash, 1)?;
2345        store.put_cached_blob(data)?;
2346        assert!(store.blob_last_accessed_at(&hash)?.unwrap_or(0) > 1);
2347
2348        let owned = b"owned blossom duplicate";
2349        let owned_hash = sha256(owned);
2350        let owner = [7u8; 32];
2351        store.put_owned_blob(owned, &owner)?;
2352        store.router.touch_accessed_sync(&owned_hash, 1)?;
2353        store.put_owned_blob(owned, &owner)?;
2354        assert!(store.blob_last_accessed_at(&owned_hash)?.unwrap_or(0) > 1);
2355
2356        Ok(())
2357    }
2358
2359    #[cfg(feature = "lmdb")]
2360    #[test]
2361    fn replacing_tree_ref_unpins_and_unindexes_superseded_root() -> Result<()> {
2362        let temp = TempDir::new()?;
2363        let store = HashtreeStore::with_options_and_backend(
2364            temp.path(),
2365            None,
2366            LMDB_BLOB_MIN_MAP_SIZE_BYTES,
2367            true,
2368            &StorageBackend::Lmdb,
2369        )?;
2370
2371        let old_bytes = b"old published root";
2372        let new_bytes = b"new published root";
2373        let old_root = sha256(old_bytes);
2374        let new_root = sha256(new_bytes);
2375
2376        store.put_blob(old_bytes)?;
2377        store.pin(&old_root)?;
2378        store.index_tree(
2379            &old_root,
2380            "owner",
2381            Some("playlist"),
2382            PRIORITY_OWN,
2383            Some("npub1owner/playlist"),
2384        )?;
2385
2386        assert!(store.is_pinned(&old_root)?);
2387        assert!(store.get_tree_meta(&old_root)?.is_some());
2388
2389        store.put_blob(new_bytes)?;
2390        store.pin(&new_root)?;
2391        store.index_tree(
2392            &new_root,
2393            "owner",
2394            Some("playlist"),
2395            PRIORITY_OWN,
2396            Some("npub1owner/playlist"),
2397        )?;
2398
2399        assert!(
2400            !store.is_pinned(&old_root)?,
2401            "superseded root should be unpinned when ref is replaced"
2402        );
2403        assert!(
2404            store.get_tree_meta(&old_root)?.is_none(),
2405            "superseded root metadata should be removed when ref is replaced"
2406        );
2407        assert!(store.is_pinned(&new_root)?);
2408        assert!(store.get_tree_meta(&new_root)?.is_some());
2409
2410        Ok(())
2411    }
2412
2413    #[test]
2414    fn tracked_authors_round_trip_sorted_and_deduplicated() -> Result<()> {
2415        let temp = TempDir::new()?;
2416        let store = HashtreeStore::with_options(temp.path(), None, 1024 * 1024)?;
2417
2418        store
2419            .add_tracked_author("npub1zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzs9d3kk")?;
2420        store
2421            .add_tracked_author("npub1aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaqf5slm")?;
2422        store
2423            .add_tracked_author("npub1zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzs9d3kk")?;
2424
2425        assert_eq!(
2426            store.list_tracked_authors()?,
2427            vec![
2428                "npub1aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaqf5slm".to_string(),
2429                "npub1zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzs9d3kk".to_string(),
2430            ]
2431        );
2432        assert!(store.remove_tracked_author(
2433            "npub1aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaqf5slm"
2434        )?);
2435        assert!(!store.remove_tracked_author(
2436            "npub1bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbpqqqqq"
2437        )?);
2438        assert_eq!(
2439            store.list_tracked_authors()?,
2440            vec!["npub1zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzs9d3kk".to_string()]
2441        );
2442
2443        Ok(())
2444    }
2445
2446    #[cfg(feature = "s3")]
2447    #[test]
2448    fn async_store_s3_fallback_does_not_reenter_futures_executor() -> Result<()> {
2449        let temp = tempfile::TempDir::new()?;
2450        let local = Arc::new(LocalStore::new(
2451            temp.path().join("blobs"),
2452            &StorageBackend::Fs,
2453        )?);
2454
2455        let outcome = std::panic::catch_unwind(|| {
2456            sync_block_on(async {
2457                let aws_config = aws_config::from_env()
2458                    .region(aws_sdk_s3::config::Region::new("auto"))
2459                    .load()
2460                    .await;
2461                let s3_client = aws_sdk_s3::Client::from_conf(
2462                    aws_sdk_s3::config::Builder::from(&aws_config)
2463                        .endpoint_url("http://127.0.0.1:9")
2464                        .force_path_style(true)
2465                        .build(),
2466                );
2467
2468                let router = StorageRouter {
2469                    local,
2470                    s3_client: Some(s3_client),
2471                    s3_bucket: Some("test-bucket".to_string()),
2472                    s3_prefix: String::new(),
2473                    sync_tx: None,
2474                };
2475                let hash = [0u8; 32];
2476
2477                let _ = Store::has(&router, &hash).await;
2478                let _ = Store::get(&router, &hash).await;
2479            });
2480        });
2481
2482        assert!(
2483            outcome.is_ok(),
2484            "S3-backed async store methods should not panic inside futures::block_on"
2485        );
2486
2487        Ok(())
2488    }
2489}