Skip to main content

hashtree_cli/
storage.rs

1use anyhow::{Context, Result};
2use async_trait::async_trait;
3use futures::executor::block_on as sync_block_on;
4use futures::StreamExt;
5use hashtree_config::StorageBackend;
6use hashtree_core::store::{Store, StoreError};
7use hashtree_core::{
8    from_hex, sha256, to_hex, types::Hash, Cid, HashTree, HashTreeConfig, TreeNode,
9};
10use hashtree_fs::FsBlobStore;
11#[cfg(feature = "lmdb")]
12use hashtree_lmdb::LmdbBlobStore;
13use heed::types::*;
14use heed::{Database, EnvOpenOptions};
15use serde::{Deserialize, Serialize};
16use std::collections::{HashMap, HashSet};
17#[cfg(feature = "s3")]
18use std::future::Future;
19use std::io::Write;
20use std::path::{Path, PathBuf};
21use std::sync::atomic::{AtomicBool, Ordering};
22use std::sync::{Arc, Mutex};
23use std::time::{SystemTime, UNIX_EPOCH};
24
25mod upload;
26
27mod maintenance;
28mod retention;
29
30#[cfg(feature = "s3")]
31const DEFAULT_S3_SYNC_TIMEOUT_MS: u64 = 5_000;
32#[cfg(feature = "s3")]
33const S3_SYNC_TIMEOUT_MS_ENV: &str = "HTREE_S3_SYNC_TIMEOUT_MS";
34
35pub use maintenance::{
36    compact_lmdb_environments_under, CompactResult, R2ImportOptions, R2ImportResult, VerifyResult,
37};
38pub use retention::{OwnedBlobStats, PinnedItem, StorageByPriority, StorageStats, TreeMeta};
39
40/// Priority levels for tree eviction
41pub const PRIORITY_OTHER: u8 = 64;
42pub const PRIORITY_FOLLOWED: u8 = 128;
43pub const PRIORITY_OWN: u8 = 255;
44const LMDB_MAX_READERS: u32 = 1024;
45const LMDB_METADATA_MIN_MAP_SIZE_BYTES: u64 = 64 * 1024 * 1024;
46const LMDB_METADATA_MAX_MAP_SIZE_BYTES: u64 = 64 * 1024 * 1024 * 1024;
47const LMDB_METADATA_STORAGE_RATIO_DIVISOR: u64 = 1024;
48const LMDB_METADATA_REOPEN_HEADROOM_BYTES: u64 = 64 * 1024 * 1024;
49#[cfg(feature = "lmdb")]
50const LMDB_BLOB_MIN_MAP_SIZE_BYTES: u64 = 16 * 1024 * 1024;
51const ACCESS_UPDATE_INTERVAL_SECS: u64 = 300;
52const ACCESS_UPDATE_GATE_MAX_ENTRIES: usize = 4096;
53const ACCESS_UPDATE_BACKGROUND_BATCH_LIMIT: usize = 1024;
54
55fn unix_timestamp_now() -> u64 {
56    SystemTime::now()
57        .duration_since(UNIX_EPOCH)
58        .unwrap_or_default()
59        .as_secs()
60}
61
62/// Cached root info from Nostr events.
63#[derive(Debug, Clone, Serialize, Deserialize)]
64pub struct CachedRoot {
65    /// Root hash (hex)
66    pub hash: String,
67    /// Optional decryption key (hex)
68    pub key: Option<String>,
69    /// Unix timestamp when this was cached (from event created_at)
70    pub updated_at: u64,
71    /// Visibility: "public", "link-visible", or "private"
72    pub visibility: String,
73}
74
75/// Storage statistics
76#[derive(Debug, Clone)]
77pub struct LocalStoreStats {
78    pub count: usize,
79    pub total_bytes: u64,
80}
81
82#[derive(Default)]
83struct BlobAccessUpdateGate {
84    next_update_by_hash: Mutex<HashMap<Hash, u64>>,
85}
86
87impl BlobAccessUpdateGate {
88    fn due_hashes<I>(&self, hashes: I, now: u64) -> Vec<Hash>
89    where
90        I: IntoIterator<Item = Hash>,
91    {
92        let Ok(mut next_update_by_hash) = self.next_update_by_hash.try_lock() else {
93            return Vec::new();
94        };
95
96        if next_update_by_hash.len() >= ACCESS_UPDATE_GATE_MAX_ENTRIES {
97            next_update_by_hash.retain(|_, next_update| *next_update > now);
98            if next_update_by_hash.len() >= ACCESS_UPDATE_GATE_MAX_ENTRIES {
99                next_update_by_hash.clear();
100            }
101        }
102
103        let mut due = Vec::new();
104        let mut seen = HashSet::new();
105        for hash in hashes {
106            if !seen.insert(hash) {
107                continue;
108            }
109            if next_update_by_hash
110                .get(&hash)
111                .is_some_and(|next_update| now < *next_update)
112            {
113                continue;
114            }
115            next_update_by_hash.insert(hash, now.saturating_add(ACCESS_UPDATE_INTERVAL_SECS));
116            due.push(hash);
117        }
118        due
119    }
120}
121
122/// Local blob store - wraps either FsBlobStore or LmdbBlobStore
123pub enum LocalStore {
124    Fs(FsBlobStore),
125    #[cfg(feature = "lmdb")]
126    Lmdb(LmdbBlobStore),
127}
128
129#[cfg(feature = "lmdb")]
130fn is_fs_blob_shard_dir(path: &Path) -> bool {
131    path.file_name()
132        .and_then(|name| name.to_str())
133        .map(|name| name.len() == 2 && name.as_bytes().iter().all(u8::is_ascii_hexdigit))
134        .unwrap_or(false)
135}
136
137fn lmdb_metadata_map_size_for_storage_budget(max_size_bytes: u64) -> u64 {
138    if max_size_bytes == 0 {
139        return LMDB_METADATA_MAX_MAP_SIZE_BYTES;
140    }
141
142    max_size_bytes
143        .saturating_div(LMDB_METADATA_STORAGE_RATIO_DIVISOR)
144        .clamp(
145            LMDB_METADATA_MIN_MAP_SIZE_BYTES,
146            LMDB_METADATA_MAX_MAP_SIZE_BYTES,
147        )
148}
149
150fn lmdb_map_size_for_existing_env(path: &Path, requested_bytes: u64) -> Result<usize> {
151    let existing_bytes = std::fs::metadata(path.join("data.mdb"))
152        .map(|metadata| metadata.len())
153        .unwrap_or(0);
154    let existing_headroom = if existing_bytes == 0 {
155        0
156    } else {
157        existing_bytes
158            .saturating_div(10)
159            .max(LMDB_METADATA_REOPEN_HEADROOM_BYTES)
160    };
161    let requested =
162        align_lmdb_map_size(requested_bytes.max(existing_bytes.saturating_add(existing_headroom)));
163    usize::try_from(requested).context("LMDB map size exceeds usize")
164}
165
166fn align_lmdb_map_size(bytes: u64) -> u64 {
167    let page_size = (page_size::get() as u64).max(4096);
168    let remainder = bytes % page_size;
169    if remainder == 0 {
170        bytes
171    } else {
172        bytes.saturating_add(page_size - remainder)
173    }
174}
175
176#[cfg(feature = "lmdb")]
177fn remove_stale_fs_blob_shards(path: &Path) -> Result<(), StoreError> {
178    let entries = std::fs::read_dir(path).map_err(StoreError::Io)?;
179    for entry in entries {
180        let entry = entry.map_err(StoreError::Io)?;
181        let entry_path = entry.path();
182        if entry_path.is_dir() && is_fs_blob_shard_dir(&entry_path) {
183            std::fs::remove_dir_all(&entry_path).map_err(StoreError::Io)?;
184            tracing::info!(
185                "Removed stale filesystem blob shard directory after LMDB cutover: {}",
186                entry_path.display()
187            );
188        }
189    }
190    Ok(())
191}
192
193impl LocalStore {
194    /// Create a new unbounded local store.
195    ///
196    /// Higher-level stores that need quota enforcement should manage eviction
197    /// above this layer so tree metadata, pins, and archival policies stay
198    /// coherent.
199    pub fn new<P: AsRef<Path>>(path: P, backend: &StorageBackend) -> Result<Self, StoreError> {
200        Self::new_unbounded(path, backend)
201    }
202
203    /// Create a new local store with an explicit LMDB logical size cap when using the LMDB backend.
204    ///
205    /// The requested size is used for both the LMDB map and the store's built-in
206    /// byte quota, so standalone blob envs can evict instead of growing forever.
207    pub fn new_with_lmdb_map_size<P: AsRef<Path>>(
208        path: P,
209        backend: &StorageBackend,
210        _map_size_bytes: Option<u64>,
211    ) -> Result<Self, StoreError> {
212        match backend {
213            StorageBackend::Fs => Ok(LocalStore::Fs(FsBlobStore::new(path)?)),
214            #[cfg(feature = "lmdb")]
215            StorageBackend::Lmdb => match _map_size_bytes {
216                Some(map_size_bytes) => {
217                    std::fs::create_dir_all(path.as_ref()).map_err(StoreError::Io)?;
218                    remove_stale_fs_blob_shards(path.as_ref())?;
219                    Ok(LocalStore::Lmdb(LmdbBlobStore::with_max_bytes(
220                        path,
221                        map_size_bytes,
222                    )?))
223                }
224                None => {
225                    std::fs::create_dir_all(path.as_ref()).map_err(StoreError::Io)?;
226                    remove_stale_fs_blob_shards(path.as_ref())?;
227                    Ok(LocalStore::Lmdb(LmdbBlobStore::new(path)?))
228                }
229            },
230            #[cfg(not(feature = "lmdb"))]
231            StorageBackend::Lmdb => {
232                tracing::warn!(
233                    "LMDB backend requested but lmdb feature not enabled, using filesystem storage"
234                );
235                Ok(LocalStore::Fs(FsBlobStore::new(path)?))
236            }
237        }
238    }
239
240    /// Create a new unbounded local store for a specific backend.
241    pub fn new_unbounded<P: AsRef<Path>>(
242        path: P,
243        backend: &StorageBackend,
244    ) -> Result<Self, StoreError> {
245        Self::new_with_lmdb_map_size(path, backend, None)
246    }
247
248    pub fn backend(&self) -> StorageBackend {
249        match self {
250            LocalStore::Fs(_) => StorageBackend::Fs,
251            #[cfg(feature = "lmdb")]
252            LocalStore::Lmdb(_) => StorageBackend::Lmdb,
253        }
254    }
255
256    /// Sync put operation
257    pub fn put_sync(&self, hash: Hash, data: &[u8]) -> Result<bool, StoreError> {
258        match self {
259            LocalStore::Fs(store) => store.put_sync(hash, data),
260            #[cfg(feature = "lmdb")]
261            LocalStore::Lmdb(store) => store.put_sync(hash, data),
262        }
263    }
264
265    /// Sync batch put operation.
266    pub fn put_many_sync(&self, items: &[(Hash, Vec<u8>)]) -> Result<usize, StoreError> {
267        match self {
268            LocalStore::Fs(store) => {
269                let mut inserted = 0usize;
270                for (hash, data) in items {
271                    if store.put_sync(*hash, data.as_slice())? {
272                        inserted += 1;
273                    }
274                }
275                Ok(inserted)
276            }
277            #[cfg(feature = "lmdb")]
278            LocalStore::Lmdb(store) => store.put_many_sync(items),
279        }
280    }
281
282    /// Sync get operation
283    pub fn get_sync(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
284        match self {
285            LocalStore::Fs(store) => store.get_sync(hash),
286            #[cfg(feature = "lmdb")]
287            LocalStore::Lmdb(store) => store.get_sync(hash),
288        }
289    }
290
291    pub fn get_range_sync(
292        &self,
293        hash: &Hash,
294        start: u64,
295        end_inclusive: u64,
296    ) -> Result<Option<Vec<u8>>, StoreError> {
297        match self {
298            LocalStore::Fs(store) => store.get_range_sync(hash, start, end_inclusive),
299            #[cfg(feature = "lmdb")]
300            LocalStore::Lmdb(store) => store.get_range_sync(hash, start, end_inclusive),
301        }
302    }
303
304    pub fn blob_size_sync(&self, hash: &Hash) -> Result<Option<u64>, StoreError> {
305        match self {
306            LocalStore::Fs(store) => store.blob_size_sync(hash),
307            #[cfg(feature = "lmdb")]
308            LocalStore::Lmdb(store) => store.blob_size_sync(hash),
309        }
310    }
311
312    pub fn touch_accessed_sync(&self, hash: &Hash, now: u64) -> Result<bool, StoreError> {
313        match self {
314            LocalStore::Fs(store) => store.touch_accessed_sync(hash, now),
315            #[cfg(feature = "lmdb")]
316            LocalStore::Lmdb(store) => store.touch_accessed_sync(hash, now),
317        }
318    }
319
320    pub fn touch_many_accessed_sync(&self, hashes: &[Hash], now: u64) -> Result<usize, StoreError> {
321        match self {
322            LocalStore::Fs(store) => store.touch_many_accessed_sync(hashes, now),
323            #[cfg(feature = "lmdb")]
324            LocalStore::Lmdb(store) => store.touch_many_accessed_sync(hashes, now),
325        }
326    }
327
328    pub fn last_accessed_at_sync(&self, hash: &Hash) -> Result<Option<u64>, StoreError> {
329        match self {
330            LocalStore::Fs(store) => store.last_accessed_at_sync(hash),
331            #[cfg(feature = "lmdb")]
332            LocalStore::Lmdb(store) => store.last_accessed_at_sync(hash),
333        }
334    }
335
336    pub fn many_last_accessed_at_sync(
337        &self,
338        hashes: &[Hash],
339    ) -> Result<Vec<(Hash, u64)>, StoreError> {
340        match self {
341            LocalStore::Fs(store) => store.many_last_accessed_at_sync(hashes),
342            #[cfg(feature = "lmdb")]
343            LocalStore::Lmdb(store) => store.many_last_accessed_at_sync(hashes),
344        }
345    }
346
347    /// Check if hash exists
348    pub fn exists(&self, hash: &Hash) -> Result<bool, StoreError> {
349        match self {
350            LocalStore::Fs(store) => Ok(store.exists(hash)),
351            #[cfg(feature = "lmdb")]
352            LocalStore::Lmdb(store) => store.exists(hash),
353        }
354    }
355
356    /// Mark which sorted hashes already exist in local storage.
357    pub fn existing_hashes_in_sorted_candidates(
358        &self,
359        sorted_hashes: &[Hash],
360    ) -> Result<Vec<bool>, StoreError> {
361        match self {
362            LocalStore::Fs(store) => Ok(sorted_hashes
363                .iter()
364                .map(|hash| store.exists(hash))
365                .collect()),
366            #[cfg(feature = "lmdb")]
367            LocalStore::Lmdb(store) => store.existing_hashes_in_sorted_candidates(sorted_hashes),
368        }
369    }
370
371    /// Sync delete operation
372    pub fn delete_sync(&self, hash: &Hash) -> Result<bool, StoreError> {
373        match self {
374            LocalStore::Fs(store) => store.delete_sync(hash),
375            #[cfg(feature = "lmdb")]
376            LocalStore::Lmdb(store) => store.delete_sync(hash),
377        }
378    }
379
380    /// Get storage statistics
381    pub fn stats(&self) -> Result<LocalStoreStats, StoreError> {
382        match self {
383            LocalStore::Fs(store) => {
384                let stats = store.stats()?;
385                Ok(LocalStoreStats {
386                    count: stats.count,
387                    total_bytes: stats.total_bytes,
388                })
389            }
390            #[cfg(feature = "lmdb")]
391            LocalStore::Lmdb(store) => {
392                let stats = store.stats()?;
393                Ok(LocalStoreStats {
394                    count: stats.count,
395                    total_bytes: stats.total_bytes,
396                })
397            }
398        }
399    }
400
401    /// List all hashes in the store
402    pub fn list(&self) -> Result<Vec<Hash>, StoreError> {
403        match self {
404            LocalStore::Fs(store) => store.list(),
405            #[cfg(feature = "lmdb")]
406            LocalStore::Lmdb(store) => store.list(),
407        }
408    }
409}
410
411#[async_trait]
412impl Store for LocalStore {
413    async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
414        self.put_sync(hash, &data)
415    }
416
417    async fn put_many(&self, items: Vec<(Hash, Vec<u8>)>) -> Result<usize, StoreError> {
418        self.put_many_sync(&items)
419    }
420
421    async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
422        self.get_sync(hash)
423    }
424
425    async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
426        self.exists(hash)
427    }
428
429    async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
430        self.delete_sync(hash)
431    }
432}
433
434#[cfg(feature = "s3")]
435use tokio::sync::mpsc;
436
437use crate::config::S3Config;
438
439/// Message for background S3 sync
440#[cfg(feature = "s3")]
441enum S3SyncMessage {
442    Upload { hash: Hash, data: Vec<u8> },
443    Delete { hash: Hash },
444}
445
446/// Storage router - local store primary with optional S3 backup
447///
448/// Write path: local first (fast), then queue S3 upload (non-blocking)
449/// Read path: local first, fall back to S3 if miss
450pub struct StorageRouter {
451    /// Primary local store (always used)
452    local: Arc<LocalStore>,
453    /// Optional S3 client for backup
454    #[cfg(feature = "s3")]
455    s3_client: Option<aws_sdk_s3::Client>,
456    #[cfg(feature = "s3")]
457    s3_bucket: Option<String>,
458    #[cfg(feature = "s3")]
459    s3_prefix: String,
460    /// Channel to send uploads to background task
461    #[cfg(feature = "s3")]
462    sync_tx: Option<mpsc::UnboundedSender<S3SyncMessage>>,
463}
464
465impl StorageRouter {
466    #[cfg(feature = "s3")]
467    fn s3_sync_timeout() -> std::time::Duration {
468        let millis = std::env::var(S3_SYNC_TIMEOUT_MS_ENV)
469            .ok()
470            .and_then(|value| value.parse::<u64>().ok())
471            .filter(|value| *value > 0)
472            .unwrap_or(DEFAULT_S3_SYNC_TIMEOUT_MS);
473        std::time::Duration::from_millis(millis)
474    }
475
476    #[cfg(feature = "s3")]
477    fn s3_sync_timeout_error(timeout: std::time::Duration) -> StoreError {
478        StoreError::Other(format!(
479            "S3 sync operation timed out after {}ms",
480            timeout.as_millis()
481        ))
482    }
483
484    #[cfg(feature = "s3")]
485    fn run_s3_future_sync<F, T>(future: F) -> Result<T, StoreError>
486    where
487        F: Future<Output = T> + Send + 'static,
488        T: Send + 'static,
489    {
490        let timeout = Self::s3_sync_timeout();
491        if tokio::runtime::Handle::try_current().is_ok() {
492            return std::thread::Builder::new()
493                .name("storage-s3-sync".to_string())
494                .spawn(move || {
495                    let runtime = tokio::runtime::Builder::new_current_thread()
496                        .enable_all()
497                        .build()
498                        .map_err(|err| {
499                            StoreError::Other(format!("build storage s3 sync runtime: {err}"))
500                        })?;
501                    runtime.block_on(async move {
502                        tokio::time::timeout(timeout, future)
503                            .await
504                            .map_err(|_| Self::s3_sync_timeout_error(timeout))
505                    })
506                })
507                .map_err(|err| StoreError::Other(format!("spawn S3 sync helper thread: {err}")))?
508                .join()
509                .map_err(|_| StoreError::Other("S3 sync helper thread panicked".to_string()))?;
510        }
511
512        let runtime = tokio::runtime::Builder::new_current_thread()
513            .enable_all()
514            .build()
515            .map_err(|err| StoreError::Other(format!("build storage s3 sync runtime: {err}")))?;
516        runtime.block_on(async move {
517            tokio::time::timeout(timeout, future)
518                .await
519                .map_err(|_| Self::s3_sync_timeout_error(timeout))
520        })
521    }
522
523    /// Create router with local storage only
524    pub fn new(local: Arc<LocalStore>) -> Self {
525        Self {
526            local,
527            #[cfg(feature = "s3")]
528            s3_client: None,
529            #[cfg(feature = "s3")]
530            s3_bucket: None,
531            #[cfg(feature = "s3")]
532            s3_prefix: String::new(),
533            #[cfg(feature = "s3")]
534            sync_tx: None,
535        }
536    }
537
538    /// Create router with local storage + S3 backup
539    #[cfg(feature = "s3")]
540    pub async fn with_s3(local: Arc<LocalStore>, config: &S3Config) -> Result<Self, anyhow::Error> {
541        use aws_sdk_s3::Client as S3Client;
542
543        // Build AWS config
544        let mut aws_config_loader = aws_config::from_env();
545        aws_config_loader =
546            aws_config_loader.region(aws_sdk_s3::config::Region::new(config.region.clone()));
547        let aws_config = aws_config_loader.load().await;
548
549        // Build S3 client with custom endpoint
550        let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&aws_config);
551        s3_config_builder = s3_config_builder
552            .endpoint_url(&config.endpoint)
553            .force_path_style(true);
554
555        let s3_client = S3Client::from_conf(s3_config_builder.build());
556        let bucket = config.bucket.clone();
557        let prefix = config.prefix.clone().unwrap_or_default();
558
559        // Create background sync channel
560        let (sync_tx, mut sync_rx) = mpsc::unbounded_channel::<S3SyncMessage>();
561
562        // Spawn background sync task with bounded concurrent uploads
563        let sync_client = s3_client.clone();
564        let sync_bucket = bucket.clone();
565        let sync_prefix = prefix.clone();
566
567        tokio::spawn(async move {
568            use aws_sdk_s3::primitives::ByteStream;
569
570            tracing::info!("S3 background sync task started");
571
572            // Keep S3 writes parallel, but avoid dispatch failures during mirror backfill bursts.
573            let semaphore = std::sync::Arc::new(tokio::sync::Semaphore::new(8));
574            let client = std::sync::Arc::new(sync_client);
575            let bucket = std::sync::Arc::new(sync_bucket);
576            let prefix = std::sync::Arc::new(sync_prefix);
577
578            while let Some(msg) = sync_rx.recv().await {
579                let client = client.clone();
580                let bucket = bucket.clone();
581                let prefix = prefix.clone();
582                let semaphore = semaphore.clone();
583
584                // Spawn each upload with semaphore-bounded concurrency
585                tokio::spawn(async move {
586                    // Acquire permit before uploading
587                    let _permit = semaphore.acquire().await;
588
589                    match msg {
590                        S3SyncMessage::Upload { hash, data } => {
591                            let key = format!("{}{}.bin", prefix, to_hex(&hash));
592                            tracing::debug!("S3 uploading {} ({} bytes)", &key, data.len());
593
594                            let mut attempt = 1u8;
595                            loop {
596                                match client
597                                    .put_object()
598                                    .bucket(bucket.as_str())
599                                    .key(&key)
600                                    .body(ByteStream::from(data.clone()))
601                                    .send()
602                                    .await
603                                {
604                                    Ok(_) => {
605                                        tracing::debug!("S3 upload succeeded: {}", &key);
606                                        break;
607                                    }
608                                    Err(e) if attempt < 3 => {
609                                        tracing::warn!(
610                                            "S3 upload retrying {}: attempt={} error={}",
611                                            &key,
612                                            attempt,
613                                            e
614                                        );
615                                        tokio::time::sleep(std::time::Duration::from_millis(
616                                            250 * u64::from(attempt),
617                                        ))
618                                        .await;
619                                        attempt += 1;
620                                    }
621                                    Err(e) => {
622                                        tracing::error!(
623                                            "S3 upload failed {} after {} attempts: {}",
624                                            &key,
625                                            attempt,
626                                            e
627                                        );
628                                        break;
629                                    }
630                                }
631                            }
632                        }
633                        S3SyncMessage::Delete { hash } => {
634                            let key = format!("{}{}.bin", prefix, to_hex(&hash));
635                            tracing::debug!("S3 deleting {}", &key);
636
637                            let mut attempt = 1u8;
638                            loop {
639                                match client
640                                    .delete_object()
641                                    .bucket(bucket.as_str())
642                                    .key(&key)
643                                    .send()
644                                    .await
645                                {
646                                    Ok(_) => break,
647                                    Err(e) if attempt < 3 => {
648                                        tracing::warn!(
649                                            "S3 delete retrying {}: attempt={} error={}",
650                                            &key,
651                                            attempt,
652                                            e
653                                        );
654                                        tokio::time::sleep(std::time::Duration::from_millis(
655                                            250 * u64::from(attempt),
656                                        ))
657                                        .await;
658                                        attempt += 1;
659                                    }
660                                    Err(e) => {
661                                        tracing::error!(
662                                            "S3 delete failed {} after {} attempts: {}",
663                                            &key,
664                                            attempt,
665                                            e
666                                        );
667                                        break;
668                                    }
669                                }
670                            }
671                        }
672                    }
673                });
674            }
675        });
676
677        tracing::info!(
678            "S3 storage initialized: bucket={}, prefix={}",
679            bucket,
680            prefix
681        );
682
683        Ok(Self {
684            local,
685            s3_client: Some(s3_client),
686            s3_bucket: Some(bucket),
687            s3_prefix: prefix,
688            sync_tx: Some(sync_tx),
689        })
690    }
691
692    /// Store data - writes to LMDB, queues S3 upload in background
693    pub fn put_sync(&self, hash: Hash, data: &[u8]) -> Result<bool, StoreError> {
694        // Always write to local first
695        let is_new = self.local.put_sync(hash, data)?;
696
697        // Queue S3 upload only for newly inserted blobs.
698        // Existing local blobs were already persisted or are handled by explicit repair/push flows.
699        #[cfg(feature = "s3")]
700        if is_new {
701            if let Some(ref tx) = self.sync_tx {
702                tracing::debug!(
703                    "Queueing S3 upload for {} ({} bytes)",
704                    crate::storage::to_hex(&hash)[..16].to_string(),
705                    data.len(),
706                );
707                if let Err(e) = tx.send(S3SyncMessage::Upload {
708                    hash,
709                    data: data.to_vec(),
710                }) {
711                    tracing::error!("Failed to queue S3 upload: {}", e);
712                }
713            }
714        }
715
716        Ok(is_new)
717    }
718
719    /// Store multiple blobs with a single local batch write when supported.
720    pub fn put_many_sync(&self, items: &[(Hash, Vec<u8>)]) -> Result<usize, StoreError> {
721        #[cfg(feature = "s3")]
722        let pending_uploads = if self.sync_tx.is_some() {
723            let mut pending = Vec::new();
724            for (hash, data) in items {
725                if !self.local.exists(hash)? {
726                    pending.push((*hash, data.clone()));
727                }
728            }
729            pending
730        } else {
731            Vec::new()
732        };
733
734        let inserted = self.local.put_many_sync(items)?;
735
736        #[cfg(feature = "s3")]
737        if let Some(ref tx) = self.sync_tx {
738            for (hash, data) in pending_uploads {
739                if let Err(e) = tx.send(S3SyncMessage::Upload { hash, data }) {
740                    tracing::error!("Failed to queue S3 upload: {}", e);
741                }
742            }
743        }
744
745        Ok(inserted)
746    }
747
748    /// Get data - tries LMDB first, falls back to S3
749    pub fn get_sync(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
750        // Try local first
751        if let Some(data) = self.local.get_sync(hash)? {
752            return Ok(Some(data));
753        }
754
755        // Fall back to S3 if configured
756        #[cfg(feature = "s3")]
757        if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
758            let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
759            let client = client.clone();
760            let bucket = bucket.clone();
761
762            match Self::run_s3_future_sync(async move {
763                client.get_object().bucket(bucket).key(key).send().await
764            }) {
765                Ok(Ok(output)) => {
766                    match Self::run_s3_future_sync(async move { output.body.collect().await }) {
767                        Ok(Ok(body)) => {
768                            let data = body.into_bytes().to_vec();
769                            // Cache locally for future reads
770                            let _ = self.local.put_sync(*hash, &data);
771                            return Ok(Some(data));
772                        }
773                        Ok(Err(err)) => {
774                            tracing::warn!("S3 body collect failed: {}", err);
775                        }
776                        Err(err) => {
777                            tracing::warn!("S3 body collect runtime failed: {}", err);
778                        }
779                    }
780                }
781                Ok(Err(err)) => {
782                    let service_err = err.into_service_error();
783                    if !service_err.is_no_such_key() {
784                        tracing::warn!("S3 get failed: {}", service_err);
785                    }
786                }
787                Err(err) => {
788                    tracing::warn!("S3 get runtime failed: {}", err);
789                }
790            }
791        }
792
793        Ok(None)
794    }
795
796    pub fn get_range_sync(
797        &self,
798        hash: &Hash,
799        start: u64,
800        end_inclusive: u64,
801    ) -> Result<Option<Vec<u8>>, StoreError> {
802        self.local.get_range_sync(hash, start, end_inclusive)
803    }
804
805    pub fn blob_size_sync(&self, hash: &Hash) -> Result<Option<u64>, StoreError> {
806        self.local.blob_size_sync(hash)
807    }
808
809    pub fn touch_accessed_sync(&self, hash: &Hash, now: u64) -> Result<bool, StoreError> {
810        self.local.touch_accessed_sync(hash, now)
811    }
812
813    pub fn touch_many_accessed_sync(&self, hashes: &[Hash], now: u64) -> Result<usize, StoreError> {
814        self.local.touch_many_accessed_sync(hashes, now)
815    }
816
817    pub fn last_accessed_at_sync(&self, hash: &Hash) -> Result<Option<u64>, StoreError> {
818        self.local.last_accessed_at_sync(hash)
819    }
820
821    pub fn many_last_accessed_at_sync(
822        &self,
823        hashes: &[Hash],
824    ) -> Result<Vec<(Hash, u64)>, StoreError> {
825        self.local.many_last_accessed_at_sync(hashes)
826    }
827
828    /// Check if hash exists
829    pub fn exists(&self, hash: &Hash) -> Result<bool, StoreError> {
830        // Check local first
831        if self.local.exists(hash)? {
832            return Ok(true);
833        }
834
835        // Check S3 if configured
836        #[cfg(feature = "s3")]
837        if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
838            let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
839            let client = client.clone();
840            let bucket = bucket.clone();
841
842            match Self::run_s3_future_sync(async move {
843                client.head_object().bucket(bucket).key(&key).send().await
844            }) {
845                Ok(Ok(_)) => return Ok(true),
846                Ok(Err(err)) => {
847                    let service_err = err.into_service_error();
848                    if !service_err.is_not_found() {
849                        tracing::warn!("S3 head failed: {}", service_err);
850                    }
851                }
852                Err(err) => {
853                    tracing::warn!("S3 head runtime failed: {}", err);
854                }
855            }
856        }
857
858        Ok(false)
859    }
860
861    /// Delete data from both local and S3 stores
862    pub fn delete_sync(&self, hash: &Hash) -> Result<bool, StoreError> {
863        let deleted = self.local.delete_sync(hash)?;
864
865        // Queue S3 delete if configured
866        #[cfg(feature = "s3")]
867        if let Some(ref tx) = self.sync_tx {
868            let _ = tx.send(S3SyncMessage::Delete { hash: *hash });
869        }
870
871        Ok(deleted)
872    }
873
874    /// Delete data from local store only (don't propagate to S3)
875    /// Used for eviction where we want to keep S3 as archive
876    pub fn delete_local_only(&self, hash: &Hash) -> Result<bool, StoreError> {
877        self.local.delete_sync(hash)
878    }
879
880    /// Get stats from local store
881    pub fn stats(&self) -> Result<LocalStoreStats, StoreError> {
882        self.local.stats()
883    }
884
885    /// List all hashes from local store
886    pub fn list(&self) -> Result<Vec<Hash>, StoreError> {
887        self.local.list()
888    }
889
890    /// Get the underlying local store for HashTree operations
891    pub fn local_store(&self) -> Arc<LocalStore> {
892        Arc::clone(&self.local)
893    }
894}
895
896#[derive(Clone)]
897struct AccessRecordingStore {
898    inner: Arc<StorageRouter>,
899    accessed: Arc<Mutex<HashSet<Hash>>>,
900}
901
902impl AccessRecordingStore {
903    fn new(inner: Arc<StorageRouter>) -> Self {
904        Self {
905            inner,
906            accessed: Arc::new(Mutex::new(HashSet::new())),
907        }
908    }
909
910    fn take_accessed_hashes(&self) -> Vec<Hash> {
911        let Ok(mut accessed) = self.accessed.lock() else {
912            return Vec::new();
913        };
914        accessed.drain().collect()
915    }
916
917    fn record_access(&self, hash: &Hash) {
918        let Ok(mut accessed) = self.accessed.lock() else {
919            return;
920        };
921        accessed.insert(*hash);
922    }
923}
924
925#[async_trait]
926impl Store for AccessRecordingStore {
927    async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
928        self.inner.put(hash, data).await
929    }
930
931    async fn put_many(&self, items: Vec<(Hash, Vec<u8>)>) -> Result<usize, StoreError> {
932        self.inner.put_many(items).await
933    }
934
935    async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
936        let data = self.inner.get(hash).await?;
937        if data.is_some() {
938            self.record_access(hash);
939        }
940        Ok(data)
941    }
942
943    async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
944        self.inner.has(hash).await
945    }
946
947    async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
948        self.inner.delete(hash).await
949    }
950}
951
952// Implement async Store trait for StorageRouter so it can be used directly with HashTree
953// This ensures all writes go through S3 sync
954#[async_trait]
955impl Store for StorageRouter {
956    async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
957        self.put_sync(hash, &data)
958    }
959
960    async fn put_many(&self, items: Vec<(Hash, Vec<u8>)>) -> Result<usize, StoreError> {
961        self.put_many_sync(&items)
962    }
963
964    async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
965        self.get_sync(hash)
966    }
967
968    async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
969        self.exists(hash)
970    }
971
972    async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
973        self.delete_sync(hash)
974    }
975}
976
977pub struct HashtreeStore {
978    base_path: PathBuf,
979    env: heed::Env,
980    /// Set of pinned hashes (32-byte raw hashes, prevents garbage collection)
981    pins: Database<Bytes, Unit>,
982    /// Mutable published refs that should stay subscribed and keep following updates
983    pinned_refs: Database<Str, Unit>,
984    /// Authors whose hashtree publications should be mirrored continuously
985    tracked_authors: Database<Str, Unit>,
986    /// Blob ownership: sha256 (32 bytes) ++ pubkey (32 bytes) -> () (composite key for multi-owner)
987    blob_owners: Database<Bytes, Unit>,
988    /// Maps pubkey (32 bytes) -> blob metadata JSON (for blossom list)
989    pubkey_blobs: Database<Bytes, Bytes>,
990    /// Pubkey listing index: pubkey (32 bytes) ++ sha256 (32 bytes) -> BlobMetadata JSON
991    pubkey_blob_index: Database<Bytes, Bytes>,
992    /// Tree metadata for eviction: tree_root_hash (32 bytes) -> TreeMeta (msgpack)
993    tree_meta: Database<Bytes, Bytes>,
994    /// Blob-to-tree mapping: blob_hash ++ tree_hash (64 bytes) -> ()
995    blob_trees: Database<Bytes, Unit>,
996    /// Tree refs: "npub/path" -> tree_root_hash (32 bytes) - for replacing old versions
997    tree_refs: Database<Str, Bytes>,
998    /// Cached roots from Nostr: "pubkey_hex/tree_name" -> CachedRoot (msgpack)
999    cached_roots: Database<Str, Bytes>,
1000    /// Storage router - handles LMDB + optional S3 (Arc for sharing with HashTree)
1001    router: Arc<StorageRouter>,
1002    /// Maximum storage size in bytes (from config)
1003    max_size_bytes: u64,
1004    /// Whether quota enforcement may delete local blobs not tracked by any indexed tree.
1005    evict_orphans: bool,
1006    /// Best-effort in-memory throttle for blob access metadata writes.
1007    blob_access_update_gate: BlobAccessUpdateGate,
1008    /// Keeps access-time maintenance out of foreground blob reads.
1009    blob_access_update_inflight: Arc<AtomicBool>,
1010}
1011
1012impl HashtreeStore {
1013    /// Create a new store with the configured local storage limit.
1014    pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
1015        let config = hashtree_config::Config::load_or_default();
1016        let max_size_bytes = config
1017            .storage
1018            .max_size_gb
1019            .saturating_mul(1024 * 1024 * 1024);
1020        Self::with_options_and_backend(
1021            path,
1022            None,
1023            max_size_bytes,
1024            config.storage.evict_orphans,
1025            &config.storage.backend,
1026        )
1027    }
1028
1029    /// Create a new store with an explicit local backend and size limit.
1030    pub fn new_with_backend<P: AsRef<Path>>(
1031        path: P,
1032        backend: hashtree_config::StorageBackend,
1033        max_size_bytes: u64,
1034    ) -> Result<Self> {
1035        Self::with_options_and_backend(path, None, max_size_bytes, true, &backend)
1036    }
1037
1038    /// Create a new store with optional S3 backend and the configured local storage limit.
1039    pub fn with_s3<P: AsRef<Path>>(path: P, s3_config: Option<&S3Config>) -> Result<Self> {
1040        let config = hashtree_config::Config::load_or_default();
1041        let max_size_bytes = config
1042            .storage
1043            .max_size_gb
1044            .saturating_mul(1024 * 1024 * 1024);
1045        Self::with_options_and_backend(
1046            path,
1047            s3_config,
1048            max_size_bytes,
1049            config.storage.evict_orphans,
1050            &config.storage.backend,
1051        )
1052    }
1053
1054    /// Create a new store with optional S3 backend and custom size limit.
1055    ///
1056    /// The raw local blob backend remains unbounded. `HashtreeStore` enforces
1057    /// `max_size_bytes` at the tree-management layer so eviction can honor pins,
1058    /// orphan handling, and local-only eviction when S3 is used as archive.
1059    pub fn with_options<P: AsRef<Path>>(
1060        path: P,
1061        s3_config: Option<&S3Config>,
1062        max_size_bytes: u64,
1063    ) -> Result<Self> {
1064        let config = hashtree_config::Config::load_or_default();
1065        Self::with_options_and_backend(
1066            path,
1067            s3_config,
1068            max_size_bytes,
1069            config.storage.evict_orphans,
1070            &config.storage.backend,
1071        )
1072    }
1073
1074    pub fn with_options_and_backend<P: AsRef<Path>>(
1075        path: P,
1076        s3_config: Option<&S3Config>,
1077        max_size_bytes: u64,
1078        evict_orphans: bool,
1079        backend: &hashtree_config::StorageBackend,
1080    ) -> Result<Self> {
1081        let path = path.as_ref();
1082        std::fs::create_dir_all(path)?;
1083        let metadata_map_size = lmdb_map_size_for_existing_env(
1084            path,
1085            lmdb_metadata_map_size_for_storage_budget(max_size_bytes),
1086        )?;
1087
1088        let env = unsafe {
1089            EnvOpenOptions::new()
1090                .map_size(metadata_map_size)
1091                .max_dbs(11) // pins, pinned_refs, tracked_authors, blob_owners, pubkey_blobs, pubkey_blob_index, tree_meta, blob_trees, tree_refs, cached_roots, blobs
1092                .max_readers(LMDB_MAX_READERS)
1093                .open(path)?
1094        };
1095        let _ = env.clear_stale_readers();
1096        if env.info().map_size < metadata_map_size {
1097            unsafe { env.resize(metadata_map_size) }?;
1098        }
1099
1100        let mut wtxn = env.write_txn()?;
1101        let pins = env.create_database(&mut wtxn, Some("pins"))?;
1102        let pinned_refs = env.create_database(&mut wtxn, Some("pinned_refs"))?;
1103        let tracked_authors = env.create_database(&mut wtxn, Some("tracked_authors"))?;
1104        let blob_owners = env.create_database(&mut wtxn, Some("blob_owners"))?;
1105        let pubkey_blobs = env.create_database(&mut wtxn, Some("pubkey_blobs"))?;
1106        let pubkey_blob_index = env.create_database(&mut wtxn, Some("pubkey_blob_index"))?;
1107        let tree_meta = env.create_database(&mut wtxn, Some("tree_meta"))?;
1108        let blob_trees = env.create_database(&mut wtxn, Some("blob_trees"))?;
1109        let tree_refs = env.create_database(&mut wtxn, Some("tree_refs"))?;
1110        let cached_roots = env.create_database(&mut wtxn, Some("cached_roots"))?;
1111        wtxn.commit()?;
1112
1113        // Intentionally keep the raw blob backend unbounded here. HashtreeStore
1114        // owns quota policy above this layer, where it can coordinate eviction
1115        // with tree refs, blob ownership, pins, and S3 archival behavior.
1116        let local_store = Arc::new(match backend {
1117            hashtree_config::StorageBackend::Fs => LocalStore::Fs(
1118                FsBlobStore::new(path.join("blobs"))
1119                    .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
1120            ),
1121            #[cfg(feature = "lmdb")]
1122            hashtree_config::StorageBackend::Lmdb => {
1123                std::fs::create_dir_all(path.join("blobs"))?;
1124                remove_stale_fs_blob_shards(&path.join("blobs"))
1125                    .map_err(|e| anyhow::anyhow!("Failed to clean LMDB blob store path: {}", e))?;
1126                let requested_map_size = max_size_bytes.max(LMDB_BLOB_MIN_MAP_SIZE_BYTES);
1127                let map_size = usize::try_from(requested_map_size)
1128                    .context("LMDB blob map size exceeds usize")?;
1129                LocalStore::Lmdb(
1130                    LmdbBlobStore::with_map_size(path.join("blobs"), map_size)
1131                        .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
1132                )
1133            }
1134            #[cfg(not(feature = "lmdb"))]
1135            hashtree_config::StorageBackend::Lmdb => {
1136                tracing::warn!(
1137                    "LMDB backend requested but lmdb feature not enabled, using filesystem storage"
1138                );
1139                LocalStore::Fs(
1140                    FsBlobStore::new(path.join("blobs"))
1141                        .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
1142                )
1143            }
1144        });
1145
1146        // Create storage router with optional S3
1147        #[cfg(feature = "s3")]
1148        let router = Arc::new(if let Some(s3_cfg) = s3_config {
1149            tracing::info!(
1150                "Initializing S3 storage backend: bucket={}, endpoint={}",
1151                s3_cfg.bucket,
1152                s3_cfg.endpoint
1153            );
1154
1155            sync_block_on(async { StorageRouter::with_s3(local_store, s3_cfg).await })?
1156        } else {
1157            StorageRouter::new(local_store)
1158        });
1159
1160        #[cfg(not(feature = "s3"))]
1161        let router = Arc::new({
1162            if s3_config.is_some() {
1163                tracing::warn!(
1164                    "S3 config provided but S3 feature not enabled. Using local storage only."
1165                );
1166            }
1167            StorageRouter::new(local_store)
1168        });
1169
1170        Ok(Self {
1171            base_path: path.to_path_buf(),
1172            env,
1173            pins,
1174            pinned_refs,
1175            tracked_authors,
1176            blob_owners,
1177            pubkey_blobs,
1178            pubkey_blob_index,
1179            tree_meta,
1180            blob_trees,
1181            tree_refs,
1182            cached_roots,
1183            router,
1184            max_size_bytes,
1185            evict_orphans,
1186            blob_access_update_gate: BlobAccessUpdateGate::default(),
1187            blob_access_update_inflight: Arc::new(AtomicBool::new(false)),
1188        })
1189    }
1190
1191    pub fn base_path(&self) -> &Path {
1192        &self.base_path
1193    }
1194
1195    /// Get the storage router
1196    pub fn router(&self) -> &StorageRouter {
1197        &self.router
1198    }
1199
1200    /// Get the storage router as Arc (for use with HashTree which needs Arc<dyn Store>)
1201    /// All writes through this go to both LMDB and S3
1202    pub fn store_arc(&self) -> Arc<StorageRouter> {
1203        Arc::clone(&self.router)
1204    }
1205
1206    fn access_tracking_tree(&self) -> (HashTree<AccessRecordingStore>, AccessRecordingStore) {
1207        let access_store = AccessRecordingStore::new(self.store_arc());
1208        let tree = HashTree::new(HashTreeConfig::new(Arc::new(access_store.clone())).public());
1209        (tree, access_store)
1210    }
1211
1212    pub fn record_blob_accesses<I>(&self, hashes: I)
1213    where
1214        I: IntoIterator<Item = Hash>,
1215    {
1216        let now = unix_timestamp_now();
1217        let mut due_hashes = self.blob_access_update_gate.due_hashes(hashes, now);
1218        if due_hashes.is_empty() {
1219            return;
1220        }
1221
1222        if self
1223            .blob_access_update_inflight
1224            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
1225            .is_err()
1226        {
1227            return;
1228        }
1229
1230        if due_hashes.len() > ACCESS_UPDATE_BACKGROUND_BATCH_LIMIT {
1231            due_hashes.truncate(ACCESS_UPDATE_BACKGROUND_BATCH_LIMIT);
1232        }
1233
1234        let router = Arc::clone(&self.router);
1235        let inflight = Arc::clone(&self.blob_access_update_inflight);
1236        let spawn_result = std::thread::Builder::new()
1237            .name("blob-access-update".to_string())
1238            .spawn(move || {
1239                if let Err(err) = router.touch_many_accessed_sync(&due_hashes, now) {
1240                    tracing::debug!("Failed to update blob access metadata: {}", err);
1241                }
1242                inflight.store(false, Ordering::Release);
1243            });
1244        if let Err(err) = spawn_result {
1245            self.blob_access_update_inflight
1246                .store(false, Ordering::Release);
1247            tracing::debug!("Failed to spawn blob access metadata updater: {}", err);
1248        }
1249    }
1250
1251    fn record_blob_access_now(&self, hash: &Hash) {
1252        if let Err(err) = self.router.touch_accessed_sync(hash, unix_timestamp_now()) {
1253            tracing::debug!("Failed to update blob access metadata: {}", err);
1254        }
1255    }
1256
1257    pub fn blob_last_accessed_at(&self, hash: &Hash) -> Result<Option<u64>> {
1258        self.router
1259            .last_accessed_at_sync(hash)
1260            .map_err(|e| anyhow::anyhow!("Failed to read blob access metadata: {}", e))
1261    }
1262
1263    pub fn blob_last_accessed_many(&self, hashes: &[Hash]) -> Result<Vec<(Hash, u64)>> {
1264        self.router
1265            .many_last_accessed_at_sync(hashes)
1266            .map_err(|e| anyhow::anyhow!("Failed to read blob access metadata: {}", e))
1267    }
1268
1269    /// Get tree node by hash (raw bytes)
1270    pub fn get_tree_node(&self, hash: &[u8; 32]) -> Result<Option<TreeNode>> {
1271        let (tree, access_store) = self.access_tracking_tree();
1272
1273        let result = sync_block_on(async {
1274            tree.get_tree_node(hash)
1275                .await
1276                .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))
1277        })?;
1278        if result.is_some() {
1279            self.record_blob_accesses(access_store.take_accessed_hashes());
1280        }
1281        Ok(result)
1282    }
1283
1284    /// Store a raw blob, returns SHA256 hash as hex.
1285    pub fn put_blob(&self, data: &[u8]) -> Result<String> {
1286        let hash = sha256(data);
1287        let inserted = self
1288            .router
1289            .put_sync(hash, data)
1290            .map_err(|e| anyhow::anyhow!("Failed to store blob: {}", e))?;
1291        if !inserted {
1292            self.record_blob_access_now(&hash);
1293        }
1294        Ok(to_hex(&hash))
1295    }
1296
1297    /// Store an owned Blossom blob under the configured durable storage limit.
1298    pub fn put_owned_blob(&self, data: &[u8], pubkey: &[u8; 32]) -> Result<String> {
1299        let hash = sha256(data);
1300        if !self
1301            .router
1302            .exists(&hash)
1303            .map_err(|e| anyhow::anyhow!("Failed to check blob: {}", e))?
1304        {
1305            self.make_room_for_durable_blob(data.len() as u64)?;
1306            self.router
1307                .put_sync(hash, data)
1308                .map_err(|e| anyhow::anyhow!("Failed to store blob: {}", e))?;
1309        } else {
1310            self.record_blob_access_now(&hash);
1311        }
1312        self.set_blob_owner_with_size(&hash, pubkey, data.len() as u64)?;
1313        Ok(to_hex(&hash))
1314    }
1315
1316    /// Store multiple owned Blossom blobs, batching raw blob and owner-index writes.
1317    pub fn put_owned_blobs(&self, items: &[(Hash, Vec<u8>)], pubkey: &[u8; 32]) -> Result<usize> {
1318        if items.is_empty() {
1319            return Ok(0);
1320        }
1321        let incoming_bytes = items
1322            .iter()
1323            .fold(0u64, |total, (_, data)| total.saturating_add(data.len() as u64));
1324        self.make_room_for_durable_blob(incoming_bytes)?;
1325        let inserted = self
1326            .router
1327            .put_many_sync(items)
1328            .map_err(|e| anyhow::anyhow!("Failed to store blob batch: {}", e))?;
1329
1330        let now = SystemTime::now()
1331            .duration_since(UNIX_EPOCH)
1332            .unwrap()
1333            .as_secs();
1334        let mut wtxn = self.env.write_txn()?;
1335        for (hash, data) in items {
1336            let owner_key = Self::blob_owner_key(hash, pubkey);
1337            self.blob_owners.put(&mut wtxn, &owner_key[..], &())?;
1338
1339            let index_key = Self::pubkey_blob_key(pubkey, hash);
1340            if self.pubkey_blob_index.get(&wtxn, &index_key[..])?.is_none() {
1341                let metadata = BlobMetadata {
1342                    sha256: to_hex(hash),
1343                    size: data.len() as u64,
1344                    mime_type: "application/octet-stream".to_string(),
1345                    uploaded: now,
1346                };
1347                self.pubkey_blob_index
1348                    .put(&mut wtxn, &index_key[..], &serde_json::to_vec(&metadata)?)?;
1349            }
1350        }
1351        wtxn.commit()?;
1352        Ok(inserted)
1353    }
1354
1355    /// Store an opportunistically cached blob.
1356    ///
1357    /// Unlike durable `put_blob` writes, this path may evict disposable orphaned
1358    /// blobs to make room under storage pressure. It intentionally avoids touching
1359    /// indexed trees, social-graph roots, explicit pins, and owned Blossom blobs.
1360    pub fn put_cached_blob(&self, data: &[u8]) -> Result<String> {
1361        let hash = sha256(data);
1362        if self
1363            .router
1364            .exists(&hash)
1365            .map_err(|e| anyhow::anyhow!("Failed to check cached blob: {}", e))?
1366        {
1367            self.record_blob_access_now(&hash);
1368            return Ok(to_hex(&hash));
1369        }
1370
1371        let incoming_bytes = data.len() as u64;
1372        let _ = self.make_room_for_cached_blob(incoming_bytes);
1373
1374        let mut retried_after_cleanup = false;
1375        loop {
1376            match self.router.put_sync(hash, data) {
1377                Ok(_) => return Ok(to_hex(&hash)),
1378                Err(err) if !retried_after_cleanup && is_map_full_store_error(&err) => {
1379                    let freed = self.relieve_cached_blob_write_pressure(incoming_bytes)?;
1380                    if freed == 0 {
1381                        return Err(anyhow::anyhow!("Failed to store cached blob: {}", err));
1382                    }
1383                    retried_after_cleanup = true;
1384                }
1385                Err(err) => return Err(anyhow::anyhow!("Failed to store cached blob: {}", err)),
1386            }
1387        }
1388    }
1389
1390    /// Get a raw blob by SHA256 hash (raw bytes).
1391    pub fn get_blob(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
1392        let data = self
1393            .router
1394            .get_sync(hash)
1395            .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))?;
1396        if data.is_some() {
1397            self.record_blob_accesses(std::iter::once(*hash));
1398        }
1399        Ok(data)
1400    }
1401
1402    pub fn get_blob_range(
1403        &self,
1404        hash: &[u8; 32],
1405        start: u64,
1406        end_inclusive: u64,
1407    ) -> Result<Option<Vec<u8>>> {
1408        let data = self
1409            .router
1410            .get_range_sync(hash, start, end_inclusive)
1411            .map_err(|e| anyhow::anyhow!("Failed to get blob range: {}", e))?;
1412        if data.is_some() {
1413            self.record_blob_accesses(std::iter::once(*hash));
1414        }
1415        Ok(data)
1416    }
1417
1418    pub fn blob_size(&self, hash: &[u8; 32]) -> Result<Option<u64>> {
1419        self.router
1420            .blob_size_sync(hash)
1421            .map_err(|e| anyhow::anyhow!("Failed to get blob size: {}", e))
1422    }
1423
1424    /// Check if a blob exists by SHA256 hash (raw bytes).
1425    pub fn blob_exists(&self, hash: &[u8; 32]) -> Result<bool> {
1426        self.router
1427            .exists(hash)
1428            .map_err(|e| anyhow::anyhow!("Failed to check blob: {}", e))
1429    }
1430
1431    // === Blossom ownership tracking ===
1432    // Uses composite key: sha256 (32 bytes) ++ pubkey (32 bytes) -> ()
1433    // This allows efficient multi-owner tracking with O(1) lookups
1434
1435    /// Build composite key for blob_owners: sha256 ++ pubkey (64 bytes total)
1436    fn blob_owner_key(sha256: &[u8; 32], pubkey: &[u8; 32]) -> [u8; 64] {
1437        let mut key = [0u8; 64];
1438        key[..32].copy_from_slice(sha256);
1439        key[32..].copy_from_slice(pubkey);
1440        key
1441    }
1442
1443    fn pubkey_blob_key(pubkey: &[u8; 32], sha256: &[u8; 32]) -> [u8; 64] {
1444        let mut key = [0u8; 64];
1445        key[..32].copy_from_slice(pubkey);
1446        key[32..].copy_from_slice(sha256);
1447        key
1448    }
1449
1450    /// Add an owner (pubkey) to a blob for Blossom protocol
1451    /// Multiple users can own the same blob - it's only deleted when all owners remove it
1452    pub fn set_blob_owner(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<()> {
1453        let size = self
1454            .router
1455            .blob_size_sync(sha256)
1456            .map_err(|e| anyhow::anyhow!("Failed to get blob size: {}", e))?
1457            .unwrap_or(0);
1458        self.set_blob_owner_with_size(sha256, pubkey, size)
1459    }
1460
1461    fn set_blob_owner_with_size(
1462        &self,
1463        sha256: &[u8; 32],
1464        pubkey: &[u8; 32],
1465        size: u64,
1466    ) -> Result<()> {
1467        let key = Self::blob_owner_key(sha256, pubkey);
1468        let index_key = Self::pubkey_blob_key(pubkey, sha256);
1469        let mut wtxn = self.env.write_txn()?;
1470
1471        // Add ownership entry (idempotent - put overwrites)
1472        self.blob_owners.put(&mut wtxn, &key[..], &())?;
1473
1474        if self.pubkey_blob_index.get(&wtxn, &index_key[..])?.is_none() {
1475            let now = SystemTime::now()
1476                .duration_since(UNIX_EPOCH)
1477                .unwrap()
1478                .as_secs();
1479            let metadata = BlobMetadata {
1480                sha256: to_hex(sha256),
1481                size,
1482                mime_type: "application/octet-stream".to_string(),
1483                uploaded: now,
1484            };
1485            self.pubkey_blob_index
1486                .put(&mut wtxn, &index_key[..], &serde_json::to_vec(&metadata)?)?;
1487        }
1488
1489        wtxn.commit()?;
1490        Ok(())
1491    }
1492
1493    /// Check if a pubkey owns a blob
1494    pub fn is_blob_owner(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<bool> {
1495        let key = Self::blob_owner_key(sha256, pubkey);
1496        let rtxn = self.env.read_txn()?;
1497        Ok(self.blob_owners.get(&rtxn, &key[..])?.is_some())
1498    }
1499
1500    /// Get all owners (pubkeys) of a blob via prefix scan (returns raw bytes)
1501    pub fn get_blob_owners(&self, sha256: &[u8; 32]) -> Result<Vec<[u8; 32]>> {
1502        let rtxn = self.env.read_txn()?;
1503
1504        let mut owners = Vec::new();
1505        for item in self.blob_owners.prefix_iter(&rtxn, &sha256[..])? {
1506            let (key, _) = item?;
1507            if key.len() == 64 {
1508                // Extract pubkey from composite key (bytes 32-64)
1509                let mut pubkey = [0u8; 32];
1510                pubkey.copy_from_slice(&key[32..64]);
1511                owners.push(pubkey);
1512            }
1513        }
1514        Ok(owners)
1515    }
1516
1517    /// Check if blob has any owners
1518    pub fn blob_has_owners(&self, sha256: &[u8; 32]) -> Result<bool> {
1519        let rtxn = self.env.read_txn()?;
1520
1521        // Just check if any entry exists with this prefix
1522        for item in self.blob_owners.prefix_iter(&rtxn, &sha256[..])? {
1523            if item.is_ok() {
1524                return Ok(true);
1525            }
1526        }
1527        Ok(false)
1528    }
1529
1530    /// Get the first owner (pubkey) of a blob (for backwards compatibility)
1531    pub fn get_blob_owner(&self, sha256: &[u8; 32]) -> Result<Option<[u8; 32]>> {
1532        Ok(self.get_blob_owners(sha256)?.into_iter().next())
1533    }
1534
1535    /// Remove a user's ownership of a blossom blob
1536    /// Only deletes the actual blob when no owners remain
1537    /// Returns true if the blob was actually deleted (no owners left)
1538    pub fn delete_blossom_blob(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<bool> {
1539        let key = Self::blob_owner_key(sha256, pubkey);
1540        let mut wtxn = self.env.write_txn()?;
1541
1542        // Remove this pubkey's ownership entry
1543        self.blob_owners.delete(&mut wtxn, &key[..])?;
1544        self.pubkey_blob_index
1545            .delete(&mut wtxn, &Self::pubkey_blob_key(pubkey, sha256)[..])?;
1546
1547        // Hex strings for logging and BlobMetadata (which stores sha256 as hex string)
1548        let sha256_hex = to_hex(sha256);
1549
1550        // Remove from pubkey's blob list
1551        if let Some(blobs_bytes) = self.pubkey_blobs.get(&wtxn, pubkey)? {
1552            if let Ok(mut blobs) = serde_json::from_slice::<Vec<BlobMetadata>>(blobs_bytes) {
1553                blobs.retain(|b| b.sha256 != sha256_hex);
1554                let blobs_json = serde_json::to_vec(&blobs)?;
1555                self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
1556            }
1557        }
1558
1559        // Check if any other owners remain (prefix scan)
1560        let mut has_other_owners = false;
1561        for item in self.blob_owners.prefix_iter(&wtxn, &sha256[..])? {
1562            if item.is_ok() {
1563                has_other_owners = true;
1564                break;
1565            }
1566        }
1567
1568        if has_other_owners {
1569            wtxn.commit()?;
1570            tracing::debug!(
1571                "Removed {} from blob {} owners, other owners remain",
1572                &to_hex(pubkey)[..8],
1573                &sha256_hex[..8]
1574            );
1575            return Ok(false);
1576        }
1577
1578        // No owners left - delete the blob completely
1579        tracing::info!(
1580            "All owners removed from blob {}, deleting",
1581            &sha256_hex[..8]
1582        );
1583
1584        // Delete raw blob (by content hash) - this deletes from S3 too
1585        let _ = self.router.delete_sync(sha256);
1586
1587        wtxn.commit()?;
1588        Ok(true)
1589    }
1590
1591    /// List all blobs owned by a pubkey (for Blossom /list endpoint)
1592    pub fn list_blobs_by_pubkey(
1593        &self,
1594        pubkey: &[u8; 32],
1595    ) -> Result<Vec<crate::server::blossom::BlobDescriptor>> {
1596        let rtxn = self.env.read_txn()?;
1597
1598        let mut blobs: Vec<BlobMetadata> = self
1599            .pubkey_blobs
1600            .get(&rtxn, pubkey)?
1601            .and_then(|b| serde_json::from_slice(b).ok())
1602            .unwrap_or_default();
1603        let mut seen: HashSet<String> = blobs.iter().map(|blob| blob.sha256.clone()).collect();
1604
1605        for item in self.pubkey_blob_index.prefix_iter(&rtxn, pubkey)? {
1606            let (_, metadata_bytes) = item?;
1607            let metadata: BlobMetadata = match serde_json::from_slice(metadata_bytes) {
1608                Ok(metadata) => metadata,
1609                Err(_) => continue,
1610            };
1611            if seen.insert(metadata.sha256.clone()) {
1612                blobs.push(metadata);
1613            }
1614        }
1615
1616        Ok(blobs
1617            .into_iter()
1618            .map(|b| crate::server::blossom::BlobDescriptor {
1619                url: format!("/{}", b.sha256),
1620                sha256: b.sha256,
1621                size: b.size,
1622                mime_type: b.mime_type,
1623                uploaded: b.uploaded,
1624            })
1625            .collect())
1626    }
1627
1628    /// Get a single chunk/blob by hash (raw bytes)
1629    pub fn get_chunk(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
1630        let data = self
1631            .router
1632            .get_sync(hash)
1633            .map_err(|e| anyhow::anyhow!("Failed to get chunk: {}", e))?;
1634        if data.is_some() {
1635            self.record_blob_accesses(std::iter::once(*hash));
1636        }
1637        Ok(data)
1638    }
1639
1640    /// Get file content by hash (raw bytes)
1641    /// Returns raw bytes (caller handles decryption if needed)
1642    pub fn get_file(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
1643        let (tree, access_store) = self.access_tracking_tree();
1644
1645        let result = sync_block_on(async {
1646            tree.read_file(hash)
1647                .await
1648                .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
1649        })?;
1650        if result.is_some() {
1651            self.record_blob_accesses(access_store.take_accessed_hashes());
1652        }
1653        Ok(result)
1654    }
1655
1656    /// Get file content by Cid (hash + optional decryption key as raw bytes)
1657    /// Handles decryption automatically if key is present
1658    pub fn get_file_by_cid(&self, cid: &Cid) -> Result<Option<Vec<u8>>> {
1659        let (tree, access_store) = self.access_tracking_tree();
1660
1661        let result = sync_block_on(async {
1662            tree.get(cid, None)
1663                .await
1664                .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
1665        })?;
1666        if result.is_some() {
1667            self.record_blob_accesses(access_store.take_accessed_hashes());
1668        }
1669        Ok(result)
1670    }
1671
1672    fn ensure_cid_exists(&self, cid: &Cid) -> Result<()> {
1673        let exists = self
1674            .router
1675            .exists(&cid.hash)
1676            .map_err(|e| anyhow::anyhow!("Failed to check cid existence: {}", e))?;
1677        if !exists {
1678            anyhow::bail!("CID not found: {}", to_hex(&cid.hash));
1679        }
1680        Ok(())
1681    }
1682
1683    /// Stream file content identified by Cid into a writer without buffering full file in memory.
1684    pub fn write_file_by_cid_to_writer<W: Write>(&self, cid: &Cid, writer: &mut W) -> Result<u64> {
1685        self.ensure_cid_exists(cid)?;
1686
1687        let (tree, access_store) = self.access_tracking_tree();
1688        let mut total_bytes = 0u64;
1689        let mut streamed_any_chunk = false;
1690
1691        sync_block_on(async {
1692            let mut stream = tree.get_stream(cid);
1693            while let Some(chunk) = stream.next().await {
1694                streamed_any_chunk = true;
1695                let chunk =
1696                    chunk.map_err(|e| anyhow::anyhow!("Failed to stream file chunk: {}", e))?;
1697                writer
1698                    .write_all(&chunk)
1699                    .map_err(|e| anyhow::anyhow!("Failed to write file chunk: {}", e))?;
1700                total_bytes += chunk.len() as u64;
1701            }
1702            Ok::<(), anyhow::Error>(())
1703        })?;
1704
1705        if !streamed_any_chunk {
1706            anyhow::bail!("CID not found: {}", to_hex(&cid.hash));
1707        }
1708        self.record_blob_accesses(access_store.take_accessed_hashes());
1709
1710        writer
1711            .flush()
1712            .map_err(|e| anyhow::anyhow!("Failed to flush output: {}", e))?;
1713        Ok(total_bytes)
1714    }
1715
1716    /// Stream file content identified by Cid directly into a destination path.
1717    pub fn write_file_by_cid<P: AsRef<Path>>(&self, cid: &Cid, output_path: P) -> Result<u64> {
1718        self.ensure_cid_exists(cid)?;
1719
1720        let output_path = output_path.as_ref();
1721        if let Some(parent) = output_path.parent() {
1722            if !parent.as_os_str().is_empty() {
1723                std::fs::create_dir_all(parent).with_context(|| {
1724                    format!("Failed to create output directory {}", parent.display())
1725                })?;
1726            }
1727        }
1728
1729        let mut file = std::fs::File::create(output_path)
1730            .with_context(|| format!("Failed to create output file {}", output_path.display()))?;
1731        self.write_file_by_cid_to_writer(cid, &mut file)
1732    }
1733
1734    /// Stream a public (unencrypted) file by hash directly into a destination path.
1735    pub fn write_file<P: AsRef<Path>>(&self, hash: &[u8; 32], output_path: P) -> Result<u64> {
1736        self.write_file_by_cid(&Cid::public(*hash), output_path)
1737    }
1738
1739    /// Resolve a path within a tree (returns Cid with key if encrypted)
1740    pub fn resolve_path(&self, cid: &Cid, path: &str) -> Result<Option<Cid>> {
1741        let (tree, access_store) = self.access_tracking_tree();
1742
1743        let result = sync_block_on(async {
1744            tree.resolve_path(cid, path)
1745                .await
1746                .map_err(|e| anyhow::anyhow!("Failed to resolve path: {}", e))
1747        })?;
1748        if result.is_some() {
1749            self.record_blob_accesses(access_store.take_accessed_hashes());
1750        }
1751        Ok(result)
1752    }
1753
1754    /// Get chunk metadata for a file (chunk list, sizes, total size)
1755    pub fn get_file_chunk_metadata(&self, hash: &[u8; 32]) -> Result<Option<FileChunkMetadata>> {
1756        let access_store = AccessRecordingStore::new(self.store_arc());
1757        let tree = HashTree::new(HashTreeConfig::new(Arc::new(access_store.clone())).public());
1758
1759        let metadata: Result<Option<FileChunkMetadata>> = sync_block_on(async {
1760            // First check if the hash exists in the store at all
1761            // (either as a blob or tree node)
1762            let exists = access_store
1763                .has(hash)
1764                .await
1765                .map_err(|e| anyhow::anyhow!("Failed to check existence: {}", e))?;
1766
1767            if !exists {
1768                return Ok(None);
1769            }
1770
1771            // Get total size
1772            let total_size = tree
1773                .get_size(hash)
1774                .await
1775                .map_err(|e| anyhow::anyhow!("Failed to get size: {}", e))?;
1776
1777            // Check if it's a tree (chunked) or blob
1778            let is_tree_node = tree
1779                .is_tree(hash)
1780                .await
1781                .map_err(|e| anyhow::anyhow!("Failed to check tree: {}", e))?;
1782
1783            if !is_tree_node {
1784                // Single blob, not chunked
1785                return Ok(Some(FileChunkMetadata {
1786                    total_size,
1787                    chunk_hashes: vec![],
1788                    chunk_sizes: vec![],
1789                    is_chunked: false,
1790                }));
1791            }
1792
1793            // Get tree node to extract chunk info
1794            let node = match tree
1795                .get_tree_node(hash)
1796                .await
1797                .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
1798            {
1799                Some(n) => n,
1800                None => return Ok(None),
1801            };
1802
1803            // Check if it's a directory (has named links)
1804            let is_directory = tree
1805                .is_directory(hash)
1806                .await
1807                .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1808
1809            if is_directory {
1810                return Ok(None); // Not a file
1811            }
1812
1813            // Extract chunk info from links
1814            let chunk_hashes: Vec<Hash> = node.links.iter().map(|l| l.hash).collect();
1815            let chunk_sizes: Vec<u64> = node.links.iter().map(|l| l.size).collect();
1816
1817            Ok(Some(FileChunkMetadata {
1818                total_size,
1819                chunk_hashes,
1820                chunk_sizes,
1821                is_chunked: !node.links.is_empty(),
1822            }))
1823        });
1824        let metadata = metadata?;
1825        if metadata.is_some() {
1826            self.record_blob_accesses(access_store.take_accessed_hashes());
1827        }
1828        Ok(metadata)
1829    }
1830
1831    /// Get byte range from file
1832    pub fn get_file_range(
1833        &self,
1834        hash: &[u8; 32],
1835        start: u64,
1836        end: Option<u64>,
1837    ) -> Result<Option<(Vec<u8>, u64)>> {
1838        let metadata = match self.get_file_chunk_metadata(hash)? {
1839            Some(m) => m,
1840            None => return Ok(None),
1841        };
1842
1843        if metadata.total_size == 0 {
1844            return Ok(Some((Vec::new(), 0)));
1845        }
1846
1847        if start >= metadata.total_size {
1848            return Ok(None);
1849        }
1850
1851        let end = end
1852            .unwrap_or(metadata.total_size - 1)
1853            .min(metadata.total_size - 1);
1854
1855        // For non-chunked files, load entire file
1856        if !metadata.is_chunked {
1857            let content = self.get_file(hash)?.unwrap_or_default();
1858            let range_content = if start < content.len() as u64 {
1859                content[start as usize..=(end as usize).min(content.len() - 1)].to_vec()
1860            } else {
1861                Vec::new()
1862            };
1863            return Ok(Some((range_content, metadata.total_size)));
1864        }
1865
1866        // For chunked files, load only needed chunks
1867        let mut result = Vec::new();
1868        let mut current_offset = 0u64;
1869
1870        for (i, chunk_hash) in metadata.chunk_hashes.iter().enumerate() {
1871            let chunk_size = metadata.chunk_sizes[i];
1872            let chunk_end = current_offset + chunk_size - 1;
1873
1874            // Check if this chunk overlaps with requested range
1875            if chunk_end >= start && current_offset <= end {
1876                let chunk_content = match self.get_chunk(chunk_hash)? {
1877                    Some(content) => content,
1878                    None => {
1879                        return Err(anyhow::anyhow!("Chunk {} not found", to_hex(chunk_hash)));
1880                    }
1881                };
1882
1883                let chunk_read_start = if current_offset >= start {
1884                    0
1885                } else {
1886                    (start - current_offset) as usize
1887                };
1888
1889                let chunk_read_end = if chunk_end <= end {
1890                    chunk_size as usize - 1
1891                } else {
1892                    (end - current_offset) as usize
1893                };
1894
1895                result.extend_from_slice(&chunk_content[chunk_read_start..=chunk_read_end]);
1896            }
1897
1898            current_offset += chunk_size;
1899
1900            if current_offset > end {
1901                break;
1902            }
1903        }
1904
1905        Ok(Some((result, metadata.total_size)))
1906    }
1907
1908    /// Stream file range as chunks using Arc for async/Send contexts
1909    pub fn stream_file_range_chunks_owned(
1910        self: Arc<Self>,
1911        hash: &[u8; 32],
1912        start: u64,
1913        end: u64,
1914    ) -> Result<Option<FileRangeChunksOwned>> {
1915        let metadata = match self.get_file_chunk_metadata(hash)? {
1916            Some(m) => m,
1917            None => return Ok(None),
1918        };
1919
1920        if metadata.total_size == 0 || start >= metadata.total_size {
1921            return Ok(None);
1922        }
1923
1924        let end = end.min(metadata.total_size - 1);
1925
1926        Ok(Some(FileRangeChunksOwned {
1927            store: self,
1928            metadata,
1929            start,
1930            end,
1931            current_chunk_idx: 0,
1932            current_offset: 0,
1933        }))
1934    }
1935
1936    /// Get directory structure by hash (raw bytes)
1937    pub fn get_directory_listing(&self, hash: &[u8; 32]) -> Result<Option<DirectoryListing>> {
1938        let (tree, access_store) = self.access_tracking_tree();
1939
1940        let listing: Result<Option<DirectoryListing>> = sync_block_on(async {
1941            // Check if it's a directory
1942            let is_dir = tree
1943                .is_directory(hash)
1944                .await
1945                .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1946
1947            if !is_dir {
1948                return Ok(None);
1949            }
1950
1951            // Get directory entries (public Cid - no encryption key)
1952            let cid = hashtree_core::Cid::public(*hash);
1953            let tree_entries = tree
1954                .list_directory(&cid)
1955                .await
1956                .map_err(|e| anyhow::anyhow!("Failed to list directory: {}", e))?;
1957
1958            let entries: Vec<DirEntry> = tree_entries
1959                .into_iter()
1960                .map(|e| DirEntry {
1961                    name: e.name,
1962                    cid: to_hex(&e.hash),
1963                    is_directory: e.link_type.is_tree(),
1964                    size: e.size,
1965                })
1966                .collect();
1967
1968            Ok(Some(DirectoryListing {
1969                dir_name: String::new(),
1970                entries,
1971            }))
1972        });
1973        let listing = listing?;
1974        if listing.is_some() {
1975            self.record_blob_accesses(access_store.take_accessed_hashes());
1976        }
1977        Ok(listing)
1978    }
1979
1980    /// Get directory structure by CID, supporting encrypted directories.
1981    pub fn get_directory_listing_by_cid(&self, cid: &Cid) -> Result<Option<DirectoryListing>> {
1982        let (tree, access_store) = self.access_tracking_tree();
1983        let cid = cid.clone();
1984
1985        let listing: Result<Option<DirectoryListing>> = sync_block_on(async {
1986            let is_dir = tree
1987                .is_dir(&cid)
1988                .await
1989                .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1990
1991            if !is_dir {
1992                return Ok(None);
1993            }
1994
1995            let tree_entries = tree
1996                .list_directory(&cid)
1997                .await
1998                .map_err(|e| anyhow::anyhow!("Failed to list directory: {}", e))?;
1999
2000            let entries: Vec<DirEntry> = tree_entries
2001                .into_iter()
2002                .map(|e| DirEntry {
2003                    name: e.name,
2004                    cid: Cid {
2005                        hash: e.hash,
2006                        key: e.key,
2007                    }
2008                    .to_string(),
2009                    is_directory: e.link_type.is_tree(),
2010                    size: e.size,
2011                })
2012                .collect();
2013
2014            Ok(Some(DirectoryListing {
2015                dir_name: String::new(),
2016                entries,
2017            }))
2018        });
2019        let listing = listing?;
2020        if listing.is_some() {
2021            self.record_blob_accesses(access_store.take_accessed_hashes());
2022        }
2023        Ok(listing)
2024    }
2025
2026    // === Cached roots ===
2027
2028    /// Persist a mutable published ref that should stay subscribed.
2029    pub fn add_pinned_ref(&self, key: &str) -> Result<()> {
2030        let mut wtxn = self.env.write_txn()?;
2031        self.pinned_refs.put(&mut wtxn, key, &())?;
2032        wtxn.commit()?;
2033        Ok(())
2034    }
2035
2036    /// Remove a mutable published ref from the live pinned set.
2037    pub fn remove_pinned_ref(&self, key: &str) -> Result<bool> {
2038        let mut wtxn = self.env.write_txn()?;
2039        let removed = self.pinned_refs.delete(&mut wtxn, key)?;
2040        wtxn.commit()?;
2041        Ok(removed)
2042    }
2043
2044    /// List mutable published refs that should stay subscribed.
2045    pub fn list_pinned_refs(&self) -> Result<Vec<String>> {
2046        let rtxn = self.env.read_txn()?;
2047        let mut refs = Vec::new();
2048
2049        for item in self.pinned_refs.iter(&rtxn)? {
2050            let (key, _) = item?;
2051            refs.push(key.to_string());
2052        }
2053
2054        refs.sort();
2055        Ok(refs)
2056    }
2057
2058    /// Persist an author whose published trees should stay mirrored.
2059    pub fn add_tracked_author(&self, npub: &str) -> Result<bool> {
2060        let mut wtxn = self.env.write_txn()?;
2061        let inserted = self.tracked_authors.get(&wtxn, npub)?.is_none();
2062        self.tracked_authors.put(&mut wtxn, npub, &())?;
2063        wtxn.commit()?;
2064        Ok(inserted)
2065    }
2066
2067    /// Remove an author from the continuous mirror set.
2068    pub fn remove_tracked_author(&self, npub: &str) -> Result<bool> {
2069        let mut wtxn = self.env.write_txn()?;
2070        let removed = self.tracked_authors.delete(&mut wtxn, npub)?;
2071        wtxn.commit()?;
2072        Ok(removed)
2073    }
2074
2075    /// List authors whose published trees should stay mirrored.
2076    pub fn list_tracked_authors(&self) -> Result<Vec<String>> {
2077        let rtxn = self.env.read_txn()?;
2078        let mut authors = Vec::new();
2079
2080        for item in self.tracked_authors.iter(&rtxn)? {
2081            let (npub, _) = item?;
2082            authors.push(npub.to_string());
2083        }
2084
2085        authors.sort();
2086        Ok(authors)
2087    }
2088
2089    /// Get cached root for a pubkey/tree_name pair
2090    pub fn get_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<Option<CachedRoot>> {
2091        let key = format!("{}/{}", pubkey_hex, tree_name);
2092        let rtxn = self.env.read_txn()?;
2093        if let Some(bytes) = self.cached_roots.get(&rtxn, &key)? {
2094            let root: CachedRoot = rmp_serde::from_slice(bytes)
2095                .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
2096            Ok(Some(root))
2097        } else {
2098            Ok(None)
2099        }
2100    }
2101
2102    /// Set cached root for a pubkey/tree_name pair
2103    pub fn set_cached_root(
2104        &self,
2105        pubkey_hex: &str,
2106        tree_name: &str,
2107        hash: &str,
2108        key: Option<&str>,
2109        visibility: &str,
2110        updated_at: u64,
2111    ) -> Result<()> {
2112        let db_key = format!("{}/{}", pubkey_hex, tree_name);
2113        let root = CachedRoot {
2114            hash: hash.to_string(),
2115            key: key.map(|k| k.to_string()),
2116            updated_at,
2117            visibility: visibility.to_string(),
2118        };
2119        let bytes = rmp_serde::to_vec(&root)
2120            .map_err(|e| anyhow::anyhow!("Failed to serialize CachedRoot: {}", e))?;
2121        let mut wtxn = self.env.write_txn()?;
2122        self.cached_roots.put(&mut wtxn, &db_key, &bytes)?;
2123        wtxn.commit()?;
2124        Ok(())
2125    }
2126
2127    /// List all cached roots for a pubkey
2128    pub fn list_cached_roots(&self, pubkey_hex: &str) -> Result<Vec<(String, CachedRoot)>> {
2129        let prefix = format!("{}/", pubkey_hex);
2130        let rtxn = self.env.read_txn()?;
2131        let mut results = Vec::new();
2132
2133        for item in self.cached_roots.iter(&rtxn)? {
2134            let (key, bytes) = item?;
2135            if key.starts_with(&prefix) {
2136                let tree_name = key.strip_prefix(&prefix).unwrap_or(key);
2137                let root: CachedRoot = rmp_serde::from_slice(bytes)
2138                    .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
2139                results.push((tree_name.to_string(), root));
2140            }
2141        }
2142
2143        Ok(results)
2144    }
2145
2146    /// Delete a cached root
2147    pub fn delete_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<bool> {
2148        let key = format!("{}/{}", pubkey_hex, tree_name);
2149        let mut wtxn = self.env.write_txn()?;
2150        let deleted = self.cached_roots.delete(&mut wtxn, &key)?;
2151        wtxn.commit()?;
2152        Ok(deleted)
2153    }
2154}
2155
2156fn is_map_full_store_error(err: &StoreError) -> bool {
2157    let message = err.to_string();
2158    message.contains("MDB_MAP_FULL") || message.contains("MapFull")
2159}
2160
2161#[derive(Debug, Clone)]
2162pub struct FileChunkMetadata {
2163    pub total_size: u64,
2164    pub chunk_hashes: Vec<Hash>,
2165    pub chunk_sizes: Vec<u64>,
2166    pub is_chunked: bool,
2167}
2168
2169/// Owned iterator for async streaming
2170pub struct FileRangeChunksOwned {
2171    store: Arc<HashtreeStore>,
2172    metadata: FileChunkMetadata,
2173    start: u64,
2174    end: u64,
2175    current_chunk_idx: usize,
2176    current_offset: u64,
2177}
2178
2179impl Iterator for FileRangeChunksOwned {
2180    type Item = Result<Vec<u8>>;
2181
2182    fn next(&mut self) -> Option<Self::Item> {
2183        if !self.metadata.is_chunked || self.current_chunk_idx >= self.metadata.chunk_hashes.len() {
2184            return None;
2185        }
2186
2187        if self.current_offset > self.end {
2188            return None;
2189        }
2190
2191        let chunk_hash = &self.metadata.chunk_hashes[self.current_chunk_idx];
2192        let chunk_size = self.metadata.chunk_sizes[self.current_chunk_idx];
2193        let chunk_end = self.current_offset + chunk_size - 1;
2194
2195        self.current_chunk_idx += 1;
2196
2197        if chunk_end < self.start || self.current_offset > self.end {
2198            self.current_offset += chunk_size;
2199            return self.next();
2200        }
2201
2202        let chunk_content = match self.store.get_chunk(chunk_hash) {
2203            Ok(Some(content)) => content,
2204            Ok(None) => {
2205                return Some(Err(anyhow::anyhow!(
2206                    "Chunk {} not found",
2207                    to_hex(chunk_hash)
2208                )));
2209            }
2210            Err(e) => {
2211                return Some(Err(e));
2212            }
2213        };
2214
2215        let chunk_read_start = if self.current_offset >= self.start {
2216            0
2217        } else {
2218            (self.start - self.current_offset) as usize
2219        };
2220
2221        let chunk_read_end = if chunk_end <= self.end {
2222            chunk_size as usize - 1
2223        } else {
2224            (self.end - self.current_offset) as usize
2225        };
2226
2227        let result = chunk_content[chunk_read_start..=chunk_read_end].to_vec();
2228        self.current_offset += chunk_size;
2229
2230        Some(Ok(result))
2231    }
2232}
2233
2234#[derive(Debug)]
2235pub struct GcStats {
2236    pub deleted_dags: usize,
2237    pub freed_bytes: u64,
2238}
2239
2240#[derive(Debug, Clone)]
2241pub struct DirEntry {
2242    pub name: String,
2243    pub cid: String,
2244    pub is_directory: bool,
2245    pub size: u64,
2246}
2247
2248#[derive(Debug, Clone)]
2249pub struct DirectoryListing {
2250    pub dir_name: String,
2251    pub entries: Vec<DirEntry>,
2252}
2253
2254/// Blob metadata for Blossom protocol
2255#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
2256pub struct BlobMetadata {
2257    pub sha256: String,
2258    pub size: u64,
2259    pub mime_type: String,
2260    pub uploaded: u64,
2261}
2262
2263// Implement ContentStore trait for WebRTC data exchange
2264impl crate::webrtc::ContentStore for HashtreeStore {
2265    fn get(&self, hash_hex: &str) -> Result<Option<Vec<u8>>> {
2266        let hash = from_hex(hash_hex).map_err(|e| anyhow::anyhow!("Invalid hash: {}", e))?;
2267        self.get_chunk(&hash)
2268    }
2269}
2270
2271#[cfg(test)]
2272mod tests {
2273    use super::*;
2274    #[cfg(feature = "lmdb")]
2275    use tempfile::TempDir;
2276
2277    #[test]
2278    fn blob_access_update_gate_deduplicates_and_throttles() {
2279        let gate = BlobAccessUpdateGate::default();
2280        let first = sha256(b"first");
2281        let second = sha256(b"second");
2282
2283        assert_eq!(
2284            gate.due_hashes([first, first, second], 10),
2285            vec![first, second]
2286        );
2287        assert!(gate.due_hashes([first, second], 11).is_empty());
2288        assert_eq!(
2289            gate.due_hashes([second, first], 10 + ACCESS_UPDATE_INTERVAL_SECS),
2290            vec![second, first]
2291        );
2292    }
2293
2294    #[cfg(feature = "lmdb")]
2295    #[test]
2296    fn hashtree_store_expands_blob_lmdb_map_size_to_storage_budget() -> Result<()> {
2297        let temp = TempDir::new()?;
2298        let requested = LMDB_BLOB_MIN_MAP_SIZE_BYTES + 64 * 1024 * 1024;
2299        let store = HashtreeStore::with_options_and_backend(
2300            temp.path(),
2301            None,
2302            requested,
2303            true,
2304            &StorageBackend::Lmdb,
2305        )?;
2306
2307        let map_size = match store.router.local.as_ref() {
2308            LocalStore::Lmdb(local) => local.map_size_bytes() as u64,
2309            LocalStore::Fs(_) => panic!("expected LMDB local store"),
2310        };
2311
2312        assert!(
2313            map_size >= requested,
2314            "expected blob LMDB map to grow to at least {requested} bytes, got {map_size}"
2315        );
2316
2317        drop(store);
2318        Ok(())
2319    }
2320
2321    #[cfg(feature = "lmdb")]
2322    #[test]
2323    fn hashtree_store_expands_metadata_lmdb_map_size_to_storage_budget() -> Result<()> {
2324        let temp = TempDir::new()?;
2325        let storage_budget = 256 * 1024 * 1024 * 1024u64;
2326        let expected = lmdb_metadata_map_size_for_storage_budget(storage_budget);
2327        let store = HashtreeStore::with_options_and_backend(
2328            temp.path(),
2329            None,
2330            storage_budget,
2331            true,
2332            &StorageBackend::Lmdb,
2333        )?;
2334
2335        let map_size = store.env.info().map_size as u64;
2336        assert!(
2337            map_size >= expected,
2338            "expected metadata LMDB map to grow to at least {expected} bytes, got {map_size}"
2339        );
2340
2341        drop(store);
2342        Ok(())
2343    }
2344
2345    #[cfg(feature = "lmdb")]
2346    #[test]
2347    fn local_store_can_override_lmdb_map_size() -> Result<()> {
2348        let temp = TempDir::new()?;
2349        let requested = 512 * 1024 * 1024u64;
2350        let store = LocalStore::new_with_lmdb_map_size(
2351            temp.path().join("lmdb-blobs"),
2352            &StorageBackend::Lmdb,
2353            Some(requested),
2354        )?;
2355
2356        let map_size = match store {
2357            LocalStore::Lmdb(local) => local.map_size_bytes() as u64,
2358            LocalStore::Fs(_) => panic!("expected LMDB local store"),
2359        };
2360
2361        assert!(
2362            map_size >= requested,
2363            "expected LMDB map to grow to at least {requested} bytes, got {map_size}"
2364        );
2365
2366        Ok(())
2367    }
2368
2369    #[cfg(feature = "lmdb")]
2370    #[test]
2371    fn lmdb_local_store_removes_stale_fs_blob_shard_dirs() -> Result<()> {
2372        let temp = TempDir::new()?;
2373        let path = temp.path().join("lmdb-blobs");
2374        std::fs::create_dir_all(path.join("aa"))?;
2375        std::fs::create_dir_all(path.join("b2"))?;
2376        std::fs::create_dir_all(path.join("keep-me"))?;
2377        std::fs::write(path.join("aa").join("blob.bin"), b"old fs shard")?;
2378        std::fs::write(path.join("b2").join("blob.bin"), b"old fs shard")?;
2379        std::fs::write(path.join("keep-me").join("note.txt"), b"keep")?;
2380
2381        let _store = LocalStore::new_with_lmdb_map_size(
2382            &path,
2383            &StorageBackend::Lmdb,
2384            Some(128 * 1024 * 1024),
2385        )?;
2386
2387        assert!(!path.join("aa").exists());
2388        assert!(!path.join("b2").exists());
2389        assert!(path.join("keep-me").exists());
2390        assert!(path.join("data.mdb").exists());
2391        assert!(path.join("lock.mdb").exists());
2392
2393        Ok(())
2394    }
2395
2396    #[cfg(feature = "lmdb")]
2397    #[test]
2398    fn duplicate_blossom_writes_refresh_blob_last_accessed() -> Result<()> {
2399        let temp = TempDir::new()?;
2400        let store = HashtreeStore::with_options_and_backend(
2401            temp.path(),
2402            None,
2403            LMDB_BLOB_MIN_MAP_SIZE_BYTES,
2404            true,
2405            &StorageBackend::Lmdb,
2406        )?;
2407
2408        let data = b"cached blossom duplicate";
2409        let hash = sha256(data);
2410        store.put_cached_blob(data)?;
2411        store.router.touch_accessed_sync(&hash, 1)?;
2412        store.put_cached_blob(data)?;
2413        assert!(store.blob_last_accessed_at(&hash)?.unwrap_or(0) > 1);
2414
2415        let owned = b"owned blossom duplicate";
2416        let owned_hash = sha256(owned);
2417        let owner = [7u8; 32];
2418        store.put_owned_blob(owned, &owner)?;
2419        store.router.touch_accessed_sync(&owned_hash, 1)?;
2420        store.put_owned_blob(owned, &owner)?;
2421        assert!(store.blob_last_accessed_at(&owned_hash)?.unwrap_or(0) > 1);
2422        let owned_blobs = store.list_blobs_by_pubkey(&owner)?;
2423        assert_eq!(owned_blobs.len(), 1);
2424        assert_eq!(owned_blobs[0].sha256, to_hex(&owned_hash));
2425
2426        let batch = [
2427            (sha256(b"owned blossom batch 1"), b"owned blossom batch 1".to_vec()),
2428            (sha256(b"owned blossom batch 2"), b"owned blossom batch 2".to_vec()),
2429        ];
2430        store.put_owned_blobs(&batch, &owner)?;
2431        let owned_blobs = store.list_blobs_by_pubkey(&owner)?;
2432        assert_eq!(owned_blobs.len(), 3);
2433
2434        Ok(())
2435    }
2436
2437    #[cfg(feature = "lmdb")]
2438    #[test]
2439    fn replacing_tree_ref_unpins_and_unindexes_superseded_root() -> Result<()> {
2440        let temp = TempDir::new()?;
2441        let store = HashtreeStore::with_options_and_backend(
2442            temp.path(),
2443            None,
2444            LMDB_BLOB_MIN_MAP_SIZE_BYTES,
2445            true,
2446            &StorageBackend::Lmdb,
2447        )?;
2448
2449        let old_bytes = b"old published root";
2450        let new_bytes = b"new published root";
2451        let old_root = sha256(old_bytes);
2452        let new_root = sha256(new_bytes);
2453
2454        store.put_blob(old_bytes)?;
2455        store.pin(&old_root)?;
2456        store.index_tree(
2457            &old_root,
2458            "owner",
2459            Some("playlist"),
2460            PRIORITY_OWN,
2461            Some("npub1owner/playlist"),
2462        )?;
2463
2464        assert!(store.is_pinned(&old_root)?);
2465        assert!(store.get_tree_meta(&old_root)?.is_some());
2466
2467        store.put_blob(new_bytes)?;
2468        store.pin(&new_root)?;
2469        store.index_tree(
2470            &new_root,
2471            "owner",
2472            Some("playlist"),
2473            PRIORITY_OWN,
2474            Some("npub1owner/playlist"),
2475        )?;
2476
2477        assert!(
2478            !store.is_pinned(&old_root)?,
2479            "superseded root should be unpinned when ref is replaced"
2480        );
2481        assert!(
2482            store.get_tree_meta(&old_root)?.is_none(),
2483            "superseded root metadata should be removed when ref is replaced"
2484        );
2485        assert!(store.is_pinned(&new_root)?);
2486        assert!(store.get_tree_meta(&new_root)?.is_some());
2487
2488        Ok(())
2489    }
2490
2491    #[test]
2492    fn tracked_authors_round_trip_sorted_and_deduplicated() -> Result<()> {
2493        let temp = TempDir::new()?;
2494        let store = HashtreeStore::with_options(temp.path(), None, 1024 * 1024)?;
2495
2496        store
2497            .add_tracked_author("npub1zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzs9d3kk")?;
2498        store
2499            .add_tracked_author("npub1aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaqf5slm")?;
2500        store
2501            .add_tracked_author("npub1zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzs9d3kk")?;
2502
2503        assert_eq!(
2504            store.list_tracked_authors()?,
2505            vec![
2506                "npub1aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaqf5slm".to_string(),
2507                "npub1zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzs9d3kk".to_string(),
2508            ]
2509        );
2510        assert!(store.remove_tracked_author(
2511            "npub1aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaqf5slm"
2512        )?);
2513        assert!(!store.remove_tracked_author(
2514            "npub1bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbpqqqqq"
2515        )?);
2516        assert_eq!(
2517            store.list_tracked_authors()?,
2518            vec!["npub1zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzs9d3kk".to_string()]
2519        );
2520
2521        Ok(())
2522    }
2523
2524    #[cfg(feature = "s3")]
2525    #[test]
2526    fn async_store_s3_fallback_does_not_reenter_futures_executor() -> Result<()> {
2527        let temp = tempfile::TempDir::new()?;
2528        let local = Arc::new(LocalStore::new(
2529            temp.path().join("blobs"),
2530            &StorageBackend::Fs,
2531        )?);
2532
2533        let outcome = std::panic::catch_unwind(|| {
2534            sync_block_on(async {
2535                let aws_config = aws_config::from_env()
2536                    .region(aws_sdk_s3::config::Region::new("auto"))
2537                    .load()
2538                    .await;
2539                let s3_client = aws_sdk_s3::Client::from_conf(
2540                    aws_sdk_s3::config::Builder::from(&aws_config)
2541                        .endpoint_url("http://127.0.0.1:9")
2542                        .force_path_style(true)
2543                        .build(),
2544                );
2545
2546                let router = StorageRouter {
2547                    local,
2548                    s3_client: Some(s3_client),
2549                    s3_bucket: Some("test-bucket".to_string()),
2550                    s3_prefix: String::new(),
2551                    sync_tx: None,
2552                };
2553                let hash = [0u8; 32];
2554
2555                let _ = Store::has(&router, &hash).await;
2556                let _ = Store::get(&router, &hash).await;
2557            });
2558        });
2559
2560        assert!(
2561            outcome.is_ok(),
2562            "S3-backed async store methods should not panic inside futures::block_on"
2563        );
2564
2565        Ok(())
2566    }
2567}