Skip to main content

hashtree_cli/
storage.rs

1use anyhow::{Context, Result};
2use async_trait::async_trait;
3use futures::executor::block_on as sync_block_on;
4use futures::StreamExt;
5use hashtree_config::StorageBackend;
6use hashtree_core::store::{Store, StoreError};
7use hashtree_core::{
8    from_hex, sha256, to_hex, types::Hash, Cid, HashTree, HashTreeConfig, TreeNode,
9};
10use hashtree_fs::FsBlobStore;
11#[cfg(feature = "lmdb")]
12use hashtree_lmdb::LmdbBlobStore;
13use heed::types::*;
14use heed::{Database, EnvOpenOptions};
15use serde::{Deserialize, Serialize};
16use std::collections::{HashMap, HashSet};
17#[cfg(feature = "s3")]
18use std::future::Future;
19use std::io::Write;
20use std::path::{Path, PathBuf};
21use std::sync::atomic::{AtomicBool, Ordering};
22use std::sync::{Arc, Mutex};
23use std::time::{Instant, SystemTime, UNIX_EPOCH};
24
25mod upload;
26
27mod maintenance;
28mod retention;
29
30#[cfg(feature = "s3")]
31const DEFAULT_S3_SYNC_TIMEOUT_MS: u64 = 5_000;
32#[cfg(feature = "s3")]
33const S3_SYNC_TIMEOUT_MS_ENV: &str = "HTREE_S3_SYNC_TIMEOUT_MS";
34
35pub use maintenance::{
36    compact_lmdb_environments_under, CompactResult, R2ImportOptions, R2ImportResult, VerifyResult,
37};
38pub use retention::{OwnedBlobStats, PinnedItem, StorageByPriority, StorageStats, TreeMeta};
39
40/// Priority levels for tree eviction
41pub const PRIORITY_OTHER: u8 = 64;
42pub const PRIORITY_FOLLOWED: u8 = 128;
43pub const PRIORITY_OWN: u8 = 255;
44const LMDB_MAX_READERS: u32 = 1024;
45const LMDB_METADATA_MIN_MAP_SIZE_BYTES: u64 = 64 * 1024 * 1024;
46const LMDB_METADATA_MAX_MAP_SIZE_BYTES: u64 = 64 * 1024 * 1024 * 1024;
47const LMDB_METADATA_STORAGE_RATIO_DIVISOR: u64 = 1024;
48const LMDB_METADATA_REOPEN_HEADROOM_BYTES: u64 = 64 * 1024 * 1024;
49#[cfg(feature = "lmdb")]
50const LMDB_BLOB_MIN_MAP_SIZE_BYTES: u64 = 16 * 1024 * 1024;
51const ACCESS_UPDATE_INTERVAL_SECS: u64 = 300;
52const ACCESS_UPDATE_GATE_MAX_ENTRIES: usize = 4096;
53const DEFAULT_ACCESS_UPDATE_BACKGROUND_BATCH_LIMIT: usize = 64;
54const ACCESS_UPDATE_BACKGROUND_BATCH_LIMIT_ENV: &str = "HTREE_ACCESS_UPDATE_BACKGROUND_BATCH_LIMIT";
55const CACHED_BLOB_BATCH_EXISTING_PRECHECK_ENV: &str = "HTREE_CACHED_BLOB_BATCH_EXISTING_PRECHECK";
56const SLOW_OWNED_BLOB_BATCH_LOG_MS_ENV: &str = "HTREE_SLOW_OWNED_BLOB_BATCH_LOG_MS";
57const SLOW_CACHED_BLOB_BATCH_LOG_MS_ENV: &str = "HTREE_SLOW_CACHED_BLOB_BATCH_LOG_MS";
58
59fn slow_owned_blob_batch_log_ms() -> Option<u128> {
60    std::env::var(SLOW_OWNED_BLOB_BATCH_LOG_MS_ENV)
61        .ok()
62        .and_then(|value| value.parse::<u128>().ok())
63        .filter(|value| *value > 0)
64}
65
66fn slow_cached_blob_batch_log_ms() -> Option<u128> {
67    std::env::var(SLOW_CACHED_BLOB_BATCH_LOG_MS_ENV)
68        .ok()
69        .and_then(|value| value.parse::<u128>().ok())
70        .filter(|value| *value > 0)
71}
72
73fn access_update_background_batch_limit() -> usize {
74    std::env::var(ACCESS_UPDATE_BACKGROUND_BATCH_LIMIT_ENV)
75        .ok()
76        .and_then(|value| value.parse::<usize>().ok())
77        .unwrap_or(DEFAULT_ACCESS_UPDATE_BACKGROUND_BATCH_LIMIT)
78}
79
80fn cached_blob_batch_existing_precheck() -> bool {
81    std::env::var(CACHED_BLOB_BATCH_EXISTING_PRECHECK_ENV)
82        .is_ok_and(|value| value == "1" || value.eq_ignore_ascii_case("true"))
83}
84
85fn unix_timestamp_now() -> u64 {
86    SystemTime::now()
87        .duration_since(UNIX_EPOCH)
88        .unwrap_or_default()
89        .as_secs()
90}
91
92/// Cached root info from Nostr events.
93#[derive(Debug, Clone, Serialize, Deserialize)]
94pub struct CachedRoot {
95    /// Root hash (hex)
96    pub hash: String,
97    /// Optional decryption key (hex)
98    pub key: Option<String>,
99    /// Unix timestamp when this was cached (from event created_at)
100    pub updated_at: u64,
101    /// Visibility: "public", "link-visible", or "private"
102    pub visibility: String,
103}
104
105/// Storage statistics
106#[derive(Debug, Clone)]
107pub struct LocalStoreStats {
108    pub count: usize,
109    pub total_bytes: u64,
110}
111
112#[derive(Default)]
113struct BlobAccessUpdateGate {
114    next_update_by_hash: Mutex<HashMap<Hash, u64>>,
115}
116
117impl BlobAccessUpdateGate {
118    fn due_hashes<I>(&self, hashes: I, now: u64) -> Vec<Hash>
119    where
120        I: IntoIterator<Item = Hash>,
121    {
122        let Ok(mut next_update_by_hash) = self.next_update_by_hash.try_lock() else {
123            return Vec::new();
124        };
125
126        if next_update_by_hash.len() >= ACCESS_UPDATE_GATE_MAX_ENTRIES {
127            next_update_by_hash.retain(|_, next_update| *next_update > now);
128            if next_update_by_hash.len() >= ACCESS_UPDATE_GATE_MAX_ENTRIES {
129                next_update_by_hash.clear();
130            }
131        }
132
133        let mut due = Vec::new();
134        let mut seen = HashSet::new();
135        for hash in hashes {
136            if !seen.insert(hash) {
137                continue;
138            }
139            if next_update_by_hash
140                .get(&hash)
141                .is_some_and(|next_update| now < *next_update)
142            {
143                continue;
144            }
145            next_update_by_hash.insert(hash, now.saturating_add(ACCESS_UPDATE_INTERVAL_SECS));
146            due.push(hash);
147        }
148        due
149    }
150}
151
152/// Local blob store - wraps either FsBlobStore or LmdbBlobStore
153pub enum LocalStore {
154    Fs(FsBlobStore),
155    #[cfg(feature = "lmdb")]
156    Lmdb(LmdbBlobStore),
157}
158
159#[cfg(feature = "lmdb")]
160fn is_fs_blob_shard_dir(path: &Path) -> bool {
161    path.file_name()
162        .and_then(|name| name.to_str())
163        .map(|name| name.len() == 2 && name.as_bytes().iter().all(u8::is_ascii_hexdigit))
164        .unwrap_or(false)
165}
166
167fn lmdb_metadata_map_size_for_storage_budget(max_size_bytes: u64) -> u64 {
168    if max_size_bytes == 0 {
169        return LMDB_METADATA_MAX_MAP_SIZE_BYTES;
170    }
171
172    max_size_bytes
173        .saturating_div(LMDB_METADATA_STORAGE_RATIO_DIVISOR)
174        .clamp(
175            LMDB_METADATA_MIN_MAP_SIZE_BYTES,
176            LMDB_METADATA_MAX_MAP_SIZE_BYTES,
177        )
178}
179
180fn lmdb_map_size_for_existing_env(path: &Path, requested_bytes: u64) -> Result<usize> {
181    let existing_bytes = std::fs::metadata(path.join("data.mdb"))
182        .map(|metadata| metadata.len())
183        .unwrap_or(0);
184    let requested = if existing_bytes > requested_bytes {
185        let existing_headroom = existing_bytes
186            .saturating_div(10)
187            .max(LMDB_METADATA_REOPEN_HEADROOM_BYTES);
188        existing_bytes.saturating_add(existing_headroom)
189    } else {
190        requested_bytes
191    };
192    let requested = align_lmdb_map_size(requested);
193    usize::try_from(requested).context("LMDB map size exceeds usize")
194}
195
196fn align_lmdb_map_size(bytes: u64) -> u64 {
197    let page_size = (page_size::get() as u64).max(4096);
198    let remainder = bytes % page_size;
199    if remainder == 0 {
200        bytes
201    } else {
202        bytes.saturating_add(page_size - remainder)
203    }
204}
205
206#[cfg(feature = "lmdb")]
207fn remove_stale_fs_blob_shards(path: &Path) -> Result<(), StoreError> {
208    let entries = std::fs::read_dir(path).map_err(StoreError::Io)?;
209    for entry in entries {
210        let entry = entry.map_err(StoreError::Io)?;
211        let entry_path = entry.path();
212        if entry_path.is_dir() && is_fs_blob_shard_dir(&entry_path) {
213            std::fs::remove_dir_all(&entry_path).map_err(StoreError::Io)?;
214            tracing::info!(
215                "Removed stale filesystem blob shard directory after LMDB cutover: {}",
216                entry_path.display()
217            );
218        }
219    }
220    Ok(())
221}
222
223impl LocalStore {
224    /// Create a new unbounded local store.
225    ///
226    /// Higher-level stores that need quota enforcement should manage eviction
227    /// above this layer so tree metadata, pins, and archival policies stay
228    /// coherent.
229    pub fn new<P: AsRef<Path>>(path: P, backend: &StorageBackend) -> Result<Self, StoreError> {
230        Self::new_unbounded(path, backend)
231    }
232
233    /// Create a new local store with an explicit LMDB logical size cap when using the LMDB backend.
234    ///
235    /// The requested size is used for both the LMDB map and the store's built-in
236    /// byte quota, so standalone blob envs can evict instead of growing forever.
237    pub fn new_with_lmdb_map_size<P: AsRef<Path>>(
238        path: P,
239        backend: &StorageBackend,
240        _map_size_bytes: Option<u64>,
241    ) -> Result<Self, StoreError> {
242        match backend {
243            StorageBackend::Fs => Ok(LocalStore::Fs(FsBlobStore::new(path)?)),
244            #[cfg(feature = "lmdb")]
245            StorageBackend::Lmdb => match _map_size_bytes {
246                Some(map_size_bytes) => {
247                    std::fs::create_dir_all(path.as_ref()).map_err(StoreError::Io)?;
248                    remove_stale_fs_blob_shards(path.as_ref())?;
249                    Ok(LocalStore::Lmdb(LmdbBlobStore::with_max_bytes(
250                        path,
251                        map_size_bytes,
252                    )?))
253                }
254                None => {
255                    std::fs::create_dir_all(path.as_ref()).map_err(StoreError::Io)?;
256                    remove_stale_fs_blob_shards(path.as_ref())?;
257                    Ok(LocalStore::Lmdb(LmdbBlobStore::new(path)?))
258                }
259            },
260            #[cfg(not(feature = "lmdb"))]
261            StorageBackend::Lmdb => {
262                tracing::warn!(
263                    "LMDB backend requested but lmdb feature not enabled, using filesystem storage"
264                );
265                Ok(LocalStore::Fs(FsBlobStore::new(path)?))
266            }
267        }
268    }
269
270    /// Create a new unbounded local store for a specific backend.
271    pub fn new_unbounded<P: AsRef<Path>>(
272        path: P,
273        backend: &StorageBackend,
274    ) -> Result<Self, StoreError> {
275        Self::new_with_lmdb_map_size(path, backend, None)
276    }
277
278    pub fn backend(&self) -> StorageBackend {
279        match self {
280            LocalStore::Fs(_) => StorageBackend::Fs,
281            #[cfg(feature = "lmdb")]
282            LocalStore::Lmdb(_) => StorageBackend::Lmdb,
283        }
284    }
285
286    /// Sync put operation
287    pub fn put_sync(&self, hash: Hash, data: &[u8]) -> Result<bool, StoreError> {
288        match self {
289            LocalStore::Fs(store) => store.put_sync(hash, data),
290            #[cfg(feature = "lmdb")]
291            LocalStore::Lmdb(store) => store.put_sync(hash, data),
292        }
293    }
294
295    /// Sync batch put operation.
296    pub fn put_many_sync(&self, items: &[(Hash, Vec<u8>)]) -> Result<usize, StoreError> {
297        match self {
298            LocalStore::Fs(store) => {
299                let mut inserted = 0usize;
300                for (hash, data) in items {
301                    if store.put_sync(*hash, data.as_slice())? {
302                        inserted += 1;
303                    }
304                }
305                Ok(inserted)
306            }
307            #[cfg(feature = "lmdb")]
308            LocalStore::Lmdb(store) => store.put_many_sync(items),
309        }
310    }
311
312    /// Sync get operation
313    pub fn get_sync(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
314        match self {
315            LocalStore::Fs(store) => store.get_sync(hash),
316            #[cfg(feature = "lmdb")]
317            LocalStore::Lmdb(store) => store.get_sync(hash),
318        }
319    }
320
321    pub fn get_range_sync(
322        &self,
323        hash: &Hash,
324        start: u64,
325        end_inclusive: u64,
326    ) -> Result<Option<Vec<u8>>, StoreError> {
327        match self {
328            LocalStore::Fs(store) => store.get_range_sync(hash, start, end_inclusive),
329            #[cfg(feature = "lmdb")]
330            LocalStore::Lmdb(store) => store.get_range_sync(hash, start, end_inclusive),
331        }
332    }
333
334    pub fn blob_size_sync(&self, hash: &Hash) -> Result<Option<u64>, StoreError> {
335        match self {
336            LocalStore::Fs(store) => store.blob_size_sync(hash),
337            #[cfg(feature = "lmdb")]
338            LocalStore::Lmdb(store) => store.blob_size_sync(hash),
339        }
340    }
341
342    pub fn touch_accessed_sync(&self, hash: &Hash, now: u64) -> Result<bool, StoreError> {
343        match self {
344            LocalStore::Fs(store) => store.touch_accessed_sync(hash, now),
345            #[cfg(feature = "lmdb")]
346            LocalStore::Lmdb(store) => store.touch_accessed_sync(hash, now),
347        }
348    }
349
350    pub fn touch_many_accessed_sync(&self, hashes: &[Hash], now: u64) -> Result<usize, StoreError> {
351        match self {
352            LocalStore::Fs(store) => store.touch_many_accessed_sync(hashes, now),
353            #[cfg(feature = "lmdb")]
354            LocalStore::Lmdb(store) => store.touch_many_accessed_sync(hashes, now),
355        }
356    }
357
358    pub fn last_accessed_at_sync(&self, hash: &Hash) -> Result<Option<u64>, StoreError> {
359        match self {
360            LocalStore::Fs(store) => store.last_accessed_at_sync(hash),
361            #[cfg(feature = "lmdb")]
362            LocalStore::Lmdb(store) => store.last_accessed_at_sync(hash),
363        }
364    }
365
366    pub fn many_last_accessed_at_sync(
367        &self,
368        hashes: &[Hash],
369    ) -> Result<Vec<(Hash, u64)>, StoreError> {
370        match self {
371            LocalStore::Fs(store) => store.many_last_accessed_at_sync(hashes),
372            #[cfg(feature = "lmdb")]
373            LocalStore::Lmdb(store) => store.many_last_accessed_at_sync(hashes),
374        }
375    }
376
377    /// Check if hash exists
378    pub fn exists(&self, hash: &Hash) -> Result<bool, StoreError> {
379        match self {
380            LocalStore::Fs(store) => Ok(store.exists(hash)),
381            #[cfg(feature = "lmdb")]
382            LocalStore::Lmdb(store) => store.exists(hash),
383        }
384    }
385
386    /// Mark which sorted hashes already exist in local storage.
387    pub fn existing_hashes_in_sorted_candidates(
388        &self,
389        sorted_hashes: &[Hash],
390    ) -> Result<Vec<bool>, StoreError> {
391        match self {
392            LocalStore::Fs(store) => Ok(sorted_hashes
393                .iter()
394                .map(|hash| store.exists(hash))
395                .collect()),
396            #[cfg(feature = "lmdb")]
397            LocalStore::Lmdb(store) => store.existing_hashes_in_sorted_candidates(sorted_hashes),
398        }
399    }
400
401    /// Sync delete operation
402    pub fn delete_sync(&self, hash: &Hash) -> Result<bool, StoreError> {
403        match self {
404            LocalStore::Fs(store) => store.delete_sync(hash),
405            #[cfg(feature = "lmdb")]
406            LocalStore::Lmdb(store) => store.delete_sync(hash),
407        }
408    }
409
410    /// Get storage statistics
411    pub fn stats(&self) -> Result<LocalStoreStats, StoreError> {
412        match self {
413            LocalStore::Fs(store) => {
414                let stats = store.stats()?;
415                Ok(LocalStoreStats {
416                    count: stats.count,
417                    total_bytes: stats.total_bytes,
418                })
419            }
420            #[cfg(feature = "lmdb")]
421            LocalStore::Lmdb(store) => {
422                let stats = store.stats()?;
423                Ok(LocalStoreStats {
424                    count: stats.count,
425                    total_bytes: stats.total_bytes,
426                })
427            }
428        }
429    }
430
431    /// List all hashes in the store
432    pub fn list(&self) -> Result<Vec<Hash>, StoreError> {
433        match self {
434            LocalStore::Fs(store) => store.list(),
435            #[cfg(feature = "lmdb")]
436            LocalStore::Lmdb(store) => store.list(),
437        }
438    }
439}
440
441#[async_trait]
442impl Store for LocalStore {
443    async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
444        self.put_sync(hash, &data)
445    }
446
447    async fn put_many(&self, items: Vec<(Hash, Vec<u8>)>) -> Result<usize, StoreError> {
448        self.put_many_sync(&items)
449    }
450
451    async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
452        self.get_sync(hash)
453    }
454
455    async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
456        self.exists(hash)
457    }
458
459    async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
460        self.delete_sync(hash)
461    }
462}
463
464#[cfg(feature = "s3")]
465use tokio::sync::mpsc;
466
467use crate::config::S3Config;
468
469/// Message for background S3 sync
470#[cfg(feature = "s3")]
471enum S3SyncMessage {
472    Upload { hash: Hash, data: Vec<u8> },
473    Delete { hash: Hash },
474}
475
476/// Storage router - local store primary with optional S3 backup
477///
478/// Write path: local first (fast), then queue S3 upload (non-blocking)
479/// Read path: local first, fall back to S3 if miss
480pub struct StorageRouter {
481    /// Primary local store (always used)
482    local: Arc<LocalStore>,
483    /// Optional S3 client for backup
484    #[cfg(feature = "s3")]
485    s3_client: Option<aws_sdk_s3::Client>,
486    #[cfg(feature = "s3")]
487    s3_bucket: Option<String>,
488    #[cfg(feature = "s3")]
489    s3_prefix: String,
490    /// Channel to send uploads to background task
491    #[cfg(feature = "s3")]
492    sync_tx: Option<mpsc::UnboundedSender<S3SyncMessage>>,
493}
494
495impl StorageRouter {
496    #[cfg(feature = "s3")]
497    fn s3_sync_timeout() -> std::time::Duration {
498        let millis = std::env::var(S3_SYNC_TIMEOUT_MS_ENV)
499            .ok()
500            .and_then(|value| value.parse::<u64>().ok())
501            .filter(|value| *value > 0)
502            .unwrap_or(DEFAULT_S3_SYNC_TIMEOUT_MS);
503        std::time::Duration::from_millis(millis)
504    }
505
506    #[cfg(feature = "s3")]
507    fn s3_sync_timeout_error(timeout: std::time::Duration) -> StoreError {
508        StoreError::Other(format!(
509            "S3 sync operation timed out after {}ms",
510            timeout.as_millis()
511        ))
512    }
513
514    #[cfg(feature = "s3")]
515    fn run_s3_future_sync<F, T>(future: F) -> Result<T, StoreError>
516    where
517        F: Future<Output = T> + Send + 'static,
518        T: Send + 'static,
519    {
520        let timeout = Self::s3_sync_timeout();
521        if tokio::runtime::Handle::try_current().is_ok() {
522            return std::thread::Builder::new()
523                .name("storage-s3-sync".to_string())
524                .spawn(move || {
525                    let runtime = tokio::runtime::Builder::new_current_thread()
526                        .enable_all()
527                        .build()
528                        .map_err(|err| {
529                            StoreError::Other(format!("build storage s3 sync runtime: {err}"))
530                        })?;
531                    runtime.block_on(async move {
532                        tokio::time::timeout(timeout, future)
533                            .await
534                            .map_err(|_| Self::s3_sync_timeout_error(timeout))
535                    })
536                })
537                .map_err(|err| StoreError::Other(format!("spawn S3 sync helper thread: {err}")))?
538                .join()
539                .map_err(|_| StoreError::Other("S3 sync helper thread panicked".to_string()))?;
540        }
541
542        let runtime = tokio::runtime::Builder::new_current_thread()
543            .enable_all()
544            .build()
545            .map_err(|err| StoreError::Other(format!("build storage s3 sync runtime: {err}")))?;
546        runtime.block_on(async move {
547            tokio::time::timeout(timeout, future)
548                .await
549                .map_err(|_| Self::s3_sync_timeout_error(timeout))
550        })
551    }
552
553    /// Create router with local storage only
554    pub fn new(local: Arc<LocalStore>) -> Self {
555        Self {
556            local,
557            #[cfg(feature = "s3")]
558            s3_client: None,
559            #[cfg(feature = "s3")]
560            s3_bucket: None,
561            #[cfg(feature = "s3")]
562            s3_prefix: String::new(),
563            #[cfg(feature = "s3")]
564            sync_tx: None,
565        }
566    }
567
568    /// Create router with local storage + S3 backup
569    #[cfg(feature = "s3")]
570    pub async fn with_s3(local: Arc<LocalStore>, config: &S3Config) -> Result<Self, anyhow::Error> {
571        use aws_sdk_s3::Client as S3Client;
572
573        // Build AWS config
574        let mut aws_config_loader = aws_config::from_env();
575        aws_config_loader =
576            aws_config_loader.region(aws_sdk_s3::config::Region::new(config.region.clone()));
577        let aws_config = aws_config_loader.load().await;
578
579        // Build S3 client with custom endpoint
580        let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&aws_config);
581        s3_config_builder = s3_config_builder
582            .endpoint_url(&config.endpoint)
583            .force_path_style(true);
584
585        let s3_client = S3Client::from_conf(s3_config_builder.build());
586        let bucket = config.bucket.clone();
587        let prefix = config.prefix.clone().unwrap_or_default();
588
589        // Create background sync channel
590        let (sync_tx, mut sync_rx) = mpsc::unbounded_channel::<S3SyncMessage>();
591
592        // Spawn background sync task with bounded concurrent uploads
593        let sync_client = s3_client.clone();
594        let sync_bucket = bucket.clone();
595        let sync_prefix = prefix.clone();
596
597        tokio::spawn(async move {
598            use aws_sdk_s3::primitives::ByteStream;
599
600            tracing::info!("S3 background sync task started");
601
602            // Keep S3 writes parallel, but avoid dispatch failures during mirror backfill bursts.
603            let semaphore = std::sync::Arc::new(tokio::sync::Semaphore::new(8));
604            let client = std::sync::Arc::new(sync_client);
605            let bucket = std::sync::Arc::new(sync_bucket);
606            let prefix = std::sync::Arc::new(sync_prefix);
607
608            while let Some(msg) = sync_rx.recv().await {
609                let client = client.clone();
610                let bucket = bucket.clone();
611                let prefix = prefix.clone();
612                let semaphore = semaphore.clone();
613
614                // Spawn each upload with semaphore-bounded concurrency
615                tokio::spawn(async move {
616                    // Acquire permit before uploading
617                    let _permit = semaphore.acquire().await;
618
619                    match msg {
620                        S3SyncMessage::Upload { hash, data } => {
621                            let key = format!("{}{}.bin", prefix, to_hex(&hash));
622                            tracing::debug!("S3 uploading {} ({} bytes)", &key, data.len());
623
624                            let mut attempt = 1u8;
625                            loop {
626                                match client
627                                    .put_object()
628                                    .bucket(bucket.as_str())
629                                    .key(&key)
630                                    .body(ByteStream::from(data.clone()))
631                                    .send()
632                                    .await
633                                {
634                                    Ok(_) => {
635                                        tracing::debug!("S3 upload succeeded: {}", &key);
636                                        break;
637                                    }
638                                    Err(e) if attempt < 3 => {
639                                        tracing::warn!(
640                                            "S3 upload retrying {}: attempt={} error={}",
641                                            &key,
642                                            attempt,
643                                            e
644                                        );
645                                        tokio::time::sleep(std::time::Duration::from_millis(
646                                            250 * u64::from(attempt),
647                                        ))
648                                        .await;
649                                        attempt += 1;
650                                    }
651                                    Err(e) => {
652                                        tracing::error!(
653                                            "S3 upload failed {} after {} attempts: {}",
654                                            &key,
655                                            attempt,
656                                            e
657                                        );
658                                        break;
659                                    }
660                                }
661                            }
662                        }
663                        S3SyncMessage::Delete { hash } => {
664                            let key = format!("{}{}.bin", prefix, to_hex(&hash));
665                            tracing::debug!("S3 deleting {}", &key);
666
667                            let mut attempt = 1u8;
668                            loop {
669                                match client
670                                    .delete_object()
671                                    .bucket(bucket.as_str())
672                                    .key(&key)
673                                    .send()
674                                    .await
675                                {
676                                    Ok(_) => break,
677                                    Err(e) if attempt < 3 => {
678                                        tracing::warn!(
679                                            "S3 delete retrying {}: attempt={} error={}",
680                                            &key,
681                                            attempt,
682                                            e
683                                        );
684                                        tokio::time::sleep(std::time::Duration::from_millis(
685                                            250 * u64::from(attempt),
686                                        ))
687                                        .await;
688                                        attempt += 1;
689                                    }
690                                    Err(e) => {
691                                        tracing::error!(
692                                            "S3 delete failed {} after {} attempts: {}",
693                                            &key,
694                                            attempt,
695                                            e
696                                        );
697                                        break;
698                                    }
699                                }
700                            }
701                        }
702                    }
703                });
704            }
705        });
706
707        tracing::info!(
708            "S3 storage initialized: bucket={}, prefix={}",
709            bucket,
710            prefix
711        );
712
713        Ok(Self {
714            local,
715            s3_client: Some(s3_client),
716            s3_bucket: Some(bucket),
717            s3_prefix: prefix,
718            sync_tx: Some(sync_tx),
719        })
720    }
721
722    /// Store data - writes to LMDB, queues S3 upload in background
723    pub fn put_sync(&self, hash: Hash, data: &[u8]) -> Result<bool, StoreError> {
724        // Always write to local first
725        let is_new = self.local.put_sync(hash, data)?;
726
727        // Queue S3 upload only for newly inserted blobs.
728        // Existing local blobs were already persisted or are handled by explicit repair/push flows.
729        #[cfg(feature = "s3")]
730        if is_new {
731            if let Some(ref tx) = self.sync_tx {
732                tracing::debug!(
733                    "Queueing S3 upload for {} ({} bytes)",
734                    crate::storage::to_hex(&hash)[..16].to_string(),
735                    data.len(),
736                );
737                if let Err(e) = tx.send(S3SyncMessage::Upload {
738                    hash,
739                    data: data.to_vec(),
740                }) {
741                    tracing::error!("Failed to queue S3 upload: {}", e);
742                }
743            }
744        }
745
746        Ok(is_new)
747    }
748
749    /// Store multiple blobs with a single local batch write when supported.
750    pub fn put_many_sync(&self, items: &[(Hash, Vec<u8>)]) -> Result<usize, StoreError> {
751        #[cfg(feature = "s3")]
752        let pending_uploads = if self.sync_tx.is_some() {
753            let mut pending = Vec::new();
754            for (hash, data) in items {
755                if !self.local.exists(hash)? {
756                    pending.push((*hash, data.clone()));
757                }
758            }
759            pending
760        } else {
761            Vec::new()
762        };
763
764        let inserted = self.local.put_many_sync(items)?;
765
766        #[cfg(feature = "s3")]
767        if let Some(ref tx) = self.sync_tx {
768            for (hash, data) in pending_uploads {
769                if let Err(e) = tx.send(S3SyncMessage::Upload { hash, data }) {
770                    tracing::error!("Failed to queue S3 upload: {}", e);
771                }
772            }
773        }
774
775        Ok(inserted)
776    }
777
778    /// Get data - tries LMDB first, falls back to S3
779    pub fn get_sync(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
780        // Try local first
781        if let Some(data) = self.local.get_sync(hash)? {
782            return Ok(Some(data));
783        }
784
785        // Fall back to S3 if configured
786        #[cfg(feature = "s3")]
787        if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
788            let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
789            let client = client.clone();
790            let bucket = bucket.clone();
791
792            match Self::run_s3_future_sync(async move {
793                client.get_object().bucket(bucket).key(key).send().await
794            }) {
795                Ok(Ok(output)) => {
796                    match Self::run_s3_future_sync(async move { output.body.collect().await }) {
797                        Ok(Ok(body)) => {
798                            let data = body.into_bytes().to_vec();
799                            // Cache locally for future reads
800                            let _ = self.local.put_sync(*hash, &data);
801                            return Ok(Some(data));
802                        }
803                        Ok(Err(err)) => {
804                            tracing::warn!("S3 body collect failed: {}", err);
805                        }
806                        Err(err) => {
807                            tracing::warn!("S3 body collect runtime failed: {}", err);
808                        }
809                    }
810                }
811                Ok(Err(err)) => {
812                    let service_err = err.into_service_error();
813                    if !service_err.is_no_such_key() {
814                        tracing::warn!("S3 get failed: {}", service_err);
815                    }
816                }
817                Err(err) => {
818                    tracing::warn!("S3 get runtime failed: {}", err);
819                }
820            }
821        }
822
823        Ok(None)
824    }
825
826    pub fn get_range_sync(
827        &self,
828        hash: &Hash,
829        start: u64,
830        end_inclusive: u64,
831    ) -> Result<Option<Vec<u8>>, StoreError> {
832        self.local.get_range_sync(hash, start, end_inclusive)
833    }
834
835    pub fn blob_size_sync(&self, hash: &Hash) -> Result<Option<u64>, StoreError> {
836        self.local.blob_size_sync(hash)
837    }
838
839    pub fn touch_accessed_sync(&self, hash: &Hash, now: u64) -> Result<bool, StoreError> {
840        self.local.touch_accessed_sync(hash, now)
841    }
842
843    pub fn touch_many_accessed_sync(&self, hashes: &[Hash], now: u64) -> Result<usize, StoreError> {
844        self.local.touch_many_accessed_sync(hashes, now)
845    }
846
847    pub fn last_accessed_at_sync(&self, hash: &Hash) -> Result<Option<u64>, StoreError> {
848        self.local.last_accessed_at_sync(hash)
849    }
850
851    pub fn many_last_accessed_at_sync(
852        &self,
853        hashes: &[Hash],
854    ) -> Result<Vec<(Hash, u64)>, StoreError> {
855        self.local.many_last_accessed_at_sync(hashes)
856    }
857
858    /// Check if hash exists
859    pub fn exists(&self, hash: &Hash) -> Result<bool, StoreError> {
860        // Check local first
861        if self.local.exists(hash)? {
862            return Ok(true);
863        }
864
865        // Check S3 if configured
866        #[cfg(feature = "s3")]
867        if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
868            let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
869            let client = client.clone();
870            let bucket = bucket.clone();
871
872            match Self::run_s3_future_sync(async move {
873                client.head_object().bucket(bucket).key(&key).send().await
874            }) {
875                Ok(Ok(_)) => return Ok(true),
876                Ok(Err(err)) => {
877                    let service_err = err.into_service_error();
878                    if !service_err.is_not_found() {
879                        tracing::warn!("S3 head failed: {}", service_err);
880                    }
881                }
882                Err(err) => {
883                    tracing::warn!("S3 head runtime failed: {}", err);
884                }
885            }
886        }
887
888        Ok(false)
889    }
890
891    /// Delete data from both local and S3 stores
892    pub fn delete_sync(&self, hash: &Hash) -> Result<bool, StoreError> {
893        let deleted = self.local.delete_sync(hash)?;
894
895        // Queue S3 delete if configured
896        #[cfg(feature = "s3")]
897        if let Some(ref tx) = self.sync_tx {
898            let _ = tx.send(S3SyncMessage::Delete { hash: *hash });
899        }
900
901        Ok(deleted)
902    }
903
904    /// Delete data from local store only (don't propagate to S3)
905    /// Used for eviction where we want to keep S3 as archive
906    pub fn delete_local_only(&self, hash: &Hash) -> Result<bool, StoreError> {
907        self.local.delete_sync(hash)
908    }
909
910    /// Get stats from local store
911    pub fn stats(&self) -> Result<LocalStoreStats, StoreError> {
912        self.local.stats()
913    }
914
915    /// List all hashes from local store
916    pub fn list(&self) -> Result<Vec<Hash>, StoreError> {
917        self.local.list()
918    }
919
920    /// Mark which sorted candidate hashes already exist in local storage.
921    pub fn existing_local_hashes_in_sorted_candidates(
922        &self,
923        sorted_hashes: &[Hash],
924    ) -> Result<Vec<bool>, StoreError> {
925        self.local
926            .existing_hashes_in_sorted_candidates(sorted_hashes)
927    }
928
929    /// Get the underlying local store for HashTree operations
930    pub fn local_store(&self) -> Arc<LocalStore> {
931        Arc::clone(&self.local)
932    }
933}
934
935#[derive(Clone)]
936struct AccessRecordingStore {
937    inner: Arc<StorageRouter>,
938    accessed: Arc<Mutex<HashSet<Hash>>>,
939}
940
941impl AccessRecordingStore {
942    fn new(inner: Arc<StorageRouter>) -> Self {
943        Self {
944            inner,
945            accessed: Arc::new(Mutex::new(HashSet::new())),
946        }
947    }
948
949    fn take_accessed_hashes(&self) -> Vec<Hash> {
950        let Ok(mut accessed) = self.accessed.lock() else {
951            return Vec::new();
952        };
953        accessed.drain().collect()
954    }
955
956    fn record_access(&self, hash: &Hash) {
957        let Ok(mut accessed) = self.accessed.lock() else {
958            return;
959        };
960        accessed.insert(*hash);
961    }
962}
963
964#[async_trait]
965impl Store for AccessRecordingStore {
966    async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
967        self.inner.put(hash, data).await
968    }
969
970    async fn put_many(&self, items: Vec<(Hash, Vec<u8>)>) -> Result<usize, StoreError> {
971        self.inner.put_many(items).await
972    }
973
974    async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
975        let data = self.inner.get(hash).await?;
976        if data.is_some() {
977            self.record_access(hash);
978        }
979        Ok(data)
980    }
981
982    async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
983        self.inner.has(hash).await
984    }
985
986    async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
987        self.inner.delete(hash).await
988    }
989}
990
991// Implement async Store trait for StorageRouter so it can be used directly with HashTree
992// This ensures all writes go through S3 sync
993#[async_trait]
994impl Store for StorageRouter {
995    async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
996        self.put_sync(hash, &data)
997    }
998
999    async fn put_many(&self, items: Vec<(Hash, Vec<u8>)>) -> Result<usize, StoreError> {
1000        self.put_many_sync(&items)
1001    }
1002
1003    async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
1004        self.get_sync(hash)
1005    }
1006
1007    async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
1008        self.exists(hash)
1009    }
1010
1011    async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
1012        self.delete_sync(hash)
1013    }
1014}
1015
1016pub struct HashtreeStore {
1017    base_path: PathBuf,
1018    env: heed::Env,
1019    /// Set of pinned hashes (32-byte raw hashes, prevents garbage collection)
1020    pins: Database<Bytes, Unit>,
1021    /// Mutable published refs that should stay subscribed and keep following updates
1022    pinned_refs: Database<Str, Unit>,
1023    /// Authors whose hashtree publications should be mirrored continuously
1024    tracked_authors: Database<Str, Unit>,
1025    /// Blob ownership: sha256 (32 bytes) ++ pubkey (32 bytes) -> () (composite key for multi-owner)
1026    blob_owners: Database<Bytes, Unit>,
1027    /// Maps pubkey (32 bytes) -> blob metadata JSON (for blossom list)
1028    pubkey_blobs: Database<Bytes, Bytes>,
1029    /// Pubkey listing index: pubkey (32 bytes) ++ sha256 (32 bytes) -> BlobMetadata JSON
1030    pubkey_blob_index: Database<Bytes, Bytes>,
1031    /// Tree metadata for eviction: tree_root_hash (32 bytes) -> TreeMeta (msgpack)
1032    tree_meta: Database<Bytes, Bytes>,
1033    /// Blob-to-tree mapping: blob_hash ++ tree_hash (64 bytes) -> ()
1034    blob_trees: Database<Bytes, Unit>,
1035    /// Tree refs: "npub/path" -> tree_root_hash (32 bytes) - for replacing old versions
1036    tree_refs: Database<Str, Bytes>,
1037    /// Cached roots from Nostr: "pubkey_hex/tree_name" -> CachedRoot (msgpack)
1038    cached_roots: Database<Str, Bytes>,
1039    /// Storage router - handles LMDB + optional S3 (Arc for sharing with HashTree)
1040    router: Arc<StorageRouter>,
1041    /// Maximum storage size in bytes (from config)
1042    max_size_bytes: u64,
1043    /// Whether quota enforcement may delete local blobs not tracked by any indexed tree.
1044    evict_orphans: bool,
1045    /// Best-effort in-memory throttle for blob access metadata writes.
1046    blob_access_update_gate: BlobAccessUpdateGate,
1047    /// Keeps access-time maintenance out of foreground blob reads.
1048    blob_access_update_inflight: Arc<AtomicBool>,
1049}
1050
1051impl HashtreeStore {
1052    /// Create a new store with the configured local storage limit.
1053    pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
1054        let config = hashtree_config::Config::load_or_default();
1055        let max_size_bytes = config
1056            .storage
1057            .max_size_gb
1058            .saturating_mul(1024 * 1024 * 1024);
1059        Self::with_options_and_backend(
1060            path,
1061            None,
1062            max_size_bytes,
1063            config.storage.evict_orphans,
1064            &config.storage.backend,
1065        )
1066    }
1067
1068    /// Create a new store with an explicit local backend and size limit.
1069    pub fn new_with_backend<P: AsRef<Path>>(
1070        path: P,
1071        backend: hashtree_config::StorageBackend,
1072        max_size_bytes: u64,
1073    ) -> Result<Self> {
1074        Self::with_options_and_backend(path, None, max_size_bytes, true, &backend)
1075    }
1076
1077    /// Create a new store with optional S3 backend and the configured local storage limit.
1078    pub fn with_s3<P: AsRef<Path>>(path: P, s3_config: Option<&S3Config>) -> Result<Self> {
1079        let config = hashtree_config::Config::load_or_default();
1080        let max_size_bytes = config
1081            .storage
1082            .max_size_gb
1083            .saturating_mul(1024 * 1024 * 1024);
1084        Self::with_options_and_backend(
1085            path,
1086            s3_config,
1087            max_size_bytes,
1088            config.storage.evict_orphans,
1089            &config.storage.backend,
1090        )
1091    }
1092
1093    /// Create a new store with optional S3 backend and custom size limit.
1094    ///
1095    /// The raw local blob backend remains unbounded. `HashtreeStore` enforces
1096    /// `max_size_bytes` at the tree-management layer so eviction can honor pins,
1097    /// orphan handling, and local-only eviction when S3 is used as archive.
1098    pub fn with_options<P: AsRef<Path>>(
1099        path: P,
1100        s3_config: Option<&S3Config>,
1101        max_size_bytes: u64,
1102    ) -> Result<Self> {
1103        let config = hashtree_config::Config::load_or_default();
1104        Self::with_options_and_backend(
1105            path,
1106            s3_config,
1107            max_size_bytes,
1108            config.storage.evict_orphans,
1109            &config.storage.backend,
1110        )
1111    }
1112
1113    pub fn with_options_and_backend<P: AsRef<Path>>(
1114        path: P,
1115        s3_config: Option<&S3Config>,
1116        max_size_bytes: u64,
1117        evict_orphans: bool,
1118        backend: &hashtree_config::StorageBackend,
1119    ) -> Result<Self> {
1120        let path = path.as_ref();
1121        std::fs::create_dir_all(path)?;
1122        let metadata_map_size = lmdb_map_size_for_existing_env(
1123            path,
1124            lmdb_metadata_map_size_for_storage_budget(max_size_bytes),
1125        )?;
1126
1127        let env = unsafe {
1128            EnvOpenOptions::new()
1129                .map_size(metadata_map_size)
1130                .max_dbs(11) // pins, pinned_refs, tracked_authors, blob_owners, pubkey_blobs, pubkey_blob_index, tree_meta, blob_trees, tree_refs, cached_roots, blobs
1131                .max_readers(LMDB_MAX_READERS)
1132                .open(path)?
1133        };
1134        let _ = env.clear_stale_readers();
1135        if env.info().map_size < metadata_map_size {
1136            unsafe { env.resize(metadata_map_size) }?;
1137        }
1138
1139        let mut wtxn = env.write_txn()?;
1140        let pins = env.create_database(&mut wtxn, Some("pins"))?;
1141        let pinned_refs = env.create_database(&mut wtxn, Some("pinned_refs"))?;
1142        let tracked_authors = env.create_database(&mut wtxn, Some("tracked_authors"))?;
1143        let blob_owners = env.create_database(&mut wtxn, Some("blob_owners"))?;
1144        let pubkey_blobs = env.create_database(&mut wtxn, Some("pubkey_blobs"))?;
1145        let pubkey_blob_index = env.create_database(&mut wtxn, Some("pubkey_blob_index"))?;
1146        let tree_meta = env.create_database(&mut wtxn, Some("tree_meta"))?;
1147        let blob_trees = env.create_database(&mut wtxn, Some("blob_trees"))?;
1148        let tree_refs = env.create_database(&mut wtxn, Some("tree_refs"))?;
1149        let cached_roots = env.create_database(&mut wtxn, Some("cached_roots"))?;
1150        wtxn.commit()?;
1151
1152        // Intentionally keep the raw blob backend unbounded here. HashtreeStore
1153        // owns quota policy above this layer, where it can coordinate eviction
1154        // with tree refs, blob ownership, pins, and S3 archival behavior.
1155        let local_store = Arc::new(match backend {
1156            hashtree_config::StorageBackend::Fs => LocalStore::Fs(
1157                FsBlobStore::new(path.join("blobs"))
1158                    .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
1159            ),
1160            #[cfg(feature = "lmdb")]
1161            hashtree_config::StorageBackend::Lmdb => {
1162                std::fs::create_dir_all(path.join("blobs"))?;
1163                remove_stale_fs_blob_shards(&path.join("blobs"))
1164                    .map_err(|e| anyhow::anyhow!("Failed to clean LMDB blob store path: {}", e))?;
1165                let requested_map_size = max_size_bytes.max(LMDB_BLOB_MIN_MAP_SIZE_BYTES);
1166                let map_size = usize::try_from(requested_map_size)
1167                    .context("LMDB blob map size exceeds usize")?;
1168                LocalStore::Lmdb(
1169                    LmdbBlobStore::with_map_size(path.join("blobs"), map_size)
1170                        .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
1171                )
1172            }
1173            #[cfg(not(feature = "lmdb"))]
1174            hashtree_config::StorageBackend::Lmdb => {
1175                tracing::warn!(
1176                    "LMDB backend requested but lmdb feature not enabled, using filesystem storage"
1177                );
1178                LocalStore::Fs(
1179                    FsBlobStore::new(path.join("blobs"))
1180                        .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
1181                )
1182            }
1183        });
1184
1185        // Create storage router with optional S3
1186        #[cfg(feature = "s3")]
1187        let router = Arc::new(if let Some(s3_cfg) = s3_config {
1188            tracing::info!(
1189                "Initializing S3 storage backend: bucket={}, endpoint={}",
1190                s3_cfg.bucket,
1191                s3_cfg.endpoint
1192            );
1193
1194            sync_block_on(async { StorageRouter::with_s3(local_store, s3_cfg).await })?
1195        } else {
1196            StorageRouter::new(local_store)
1197        });
1198
1199        #[cfg(not(feature = "s3"))]
1200        let router = Arc::new({
1201            if s3_config.is_some() {
1202                tracing::warn!(
1203                    "S3 config provided but S3 feature not enabled. Using local storage only."
1204                );
1205            }
1206            StorageRouter::new(local_store)
1207        });
1208
1209        Ok(Self {
1210            base_path: path.to_path_buf(),
1211            env,
1212            pins,
1213            pinned_refs,
1214            tracked_authors,
1215            blob_owners,
1216            pubkey_blobs,
1217            pubkey_blob_index,
1218            tree_meta,
1219            blob_trees,
1220            tree_refs,
1221            cached_roots,
1222            router,
1223            max_size_bytes,
1224            evict_orphans,
1225            blob_access_update_gate: BlobAccessUpdateGate::default(),
1226            blob_access_update_inflight: Arc::new(AtomicBool::new(false)),
1227        })
1228    }
1229
1230    pub fn base_path(&self) -> &Path {
1231        &self.base_path
1232    }
1233
1234    /// Get the storage router
1235    pub fn router(&self) -> &StorageRouter {
1236        &self.router
1237    }
1238
1239    /// Get the storage router as Arc (for use with HashTree which needs Arc<dyn Store>)
1240    /// All writes through this go to both LMDB and S3
1241    pub fn store_arc(&self) -> Arc<StorageRouter> {
1242        Arc::clone(&self.router)
1243    }
1244
1245    fn access_tracking_tree(&self) -> (HashTree<AccessRecordingStore>, AccessRecordingStore) {
1246        let access_store = AccessRecordingStore::new(self.store_arc());
1247        let tree = HashTree::new(HashTreeConfig::new(Arc::new(access_store.clone())).public());
1248        (tree, access_store)
1249    }
1250
1251    pub fn record_blob_accesses<I>(&self, hashes: I)
1252    where
1253        I: IntoIterator<Item = Hash>,
1254    {
1255        let access_update_batch_limit = access_update_background_batch_limit();
1256        if access_update_batch_limit == 0 {
1257            return;
1258        }
1259
1260        let now = unix_timestamp_now();
1261        let mut due_hashes = self.blob_access_update_gate.due_hashes(hashes, now);
1262        if due_hashes.is_empty() {
1263            return;
1264        }
1265
1266        if self
1267            .blob_access_update_inflight
1268            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
1269            .is_err()
1270        {
1271            return;
1272        }
1273
1274        if due_hashes.len() > access_update_batch_limit {
1275            due_hashes.truncate(access_update_batch_limit);
1276        }
1277
1278        let router = Arc::clone(&self.router);
1279        let inflight = Arc::clone(&self.blob_access_update_inflight);
1280        let spawn_result = std::thread::Builder::new()
1281            .name("blob-access-update".to_string())
1282            .spawn(move || {
1283                if let Err(err) = router.touch_many_accessed_sync(&due_hashes, now) {
1284                    tracing::debug!("Failed to update blob access metadata: {}", err);
1285                }
1286                inflight.store(false, Ordering::Release);
1287            });
1288        if let Err(err) = spawn_result {
1289            self.blob_access_update_inflight
1290                .store(false, Ordering::Release);
1291            tracing::debug!("Failed to spawn blob access metadata updater: {}", err);
1292        }
1293    }
1294
1295    fn record_blob_access_now(&self, hash: &Hash) {
1296        if let Err(err) = self.router.touch_accessed_sync(hash, unix_timestamp_now()) {
1297            tracing::debug!("Failed to update blob access metadata: {}", err);
1298        }
1299    }
1300
1301    pub fn blob_last_accessed_at(&self, hash: &Hash) -> Result<Option<u64>> {
1302        self.router
1303            .last_accessed_at_sync(hash)
1304            .map_err(|e| anyhow::anyhow!("Failed to read blob access metadata: {}", e))
1305    }
1306
1307    pub fn blob_last_accessed_many(&self, hashes: &[Hash]) -> Result<Vec<(Hash, u64)>> {
1308        self.router
1309            .many_last_accessed_at_sync(hashes)
1310            .map_err(|e| anyhow::anyhow!("Failed to read blob access metadata: {}", e))
1311    }
1312
1313    /// Get tree node by hash (raw bytes)
1314    pub fn get_tree_node(&self, hash: &[u8; 32]) -> Result<Option<TreeNode>> {
1315        let (tree, access_store) = self.access_tracking_tree();
1316
1317        let result = sync_block_on(async {
1318            tree.get_tree_node(hash)
1319                .await
1320                .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))
1321        })?;
1322        if result.is_some() {
1323            self.record_blob_accesses(access_store.take_accessed_hashes());
1324        }
1325        Ok(result)
1326    }
1327
1328    /// Store a raw blob, returns SHA256 hash as hex.
1329    pub fn put_blob(&self, data: &[u8]) -> Result<String> {
1330        let hash = sha256(data);
1331        let inserted = self
1332            .router
1333            .put_sync(hash, data)
1334            .map_err(|e| anyhow::anyhow!("Failed to store blob: {}", e))?;
1335        if !inserted {
1336            self.record_blob_access_now(&hash);
1337        }
1338        Ok(to_hex(&hash))
1339    }
1340
1341    /// Store an owned Blossom blob under the configured durable storage limit.
1342    pub fn put_owned_blob(&self, data: &[u8], pubkey: &[u8; 32]) -> Result<String> {
1343        let hash = sha256(data);
1344        if !self
1345            .router
1346            .exists(&hash)
1347            .map_err(|e| anyhow::anyhow!("Failed to check blob: {}", e))?
1348        {
1349            self.make_room_for_durable_blob(data.len() as u64)?;
1350            self.router
1351                .put_sync(hash, data)
1352                .map_err(|e| anyhow::anyhow!("Failed to store blob: {}", e))?;
1353        } else {
1354            self.record_blob_access_now(&hash);
1355        }
1356        self.set_blob_owner_with_size(&hash, pubkey, data.len() as u64)?;
1357        Ok(to_hex(&hash))
1358    }
1359
1360    /// Store multiple owned Blossom blobs, batching raw blob and owner-index writes.
1361    pub fn put_owned_blobs(&self, items: &[(Hash, Vec<u8>)], pubkey: &[u8; 32]) -> Result<usize> {
1362        let started_at = Instant::now();
1363        let slow_log_ms = slow_owned_blob_batch_log_ms();
1364        if items.is_empty() {
1365            return Ok(0);
1366        }
1367        let incoming_bytes = items.iter().fold(0u64, |total, (_, data)| {
1368            total.saturating_add(data.len() as u64)
1369        });
1370        let count = items.len();
1371        let room_started = Instant::now();
1372        self.make_room_for_durable_blob(incoming_bytes)?;
1373        let make_room_ms = room_started.elapsed().as_millis();
1374        let raw_started = Instant::now();
1375        let inserted = self
1376            .router
1377            .put_many_sync(items)
1378            .map_err(|e| anyhow::anyhow!("Failed to store blob batch: {}", e))?;
1379        let raw_write_ms = raw_started.elapsed().as_millis();
1380
1381        let now = SystemTime::now()
1382            .duration_since(UNIX_EPOCH)
1383            .unwrap()
1384            .as_secs();
1385        let owner_started = Instant::now();
1386        let mut wtxn = self.env.write_txn()?;
1387        for (hash, data) in items {
1388            let owner_key = Self::blob_owner_key(hash, pubkey);
1389            self.blob_owners.put(&mut wtxn, &owner_key[..], &())?;
1390
1391            let index_key = Self::pubkey_blob_key(pubkey, hash);
1392            if self.pubkey_blob_index.get(&wtxn, &index_key[..])?.is_none() {
1393                let metadata = BlobMetadata {
1394                    sha256: to_hex(hash),
1395                    size: data.len() as u64,
1396                    mime_type: "application/octet-stream".to_string(),
1397                    uploaded: now,
1398                };
1399                self.pubkey_blob_index.put(
1400                    &mut wtxn,
1401                    &index_key[..],
1402                    &serde_json::to_vec(&metadata)?,
1403                )?;
1404            }
1405        }
1406        wtxn.commit()?;
1407        let owner_index_ms = owner_started.elapsed().as_millis();
1408        let total_ms = started_at.elapsed().as_millis();
1409        if slow_log_ms.is_some_and(|threshold| total_ms >= threshold) {
1410            tracing::warn!(
1411                blobs = count,
1412                inserted,
1413                incoming_bytes,
1414                total_ms,
1415                make_room_ms,
1416                raw_write_ms,
1417                owner_index_ms,
1418                "slow owned Blossom blob batch write"
1419            );
1420        }
1421        Ok(inserted)
1422    }
1423
1424    /// Store an opportunistically cached blob.
1425    ///
1426    /// Unlike durable `put_blob` writes, this path may evict disposable orphaned
1427    /// blobs to make room under storage pressure. It intentionally avoids touching
1428    /// indexed trees, social-graph roots, explicit pins, and owned Blossom blobs.
1429    pub fn put_cached_blob(&self, data: &[u8]) -> Result<String> {
1430        let hash = sha256(data);
1431        if self
1432            .router
1433            .exists(&hash)
1434            .map_err(|e| anyhow::anyhow!("Failed to check cached blob: {}", e))?
1435        {
1436            self.record_blob_access_now(&hash);
1437            return Ok(to_hex(&hash));
1438        }
1439
1440        let incoming_bytes = data.len() as u64;
1441        let _ = self.make_room_for_cached_blob(incoming_bytes);
1442
1443        let mut retried_after_cleanup = false;
1444        loop {
1445            match self.router.put_sync(hash, data) {
1446                Ok(_) => return Ok(to_hex(&hash)),
1447                Err(err) if !retried_after_cleanup && is_map_full_store_error(&err) => {
1448                    let freed = self.relieve_cached_blob_write_pressure(incoming_bytes)?;
1449                    if freed == 0 {
1450                        return Err(anyhow::anyhow!("Failed to store cached blob: {}", err));
1451                    }
1452                    retried_after_cleanup = true;
1453                }
1454                Err(err) => return Err(anyhow::anyhow!("Failed to store cached blob: {}", err)),
1455            }
1456        }
1457    }
1458
1459    /// Store multiple opportunistically cached blobs in one raw storage batch.
1460    pub fn put_cached_blobs(&self, items: &[(Hash, Vec<u8>)]) -> Result<usize> {
1461        let started_at = Instant::now();
1462        let slow_log_ms = slow_cached_blob_batch_log_ms();
1463        if items.is_empty() {
1464            return Ok(0);
1465        }
1466
1467        let missing_items;
1468        let write_items: &[(Hash, Vec<u8>)] = if cached_blob_batch_existing_precheck() {
1469            let mut sorted_hashes: Vec<Hash> = items.iter().map(|(hash, _)| *hash).collect();
1470            sorted_hashes.sort_unstable();
1471            sorted_hashes.dedup();
1472            let existing = self
1473                .router
1474                .existing_local_hashes_in_sorted_candidates(&sorted_hashes)
1475                .map_err(|e| anyhow::anyhow!("Failed to check cached blob batch: {}", e))?;
1476            let existing_hashes: HashSet<Hash> = sorted_hashes
1477                .into_iter()
1478                .zip(existing)
1479                .filter_map(|(hash, exists)| exists.then_some(hash))
1480                .collect();
1481            missing_items = items
1482                .iter()
1483                .filter(|(hash, _)| !existing_hashes.contains(hash))
1484                .cloned()
1485                .collect::<Vec<_>>();
1486            if missing_items.is_empty() {
1487                return Ok(0);
1488            }
1489            missing_items.as_slice()
1490        } else {
1491            items
1492        };
1493
1494        let incoming_bytes = write_items.iter().fold(0u64, |total, (_, data)| {
1495            total.saturating_add(data.len() as u64)
1496        });
1497        let room_started = Instant::now();
1498        let _ = self.make_room_for_cached_blob(incoming_bytes);
1499        let make_room_ms = room_started.elapsed().as_millis();
1500
1501        let mut retried_after_cleanup = false;
1502        loop {
1503            let raw_started = Instant::now();
1504            match self.router.put_many_sync(write_items) {
1505                Ok(inserted) => {
1506                    let raw_write_ms = raw_started.elapsed().as_millis();
1507                    let total_ms = started_at.elapsed().as_millis();
1508                    if slow_log_ms.is_some_and(|threshold| total_ms >= threshold) {
1509                        tracing::warn!(
1510                            blobs = write_items.len(),
1511                            inserted,
1512                            incoming_bytes,
1513                            total_ms,
1514                            make_room_ms,
1515                            raw_write_ms,
1516                            "slow cached Blossom blob batch write"
1517                        );
1518                    }
1519                    return Ok(inserted);
1520                }
1521                Err(err) if !retried_after_cleanup && is_map_full_store_error(&err) => {
1522                    let freed = self.relieve_cached_blob_write_pressure(incoming_bytes)?;
1523                    if freed == 0 {
1524                        return Err(anyhow::anyhow!(
1525                            "Failed to store cached blob batch: {}",
1526                            err
1527                        ));
1528                    }
1529                    retried_after_cleanup = true;
1530                }
1531                Err(err) => {
1532                    return Err(anyhow::anyhow!(
1533                        "Failed to store cached blob batch: {}",
1534                        err
1535                    ));
1536                }
1537            }
1538        }
1539    }
1540
1541    /// Get a raw blob by SHA256 hash (raw bytes).
1542    pub fn get_blob(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
1543        let data = self
1544            .router
1545            .get_sync(hash)
1546            .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))?;
1547        if data.is_some() {
1548            self.record_blob_accesses(std::iter::once(*hash));
1549        }
1550        Ok(data)
1551    }
1552
1553    pub fn get_blob_range(
1554        &self,
1555        hash: &[u8; 32],
1556        start: u64,
1557        end_inclusive: u64,
1558    ) -> Result<Option<Vec<u8>>> {
1559        let data = self
1560            .router
1561            .get_range_sync(hash, start, end_inclusive)
1562            .map_err(|e| anyhow::anyhow!("Failed to get blob range: {}", e))?;
1563        if data.is_some() {
1564            self.record_blob_accesses(std::iter::once(*hash));
1565        }
1566        Ok(data)
1567    }
1568
1569    pub fn blob_size(&self, hash: &[u8; 32]) -> Result<Option<u64>> {
1570        self.router
1571            .blob_size_sync(hash)
1572            .map_err(|e| anyhow::anyhow!("Failed to get blob size: {}", e))
1573    }
1574
1575    /// Check if a blob exists by SHA256 hash (raw bytes).
1576    pub fn blob_exists(&self, hash: &[u8; 32]) -> Result<bool> {
1577        self.router
1578            .exists(hash)
1579            .map_err(|e| anyhow::anyhow!("Failed to check blob: {}", e))
1580    }
1581
1582    // === Blossom ownership tracking ===
1583    // Uses composite key: sha256 (32 bytes) ++ pubkey (32 bytes) -> ()
1584    // This allows efficient multi-owner tracking with O(1) lookups
1585
1586    /// Build composite key for blob_owners: sha256 ++ pubkey (64 bytes total)
1587    fn blob_owner_key(sha256: &[u8; 32], pubkey: &[u8; 32]) -> [u8; 64] {
1588        let mut key = [0u8; 64];
1589        key[..32].copy_from_slice(sha256);
1590        key[32..].copy_from_slice(pubkey);
1591        key
1592    }
1593
1594    fn pubkey_blob_key(pubkey: &[u8; 32], sha256: &[u8; 32]) -> [u8; 64] {
1595        let mut key = [0u8; 64];
1596        key[..32].copy_from_slice(pubkey);
1597        key[32..].copy_from_slice(sha256);
1598        key
1599    }
1600
1601    /// Add an owner (pubkey) to a blob for Blossom protocol
1602    /// Multiple users can own the same blob - it's only deleted when all owners remove it
1603    pub fn set_blob_owner(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<()> {
1604        let size = self
1605            .router
1606            .blob_size_sync(sha256)
1607            .map_err(|e| anyhow::anyhow!("Failed to get blob size: {}", e))?
1608            .unwrap_or(0);
1609        self.set_blob_owner_with_size(sha256, pubkey, size)
1610    }
1611
1612    fn set_blob_owner_with_size(
1613        &self,
1614        sha256: &[u8; 32],
1615        pubkey: &[u8; 32],
1616        size: u64,
1617    ) -> Result<()> {
1618        let key = Self::blob_owner_key(sha256, pubkey);
1619        let index_key = Self::pubkey_blob_key(pubkey, sha256);
1620        let mut wtxn = self.env.write_txn()?;
1621
1622        // Add ownership entry (idempotent - put overwrites)
1623        self.blob_owners.put(&mut wtxn, &key[..], &())?;
1624
1625        if self.pubkey_blob_index.get(&wtxn, &index_key[..])?.is_none() {
1626            let now = SystemTime::now()
1627                .duration_since(UNIX_EPOCH)
1628                .unwrap()
1629                .as_secs();
1630            let metadata = BlobMetadata {
1631                sha256: to_hex(sha256),
1632                size,
1633                mime_type: "application/octet-stream".to_string(),
1634                uploaded: now,
1635            };
1636            self.pubkey_blob_index.put(
1637                &mut wtxn,
1638                &index_key[..],
1639                &serde_json::to_vec(&metadata)?,
1640            )?;
1641        }
1642
1643        wtxn.commit()?;
1644        Ok(())
1645    }
1646
1647    /// Check if a pubkey owns a blob
1648    pub fn is_blob_owner(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<bool> {
1649        let key = Self::blob_owner_key(sha256, pubkey);
1650        let rtxn = self.env.read_txn()?;
1651        Ok(self.blob_owners.get(&rtxn, &key[..])?.is_some())
1652    }
1653
1654    /// Get all owners (pubkeys) of a blob via prefix scan (returns raw bytes)
1655    pub fn get_blob_owners(&self, sha256: &[u8; 32]) -> Result<Vec<[u8; 32]>> {
1656        let rtxn = self.env.read_txn()?;
1657
1658        let mut owners = Vec::new();
1659        for item in self.blob_owners.prefix_iter(&rtxn, &sha256[..])? {
1660            let (key, _) = item?;
1661            if key.len() == 64 {
1662                // Extract pubkey from composite key (bytes 32-64)
1663                let mut pubkey = [0u8; 32];
1664                pubkey.copy_from_slice(&key[32..64]);
1665                owners.push(pubkey);
1666            }
1667        }
1668        Ok(owners)
1669    }
1670
1671    /// Check if blob has any owners
1672    pub fn blob_has_owners(&self, sha256: &[u8; 32]) -> Result<bool> {
1673        let rtxn = self.env.read_txn()?;
1674
1675        // Just check if any entry exists with this prefix
1676        for item in self.blob_owners.prefix_iter(&rtxn, &sha256[..])? {
1677            if item.is_ok() {
1678                return Ok(true);
1679            }
1680        }
1681        Ok(false)
1682    }
1683
1684    /// Get the first owner (pubkey) of a blob (for backwards compatibility)
1685    pub fn get_blob_owner(&self, sha256: &[u8; 32]) -> Result<Option<[u8; 32]>> {
1686        Ok(self.get_blob_owners(sha256)?.into_iter().next())
1687    }
1688
1689    /// Remove a user's ownership of a blossom blob
1690    /// Only deletes the actual blob when no owners remain
1691    /// Returns true if the blob was actually deleted (no owners left)
1692    pub fn delete_blossom_blob(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<bool> {
1693        let key = Self::blob_owner_key(sha256, pubkey);
1694        let mut wtxn = self.env.write_txn()?;
1695
1696        // Remove this pubkey's ownership entry
1697        self.blob_owners.delete(&mut wtxn, &key[..])?;
1698        self.pubkey_blob_index
1699            .delete(&mut wtxn, &Self::pubkey_blob_key(pubkey, sha256)[..])?;
1700
1701        // Hex strings for logging and BlobMetadata (which stores sha256 as hex string)
1702        let sha256_hex = to_hex(sha256);
1703
1704        // Remove from pubkey's blob list
1705        if let Some(blobs_bytes) = self.pubkey_blobs.get(&wtxn, pubkey)? {
1706            if let Ok(mut blobs) = serde_json::from_slice::<Vec<BlobMetadata>>(blobs_bytes) {
1707                blobs.retain(|b| b.sha256 != sha256_hex);
1708                let blobs_json = serde_json::to_vec(&blobs)?;
1709                self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
1710            }
1711        }
1712
1713        // Check if any other owners remain (prefix scan)
1714        let mut has_other_owners = false;
1715        for item in self.blob_owners.prefix_iter(&wtxn, &sha256[..])? {
1716            if item.is_ok() {
1717                has_other_owners = true;
1718                break;
1719            }
1720        }
1721
1722        if has_other_owners {
1723            wtxn.commit()?;
1724            tracing::debug!(
1725                "Removed {} from blob {} owners, other owners remain",
1726                &to_hex(pubkey)[..8],
1727                &sha256_hex[..8]
1728            );
1729            return Ok(false);
1730        }
1731
1732        // No owners left - delete the blob completely
1733        tracing::info!(
1734            "All owners removed from blob {}, deleting",
1735            &sha256_hex[..8]
1736        );
1737
1738        // Delete raw blob (by content hash) - this deletes from S3 too
1739        let _ = self.router.delete_sync(sha256);
1740
1741        wtxn.commit()?;
1742        Ok(true)
1743    }
1744
1745    /// List all blobs owned by a pubkey (for Blossom /list endpoint)
1746    pub fn list_blobs_by_pubkey(
1747        &self,
1748        pubkey: &[u8; 32],
1749    ) -> Result<Vec<crate::server::blossom::BlobDescriptor>> {
1750        let rtxn = self.env.read_txn()?;
1751
1752        let mut blobs: Vec<BlobMetadata> = self
1753            .pubkey_blobs
1754            .get(&rtxn, pubkey)?
1755            .and_then(|b| serde_json::from_slice(b).ok())
1756            .unwrap_or_default();
1757        let mut seen: HashSet<String> = blobs.iter().map(|blob| blob.sha256.clone()).collect();
1758
1759        for item in self.pubkey_blob_index.prefix_iter(&rtxn, pubkey)? {
1760            let (_, metadata_bytes) = item?;
1761            let metadata: BlobMetadata = match serde_json::from_slice(metadata_bytes) {
1762                Ok(metadata) => metadata,
1763                Err(_) => continue,
1764            };
1765            if seen.insert(metadata.sha256.clone()) {
1766                blobs.push(metadata);
1767            }
1768        }
1769
1770        Ok(blobs
1771            .into_iter()
1772            .map(|b| crate::server::blossom::BlobDescriptor {
1773                url: format!("/{}", b.sha256),
1774                sha256: b.sha256,
1775                size: b.size,
1776                mime_type: b.mime_type,
1777                uploaded: b.uploaded,
1778            })
1779            .collect())
1780    }
1781
1782    /// Get a single chunk/blob by hash (raw bytes)
1783    pub fn get_chunk(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
1784        let data = self
1785            .router
1786            .get_sync(hash)
1787            .map_err(|e| anyhow::anyhow!("Failed to get chunk: {}", e))?;
1788        if data.is_some() {
1789            self.record_blob_accesses(std::iter::once(*hash));
1790        }
1791        Ok(data)
1792    }
1793
1794    /// Get file content by hash (raw bytes)
1795    /// Returns raw bytes (caller handles decryption if needed)
1796    pub fn get_file(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
1797        let (tree, access_store) = self.access_tracking_tree();
1798
1799        let result = sync_block_on(async {
1800            tree.read_file(hash)
1801                .await
1802                .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
1803        })?;
1804        if result.is_some() {
1805            self.record_blob_accesses(access_store.take_accessed_hashes());
1806        }
1807        Ok(result)
1808    }
1809
1810    /// Get file content by Cid (hash + optional decryption key as raw bytes)
1811    /// Handles decryption automatically if key is present
1812    pub fn get_file_by_cid(&self, cid: &Cid) -> Result<Option<Vec<u8>>> {
1813        let (tree, access_store) = self.access_tracking_tree();
1814
1815        let result = sync_block_on(async {
1816            tree.get(cid, None)
1817                .await
1818                .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
1819        })?;
1820        if result.is_some() {
1821            self.record_blob_accesses(access_store.take_accessed_hashes());
1822        }
1823        Ok(result)
1824    }
1825
1826    fn ensure_cid_exists(&self, cid: &Cid) -> Result<()> {
1827        let exists = self
1828            .router
1829            .exists(&cid.hash)
1830            .map_err(|e| anyhow::anyhow!("Failed to check cid existence: {}", e))?;
1831        if !exists {
1832            anyhow::bail!("CID not found: {}", to_hex(&cid.hash));
1833        }
1834        Ok(())
1835    }
1836
1837    /// Stream file content identified by Cid into a writer without buffering full file in memory.
1838    pub fn write_file_by_cid_to_writer<W: Write>(&self, cid: &Cid, writer: &mut W) -> Result<u64> {
1839        self.ensure_cid_exists(cid)?;
1840
1841        let (tree, access_store) = self.access_tracking_tree();
1842        let mut total_bytes = 0u64;
1843        let mut streamed_any_chunk = false;
1844
1845        sync_block_on(async {
1846            let mut stream = tree.get_stream(cid);
1847            while let Some(chunk) = stream.next().await {
1848                streamed_any_chunk = true;
1849                let chunk =
1850                    chunk.map_err(|e| anyhow::anyhow!("Failed to stream file chunk: {}", e))?;
1851                writer
1852                    .write_all(&chunk)
1853                    .map_err(|e| anyhow::anyhow!("Failed to write file chunk: {}", e))?;
1854                total_bytes += chunk.len() as u64;
1855            }
1856            Ok::<(), anyhow::Error>(())
1857        })?;
1858
1859        if !streamed_any_chunk {
1860            anyhow::bail!("CID not found: {}", to_hex(&cid.hash));
1861        }
1862        self.record_blob_accesses(access_store.take_accessed_hashes());
1863
1864        writer
1865            .flush()
1866            .map_err(|e| anyhow::anyhow!("Failed to flush output: {}", e))?;
1867        Ok(total_bytes)
1868    }
1869
1870    /// Stream file content identified by Cid directly into a destination path.
1871    pub fn write_file_by_cid<P: AsRef<Path>>(&self, cid: &Cid, output_path: P) -> Result<u64> {
1872        self.ensure_cid_exists(cid)?;
1873
1874        let output_path = output_path.as_ref();
1875        if let Some(parent) = output_path.parent() {
1876            if !parent.as_os_str().is_empty() {
1877                std::fs::create_dir_all(parent).with_context(|| {
1878                    format!("Failed to create output directory {}", parent.display())
1879                })?;
1880            }
1881        }
1882
1883        let mut file = std::fs::File::create(output_path)
1884            .with_context(|| format!("Failed to create output file {}", output_path.display()))?;
1885        self.write_file_by_cid_to_writer(cid, &mut file)
1886    }
1887
1888    /// Stream a public (unencrypted) file by hash directly into a destination path.
1889    pub fn write_file<P: AsRef<Path>>(&self, hash: &[u8; 32], output_path: P) -> Result<u64> {
1890        self.write_file_by_cid(&Cid::public(*hash), output_path)
1891    }
1892
1893    /// Resolve a path within a tree (returns Cid with key if encrypted)
1894    pub fn resolve_path(&self, cid: &Cid, path: &str) -> Result<Option<Cid>> {
1895        let (tree, access_store) = self.access_tracking_tree();
1896
1897        let result = sync_block_on(async {
1898            tree.resolve_path(cid, path)
1899                .await
1900                .map_err(|e| anyhow::anyhow!("Failed to resolve path: {}", e))
1901        })?;
1902        if result.is_some() {
1903            self.record_blob_accesses(access_store.take_accessed_hashes());
1904        }
1905        Ok(result)
1906    }
1907
1908    /// Get chunk metadata for a file (chunk list, sizes, total size)
1909    pub fn get_file_chunk_metadata(&self, hash: &[u8; 32]) -> Result<Option<FileChunkMetadata>> {
1910        let access_store = AccessRecordingStore::new(self.store_arc());
1911        let tree = HashTree::new(HashTreeConfig::new(Arc::new(access_store.clone())).public());
1912
1913        let metadata: Result<Option<FileChunkMetadata>> = sync_block_on(async {
1914            // First check if the hash exists in the store at all
1915            // (either as a blob or tree node)
1916            let exists = access_store
1917                .has(hash)
1918                .await
1919                .map_err(|e| anyhow::anyhow!("Failed to check existence: {}", e))?;
1920
1921            if !exists {
1922                return Ok(None);
1923            }
1924
1925            // Get total size
1926            let total_size = tree
1927                .get_size(hash)
1928                .await
1929                .map_err(|e| anyhow::anyhow!("Failed to get size: {}", e))?;
1930
1931            // Check if it's a tree (chunked) or blob
1932            let is_tree_node = tree
1933                .is_tree(hash)
1934                .await
1935                .map_err(|e| anyhow::anyhow!("Failed to check tree: {}", e))?;
1936
1937            if !is_tree_node {
1938                // Single blob, not chunked
1939                return Ok(Some(FileChunkMetadata {
1940                    total_size,
1941                    chunk_hashes: vec![],
1942                    chunk_sizes: vec![],
1943                    is_chunked: false,
1944                }));
1945            }
1946
1947            // Get tree node to extract chunk info
1948            let node = match tree
1949                .get_tree_node(hash)
1950                .await
1951                .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
1952            {
1953                Some(n) => n,
1954                None => return Ok(None),
1955            };
1956
1957            // Check if it's a directory (has named links)
1958            let is_directory = tree
1959                .is_directory(hash)
1960                .await
1961                .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1962
1963            if is_directory {
1964                return Ok(None); // Not a file
1965            }
1966
1967            // Extract chunk info from links
1968            let chunk_hashes: Vec<Hash> = node.links.iter().map(|l| l.hash).collect();
1969            let chunk_sizes: Vec<u64> = node.links.iter().map(|l| l.size).collect();
1970
1971            Ok(Some(FileChunkMetadata {
1972                total_size,
1973                chunk_hashes,
1974                chunk_sizes,
1975                is_chunked: !node.links.is_empty(),
1976            }))
1977        });
1978        let metadata = metadata?;
1979        if metadata.is_some() {
1980            self.record_blob_accesses(access_store.take_accessed_hashes());
1981        }
1982        Ok(metadata)
1983    }
1984
1985    /// Get byte range from file
1986    pub fn get_file_range(
1987        &self,
1988        hash: &[u8; 32],
1989        start: u64,
1990        end: Option<u64>,
1991    ) -> Result<Option<(Vec<u8>, u64)>> {
1992        let metadata = match self.get_file_chunk_metadata(hash)? {
1993            Some(m) => m,
1994            None => return Ok(None),
1995        };
1996
1997        if metadata.total_size == 0 {
1998            return Ok(Some((Vec::new(), 0)));
1999        }
2000
2001        if start >= metadata.total_size {
2002            return Ok(None);
2003        }
2004
2005        let end = end
2006            .unwrap_or(metadata.total_size - 1)
2007            .min(metadata.total_size - 1);
2008
2009        // For non-chunked files, load entire file
2010        if !metadata.is_chunked {
2011            let content = self.get_file(hash)?.unwrap_or_default();
2012            let range_content = if start < content.len() as u64 {
2013                content[start as usize..=(end as usize).min(content.len() - 1)].to_vec()
2014            } else {
2015                Vec::new()
2016            };
2017            return Ok(Some((range_content, metadata.total_size)));
2018        }
2019
2020        // For chunked files, load only needed chunks
2021        let mut result = Vec::new();
2022        let mut current_offset = 0u64;
2023
2024        for (i, chunk_hash) in metadata.chunk_hashes.iter().enumerate() {
2025            let chunk_size = metadata.chunk_sizes[i];
2026            let chunk_end = current_offset + chunk_size - 1;
2027
2028            // Check if this chunk overlaps with requested range
2029            if chunk_end >= start && current_offset <= end {
2030                let chunk_content = match self.get_chunk(chunk_hash)? {
2031                    Some(content) => content,
2032                    None => {
2033                        return Err(anyhow::anyhow!("Chunk {} not found", to_hex(chunk_hash)));
2034                    }
2035                };
2036
2037                let chunk_read_start = if current_offset >= start {
2038                    0
2039                } else {
2040                    (start - current_offset) as usize
2041                };
2042
2043                let chunk_read_end = if chunk_end <= end {
2044                    chunk_size as usize - 1
2045                } else {
2046                    (end - current_offset) as usize
2047                };
2048
2049                result.extend_from_slice(&chunk_content[chunk_read_start..=chunk_read_end]);
2050            }
2051
2052            current_offset += chunk_size;
2053
2054            if current_offset > end {
2055                break;
2056            }
2057        }
2058
2059        Ok(Some((result, metadata.total_size)))
2060    }
2061
2062    /// Stream file range as chunks using Arc for async/Send contexts
2063    pub fn stream_file_range_chunks_owned(
2064        self: Arc<Self>,
2065        hash: &[u8; 32],
2066        start: u64,
2067        end: u64,
2068    ) -> Result<Option<FileRangeChunksOwned>> {
2069        let metadata = match self.get_file_chunk_metadata(hash)? {
2070            Some(m) => m,
2071            None => return Ok(None),
2072        };
2073
2074        if metadata.total_size == 0 || start >= metadata.total_size {
2075            return Ok(None);
2076        }
2077
2078        let end = end.min(metadata.total_size - 1);
2079
2080        Ok(Some(FileRangeChunksOwned {
2081            store: self,
2082            metadata,
2083            start,
2084            end,
2085            current_chunk_idx: 0,
2086            current_offset: 0,
2087        }))
2088    }
2089
2090    /// Get directory structure by hash (raw bytes)
2091    pub fn get_directory_listing(&self, hash: &[u8; 32]) -> Result<Option<DirectoryListing>> {
2092        let (tree, access_store) = self.access_tracking_tree();
2093
2094        let listing: Result<Option<DirectoryListing>> = sync_block_on(async {
2095            // Check if it's a directory
2096            let is_dir = tree
2097                .is_directory(hash)
2098                .await
2099                .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
2100
2101            if !is_dir {
2102                return Ok(None);
2103            }
2104
2105            // Get directory entries (public Cid - no encryption key)
2106            let cid = hashtree_core::Cid::public(*hash);
2107            let tree_entries = tree
2108                .list_directory(&cid)
2109                .await
2110                .map_err(|e| anyhow::anyhow!("Failed to list directory: {}", e))?;
2111
2112            let entries: Vec<DirEntry> = tree_entries
2113                .into_iter()
2114                .map(|e| DirEntry {
2115                    name: e.name,
2116                    cid: to_hex(&e.hash),
2117                    is_directory: e.link_type.is_tree(),
2118                    size: e.size,
2119                })
2120                .collect();
2121
2122            Ok(Some(DirectoryListing {
2123                dir_name: String::new(),
2124                entries,
2125            }))
2126        });
2127        let listing = listing?;
2128        if listing.is_some() {
2129            self.record_blob_accesses(access_store.take_accessed_hashes());
2130        }
2131        Ok(listing)
2132    }
2133
2134    /// Get directory structure by CID, supporting encrypted directories.
2135    pub fn get_directory_listing_by_cid(&self, cid: &Cid) -> Result<Option<DirectoryListing>> {
2136        let (tree, access_store) = self.access_tracking_tree();
2137        let cid = cid.clone();
2138
2139        let listing: Result<Option<DirectoryListing>> = sync_block_on(async {
2140            let is_dir = tree
2141                .is_dir(&cid)
2142                .await
2143                .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
2144
2145            if !is_dir {
2146                return Ok(None);
2147            }
2148
2149            let tree_entries = tree
2150                .list_directory(&cid)
2151                .await
2152                .map_err(|e| anyhow::anyhow!("Failed to list directory: {}", e))?;
2153
2154            let entries: Vec<DirEntry> = tree_entries
2155                .into_iter()
2156                .map(|e| DirEntry {
2157                    name: e.name,
2158                    cid: Cid {
2159                        hash: e.hash,
2160                        key: e.key,
2161                    }
2162                    .to_string(),
2163                    is_directory: e.link_type.is_tree(),
2164                    size: e.size,
2165                })
2166                .collect();
2167
2168            Ok(Some(DirectoryListing {
2169                dir_name: String::new(),
2170                entries,
2171            }))
2172        });
2173        let listing = listing?;
2174        if listing.is_some() {
2175            self.record_blob_accesses(access_store.take_accessed_hashes());
2176        }
2177        Ok(listing)
2178    }
2179
2180    // === Cached roots ===
2181
2182    /// Persist a mutable published ref that should stay subscribed.
2183    pub fn add_pinned_ref(&self, key: &str) -> Result<()> {
2184        let mut wtxn = self.env.write_txn()?;
2185        self.pinned_refs.put(&mut wtxn, key, &())?;
2186        wtxn.commit()?;
2187        Ok(())
2188    }
2189
2190    /// Remove a mutable published ref from the live pinned set.
2191    pub fn remove_pinned_ref(&self, key: &str) -> Result<bool> {
2192        let mut wtxn = self.env.write_txn()?;
2193        let removed = self.pinned_refs.delete(&mut wtxn, key)?;
2194        wtxn.commit()?;
2195        Ok(removed)
2196    }
2197
2198    /// List mutable published refs that should stay subscribed.
2199    pub fn list_pinned_refs(&self) -> Result<Vec<String>> {
2200        let rtxn = self.env.read_txn()?;
2201        let mut refs = Vec::new();
2202
2203        for item in self.pinned_refs.iter(&rtxn)? {
2204            let (key, _) = item?;
2205            refs.push(key.to_string());
2206        }
2207
2208        refs.sort();
2209        Ok(refs)
2210    }
2211
2212    /// Persist an author whose published trees should stay mirrored.
2213    pub fn add_tracked_author(&self, npub: &str) -> Result<bool> {
2214        let mut wtxn = self.env.write_txn()?;
2215        let inserted = self.tracked_authors.get(&wtxn, npub)?.is_none();
2216        self.tracked_authors.put(&mut wtxn, npub, &())?;
2217        wtxn.commit()?;
2218        Ok(inserted)
2219    }
2220
2221    /// Remove an author from the continuous mirror set.
2222    pub fn remove_tracked_author(&self, npub: &str) -> Result<bool> {
2223        let mut wtxn = self.env.write_txn()?;
2224        let removed = self.tracked_authors.delete(&mut wtxn, npub)?;
2225        wtxn.commit()?;
2226        Ok(removed)
2227    }
2228
2229    /// List authors whose published trees should stay mirrored.
2230    pub fn list_tracked_authors(&self) -> Result<Vec<String>> {
2231        let rtxn = self.env.read_txn()?;
2232        let mut authors = Vec::new();
2233
2234        for item in self.tracked_authors.iter(&rtxn)? {
2235            let (npub, _) = item?;
2236            authors.push(npub.to_string());
2237        }
2238
2239        authors.sort();
2240        Ok(authors)
2241    }
2242
2243    /// Get cached root for a pubkey/tree_name pair
2244    pub fn get_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<Option<CachedRoot>> {
2245        let key = format!("{}/{}", pubkey_hex, tree_name);
2246        let rtxn = self.env.read_txn()?;
2247        if let Some(bytes) = self.cached_roots.get(&rtxn, &key)? {
2248            let root: CachedRoot = rmp_serde::from_slice(bytes)
2249                .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
2250            Ok(Some(root))
2251        } else {
2252            Ok(None)
2253        }
2254    }
2255
2256    /// Set cached root for a pubkey/tree_name pair
2257    pub fn set_cached_root(
2258        &self,
2259        pubkey_hex: &str,
2260        tree_name: &str,
2261        hash: &str,
2262        key: Option<&str>,
2263        visibility: &str,
2264        updated_at: u64,
2265    ) -> Result<()> {
2266        let db_key = format!("{}/{}", pubkey_hex, tree_name);
2267        let root = CachedRoot {
2268            hash: hash.to_string(),
2269            key: key.map(|k| k.to_string()),
2270            updated_at,
2271            visibility: visibility.to_string(),
2272        };
2273        let bytes = rmp_serde::to_vec(&root)
2274            .map_err(|e| anyhow::anyhow!("Failed to serialize CachedRoot: {}", e))?;
2275        let mut wtxn = self.env.write_txn()?;
2276        self.cached_roots.put(&mut wtxn, &db_key, &bytes)?;
2277        wtxn.commit()?;
2278        Ok(())
2279    }
2280
2281    /// List all cached roots for a pubkey
2282    pub fn list_cached_roots(&self, pubkey_hex: &str) -> Result<Vec<(String, CachedRoot)>> {
2283        let prefix = format!("{}/", pubkey_hex);
2284        let rtxn = self.env.read_txn()?;
2285        let mut results = Vec::new();
2286
2287        for item in self.cached_roots.iter(&rtxn)? {
2288            let (key, bytes) = item?;
2289            if key.starts_with(&prefix) {
2290                let tree_name = key.strip_prefix(&prefix).unwrap_or(key);
2291                let root: CachedRoot = rmp_serde::from_slice(bytes)
2292                    .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
2293                results.push((tree_name.to_string(), root));
2294            }
2295        }
2296
2297        Ok(results)
2298    }
2299
2300    /// Delete a cached root
2301    pub fn delete_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<bool> {
2302        let key = format!("{}/{}", pubkey_hex, tree_name);
2303        let mut wtxn = self.env.write_txn()?;
2304        let deleted = self.cached_roots.delete(&mut wtxn, &key)?;
2305        wtxn.commit()?;
2306        Ok(deleted)
2307    }
2308}
2309
2310fn is_map_full_store_error(err: &StoreError) -> bool {
2311    let message = err.to_string();
2312    message.contains("MDB_MAP_FULL") || message.contains("MapFull")
2313}
2314
2315#[derive(Debug, Clone)]
2316pub struct FileChunkMetadata {
2317    pub total_size: u64,
2318    pub chunk_hashes: Vec<Hash>,
2319    pub chunk_sizes: Vec<u64>,
2320    pub is_chunked: bool,
2321}
2322
2323/// Owned iterator for async streaming
2324pub struct FileRangeChunksOwned {
2325    store: Arc<HashtreeStore>,
2326    metadata: FileChunkMetadata,
2327    start: u64,
2328    end: u64,
2329    current_chunk_idx: usize,
2330    current_offset: u64,
2331}
2332
2333impl Iterator for FileRangeChunksOwned {
2334    type Item = Result<Vec<u8>>;
2335
2336    fn next(&mut self) -> Option<Self::Item> {
2337        if !self.metadata.is_chunked || self.current_chunk_idx >= self.metadata.chunk_hashes.len() {
2338            return None;
2339        }
2340
2341        if self.current_offset > self.end {
2342            return None;
2343        }
2344
2345        let chunk_hash = &self.metadata.chunk_hashes[self.current_chunk_idx];
2346        let chunk_size = self.metadata.chunk_sizes[self.current_chunk_idx];
2347        let chunk_end = self.current_offset + chunk_size - 1;
2348
2349        self.current_chunk_idx += 1;
2350
2351        if chunk_end < self.start || self.current_offset > self.end {
2352            self.current_offset += chunk_size;
2353            return self.next();
2354        }
2355
2356        let chunk_content = match self.store.get_chunk(chunk_hash) {
2357            Ok(Some(content)) => content,
2358            Ok(None) => {
2359                return Some(Err(anyhow::anyhow!(
2360                    "Chunk {} not found",
2361                    to_hex(chunk_hash)
2362                )));
2363            }
2364            Err(e) => {
2365                return Some(Err(e));
2366            }
2367        };
2368
2369        let chunk_read_start = if self.current_offset >= self.start {
2370            0
2371        } else {
2372            (self.start - self.current_offset) as usize
2373        };
2374
2375        let chunk_read_end = if chunk_end <= self.end {
2376            chunk_size as usize - 1
2377        } else {
2378            (self.end - self.current_offset) as usize
2379        };
2380
2381        let result = chunk_content[chunk_read_start..=chunk_read_end].to_vec();
2382        self.current_offset += chunk_size;
2383
2384        Some(Ok(result))
2385    }
2386}
2387
2388#[derive(Debug)]
2389pub struct GcStats {
2390    pub deleted_dags: usize,
2391    pub freed_bytes: u64,
2392}
2393
2394#[derive(Debug, Clone)]
2395pub struct DirEntry {
2396    pub name: String,
2397    pub cid: String,
2398    pub is_directory: bool,
2399    pub size: u64,
2400}
2401
2402#[derive(Debug, Clone)]
2403pub struct DirectoryListing {
2404    pub dir_name: String,
2405    pub entries: Vec<DirEntry>,
2406}
2407
2408/// Blob metadata for Blossom protocol
2409#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
2410pub struct BlobMetadata {
2411    pub sha256: String,
2412    pub size: u64,
2413    pub mime_type: String,
2414    pub uploaded: u64,
2415}
2416
2417// Implement ContentStore trait for WebRTC data exchange
2418impl crate::webrtc::ContentStore for HashtreeStore {
2419    fn get(&self, hash_hex: &str) -> Result<Option<Vec<u8>>> {
2420        let hash = from_hex(hash_hex).map_err(|e| anyhow::anyhow!("Invalid hash: {}", e))?;
2421        self.get_chunk(&hash)
2422    }
2423}
2424
2425#[cfg(test)]
2426mod tests {
2427    use super::*;
2428    #[cfg(feature = "lmdb")]
2429    use tempfile::TempDir;
2430
2431    #[test]
2432    fn blob_access_update_gate_deduplicates_and_throttles() {
2433        let gate = BlobAccessUpdateGate::default();
2434        let first = sha256(b"first");
2435        let second = sha256(b"second");
2436
2437        assert_eq!(
2438            gate.due_hashes([first, first, second], 10),
2439            vec![first, second]
2440        );
2441        assert!(gate.due_hashes([first, second], 11).is_empty());
2442        assert_eq!(
2443            gate.due_hashes([second, first], 10 + ACCESS_UPDATE_INTERVAL_SECS),
2444            vec![second, first]
2445        );
2446    }
2447
2448    #[cfg(feature = "lmdb")]
2449    #[test]
2450    fn hashtree_store_expands_blob_lmdb_map_size_to_storage_budget() -> Result<()> {
2451        let temp = TempDir::new()?;
2452        let requested = LMDB_BLOB_MIN_MAP_SIZE_BYTES + 64 * 1024 * 1024;
2453        let store = HashtreeStore::with_options_and_backend(
2454            temp.path(),
2455            None,
2456            requested,
2457            true,
2458            &StorageBackend::Lmdb,
2459        )?;
2460
2461        let map_size = match store.router.local.as_ref() {
2462            LocalStore::Lmdb(local) => local.map_size_bytes() as u64,
2463            LocalStore::Fs(_) => panic!("expected LMDB local store"),
2464        };
2465
2466        assert!(
2467            map_size >= requested,
2468            "expected blob LMDB map to grow to at least {requested} bytes, got {map_size}"
2469        );
2470
2471        drop(store);
2472        Ok(())
2473    }
2474
2475    #[cfg(feature = "lmdb")]
2476    #[test]
2477    fn hashtree_store_expands_metadata_lmdb_map_size_to_storage_budget() -> Result<()> {
2478        let temp = TempDir::new()?;
2479        let storage_budget = 256 * 1024 * 1024 * 1024u64;
2480        let expected = lmdb_metadata_map_size_for_storage_budget(storage_budget);
2481        let store = HashtreeStore::with_options_and_backend(
2482            temp.path(),
2483            None,
2484            storage_budget,
2485            true,
2486            &StorageBackend::Lmdb,
2487        )?;
2488
2489        let map_size = store.env.info().map_size as u64;
2490        assert!(
2491            map_size >= expected,
2492            "expected metadata LMDB map to grow to at least {expected} bytes, got {map_size}"
2493        );
2494
2495        drop(store);
2496        Ok(())
2497    }
2498
2499    #[cfg(feature = "lmdb")]
2500    #[test]
2501    fn lmdb_map_size_for_existing_env_keeps_matching_requested_size() -> Result<()> {
2502        let temp = TempDir::new()?;
2503        let requested = LMDB_METADATA_MIN_MAP_SIZE_BYTES;
2504        std::fs::File::create(temp.path().join("data.mdb"))?.set_len(requested)?;
2505
2506        let map_size = lmdb_map_size_for_existing_env(temp.path(), requested)? as u64;
2507
2508        assert_eq!(map_size, align_lmdb_map_size(requested));
2509        Ok(())
2510    }
2511
2512    #[cfg(feature = "lmdb")]
2513    #[test]
2514    fn lmdb_map_size_for_existing_env_adds_headroom_when_existing_is_larger() -> Result<()> {
2515        let temp = TempDir::new()?;
2516        let requested = LMDB_METADATA_MIN_MAP_SIZE_BYTES;
2517        let existing = requested + 4096;
2518        std::fs::File::create(temp.path().join("data.mdb"))?.set_len(existing)?;
2519
2520        let map_size = lmdb_map_size_for_existing_env(temp.path(), requested)? as u64;
2521        let expected = align_lmdb_map_size(existing + LMDB_METADATA_REOPEN_HEADROOM_BYTES);
2522
2523        assert_eq!(map_size, expected);
2524        Ok(())
2525    }
2526
2527    #[cfg(feature = "lmdb")]
2528    #[test]
2529    fn local_store_can_override_lmdb_map_size() -> Result<()> {
2530        let temp = TempDir::new()?;
2531        let requested = 512 * 1024 * 1024u64;
2532        let store = LocalStore::new_with_lmdb_map_size(
2533            temp.path().join("lmdb-blobs"),
2534            &StorageBackend::Lmdb,
2535            Some(requested),
2536        )?;
2537
2538        let map_size = match store {
2539            LocalStore::Lmdb(local) => local.map_size_bytes() as u64,
2540            LocalStore::Fs(_) => panic!("expected LMDB local store"),
2541        };
2542
2543        assert!(
2544            map_size >= requested,
2545            "expected LMDB map to grow to at least {requested} bytes, got {map_size}"
2546        );
2547
2548        Ok(())
2549    }
2550
2551    #[cfg(feature = "lmdb")]
2552    #[test]
2553    fn lmdb_local_store_removes_stale_fs_blob_shard_dirs() -> Result<()> {
2554        let temp = TempDir::new()?;
2555        let path = temp.path().join("lmdb-blobs");
2556        std::fs::create_dir_all(path.join("aa"))?;
2557        std::fs::create_dir_all(path.join("b2"))?;
2558        std::fs::create_dir_all(path.join("keep-me"))?;
2559        std::fs::write(path.join("aa").join("blob.bin"), b"old fs shard")?;
2560        std::fs::write(path.join("b2").join("blob.bin"), b"old fs shard")?;
2561        std::fs::write(path.join("keep-me").join("note.txt"), b"keep")?;
2562
2563        let _store = LocalStore::new_with_lmdb_map_size(
2564            &path,
2565            &StorageBackend::Lmdb,
2566            Some(128 * 1024 * 1024),
2567        )?;
2568
2569        assert!(!path.join("aa").exists());
2570        assert!(!path.join("b2").exists());
2571        assert!(path.join("keep-me").exists());
2572        assert!(path.join("data.mdb").exists());
2573        assert!(path.join("lock.mdb").exists());
2574
2575        Ok(())
2576    }
2577
2578    #[cfg(feature = "lmdb")]
2579    #[test]
2580    fn duplicate_blossom_writes_refresh_blob_last_accessed() -> Result<()> {
2581        let temp = TempDir::new()?;
2582        let store = HashtreeStore::with_options_and_backend(
2583            temp.path(),
2584            None,
2585            LMDB_BLOB_MIN_MAP_SIZE_BYTES,
2586            true,
2587            &StorageBackend::Lmdb,
2588        )?;
2589
2590        let data = b"cached blossom duplicate";
2591        let hash = sha256(data);
2592        store.put_cached_blob(data)?;
2593        store.router.touch_accessed_sync(&hash, 1)?;
2594        store.put_cached_blob(data)?;
2595        assert!(store.blob_last_accessed_at(&hash)?.unwrap_or(0) > 1);
2596
2597        let cached_batch = [
2598            (
2599                sha256(b"cached blossom batch 1"),
2600                b"cached blossom batch 1".to_vec(),
2601            ),
2602            (
2603                sha256(b"cached blossom batch 2"),
2604                b"cached blossom batch 2".to_vec(),
2605            ),
2606        ];
2607        assert_eq!(store.put_cached_blobs(&cached_batch)?, 2);
2608        assert_eq!(store.put_cached_blobs(&cached_batch)?, 0);
2609        assert_eq!(
2610            store.get_blob(&cached_batch[0].0)?.as_deref(),
2611            Some(cached_batch[0].1.as_slice())
2612        );
2613
2614        let owned = b"owned blossom duplicate";
2615        let owned_hash = sha256(owned);
2616        let owner = [7u8; 32];
2617        store.put_owned_blob(owned, &owner)?;
2618        store.router.touch_accessed_sync(&owned_hash, 1)?;
2619        store.put_owned_blob(owned, &owner)?;
2620        assert!(store.blob_last_accessed_at(&owned_hash)?.unwrap_or(0) > 1);
2621        let owned_blobs = store.list_blobs_by_pubkey(&owner)?;
2622        assert_eq!(owned_blobs.len(), 1);
2623        assert_eq!(owned_blobs[0].sha256, to_hex(&owned_hash));
2624
2625        let batch = [
2626            (
2627                sha256(b"owned blossom batch 1"),
2628                b"owned blossom batch 1".to_vec(),
2629            ),
2630            (
2631                sha256(b"owned blossom batch 2"),
2632                b"owned blossom batch 2".to_vec(),
2633            ),
2634        ];
2635        store.put_owned_blobs(&batch, &owner)?;
2636        let owned_blobs = store.list_blobs_by_pubkey(&owner)?;
2637        assert_eq!(owned_blobs.len(), 3);
2638
2639        Ok(())
2640    }
2641
2642    #[cfg(feature = "lmdb")]
2643    #[test]
2644    fn replacing_tree_ref_unpins_and_unindexes_superseded_root() -> Result<()> {
2645        let temp = TempDir::new()?;
2646        let store = HashtreeStore::with_options_and_backend(
2647            temp.path(),
2648            None,
2649            LMDB_BLOB_MIN_MAP_SIZE_BYTES,
2650            true,
2651            &StorageBackend::Lmdb,
2652        )?;
2653
2654        let old_bytes = b"old published root";
2655        let new_bytes = b"new published root";
2656        let old_root = sha256(old_bytes);
2657        let new_root = sha256(new_bytes);
2658
2659        store.put_blob(old_bytes)?;
2660        store.pin(&old_root)?;
2661        store.index_tree(
2662            &old_root,
2663            "owner",
2664            Some("playlist"),
2665            PRIORITY_OWN,
2666            Some("npub1owner/playlist"),
2667        )?;
2668
2669        assert!(store.is_pinned(&old_root)?);
2670        assert!(store.get_tree_meta(&old_root)?.is_some());
2671
2672        store.put_blob(new_bytes)?;
2673        store.pin(&new_root)?;
2674        store.index_tree(
2675            &new_root,
2676            "owner",
2677            Some("playlist"),
2678            PRIORITY_OWN,
2679            Some("npub1owner/playlist"),
2680        )?;
2681
2682        assert!(
2683            !store.is_pinned(&old_root)?,
2684            "superseded root should be unpinned when ref is replaced"
2685        );
2686        assert!(
2687            store.get_tree_meta(&old_root)?.is_none(),
2688            "superseded root metadata should be removed when ref is replaced"
2689        );
2690        assert!(store.is_pinned(&new_root)?);
2691        assert!(store.get_tree_meta(&new_root)?.is_some());
2692
2693        Ok(())
2694    }
2695
2696    #[test]
2697    fn tracked_authors_round_trip_sorted_and_deduplicated() -> Result<()> {
2698        let temp = TempDir::new()?;
2699        let store = HashtreeStore::with_options(temp.path(), None, 1024 * 1024)?;
2700
2701        store
2702            .add_tracked_author("npub1zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzs9d3kk")?;
2703        store
2704            .add_tracked_author("npub1aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaqf5slm")?;
2705        store
2706            .add_tracked_author("npub1zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzs9d3kk")?;
2707
2708        assert_eq!(
2709            store.list_tracked_authors()?,
2710            vec![
2711                "npub1aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaqf5slm".to_string(),
2712                "npub1zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzs9d3kk".to_string(),
2713            ]
2714        );
2715        assert!(store.remove_tracked_author(
2716            "npub1aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaqf5slm"
2717        )?);
2718        assert!(!store.remove_tracked_author(
2719            "npub1bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbpqqqqq"
2720        )?);
2721        assert_eq!(
2722            store.list_tracked_authors()?,
2723            vec!["npub1zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzs9d3kk".to_string()]
2724        );
2725
2726        Ok(())
2727    }
2728
2729    #[cfg(feature = "s3")]
2730    #[test]
2731    fn async_store_s3_fallback_does_not_reenter_futures_executor() -> Result<()> {
2732        let temp = tempfile::TempDir::new()?;
2733        let local = Arc::new(LocalStore::new(
2734            temp.path().join("blobs"),
2735            &StorageBackend::Fs,
2736        )?);
2737
2738        let outcome = std::panic::catch_unwind(|| {
2739            sync_block_on(async {
2740                let aws_config = aws_config::from_env()
2741                    .region(aws_sdk_s3::config::Region::new("auto"))
2742                    .load()
2743                    .await;
2744                let s3_client = aws_sdk_s3::Client::from_conf(
2745                    aws_sdk_s3::config::Builder::from(&aws_config)
2746                        .endpoint_url("http://127.0.0.1:9")
2747                        .force_path_style(true)
2748                        .build(),
2749                );
2750
2751                let router = StorageRouter {
2752                    local,
2753                    s3_client: Some(s3_client),
2754                    s3_bucket: Some("test-bucket".to_string()),
2755                    s3_prefix: String::new(),
2756                    sync_tx: None,
2757                };
2758                let hash = [0u8; 32];
2759
2760                let _ = Store::has(&router, &hash).await;
2761                let _ = Store::get(&router, &hash).await;
2762            });
2763        });
2764
2765        assert!(
2766            outcome.is_ok(),
2767            "S3-backed async store methods should not panic inside futures::block_on"
2768        );
2769
2770        Ok(())
2771    }
2772}