Skip to main content

hashtree_cli/
storage.rs

1use anyhow::{Context, Result};
2use async_trait::async_trait;
3use futures::executor::block_on as sync_block_on;
4use futures::StreamExt;
5use hashtree_config::StorageBackend;
6use hashtree_core::store::{Store, StoreError};
7use hashtree_core::{
8    from_hex, sha256, to_hex, types::Hash, Cid, HashTree, HashTreeConfig, TreeNode,
9};
10use hashtree_fs::FsBlobStore;
11#[cfg(feature = "lmdb")]
12use hashtree_lmdb::LmdbBlobStore;
13use heed::types::*;
14use heed::{Database, EnvOpenOptions};
15use serde::{Deserialize, Serialize};
16use std::collections::{HashMap, HashSet};
17#[cfg(feature = "s3")]
18use std::future::Future;
19use std::io::Write;
20use std::path::{Path, PathBuf};
21use std::sync::{Arc, Mutex};
22use std::time::{SystemTime, UNIX_EPOCH};
23
24mod upload;
25
26mod maintenance;
27mod retention;
28
29pub use maintenance::{
30    compact_lmdb_environments_under, CompactResult, R2ImportOptions, R2ImportResult, VerifyResult,
31};
32pub use retention::{OwnedBlobStats, PinnedItem, StorageByPriority, StorageStats, TreeMeta};
33
34/// Priority levels for tree eviction
35pub const PRIORITY_OTHER: u8 = 64;
36pub const PRIORITY_FOLLOWED: u8 = 128;
37pub const PRIORITY_OWN: u8 = 255;
38const LMDB_MAX_READERS: u32 = 1024;
39const LMDB_METADATA_MIN_MAP_SIZE_BYTES: u64 = 1024 * 1024;
40#[cfg(feature = "lmdb")]
41const LMDB_BLOB_MIN_MAP_SIZE_BYTES: u64 = 16 * 1024 * 1024;
42const ACCESS_UPDATE_INTERVAL_SECS: u64 = 300;
43const ACCESS_UPDATE_GATE_MAX_ENTRIES: usize = 4096;
44
45fn unix_timestamp_now() -> u64 {
46    SystemTime::now()
47        .duration_since(UNIX_EPOCH)
48        .unwrap_or_default()
49        .as_secs()
50}
51
52/// Cached root info from Nostr events.
53#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct CachedRoot {
55    /// Root hash (hex)
56    pub hash: String,
57    /// Optional decryption key (hex)
58    pub key: Option<String>,
59    /// Unix timestamp when this was cached (from event created_at)
60    pub updated_at: u64,
61    /// Visibility: "public", "link-visible", or "private"
62    pub visibility: String,
63}
64
65/// Storage statistics
66#[derive(Debug, Clone)]
67pub struct LocalStoreStats {
68    pub count: usize,
69    pub total_bytes: u64,
70}
71
72#[derive(Default)]
73struct BlobAccessUpdateGate {
74    next_update_by_hash: Mutex<HashMap<Hash, u64>>,
75}
76
77impl BlobAccessUpdateGate {
78    fn due_hashes<I>(&self, hashes: I, now: u64) -> Vec<Hash>
79    where
80        I: IntoIterator<Item = Hash>,
81    {
82        let Ok(mut next_update_by_hash) = self.next_update_by_hash.try_lock() else {
83            return Vec::new();
84        };
85
86        if next_update_by_hash.len() >= ACCESS_UPDATE_GATE_MAX_ENTRIES {
87            next_update_by_hash.retain(|_, next_update| *next_update > now);
88            if next_update_by_hash.len() >= ACCESS_UPDATE_GATE_MAX_ENTRIES {
89                next_update_by_hash.clear();
90            }
91        }
92
93        let mut due = Vec::new();
94        let mut seen = HashSet::new();
95        for hash in hashes {
96            if !seen.insert(hash) {
97                continue;
98            }
99            if next_update_by_hash
100                .get(&hash)
101                .is_some_and(|next_update| now < *next_update)
102            {
103                continue;
104            }
105            next_update_by_hash.insert(hash, now.saturating_add(ACCESS_UPDATE_INTERVAL_SECS));
106            due.push(hash);
107        }
108        due
109    }
110}
111
112/// Local blob store - wraps either FsBlobStore or LmdbBlobStore
113pub enum LocalStore {
114    Fs(FsBlobStore),
115    #[cfg(feature = "lmdb")]
116    Lmdb(LmdbBlobStore),
117}
118
119#[cfg(feature = "lmdb")]
120fn is_fs_blob_shard_dir(path: &Path) -> bool {
121    path.file_name()
122        .and_then(|name| name.to_str())
123        .map(|name| name.len() == 2 && name.as_bytes().iter().all(u8::is_ascii_hexdigit))
124        .unwrap_or(false)
125}
126
127fn lmdb_map_size_for_existing_env(path: &Path, minimum_bytes: u64) -> Result<usize> {
128    let existing_bytes = std::fs::metadata(path.join("data.mdb"))
129        .map(|metadata| metadata.len())
130        .unwrap_or(0);
131    let requested = align_lmdb_map_size(minimum_bytes.max(existing_bytes));
132    usize::try_from(requested).context("LMDB map size exceeds usize")
133}
134
135fn align_lmdb_map_size(bytes: u64) -> u64 {
136    let page_size = (page_size::get() as u64).max(4096);
137    let remainder = bytes % page_size;
138    if remainder == 0 {
139        bytes
140    } else {
141        bytes.saturating_add(page_size - remainder)
142    }
143}
144
145#[cfg(feature = "lmdb")]
146fn remove_stale_fs_blob_shards(path: &Path) -> Result<(), StoreError> {
147    let entries = std::fs::read_dir(path).map_err(StoreError::Io)?;
148    for entry in entries {
149        let entry = entry.map_err(StoreError::Io)?;
150        let entry_path = entry.path();
151        if entry_path.is_dir() && is_fs_blob_shard_dir(&entry_path) {
152            std::fs::remove_dir_all(&entry_path).map_err(StoreError::Io)?;
153            tracing::info!(
154                "Removed stale filesystem blob shard directory after LMDB cutover: {}",
155                entry_path.display()
156            );
157        }
158    }
159    Ok(())
160}
161
162impl LocalStore {
163    /// Create a new unbounded local store.
164    ///
165    /// Higher-level stores that need quota enforcement should manage eviction
166    /// above this layer so tree metadata, pins, and archival policies stay
167    /// coherent.
168    pub fn new<P: AsRef<Path>>(path: P, backend: &StorageBackend) -> Result<Self, StoreError> {
169        Self::new_unbounded(path, backend)
170    }
171
172    /// Create a new local store with an explicit LMDB logical size cap when using the LMDB backend.
173    ///
174    /// The requested size is used for both the LMDB map and the store's built-in
175    /// byte quota, so standalone blob envs can evict instead of growing forever.
176    pub fn new_with_lmdb_map_size<P: AsRef<Path>>(
177        path: P,
178        backend: &StorageBackend,
179        _map_size_bytes: Option<u64>,
180    ) -> Result<Self, StoreError> {
181        match backend {
182            StorageBackend::Fs => Ok(LocalStore::Fs(FsBlobStore::new(path)?)),
183            #[cfg(feature = "lmdb")]
184            StorageBackend::Lmdb => match _map_size_bytes {
185                Some(map_size_bytes) => {
186                    std::fs::create_dir_all(path.as_ref()).map_err(StoreError::Io)?;
187                    remove_stale_fs_blob_shards(path.as_ref())?;
188                    Ok(LocalStore::Lmdb(LmdbBlobStore::with_max_bytes(
189                        path,
190                        map_size_bytes,
191                    )?))
192                }
193                None => {
194                    std::fs::create_dir_all(path.as_ref()).map_err(StoreError::Io)?;
195                    remove_stale_fs_blob_shards(path.as_ref())?;
196                    Ok(LocalStore::Lmdb(LmdbBlobStore::new(path)?))
197                }
198            },
199            #[cfg(not(feature = "lmdb"))]
200            StorageBackend::Lmdb => {
201                tracing::warn!(
202                    "LMDB backend requested but lmdb feature not enabled, using filesystem storage"
203                );
204                Ok(LocalStore::Fs(FsBlobStore::new(path)?))
205            }
206        }
207    }
208
209    /// Create a new unbounded local store for a specific backend.
210    pub fn new_unbounded<P: AsRef<Path>>(
211        path: P,
212        backend: &StorageBackend,
213    ) -> Result<Self, StoreError> {
214        Self::new_with_lmdb_map_size(path, backend, None)
215    }
216
217    pub fn backend(&self) -> StorageBackend {
218        match self {
219            LocalStore::Fs(_) => StorageBackend::Fs,
220            #[cfg(feature = "lmdb")]
221            LocalStore::Lmdb(_) => StorageBackend::Lmdb,
222        }
223    }
224
225    /// Sync put operation
226    pub fn put_sync(&self, hash: Hash, data: &[u8]) -> Result<bool, StoreError> {
227        match self {
228            LocalStore::Fs(store) => store.put_sync(hash, data),
229            #[cfg(feature = "lmdb")]
230            LocalStore::Lmdb(store) => store.put_sync(hash, data),
231        }
232    }
233
234    /// Sync batch put operation.
235    pub fn put_many_sync(&self, items: &[(Hash, Vec<u8>)]) -> Result<usize, StoreError> {
236        match self {
237            LocalStore::Fs(store) => {
238                let mut inserted = 0usize;
239                for (hash, data) in items {
240                    if store.put_sync(*hash, data.as_slice())? {
241                        inserted += 1;
242                    }
243                }
244                Ok(inserted)
245            }
246            #[cfg(feature = "lmdb")]
247            LocalStore::Lmdb(store) => store.put_many_sync(items),
248        }
249    }
250
251    /// Sync get operation
252    pub fn get_sync(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
253        match self {
254            LocalStore::Fs(store) => store.get_sync(hash),
255            #[cfg(feature = "lmdb")]
256            LocalStore::Lmdb(store) => store.get_sync(hash),
257        }
258    }
259
260    pub fn get_range_sync(
261        &self,
262        hash: &Hash,
263        start: u64,
264        end_inclusive: u64,
265    ) -> Result<Option<Vec<u8>>, StoreError> {
266        match self {
267            LocalStore::Fs(store) => store.get_range_sync(hash, start, end_inclusive),
268            #[cfg(feature = "lmdb")]
269            LocalStore::Lmdb(store) => store.get_range_sync(hash, start, end_inclusive),
270        }
271    }
272
273    pub fn blob_size_sync(&self, hash: &Hash) -> Result<Option<u64>, StoreError> {
274        match self {
275            LocalStore::Fs(store) => store.blob_size_sync(hash),
276            #[cfg(feature = "lmdb")]
277            LocalStore::Lmdb(store) => store.blob_size_sync(hash),
278        }
279    }
280
281    pub fn touch_accessed_sync(&self, hash: &Hash, now: u64) -> Result<bool, StoreError> {
282        match self {
283            LocalStore::Fs(store) => store.touch_accessed_sync(hash, now),
284            #[cfg(feature = "lmdb")]
285            LocalStore::Lmdb(store) => store.touch_accessed_sync(hash, now),
286        }
287    }
288
289    pub fn touch_many_accessed_sync(&self, hashes: &[Hash], now: u64) -> Result<usize, StoreError> {
290        match self {
291            LocalStore::Fs(store) => store.touch_many_accessed_sync(hashes, now),
292            #[cfg(feature = "lmdb")]
293            LocalStore::Lmdb(store) => store.touch_many_accessed_sync(hashes, now),
294        }
295    }
296
297    pub fn last_accessed_at_sync(&self, hash: &Hash) -> Result<Option<u64>, StoreError> {
298        match self {
299            LocalStore::Fs(store) => store.last_accessed_at_sync(hash),
300            #[cfg(feature = "lmdb")]
301            LocalStore::Lmdb(store) => store.last_accessed_at_sync(hash),
302        }
303    }
304
305    pub fn many_last_accessed_at_sync(
306        &self,
307        hashes: &[Hash],
308    ) -> Result<Vec<(Hash, u64)>, StoreError> {
309        match self {
310            LocalStore::Fs(store) => store.many_last_accessed_at_sync(hashes),
311            #[cfg(feature = "lmdb")]
312            LocalStore::Lmdb(store) => store.many_last_accessed_at_sync(hashes),
313        }
314    }
315
316    /// Check if hash exists
317    pub fn exists(&self, hash: &Hash) -> Result<bool, StoreError> {
318        match self {
319            LocalStore::Fs(store) => Ok(store.exists(hash)),
320            #[cfg(feature = "lmdb")]
321            LocalStore::Lmdb(store) => store.exists(hash),
322        }
323    }
324
325    /// Mark which sorted hashes already exist in local storage.
326    pub fn existing_hashes_in_sorted_candidates(
327        &self,
328        sorted_hashes: &[Hash],
329    ) -> Result<Vec<bool>, StoreError> {
330        match self {
331            LocalStore::Fs(store) => Ok(sorted_hashes
332                .iter()
333                .map(|hash| store.exists(hash))
334                .collect()),
335            #[cfg(feature = "lmdb")]
336            LocalStore::Lmdb(store) => store.existing_hashes_in_sorted_candidates(sorted_hashes),
337        }
338    }
339
340    /// Sync delete operation
341    pub fn delete_sync(&self, hash: &Hash) -> Result<bool, StoreError> {
342        match self {
343            LocalStore::Fs(store) => store.delete_sync(hash),
344            #[cfg(feature = "lmdb")]
345            LocalStore::Lmdb(store) => store.delete_sync(hash),
346        }
347    }
348
349    /// Get storage statistics
350    pub fn stats(&self) -> Result<LocalStoreStats, StoreError> {
351        match self {
352            LocalStore::Fs(store) => {
353                let stats = store.stats()?;
354                Ok(LocalStoreStats {
355                    count: stats.count,
356                    total_bytes: stats.total_bytes,
357                })
358            }
359            #[cfg(feature = "lmdb")]
360            LocalStore::Lmdb(store) => {
361                let stats = store.stats()?;
362                Ok(LocalStoreStats {
363                    count: stats.count,
364                    total_bytes: stats.total_bytes,
365                })
366            }
367        }
368    }
369
370    /// List all hashes in the store
371    pub fn list(&self) -> Result<Vec<Hash>, StoreError> {
372        match self {
373            LocalStore::Fs(store) => store.list(),
374            #[cfg(feature = "lmdb")]
375            LocalStore::Lmdb(store) => store.list(),
376        }
377    }
378}
379
380#[async_trait]
381impl Store for LocalStore {
382    async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
383        self.put_sync(hash, &data)
384    }
385
386    async fn put_many(&self, items: Vec<(Hash, Vec<u8>)>) -> Result<usize, StoreError> {
387        self.put_many_sync(&items)
388    }
389
390    async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
391        self.get_sync(hash)
392    }
393
394    async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
395        self.exists(hash)
396    }
397
398    async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
399        self.delete_sync(hash)
400    }
401}
402
403#[cfg(feature = "s3")]
404use tokio::sync::mpsc;
405
406use crate::config::S3Config;
407
408/// Message for background S3 sync
409#[cfg(feature = "s3")]
410enum S3SyncMessage {
411    Upload { hash: Hash, data: Vec<u8> },
412    Delete { hash: Hash },
413}
414
415/// Storage router - local store primary with optional S3 backup
416///
417/// Write path: local first (fast), then queue S3 upload (non-blocking)
418/// Read path: local first, fall back to S3 if miss
419pub struct StorageRouter {
420    /// Primary local store (always used)
421    local: Arc<LocalStore>,
422    /// Optional S3 client for backup
423    #[cfg(feature = "s3")]
424    s3_client: Option<aws_sdk_s3::Client>,
425    #[cfg(feature = "s3")]
426    s3_bucket: Option<String>,
427    #[cfg(feature = "s3")]
428    s3_prefix: String,
429    /// Channel to send uploads to background task
430    #[cfg(feature = "s3")]
431    sync_tx: Option<mpsc::UnboundedSender<S3SyncMessage>>,
432}
433
434impl StorageRouter {
435    #[cfg(feature = "s3")]
436    fn run_s3_future_sync<F, T>(future: F) -> Result<T, StoreError>
437    where
438        F: Future<Output = T> + Send + 'static,
439        T: Send + 'static,
440    {
441        if tokio::runtime::Handle::try_current().is_ok() {
442            return std::thread::Builder::new()
443                .name("storage-s3-sync".to_string())
444                .spawn(move || {
445                    let runtime = tokio::runtime::Builder::new_current_thread()
446                        .enable_all()
447                        .build()
448                        .map_err(|err| {
449                            StoreError::Other(format!("build storage s3 sync runtime: {err}"))
450                        })?;
451                    Ok(runtime.block_on(future))
452                })
453                .map_err(|err| StoreError::Other(format!("spawn S3 sync helper thread: {err}")))?
454                .join()
455                .map_err(|_| StoreError::Other("S3 sync helper thread panicked".to_string()))?;
456        }
457
458        let runtime = tokio::runtime::Builder::new_current_thread()
459            .enable_all()
460            .build()
461            .map_err(|err| StoreError::Other(format!("build storage s3 sync runtime: {err}")))?;
462        Ok(runtime.block_on(future))
463    }
464
465    /// Create router with local storage only
466    pub fn new(local: Arc<LocalStore>) -> Self {
467        Self {
468            local,
469            #[cfg(feature = "s3")]
470            s3_client: None,
471            #[cfg(feature = "s3")]
472            s3_bucket: None,
473            #[cfg(feature = "s3")]
474            s3_prefix: String::new(),
475            #[cfg(feature = "s3")]
476            sync_tx: None,
477        }
478    }
479
480    /// Create router with local storage + S3 backup
481    #[cfg(feature = "s3")]
482    pub async fn with_s3(local: Arc<LocalStore>, config: &S3Config) -> Result<Self, anyhow::Error> {
483        use aws_sdk_s3::Client as S3Client;
484
485        // Build AWS config
486        let mut aws_config_loader = aws_config::from_env();
487        aws_config_loader =
488            aws_config_loader.region(aws_sdk_s3::config::Region::new(config.region.clone()));
489        let aws_config = aws_config_loader.load().await;
490
491        // Build S3 client with custom endpoint
492        let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&aws_config);
493        s3_config_builder = s3_config_builder
494            .endpoint_url(&config.endpoint)
495            .force_path_style(true);
496
497        let s3_client = S3Client::from_conf(s3_config_builder.build());
498        let bucket = config.bucket.clone();
499        let prefix = config.prefix.clone().unwrap_or_default();
500
501        // Create background sync channel
502        let (sync_tx, mut sync_rx) = mpsc::unbounded_channel::<S3SyncMessage>();
503
504        // Spawn background sync task with bounded concurrent uploads
505        let sync_client = s3_client.clone();
506        let sync_bucket = bucket.clone();
507        let sync_prefix = prefix.clone();
508
509        tokio::spawn(async move {
510            use aws_sdk_s3::primitives::ByteStream;
511
512            tracing::info!("S3 background sync task started");
513
514            // Keep S3 writes parallel, but avoid dispatch failures during mirror backfill bursts.
515            let semaphore = std::sync::Arc::new(tokio::sync::Semaphore::new(8));
516            let client = std::sync::Arc::new(sync_client);
517            let bucket = std::sync::Arc::new(sync_bucket);
518            let prefix = std::sync::Arc::new(sync_prefix);
519
520            while let Some(msg) = sync_rx.recv().await {
521                let client = client.clone();
522                let bucket = bucket.clone();
523                let prefix = prefix.clone();
524                let semaphore = semaphore.clone();
525
526                // Spawn each upload with semaphore-bounded concurrency
527                tokio::spawn(async move {
528                    // Acquire permit before uploading
529                    let _permit = semaphore.acquire().await;
530
531                    match msg {
532                        S3SyncMessage::Upload { hash, data } => {
533                            let key = format!("{}{}.bin", prefix, to_hex(&hash));
534                            tracing::debug!("S3 uploading {} ({} bytes)", &key, data.len());
535
536                            let mut attempt = 1u8;
537                            loop {
538                                match client
539                                    .put_object()
540                                    .bucket(bucket.as_str())
541                                    .key(&key)
542                                    .body(ByteStream::from(data.clone()))
543                                    .send()
544                                    .await
545                                {
546                                    Ok(_) => {
547                                        tracing::debug!("S3 upload succeeded: {}", &key);
548                                        break;
549                                    }
550                                    Err(e) if attempt < 3 => {
551                                        tracing::warn!(
552                                            "S3 upload retrying {}: attempt={} error={}",
553                                            &key,
554                                            attempt,
555                                            e
556                                        );
557                                        tokio::time::sleep(std::time::Duration::from_millis(
558                                            250 * u64::from(attempt),
559                                        ))
560                                        .await;
561                                        attempt += 1;
562                                    }
563                                    Err(e) => {
564                                        tracing::error!(
565                                            "S3 upload failed {} after {} attempts: {}",
566                                            &key,
567                                            attempt,
568                                            e
569                                        );
570                                        break;
571                                    }
572                                }
573                            }
574                        }
575                        S3SyncMessage::Delete { hash } => {
576                            let key = format!("{}{}.bin", prefix, to_hex(&hash));
577                            tracing::debug!("S3 deleting {}", &key);
578
579                            let mut attempt = 1u8;
580                            loop {
581                                match client
582                                    .delete_object()
583                                    .bucket(bucket.as_str())
584                                    .key(&key)
585                                    .send()
586                                    .await
587                                {
588                                    Ok(_) => break,
589                                    Err(e) if attempt < 3 => {
590                                        tracing::warn!(
591                                            "S3 delete retrying {}: attempt={} error={}",
592                                            &key,
593                                            attempt,
594                                            e
595                                        );
596                                        tokio::time::sleep(std::time::Duration::from_millis(
597                                            250 * u64::from(attempt),
598                                        ))
599                                        .await;
600                                        attempt += 1;
601                                    }
602                                    Err(e) => {
603                                        tracing::error!(
604                                            "S3 delete failed {} after {} attempts: {}",
605                                            &key,
606                                            attempt,
607                                            e
608                                        );
609                                        break;
610                                    }
611                                }
612                            }
613                        }
614                    }
615                });
616            }
617        });
618
619        tracing::info!(
620            "S3 storage initialized: bucket={}, prefix={}",
621            bucket,
622            prefix
623        );
624
625        Ok(Self {
626            local,
627            s3_client: Some(s3_client),
628            s3_bucket: Some(bucket),
629            s3_prefix: prefix,
630            sync_tx: Some(sync_tx),
631        })
632    }
633
634    /// Store data - writes to LMDB, queues S3 upload in background
635    pub fn put_sync(&self, hash: Hash, data: &[u8]) -> Result<bool, StoreError> {
636        // Always write to local first
637        let is_new = self.local.put_sync(hash, data)?;
638
639        // Queue S3 upload only for newly inserted blobs.
640        // Existing local blobs were already persisted or are handled by explicit repair/push flows.
641        #[cfg(feature = "s3")]
642        if is_new {
643            if let Some(ref tx) = self.sync_tx {
644                tracing::debug!(
645                    "Queueing S3 upload for {} ({} bytes)",
646                    crate::storage::to_hex(&hash)[..16].to_string(),
647                    data.len(),
648                );
649                if let Err(e) = tx.send(S3SyncMessage::Upload {
650                    hash,
651                    data: data.to_vec(),
652                }) {
653                    tracing::error!("Failed to queue S3 upload: {}", e);
654                }
655            }
656        }
657
658        Ok(is_new)
659    }
660
661    /// Store multiple blobs with a single local batch write when supported.
662    pub fn put_many_sync(&self, items: &[(Hash, Vec<u8>)]) -> Result<usize, StoreError> {
663        #[cfg(feature = "s3")]
664        let pending_uploads = if self.sync_tx.is_some() {
665            let mut pending = Vec::new();
666            for (hash, data) in items {
667                if !self.local.exists(hash)? {
668                    pending.push((*hash, data.clone()));
669                }
670            }
671            pending
672        } else {
673            Vec::new()
674        };
675
676        let inserted = self.local.put_many_sync(items)?;
677
678        #[cfg(feature = "s3")]
679        if let Some(ref tx) = self.sync_tx {
680            for (hash, data) in pending_uploads {
681                if let Err(e) = tx.send(S3SyncMessage::Upload { hash, data }) {
682                    tracing::error!("Failed to queue S3 upload: {}", e);
683                }
684            }
685        }
686
687        Ok(inserted)
688    }
689
690    /// Get data - tries LMDB first, falls back to S3
691    pub fn get_sync(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
692        // Try local first
693        if let Some(data) = self.local.get_sync(hash)? {
694            return Ok(Some(data));
695        }
696
697        // Fall back to S3 if configured
698        #[cfg(feature = "s3")]
699        if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
700            let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
701            let client = client.clone();
702            let bucket = bucket.clone();
703
704            match Self::run_s3_future_sync(async move {
705                client.get_object().bucket(bucket).key(key).send().await
706            }) {
707                Ok(Ok(output)) => {
708                    match Self::run_s3_future_sync(async move { output.body.collect().await }) {
709                        Ok(Ok(body)) => {
710                            let data = body.into_bytes().to_vec();
711                            // Cache locally for future reads
712                            let _ = self.local.put_sync(*hash, &data);
713                            return Ok(Some(data));
714                        }
715                        Ok(Err(err)) => {
716                            tracing::warn!("S3 body collect failed: {}", err);
717                        }
718                        Err(err) => {
719                            tracing::warn!("S3 body collect runtime failed: {}", err);
720                        }
721                    }
722                }
723                Ok(Err(err)) => {
724                    let service_err = err.into_service_error();
725                    if !service_err.is_no_such_key() {
726                        tracing::warn!("S3 get failed: {}", service_err);
727                    }
728                }
729                Err(err) => {
730                    tracing::warn!("S3 get runtime failed: {}", err);
731                }
732            }
733        }
734
735        Ok(None)
736    }
737
738    pub fn get_range_sync(
739        &self,
740        hash: &Hash,
741        start: u64,
742        end_inclusive: u64,
743    ) -> Result<Option<Vec<u8>>, StoreError> {
744        self.local.get_range_sync(hash, start, end_inclusive)
745    }
746
747    pub fn blob_size_sync(&self, hash: &Hash) -> Result<Option<u64>, StoreError> {
748        self.local.blob_size_sync(hash)
749    }
750
751    pub fn touch_accessed_sync(&self, hash: &Hash, now: u64) -> Result<bool, StoreError> {
752        self.local.touch_accessed_sync(hash, now)
753    }
754
755    pub fn touch_many_accessed_sync(&self, hashes: &[Hash], now: u64) -> Result<usize, StoreError> {
756        self.local.touch_many_accessed_sync(hashes, now)
757    }
758
759    pub fn last_accessed_at_sync(&self, hash: &Hash) -> Result<Option<u64>, StoreError> {
760        self.local.last_accessed_at_sync(hash)
761    }
762
763    pub fn many_last_accessed_at_sync(
764        &self,
765        hashes: &[Hash],
766    ) -> Result<Vec<(Hash, u64)>, StoreError> {
767        self.local.many_last_accessed_at_sync(hashes)
768    }
769
770    /// Check if hash exists
771    pub fn exists(&self, hash: &Hash) -> Result<bool, StoreError> {
772        // Check local first
773        if self.local.exists(hash)? {
774            return Ok(true);
775        }
776
777        // Check S3 if configured
778        #[cfg(feature = "s3")]
779        if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
780            let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
781            let client = client.clone();
782            let bucket = bucket.clone();
783
784            match Self::run_s3_future_sync(async move {
785                client.head_object().bucket(bucket).key(&key).send().await
786            }) {
787                Ok(Ok(_)) => return Ok(true),
788                Ok(Err(err)) => {
789                    let service_err = err.into_service_error();
790                    if !service_err.is_not_found() {
791                        tracing::warn!("S3 head failed: {}", service_err);
792                    }
793                }
794                Err(err) => {
795                    tracing::warn!("S3 head runtime failed: {}", err);
796                }
797            }
798        }
799
800        Ok(false)
801    }
802
803    /// Delete data from both local and S3 stores
804    pub fn delete_sync(&self, hash: &Hash) -> Result<bool, StoreError> {
805        let deleted = self.local.delete_sync(hash)?;
806
807        // Queue S3 delete if configured
808        #[cfg(feature = "s3")]
809        if let Some(ref tx) = self.sync_tx {
810            let _ = tx.send(S3SyncMessage::Delete { hash: *hash });
811        }
812
813        Ok(deleted)
814    }
815
816    /// Delete data from local store only (don't propagate to S3)
817    /// Used for eviction where we want to keep S3 as archive
818    pub fn delete_local_only(&self, hash: &Hash) -> Result<bool, StoreError> {
819        self.local.delete_sync(hash)
820    }
821
822    /// Get stats from local store
823    pub fn stats(&self) -> Result<LocalStoreStats, StoreError> {
824        self.local.stats()
825    }
826
827    /// List all hashes from local store
828    pub fn list(&self) -> Result<Vec<Hash>, StoreError> {
829        self.local.list()
830    }
831
832    /// Get the underlying local store for HashTree operations
833    pub fn local_store(&self) -> Arc<LocalStore> {
834        Arc::clone(&self.local)
835    }
836}
837
838#[derive(Clone)]
839struct AccessRecordingStore {
840    inner: Arc<StorageRouter>,
841    accessed: Arc<Mutex<HashSet<Hash>>>,
842}
843
844impl AccessRecordingStore {
845    fn new(inner: Arc<StorageRouter>) -> Self {
846        Self {
847            inner,
848            accessed: Arc::new(Mutex::new(HashSet::new())),
849        }
850    }
851
852    fn take_accessed_hashes(&self) -> Vec<Hash> {
853        let Ok(mut accessed) = self.accessed.lock() else {
854            return Vec::new();
855        };
856        accessed.drain().collect()
857    }
858
859    fn record_access(&self, hash: &Hash) {
860        let Ok(mut accessed) = self.accessed.lock() else {
861            return;
862        };
863        accessed.insert(*hash);
864    }
865}
866
867#[async_trait]
868impl Store for AccessRecordingStore {
869    async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
870        self.inner.put(hash, data).await
871    }
872
873    async fn put_many(&self, items: Vec<(Hash, Vec<u8>)>) -> Result<usize, StoreError> {
874        self.inner.put_many(items).await
875    }
876
877    async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
878        let data = self.inner.get(hash).await?;
879        if data.is_some() {
880            self.record_access(hash);
881        }
882        Ok(data)
883    }
884
885    async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
886        self.inner.has(hash).await
887    }
888
889    async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
890        self.inner.delete(hash).await
891    }
892}
893
894// Implement async Store trait for StorageRouter so it can be used directly with HashTree
895// This ensures all writes go through S3 sync
896#[async_trait]
897impl Store for StorageRouter {
898    async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
899        self.put_sync(hash, &data)
900    }
901
902    async fn put_many(&self, items: Vec<(Hash, Vec<u8>)>) -> Result<usize, StoreError> {
903        self.put_many_sync(&items)
904    }
905
906    async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
907        self.get_sync(hash)
908    }
909
910    async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
911        self.exists(hash)
912    }
913
914    async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
915        self.delete_sync(hash)
916    }
917}
918
919pub struct HashtreeStore {
920    base_path: PathBuf,
921    env: heed::Env,
922    /// Set of pinned hashes (32-byte raw hashes, prevents garbage collection)
923    pins: Database<Bytes, Unit>,
924    /// Mutable published refs that should stay subscribed and keep following updates
925    pinned_refs: Database<Str, Unit>,
926    /// Authors whose hashtree publications should be mirrored continuously
927    tracked_authors: Database<Str, Unit>,
928    /// Blob ownership: sha256 (32 bytes) ++ pubkey (32 bytes) -> () (composite key for multi-owner)
929    blob_owners: Database<Bytes, Unit>,
930    /// Maps pubkey (32 bytes) -> blob metadata JSON (for blossom list)
931    pubkey_blobs: Database<Bytes, Bytes>,
932    /// Tree metadata for eviction: tree_root_hash (32 bytes) -> TreeMeta (msgpack)
933    tree_meta: Database<Bytes, Bytes>,
934    /// Blob-to-tree mapping: blob_hash ++ tree_hash (64 bytes) -> ()
935    blob_trees: Database<Bytes, Unit>,
936    /// Tree refs: "npub/path" -> tree_root_hash (32 bytes) - for replacing old versions
937    tree_refs: Database<Str, Bytes>,
938    /// Cached roots from Nostr: "pubkey_hex/tree_name" -> CachedRoot (msgpack)
939    cached_roots: Database<Str, Bytes>,
940    /// Storage router - handles LMDB + optional S3 (Arc for sharing with HashTree)
941    router: Arc<StorageRouter>,
942    /// Maximum storage size in bytes (from config)
943    max_size_bytes: u64,
944    /// Whether quota enforcement may delete local blobs not tracked by any indexed tree.
945    evict_orphans: bool,
946    /// Best-effort in-memory throttle for blob access metadata writes.
947    blob_access_update_gate: BlobAccessUpdateGate,
948}
949
950impl HashtreeStore {
951    /// Create a new store with the configured local storage limit.
952    pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
953        let config = hashtree_config::Config::load_or_default();
954        let max_size_bytes = config
955            .storage
956            .max_size_gb
957            .saturating_mul(1024 * 1024 * 1024);
958        Self::with_options_and_backend(
959            path,
960            None,
961            max_size_bytes,
962            config.storage.evict_orphans,
963            &config.storage.backend,
964        )
965    }
966
967    /// Create a new store with an explicit local backend and size limit.
968    pub fn new_with_backend<P: AsRef<Path>>(
969        path: P,
970        backend: hashtree_config::StorageBackend,
971        max_size_bytes: u64,
972    ) -> Result<Self> {
973        Self::with_options_and_backend(path, None, max_size_bytes, true, &backend)
974    }
975
976    /// Create a new store with optional S3 backend and the configured local storage limit.
977    pub fn with_s3<P: AsRef<Path>>(path: P, s3_config: Option<&S3Config>) -> Result<Self> {
978        let config = hashtree_config::Config::load_or_default();
979        let max_size_bytes = config
980            .storage
981            .max_size_gb
982            .saturating_mul(1024 * 1024 * 1024);
983        Self::with_options_and_backend(
984            path,
985            s3_config,
986            max_size_bytes,
987            config.storage.evict_orphans,
988            &config.storage.backend,
989        )
990    }
991
992    /// Create a new store with optional S3 backend and custom size limit.
993    ///
994    /// The raw local blob backend remains unbounded. `HashtreeStore` enforces
995    /// `max_size_bytes` at the tree-management layer so eviction can honor pins,
996    /// orphan handling, and local-only eviction when S3 is used as archive.
997    pub fn with_options<P: AsRef<Path>>(
998        path: P,
999        s3_config: Option<&S3Config>,
1000        max_size_bytes: u64,
1001    ) -> Result<Self> {
1002        let config = hashtree_config::Config::load_or_default();
1003        Self::with_options_and_backend(
1004            path,
1005            s3_config,
1006            max_size_bytes,
1007            config.storage.evict_orphans,
1008            &config.storage.backend,
1009        )
1010    }
1011
1012    pub fn with_options_and_backend<P: AsRef<Path>>(
1013        path: P,
1014        s3_config: Option<&S3Config>,
1015        max_size_bytes: u64,
1016        evict_orphans: bool,
1017        backend: &hashtree_config::StorageBackend,
1018    ) -> Result<Self> {
1019        let path = path.as_ref();
1020        std::fs::create_dir_all(path)?;
1021        let metadata_map_size =
1022            lmdb_map_size_for_existing_env(path, LMDB_METADATA_MIN_MAP_SIZE_BYTES)?;
1023
1024        let env = unsafe {
1025            EnvOpenOptions::new()
1026                .map_size(metadata_map_size)
1027                .max_dbs(10) // pins, pinned_refs, tracked_authors, blob_owners, pubkey_blobs, tree_meta, blob_trees, tree_refs, cached_roots, blobs
1028                .max_readers(LMDB_MAX_READERS)
1029                .open(path)?
1030        };
1031        let _ = env.clear_stale_readers();
1032
1033        let mut wtxn = env.write_txn()?;
1034        let pins = env.create_database(&mut wtxn, Some("pins"))?;
1035        let pinned_refs = env.create_database(&mut wtxn, Some("pinned_refs"))?;
1036        let tracked_authors = env.create_database(&mut wtxn, Some("tracked_authors"))?;
1037        let blob_owners = env.create_database(&mut wtxn, Some("blob_owners"))?;
1038        let pubkey_blobs = env.create_database(&mut wtxn, Some("pubkey_blobs"))?;
1039        let tree_meta = env.create_database(&mut wtxn, Some("tree_meta"))?;
1040        let blob_trees = env.create_database(&mut wtxn, Some("blob_trees"))?;
1041        let tree_refs = env.create_database(&mut wtxn, Some("tree_refs"))?;
1042        let cached_roots = env.create_database(&mut wtxn, Some("cached_roots"))?;
1043        wtxn.commit()?;
1044
1045        // Intentionally keep the raw blob backend unbounded here. HashtreeStore
1046        // owns quota policy above this layer, where it can coordinate eviction
1047        // with tree refs, blob ownership, pins, and S3 archival behavior.
1048        let local_store = Arc::new(match backend {
1049            hashtree_config::StorageBackend::Fs => LocalStore::Fs(
1050                FsBlobStore::new(path.join("blobs"))
1051                    .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
1052            ),
1053            #[cfg(feature = "lmdb")]
1054            hashtree_config::StorageBackend::Lmdb => {
1055                std::fs::create_dir_all(path.join("blobs"))?;
1056                remove_stale_fs_blob_shards(&path.join("blobs"))
1057                    .map_err(|e| anyhow::anyhow!("Failed to clean LMDB blob store path: {}", e))?;
1058                let requested_map_size = max_size_bytes.max(LMDB_BLOB_MIN_MAP_SIZE_BYTES);
1059                let map_size = usize::try_from(requested_map_size)
1060                    .context("LMDB blob map size exceeds usize")?;
1061                LocalStore::Lmdb(
1062                    LmdbBlobStore::with_map_size(path.join("blobs"), map_size)
1063                        .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
1064                )
1065            }
1066            #[cfg(not(feature = "lmdb"))]
1067            hashtree_config::StorageBackend::Lmdb => {
1068                tracing::warn!(
1069                    "LMDB backend requested but lmdb feature not enabled, using filesystem storage"
1070                );
1071                LocalStore::Fs(
1072                    FsBlobStore::new(path.join("blobs"))
1073                        .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
1074                )
1075            }
1076        });
1077
1078        // Create storage router with optional S3
1079        #[cfg(feature = "s3")]
1080        let router = Arc::new(if let Some(s3_cfg) = s3_config {
1081            tracing::info!(
1082                "Initializing S3 storage backend: bucket={}, endpoint={}",
1083                s3_cfg.bucket,
1084                s3_cfg.endpoint
1085            );
1086
1087            sync_block_on(async { StorageRouter::with_s3(local_store, s3_cfg).await })?
1088        } else {
1089            StorageRouter::new(local_store)
1090        });
1091
1092        #[cfg(not(feature = "s3"))]
1093        let router = Arc::new({
1094            if s3_config.is_some() {
1095                tracing::warn!(
1096                    "S3 config provided but S3 feature not enabled. Using local storage only."
1097                );
1098            }
1099            StorageRouter::new(local_store)
1100        });
1101
1102        Ok(Self {
1103            base_path: path.to_path_buf(),
1104            env,
1105            pins,
1106            pinned_refs,
1107            tracked_authors,
1108            blob_owners,
1109            pubkey_blobs,
1110            tree_meta,
1111            blob_trees,
1112            tree_refs,
1113            cached_roots,
1114            router,
1115            max_size_bytes,
1116            evict_orphans,
1117            blob_access_update_gate: BlobAccessUpdateGate::default(),
1118        })
1119    }
1120
1121    pub fn base_path(&self) -> &Path {
1122        &self.base_path
1123    }
1124
1125    /// Get the storage router
1126    pub fn router(&self) -> &StorageRouter {
1127        &self.router
1128    }
1129
1130    /// Get the storage router as Arc (for use with HashTree which needs Arc<dyn Store>)
1131    /// All writes through this go to both LMDB and S3
1132    pub fn store_arc(&self) -> Arc<StorageRouter> {
1133        Arc::clone(&self.router)
1134    }
1135
1136    fn access_tracking_tree(&self) -> (HashTree<AccessRecordingStore>, AccessRecordingStore) {
1137        let access_store = AccessRecordingStore::new(self.store_arc());
1138        let tree = HashTree::new(HashTreeConfig::new(Arc::new(access_store.clone())).public());
1139        (tree, access_store)
1140    }
1141
1142    pub fn record_blob_accesses<I>(&self, hashes: I)
1143    where
1144        I: IntoIterator<Item = Hash>,
1145    {
1146        let now = unix_timestamp_now();
1147        let due_hashes = self.blob_access_update_gate.due_hashes(hashes, now);
1148        if due_hashes.is_empty() {
1149            return;
1150        }
1151
1152        if let Err(err) = self.router.touch_many_accessed_sync(&due_hashes, now) {
1153            tracing::debug!("Failed to update blob access metadata: {}", err);
1154        }
1155    }
1156
1157    fn record_blob_access_now(&self, hash: &Hash) {
1158        if let Err(err) = self.router.touch_accessed_sync(hash, unix_timestamp_now()) {
1159            tracing::debug!("Failed to update blob access metadata: {}", err);
1160        }
1161    }
1162
1163    pub fn blob_last_accessed_at(&self, hash: &Hash) -> Result<Option<u64>> {
1164        self.router
1165            .last_accessed_at_sync(hash)
1166            .map_err(|e| anyhow::anyhow!("Failed to read blob access metadata: {}", e))
1167    }
1168
1169    pub fn blob_last_accessed_many(&self, hashes: &[Hash]) -> Result<Vec<(Hash, u64)>> {
1170        self.router
1171            .many_last_accessed_at_sync(hashes)
1172            .map_err(|e| anyhow::anyhow!("Failed to read blob access metadata: {}", e))
1173    }
1174
1175    /// Get tree node by hash (raw bytes)
1176    pub fn get_tree_node(&self, hash: &[u8; 32]) -> Result<Option<TreeNode>> {
1177        let (tree, access_store) = self.access_tracking_tree();
1178
1179        let result = sync_block_on(async {
1180            tree.get_tree_node(hash)
1181                .await
1182                .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))
1183        })?;
1184        if result.is_some() {
1185            self.record_blob_accesses(access_store.take_accessed_hashes());
1186        }
1187        Ok(result)
1188    }
1189
1190    /// Store a raw blob, returns SHA256 hash as hex.
1191    pub fn put_blob(&self, data: &[u8]) -> Result<String> {
1192        let hash = sha256(data);
1193        let inserted = self
1194            .router
1195            .put_sync(hash, data)
1196            .map_err(|e| anyhow::anyhow!("Failed to store blob: {}", e))?;
1197        if !inserted {
1198            self.record_blob_access_now(&hash);
1199        }
1200        Ok(to_hex(&hash))
1201    }
1202
1203    /// Store an owned Blossom blob under the configured durable storage limit.
1204    pub fn put_owned_blob(&self, data: &[u8], pubkey: &[u8; 32]) -> Result<String> {
1205        let hash = sha256(data);
1206        if !self
1207            .router
1208            .exists(&hash)
1209            .map_err(|e| anyhow::anyhow!("Failed to check blob: {}", e))?
1210        {
1211            self.make_room_for_durable_blob(data.len() as u64)?;
1212            self.router
1213                .put_sync(hash, data)
1214                .map_err(|e| anyhow::anyhow!("Failed to store blob: {}", e))?;
1215        } else {
1216            self.record_blob_access_now(&hash);
1217        }
1218        self.set_blob_owner(&hash, pubkey)?;
1219        Ok(to_hex(&hash))
1220    }
1221
1222    /// Store an opportunistically cached blob.
1223    ///
1224    /// Unlike durable `put_blob` writes, this path may evict disposable orphaned
1225    /// blobs to make room under storage pressure. It intentionally avoids touching
1226    /// indexed trees, social-graph roots, explicit pins, and owned Blossom blobs.
1227    pub fn put_cached_blob(&self, data: &[u8]) -> Result<String> {
1228        let hash = sha256(data);
1229        if self
1230            .router
1231            .exists(&hash)
1232            .map_err(|e| anyhow::anyhow!("Failed to check cached blob: {}", e))?
1233        {
1234            self.record_blob_access_now(&hash);
1235            return Ok(to_hex(&hash));
1236        }
1237
1238        let incoming_bytes = data.len() as u64;
1239        let _ = self.make_room_for_cached_blob(incoming_bytes);
1240
1241        let mut retried_after_cleanup = false;
1242        loop {
1243            match self.router.put_sync(hash, data) {
1244                Ok(_) => return Ok(to_hex(&hash)),
1245                Err(err) if !retried_after_cleanup && is_map_full_store_error(&err) => {
1246                    let freed = self.relieve_cached_blob_write_pressure(incoming_bytes)?;
1247                    if freed == 0 {
1248                        return Err(anyhow::anyhow!("Failed to store cached blob: {}", err));
1249                    }
1250                    retried_after_cleanup = true;
1251                }
1252                Err(err) => return Err(anyhow::anyhow!("Failed to store cached blob: {}", err)),
1253            }
1254        }
1255    }
1256
1257    /// Get a raw blob by SHA256 hash (raw bytes).
1258    pub fn get_blob(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
1259        let data = self
1260            .router
1261            .get_sync(hash)
1262            .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))?;
1263        if data.is_some() {
1264            self.record_blob_accesses(std::iter::once(*hash));
1265        }
1266        Ok(data)
1267    }
1268
1269    pub fn get_blob_range(
1270        &self,
1271        hash: &[u8; 32],
1272        start: u64,
1273        end_inclusive: u64,
1274    ) -> Result<Option<Vec<u8>>> {
1275        let data = self
1276            .router
1277            .get_range_sync(hash, start, end_inclusive)
1278            .map_err(|e| anyhow::anyhow!("Failed to get blob range: {}", e))?;
1279        if data.is_some() {
1280            self.record_blob_accesses(std::iter::once(*hash));
1281        }
1282        Ok(data)
1283    }
1284
1285    pub fn blob_size(&self, hash: &[u8; 32]) -> Result<Option<u64>> {
1286        self.router
1287            .blob_size_sync(hash)
1288            .map_err(|e| anyhow::anyhow!("Failed to get blob size: {}", e))
1289    }
1290
1291    /// Check if a blob exists by SHA256 hash (raw bytes).
1292    pub fn blob_exists(&self, hash: &[u8; 32]) -> Result<bool> {
1293        self.router
1294            .exists(hash)
1295            .map_err(|e| anyhow::anyhow!("Failed to check blob: {}", e))
1296    }
1297
1298    // === Blossom ownership tracking ===
1299    // Uses composite key: sha256 (32 bytes) ++ pubkey (32 bytes) -> ()
1300    // This allows efficient multi-owner tracking with O(1) lookups
1301
1302    /// Build composite key for blob_owners: sha256 ++ pubkey (64 bytes total)
1303    fn blob_owner_key(sha256: &[u8; 32], pubkey: &[u8; 32]) -> [u8; 64] {
1304        let mut key = [0u8; 64];
1305        key[..32].copy_from_slice(sha256);
1306        key[32..].copy_from_slice(pubkey);
1307        key
1308    }
1309
1310    /// Add an owner (pubkey) to a blob for Blossom protocol
1311    /// Multiple users can own the same blob - it's only deleted when all owners remove it
1312    pub fn set_blob_owner(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<()> {
1313        let key = Self::blob_owner_key(sha256, pubkey);
1314        let mut wtxn = self.env.write_txn()?;
1315
1316        // Add ownership entry (idempotent - put overwrites)
1317        self.blob_owners.put(&mut wtxn, &key[..], &())?;
1318
1319        // Convert sha256 to hex for BlobMetadata (which stores sha256 as hex string)
1320        let sha256_hex = to_hex(sha256);
1321
1322        // Get existing blobs for this pubkey (for /list endpoint)
1323        let mut blobs: Vec<BlobMetadata> = self
1324            .pubkey_blobs
1325            .get(&wtxn, pubkey)?
1326            .and_then(|b| serde_json::from_slice(b).ok())
1327            .unwrap_or_default();
1328
1329        // Check if blob already exists for this pubkey
1330        if !blobs.iter().any(|b| b.sha256 == sha256_hex) {
1331            let now = SystemTime::now()
1332                .duration_since(UNIX_EPOCH)
1333                .unwrap()
1334                .as_secs();
1335
1336            // Only metadata is needed here. Reading the full blob body makes
1337            // duplicate Blossom uploads compete with normal content serving.
1338            let size = self
1339                .router
1340                .blob_size_sync(sha256)
1341                .map_err(|e| anyhow::anyhow!("Failed to get blob size: {}", e))?
1342                .unwrap_or(0);
1343
1344            blobs.push(BlobMetadata {
1345                sha256: sha256_hex,
1346                size,
1347                mime_type: "application/octet-stream".to_string(),
1348                uploaded: now,
1349            });
1350
1351            let blobs_json = serde_json::to_vec(&blobs)?;
1352            self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
1353        }
1354
1355        wtxn.commit()?;
1356        Ok(())
1357    }
1358
1359    /// Check if a pubkey owns a blob
1360    pub fn is_blob_owner(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<bool> {
1361        let key = Self::blob_owner_key(sha256, pubkey);
1362        let rtxn = self.env.read_txn()?;
1363        Ok(self.blob_owners.get(&rtxn, &key[..])?.is_some())
1364    }
1365
1366    /// Get all owners (pubkeys) of a blob via prefix scan (returns raw bytes)
1367    pub fn get_blob_owners(&self, sha256: &[u8; 32]) -> Result<Vec<[u8; 32]>> {
1368        let rtxn = self.env.read_txn()?;
1369
1370        let mut owners = Vec::new();
1371        for item in self.blob_owners.prefix_iter(&rtxn, &sha256[..])? {
1372            let (key, _) = item?;
1373            if key.len() == 64 {
1374                // Extract pubkey from composite key (bytes 32-64)
1375                let mut pubkey = [0u8; 32];
1376                pubkey.copy_from_slice(&key[32..64]);
1377                owners.push(pubkey);
1378            }
1379        }
1380        Ok(owners)
1381    }
1382
1383    /// Check if blob has any owners
1384    pub fn blob_has_owners(&self, sha256: &[u8; 32]) -> Result<bool> {
1385        let rtxn = self.env.read_txn()?;
1386
1387        // Just check if any entry exists with this prefix
1388        for item in self.blob_owners.prefix_iter(&rtxn, &sha256[..])? {
1389            if item.is_ok() {
1390                return Ok(true);
1391            }
1392        }
1393        Ok(false)
1394    }
1395
1396    /// Get the first owner (pubkey) of a blob (for backwards compatibility)
1397    pub fn get_blob_owner(&self, sha256: &[u8; 32]) -> Result<Option<[u8; 32]>> {
1398        Ok(self.get_blob_owners(sha256)?.into_iter().next())
1399    }
1400
1401    /// Remove a user's ownership of a blossom blob
1402    /// Only deletes the actual blob when no owners remain
1403    /// Returns true if the blob was actually deleted (no owners left)
1404    pub fn delete_blossom_blob(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<bool> {
1405        let key = Self::blob_owner_key(sha256, pubkey);
1406        let mut wtxn = self.env.write_txn()?;
1407
1408        // Remove this pubkey's ownership entry
1409        self.blob_owners.delete(&mut wtxn, &key[..])?;
1410
1411        // Hex strings for logging and BlobMetadata (which stores sha256 as hex string)
1412        let sha256_hex = to_hex(sha256);
1413
1414        // Remove from pubkey's blob list
1415        if let Some(blobs_bytes) = self.pubkey_blobs.get(&wtxn, pubkey)? {
1416            if let Ok(mut blobs) = serde_json::from_slice::<Vec<BlobMetadata>>(blobs_bytes) {
1417                blobs.retain(|b| b.sha256 != sha256_hex);
1418                let blobs_json = serde_json::to_vec(&blobs)?;
1419                self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
1420            }
1421        }
1422
1423        // Check if any other owners remain (prefix scan)
1424        let mut has_other_owners = false;
1425        for item in self.blob_owners.prefix_iter(&wtxn, &sha256[..])? {
1426            if item.is_ok() {
1427                has_other_owners = true;
1428                break;
1429            }
1430        }
1431
1432        if has_other_owners {
1433            wtxn.commit()?;
1434            tracing::debug!(
1435                "Removed {} from blob {} owners, other owners remain",
1436                &to_hex(pubkey)[..8],
1437                &sha256_hex[..8]
1438            );
1439            return Ok(false);
1440        }
1441
1442        // No owners left - delete the blob completely
1443        tracing::info!(
1444            "All owners removed from blob {}, deleting",
1445            &sha256_hex[..8]
1446        );
1447
1448        // Delete raw blob (by content hash) - this deletes from S3 too
1449        let _ = self.router.delete_sync(sha256);
1450
1451        wtxn.commit()?;
1452        Ok(true)
1453    }
1454
1455    /// List all blobs owned by a pubkey (for Blossom /list endpoint)
1456    pub fn list_blobs_by_pubkey(
1457        &self,
1458        pubkey: &[u8; 32],
1459    ) -> Result<Vec<crate::server::blossom::BlobDescriptor>> {
1460        let rtxn = self.env.read_txn()?;
1461
1462        let blobs: Vec<BlobMetadata> = self
1463            .pubkey_blobs
1464            .get(&rtxn, pubkey)?
1465            .and_then(|b| serde_json::from_slice(b).ok())
1466            .unwrap_or_default();
1467
1468        Ok(blobs
1469            .into_iter()
1470            .map(|b| crate::server::blossom::BlobDescriptor {
1471                url: format!("/{}", b.sha256),
1472                sha256: b.sha256,
1473                size: b.size,
1474                mime_type: b.mime_type,
1475                uploaded: b.uploaded,
1476            })
1477            .collect())
1478    }
1479
1480    /// Get a single chunk/blob by hash (raw bytes)
1481    pub fn get_chunk(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
1482        let data = self
1483            .router
1484            .get_sync(hash)
1485            .map_err(|e| anyhow::anyhow!("Failed to get chunk: {}", e))?;
1486        if data.is_some() {
1487            self.record_blob_accesses(std::iter::once(*hash));
1488        }
1489        Ok(data)
1490    }
1491
1492    /// Get file content by hash (raw bytes)
1493    /// Returns raw bytes (caller handles decryption if needed)
1494    pub fn get_file(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
1495        let (tree, access_store) = self.access_tracking_tree();
1496
1497        let result = sync_block_on(async {
1498            tree.read_file(hash)
1499                .await
1500                .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
1501        })?;
1502        if result.is_some() {
1503            self.record_blob_accesses(access_store.take_accessed_hashes());
1504        }
1505        Ok(result)
1506    }
1507
1508    /// Get file content by Cid (hash + optional decryption key as raw bytes)
1509    /// Handles decryption automatically if key is present
1510    pub fn get_file_by_cid(&self, cid: &Cid) -> Result<Option<Vec<u8>>> {
1511        let (tree, access_store) = self.access_tracking_tree();
1512
1513        let result = sync_block_on(async {
1514            tree.get(cid, None)
1515                .await
1516                .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
1517        })?;
1518        if result.is_some() {
1519            self.record_blob_accesses(access_store.take_accessed_hashes());
1520        }
1521        Ok(result)
1522    }
1523
1524    fn ensure_cid_exists(&self, cid: &Cid) -> Result<()> {
1525        let exists = self
1526            .router
1527            .exists(&cid.hash)
1528            .map_err(|e| anyhow::anyhow!("Failed to check cid existence: {}", e))?;
1529        if !exists {
1530            anyhow::bail!("CID not found: {}", to_hex(&cid.hash));
1531        }
1532        Ok(())
1533    }
1534
1535    /// Stream file content identified by Cid into a writer without buffering full file in memory.
1536    pub fn write_file_by_cid_to_writer<W: Write>(&self, cid: &Cid, writer: &mut W) -> Result<u64> {
1537        self.ensure_cid_exists(cid)?;
1538
1539        let (tree, access_store) = self.access_tracking_tree();
1540        let mut total_bytes = 0u64;
1541        let mut streamed_any_chunk = false;
1542
1543        sync_block_on(async {
1544            let mut stream = tree.get_stream(cid);
1545            while let Some(chunk) = stream.next().await {
1546                streamed_any_chunk = true;
1547                let chunk =
1548                    chunk.map_err(|e| anyhow::anyhow!("Failed to stream file chunk: {}", e))?;
1549                writer
1550                    .write_all(&chunk)
1551                    .map_err(|e| anyhow::anyhow!("Failed to write file chunk: {}", e))?;
1552                total_bytes += chunk.len() as u64;
1553            }
1554            Ok::<(), anyhow::Error>(())
1555        })?;
1556
1557        if !streamed_any_chunk {
1558            anyhow::bail!("CID not found: {}", to_hex(&cid.hash));
1559        }
1560        self.record_blob_accesses(access_store.take_accessed_hashes());
1561
1562        writer
1563            .flush()
1564            .map_err(|e| anyhow::anyhow!("Failed to flush output: {}", e))?;
1565        Ok(total_bytes)
1566    }
1567
1568    /// Stream file content identified by Cid directly into a destination path.
1569    pub fn write_file_by_cid<P: AsRef<Path>>(&self, cid: &Cid, output_path: P) -> Result<u64> {
1570        self.ensure_cid_exists(cid)?;
1571
1572        let output_path = output_path.as_ref();
1573        if let Some(parent) = output_path.parent() {
1574            if !parent.as_os_str().is_empty() {
1575                std::fs::create_dir_all(parent).with_context(|| {
1576                    format!("Failed to create output directory {}", parent.display())
1577                })?;
1578            }
1579        }
1580
1581        let mut file = std::fs::File::create(output_path)
1582            .with_context(|| format!("Failed to create output file {}", output_path.display()))?;
1583        self.write_file_by_cid_to_writer(cid, &mut file)
1584    }
1585
1586    /// Stream a public (unencrypted) file by hash directly into a destination path.
1587    pub fn write_file<P: AsRef<Path>>(&self, hash: &[u8; 32], output_path: P) -> Result<u64> {
1588        self.write_file_by_cid(&Cid::public(*hash), output_path)
1589    }
1590
1591    /// Resolve a path within a tree (returns Cid with key if encrypted)
1592    pub fn resolve_path(&self, cid: &Cid, path: &str) -> Result<Option<Cid>> {
1593        let (tree, access_store) = self.access_tracking_tree();
1594
1595        let result = sync_block_on(async {
1596            tree.resolve_path(cid, path)
1597                .await
1598                .map_err(|e| anyhow::anyhow!("Failed to resolve path: {}", e))
1599        })?;
1600        if result.is_some() {
1601            self.record_blob_accesses(access_store.take_accessed_hashes());
1602        }
1603        Ok(result)
1604    }
1605
1606    /// Get chunk metadata for a file (chunk list, sizes, total size)
1607    pub fn get_file_chunk_metadata(&self, hash: &[u8; 32]) -> Result<Option<FileChunkMetadata>> {
1608        let access_store = AccessRecordingStore::new(self.store_arc());
1609        let tree = HashTree::new(HashTreeConfig::new(Arc::new(access_store.clone())).public());
1610
1611        let metadata: Result<Option<FileChunkMetadata>> = sync_block_on(async {
1612            // First check if the hash exists in the store at all
1613            // (either as a blob or tree node)
1614            let exists = access_store
1615                .has(hash)
1616                .await
1617                .map_err(|e| anyhow::anyhow!("Failed to check existence: {}", e))?;
1618
1619            if !exists {
1620                return Ok(None);
1621            }
1622
1623            // Get total size
1624            let total_size = tree
1625                .get_size(hash)
1626                .await
1627                .map_err(|e| anyhow::anyhow!("Failed to get size: {}", e))?;
1628
1629            // Check if it's a tree (chunked) or blob
1630            let is_tree_node = tree
1631                .is_tree(hash)
1632                .await
1633                .map_err(|e| anyhow::anyhow!("Failed to check tree: {}", e))?;
1634
1635            if !is_tree_node {
1636                // Single blob, not chunked
1637                return Ok(Some(FileChunkMetadata {
1638                    total_size,
1639                    chunk_hashes: vec![],
1640                    chunk_sizes: vec![],
1641                    is_chunked: false,
1642                }));
1643            }
1644
1645            // Get tree node to extract chunk info
1646            let node = match tree
1647                .get_tree_node(hash)
1648                .await
1649                .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
1650            {
1651                Some(n) => n,
1652                None => return Ok(None),
1653            };
1654
1655            // Check if it's a directory (has named links)
1656            let is_directory = tree
1657                .is_directory(hash)
1658                .await
1659                .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1660
1661            if is_directory {
1662                return Ok(None); // Not a file
1663            }
1664
1665            // Extract chunk info from links
1666            let chunk_hashes: Vec<Hash> = node.links.iter().map(|l| l.hash).collect();
1667            let chunk_sizes: Vec<u64> = node.links.iter().map(|l| l.size).collect();
1668
1669            Ok(Some(FileChunkMetadata {
1670                total_size,
1671                chunk_hashes,
1672                chunk_sizes,
1673                is_chunked: !node.links.is_empty(),
1674            }))
1675        });
1676        let metadata = metadata?;
1677        if metadata.is_some() {
1678            self.record_blob_accesses(access_store.take_accessed_hashes());
1679        }
1680        Ok(metadata)
1681    }
1682
1683    /// Get byte range from file
1684    pub fn get_file_range(
1685        &self,
1686        hash: &[u8; 32],
1687        start: u64,
1688        end: Option<u64>,
1689    ) -> Result<Option<(Vec<u8>, u64)>> {
1690        let metadata = match self.get_file_chunk_metadata(hash)? {
1691            Some(m) => m,
1692            None => return Ok(None),
1693        };
1694
1695        if metadata.total_size == 0 {
1696            return Ok(Some((Vec::new(), 0)));
1697        }
1698
1699        if start >= metadata.total_size {
1700            return Ok(None);
1701        }
1702
1703        let end = end
1704            .unwrap_or(metadata.total_size - 1)
1705            .min(metadata.total_size - 1);
1706
1707        // For non-chunked files, load entire file
1708        if !metadata.is_chunked {
1709            let content = self.get_file(hash)?.unwrap_or_default();
1710            let range_content = if start < content.len() as u64 {
1711                content[start as usize..=(end as usize).min(content.len() - 1)].to_vec()
1712            } else {
1713                Vec::new()
1714            };
1715            return Ok(Some((range_content, metadata.total_size)));
1716        }
1717
1718        // For chunked files, load only needed chunks
1719        let mut result = Vec::new();
1720        let mut current_offset = 0u64;
1721
1722        for (i, chunk_hash) in metadata.chunk_hashes.iter().enumerate() {
1723            let chunk_size = metadata.chunk_sizes[i];
1724            let chunk_end = current_offset + chunk_size - 1;
1725
1726            // Check if this chunk overlaps with requested range
1727            if chunk_end >= start && current_offset <= end {
1728                let chunk_content = match self.get_chunk(chunk_hash)? {
1729                    Some(content) => content,
1730                    None => {
1731                        return Err(anyhow::anyhow!("Chunk {} not found", to_hex(chunk_hash)));
1732                    }
1733                };
1734
1735                let chunk_read_start = if current_offset >= start {
1736                    0
1737                } else {
1738                    (start - current_offset) as usize
1739                };
1740
1741                let chunk_read_end = if chunk_end <= end {
1742                    chunk_size as usize - 1
1743                } else {
1744                    (end - current_offset) as usize
1745                };
1746
1747                result.extend_from_slice(&chunk_content[chunk_read_start..=chunk_read_end]);
1748            }
1749
1750            current_offset += chunk_size;
1751
1752            if current_offset > end {
1753                break;
1754            }
1755        }
1756
1757        Ok(Some((result, metadata.total_size)))
1758    }
1759
1760    /// Stream file range as chunks using Arc for async/Send contexts
1761    pub fn stream_file_range_chunks_owned(
1762        self: Arc<Self>,
1763        hash: &[u8; 32],
1764        start: u64,
1765        end: u64,
1766    ) -> Result<Option<FileRangeChunksOwned>> {
1767        let metadata = match self.get_file_chunk_metadata(hash)? {
1768            Some(m) => m,
1769            None => return Ok(None),
1770        };
1771
1772        if metadata.total_size == 0 || start >= metadata.total_size {
1773            return Ok(None);
1774        }
1775
1776        let end = end.min(metadata.total_size - 1);
1777
1778        Ok(Some(FileRangeChunksOwned {
1779            store: self,
1780            metadata,
1781            start,
1782            end,
1783            current_chunk_idx: 0,
1784            current_offset: 0,
1785        }))
1786    }
1787
1788    /// Get directory structure by hash (raw bytes)
1789    pub fn get_directory_listing(&self, hash: &[u8; 32]) -> Result<Option<DirectoryListing>> {
1790        let (tree, access_store) = self.access_tracking_tree();
1791
1792        let listing: Result<Option<DirectoryListing>> = sync_block_on(async {
1793            // Check if it's a directory
1794            let is_dir = tree
1795                .is_directory(hash)
1796                .await
1797                .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1798
1799            if !is_dir {
1800                return Ok(None);
1801            }
1802
1803            // Get directory entries (public Cid - no encryption key)
1804            let cid = hashtree_core::Cid::public(*hash);
1805            let tree_entries = tree
1806                .list_directory(&cid)
1807                .await
1808                .map_err(|e| anyhow::anyhow!("Failed to list directory: {}", e))?;
1809
1810            let entries: Vec<DirEntry> = tree_entries
1811                .into_iter()
1812                .map(|e| DirEntry {
1813                    name: e.name,
1814                    cid: to_hex(&e.hash),
1815                    is_directory: e.link_type.is_tree(),
1816                    size: e.size,
1817                })
1818                .collect();
1819
1820            Ok(Some(DirectoryListing {
1821                dir_name: String::new(),
1822                entries,
1823            }))
1824        });
1825        let listing = listing?;
1826        if listing.is_some() {
1827            self.record_blob_accesses(access_store.take_accessed_hashes());
1828        }
1829        Ok(listing)
1830    }
1831
1832    /// Get directory structure by CID, supporting encrypted directories.
1833    pub fn get_directory_listing_by_cid(&self, cid: &Cid) -> Result<Option<DirectoryListing>> {
1834        let (tree, access_store) = self.access_tracking_tree();
1835        let cid = cid.clone();
1836
1837        let listing: Result<Option<DirectoryListing>> = sync_block_on(async {
1838            let is_dir = tree
1839                .is_dir(&cid)
1840                .await
1841                .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1842
1843            if !is_dir {
1844                return Ok(None);
1845            }
1846
1847            let tree_entries = tree
1848                .list_directory(&cid)
1849                .await
1850                .map_err(|e| anyhow::anyhow!("Failed to list directory: {}", e))?;
1851
1852            let entries: Vec<DirEntry> = tree_entries
1853                .into_iter()
1854                .map(|e| DirEntry {
1855                    name: e.name,
1856                    cid: Cid {
1857                        hash: e.hash,
1858                        key: e.key,
1859                    }
1860                    .to_string(),
1861                    is_directory: e.link_type.is_tree(),
1862                    size: e.size,
1863                })
1864                .collect();
1865
1866            Ok(Some(DirectoryListing {
1867                dir_name: String::new(),
1868                entries,
1869            }))
1870        });
1871        let listing = listing?;
1872        if listing.is_some() {
1873            self.record_blob_accesses(access_store.take_accessed_hashes());
1874        }
1875        Ok(listing)
1876    }
1877
1878    // === Cached roots ===
1879
1880    /// Persist a mutable published ref that should stay subscribed.
1881    pub fn add_pinned_ref(&self, key: &str) -> Result<()> {
1882        let mut wtxn = self.env.write_txn()?;
1883        self.pinned_refs.put(&mut wtxn, key, &())?;
1884        wtxn.commit()?;
1885        Ok(())
1886    }
1887
1888    /// Remove a mutable published ref from the live pinned set.
1889    pub fn remove_pinned_ref(&self, key: &str) -> Result<bool> {
1890        let mut wtxn = self.env.write_txn()?;
1891        let removed = self.pinned_refs.delete(&mut wtxn, key)?;
1892        wtxn.commit()?;
1893        Ok(removed)
1894    }
1895
1896    /// List mutable published refs that should stay subscribed.
1897    pub fn list_pinned_refs(&self) -> Result<Vec<String>> {
1898        let rtxn = self.env.read_txn()?;
1899        let mut refs = Vec::new();
1900
1901        for item in self.pinned_refs.iter(&rtxn)? {
1902            let (key, _) = item?;
1903            refs.push(key.to_string());
1904        }
1905
1906        refs.sort();
1907        Ok(refs)
1908    }
1909
1910    /// Persist an author whose published trees should stay mirrored.
1911    pub fn add_tracked_author(&self, npub: &str) -> Result<bool> {
1912        let mut wtxn = self.env.write_txn()?;
1913        let inserted = self.tracked_authors.get(&wtxn, npub)?.is_none();
1914        self.tracked_authors.put(&mut wtxn, npub, &())?;
1915        wtxn.commit()?;
1916        Ok(inserted)
1917    }
1918
1919    /// Remove an author from the continuous mirror set.
1920    pub fn remove_tracked_author(&self, npub: &str) -> Result<bool> {
1921        let mut wtxn = self.env.write_txn()?;
1922        let removed = self.tracked_authors.delete(&mut wtxn, npub)?;
1923        wtxn.commit()?;
1924        Ok(removed)
1925    }
1926
1927    /// List authors whose published trees should stay mirrored.
1928    pub fn list_tracked_authors(&self) -> Result<Vec<String>> {
1929        let rtxn = self.env.read_txn()?;
1930        let mut authors = Vec::new();
1931
1932        for item in self.tracked_authors.iter(&rtxn)? {
1933            let (npub, _) = item?;
1934            authors.push(npub.to_string());
1935        }
1936
1937        authors.sort();
1938        Ok(authors)
1939    }
1940
1941    /// Get cached root for a pubkey/tree_name pair
1942    pub fn get_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<Option<CachedRoot>> {
1943        let key = format!("{}/{}", pubkey_hex, tree_name);
1944        let rtxn = self.env.read_txn()?;
1945        if let Some(bytes) = self.cached_roots.get(&rtxn, &key)? {
1946            let root: CachedRoot = rmp_serde::from_slice(bytes)
1947                .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
1948            Ok(Some(root))
1949        } else {
1950            Ok(None)
1951        }
1952    }
1953
1954    /// Set cached root for a pubkey/tree_name pair
1955    pub fn set_cached_root(
1956        &self,
1957        pubkey_hex: &str,
1958        tree_name: &str,
1959        hash: &str,
1960        key: Option<&str>,
1961        visibility: &str,
1962        updated_at: u64,
1963    ) -> Result<()> {
1964        let db_key = format!("{}/{}", pubkey_hex, tree_name);
1965        let root = CachedRoot {
1966            hash: hash.to_string(),
1967            key: key.map(|k| k.to_string()),
1968            updated_at,
1969            visibility: visibility.to_string(),
1970        };
1971        let bytes = rmp_serde::to_vec(&root)
1972            .map_err(|e| anyhow::anyhow!("Failed to serialize CachedRoot: {}", e))?;
1973        let mut wtxn = self.env.write_txn()?;
1974        self.cached_roots.put(&mut wtxn, &db_key, &bytes)?;
1975        wtxn.commit()?;
1976        Ok(())
1977    }
1978
1979    /// List all cached roots for a pubkey
1980    pub fn list_cached_roots(&self, pubkey_hex: &str) -> Result<Vec<(String, CachedRoot)>> {
1981        let prefix = format!("{}/", pubkey_hex);
1982        let rtxn = self.env.read_txn()?;
1983        let mut results = Vec::new();
1984
1985        for item in self.cached_roots.iter(&rtxn)? {
1986            let (key, bytes) = item?;
1987            if key.starts_with(&prefix) {
1988                let tree_name = key.strip_prefix(&prefix).unwrap_or(key);
1989                let root: CachedRoot = rmp_serde::from_slice(bytes)
1990                    .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
1991                results.push((tree_name.to_string(), root));
1992            }
1993        }
1994
1995        Ok(results)
1996    }
1997
1998    /// Delete a cached root
1999    pub fn delete_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<bool> {
2000        let key = format!("{}/{}", pubkey_hex, tree_name);
2001        let mut wtxn = self.env.write_txn()?;
2002        let deleted = self.cached_roots.delete(&mut wtxn, &key)?;
2003        wtxn.commit()?;
2004        Ok(deleted)
2005    }
2006}
2007
2008fn is_map_full_store_error(err: &StoreError) -> bool {
2009    let message = err.to_string();
2010    message.contains("MDB_MAP_FULL") || message.contains("MapFull")
2011}
2012
2013#[derive(Debug, Clone)]
2014pub struct FileChunkMetadata {
2015    pub total_size: u64,
2016    pub chunk_hashes: Vec<Hash>,
2017    pub chunk_sizes: Vec<u64>,
2018    pub is_chunked: bool,
2019}
2020
2021/// Owned iterator for async streaming
2022pub struct FileRangeChunksOwned {
2023    store: Arc<HashtreeStore>,
2024    metadata: FileChunkMetadata,
2025    start: u64,
2026    end: u64,
2027    current_chunk_idx: usize,
2028    current_offset: u64,
2029}
2030
2031impl Iterator for FileRangeChunksOwned {
2032    type Item = Result<Vec<u8>>;
2033
2034    fn next(&mut self) -> Option<Self::Item> {
2035        if !self.metadata.is_chunked || self.current_chunk_idx >= self.metadata.chunk_hashes.len() {
2036            return None;
2037        }
2038
2039        if self.current_offset > self.end {
2040            return None;
2041        }
2042
2043        let chunk_hash = &self.metadata.chunk_hashes[self.current_chunk_idx];
2044        let chunk_size = self.metadata.chunk_sizes[self.current_chunk_idx];
2045        let chunk_end = self.current_offset + chunk_size - 1;
2046
2047        self.current_chunk_idx += 1;
2048
2049        if chunk_end < self.start || self.current_offset > self.end {
2050            self.current_offset += chunk_size;
2051            return self.next();
2052        }
2053
2054        let chunk_content = match self.store.get_chunk(chunk_hash) {
2055            Ok(Some(content)) => content,
2056            Ok(None) => {
2057                return Some(Err(anyhow::anyhow!(
2058                    "Chunk {} not found",
2059                    to_hex(chunk_hash)
2060                )));
2061            }
2062            Err(e) => {
2063                return Some(Err(e));
2064            }
2065        };
2066
2067        let chunk_read_start = if self.current_offset >= self.start {
2068            0
2069        } else {
2070            (self.start - self.current_offset) as usize
2071        };
2072
2073        let chunk_read_end = if chunk_end <= self.end {
2074            chunk_size as usize - 1
2075        } else {
2076            (self.end - self.current_offset) as usize
2077        };
2078
2079        let result = chunk_content[chunk_read_start..=chunk_read_end].to_vec();
2080        self.current_offset += chunk_size;
2081
2082        Some(Ok(result))
2083    }
2084}
2085
2086#[derive(Debug)]
2087pub struct GcStats {
2088    pub deleted_dags: usize,
2089    pub freed_bytes: u64,
2090}
2091
2092#[derive(Debug, Clone)]
2093pub struct DirEntry {
2094    pub name: String,
2095    pub cid: String,
2096    pub is_directory: bool,
2097    pub size: u64,
2098}
2099
2100#[derive(Debug, Clone)]
2101pub struct DirectoryListing {
2102    pub dir_name: String,
2103    pub entries: Vec<DirEntry>,
2104}
2105
2106/// Blob metadata for Blossom protocol
2107#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
2108pub struct BlobMetadata {
2109    pub sha256: String,
2110    pub size: u64,
2111    pub mime_type: String,
2112    pub uploaded: u64,
2113}
2114
2115// Implement ContentStore trait for WebRTC data exchange
2116impl crate::webrtc::ContentStore for HashtreeStore {
2117    fn get(&self, hash_hex: &str) -> Result<Option<Vec<u8>>> {
2118        let hash = from_hex(hash_hex).map_err(|e| anyhow::anyhow!("Invalid hash: {}", e))?;
2119        self.get_chunk(&hash)
2120    }
2121}
2122
2123#[cfg(test)]
2124mod tests {
2125    use super::*;
2126    #[cfg(feature = "lmdb")]
2127    use tempfile::TempDir;
2128
2129    #[test]
2130    fn blob_access_update_gate_deduplicates_and_throttles() {
2131        let gate = BlobAccessUpdateGate::default();
2132        let first = sha256(b"first");
2133        let second = sha256(b"second");
2134
2135        assert_eq!(
2136            gate.due_hashes([first, first, second], 10),
2137            vec![first, second]
2138        );
2139        assert!(gate.due_hashes([first, second], 11).is_empty());
2140        assert_eq!(
2141            gate.due_hashes([second, first], 10 + ACCESS_UPDATE_INTERVAL_SECS),
2142            vec![second, first]
2143        );
2144    }
2145
2146    #[cfg(feature = "lmdb")]
2147    #[test]
2148    fn hashtree_store_expands_blob_lmdb_map_size_to_storage_budget() -> Result<()> {
2149        let temp = TempDir::new()?;
2150        let requested = LMDB_BLOB_MIN_MAP_SIZE_BYTES + 64 * 1024 * 1024;
2151        let store = HashtreeStore::with_options_and_backend(
2152            temp.path(),
2153            None,
2154            requested,
2155            true,
2156            &StorageBackend::Lmdb,
2157        )?;
2158
2159        let map_size = match store.router.local.as_ref() {
2160            LocalStore::Lmdb(local) => local.map_size_bytes() as u64,
2161            LocalStore::Fs(_) => panic!("expected LMDB local store"),
2162        };
2163
2164        assert!(
2165            map_size >= requested,
2166            "expected blob LMDB map to grow to at least {requested} bytes, got {map_size}"
2167        );
2168
2169        drop(store);
2170        Ok(())
2171    }
2172
2173    #[cfg(feature = "lmdb")]
2174    #[test]
2175    fn local_store_can_override_lmdb_map_size() -> Result<()> {
2176        let temp = TempDir::new()?;
2177        let requested = 512 * 1024 * 1024u64;
2178        let store = LocalStore::new_with_lmdb_map_size(
2179            temp.path().join("lmdb-blobs"),
2180            &StorageBackend::Lmdb,
2181            Some(requested),
2182        )?;
2183
2184        let map_size = match store {
2185            LocalStore::Lmdb(local) => local.map_size_bytes() as u64,
2186            LocalStore::Fs(_) => panic!("expected LMDB local store"),
2187        };
2188
2189        assert!(
2190            map_size >= requested,
2191            "expected LMDB map to grow to at least {requested} bytes, got {map_size}"
2192        );
2193
2194        Ok(())
2195    }
2196
2197    #[cfg(feature = "lmdb")]
2198    #[test]
2199    fn lmdb_local_store_removes_stale_fs_blob_shard_dirs() -> Result<()> {
2200        let temp = TempDir::new()?;
2201        let path = temp.path().join("lmdb-blobs");
2202        std::fs::create_dir_all(path.join("aa"))?;
2203        std::fs::create_dir_all(path.join("b2"))?;
2204        std::fs::create_dir_all(path.join("keep-me"))?;
2205        std::fs::write(path.join("aa").join("blob.bin"), b"old fs shard")?;
2206        std::fs::write(path.join("b2").join("blob.bin"), b"old fs shard")?;
2207        std::fs::write(path.join("keep-me").join("note.txt"), b"keep")?;
2208
2209        let _store = LocalStore::new_with_lmdb_map_size(
2210            &path,
2211            &StorageBackend::Lmdb,
2212            Some(128 * 1024 * 1024),
2213        )?;
2214
2215        assert!(!path.join("aa").exists());
2216        assert!(!path.join("b2").exists());
2217        assert!(path.join("keep-me").exists());
2218        assert!(path.join("data.mdb").exists());
2219        assert!(path.join("lock.mdb").exists());
2220
2221        Ok(())
2222    }
2223
2224    #[cfg(feature = "lmdb")]
2225    #[test]
2226    fn duplicate_blossom_writes_refresh_blob_last_accessed() -> Result<()> {
2227        let temp = TempDir::new()?;
2228        let store = HashtreeStore::with_options_and_backend(
2229            temp.path(),
2230            None,
2231            LMDB_BLOB_MIN_MAP_SIZE_BYTES,
2232            true,
2233            &StorageBackend::Lmdb,
2234        )?;
2235
2236        let data = b"cached blossom duplicate";
2237        let hash = sha256(data);
2238        store.put_cached_blob(data)?;
2239        store.router.touch_accessed_sync(&hash, 1)?;
2240        store.put_cached_blob(data)?;
2241        assert!(store.blob_last_accessed_at(&hash)?.unwrap_or(0) > 1);
2242
2243        let owned = b"owned blossom duplicate";
2244        let owned_hash = sha256(owned);
2245        let owner = [7u8; 32];
2246        store.put_owned_blob(owned, &owner)?;
2247        store.router.touch_accessed_sync(&owned_hash, 1)?;
2248        store.put_owned_blob(owned, &owner)?;
2249        assert!(store.blob_last_accessed_at(&owned_hash)?.unwrap_or(0) > 1);
2250
2251        Ok(())
2252    }
2253
2254    #[cfg(feature = "lmdb")]
2255    #[test]
2256    fn replacing_tree_ref_unpins_and_unindexes_superseded_root() -> Result<()> {
2257        let temp = TempDir::new()?;
2258        let store = HashtreeStore::with_options_and_backend(
2259            temp.path(),
2260            None,
2261            LMDB_BLOB_MIN_MAP_SIZE_BYTES,
2262            true,
2263            &StorageBackend::Lmdb,
2264        )?;
2265
2266        let old_bytes = b"old published root";
2267        let new_bytes = b"new published root";
2268        let old_root = sha256(old_bytes);
2269        let new_root = sha256(new_bytes);
2270
2271        store.put_blob(old_bytes)?;
2272        store.pin(&old_root)?;
2273        store.index_tree(
2274            &old_root,
2275            "owner",
2276            Some("playlist"),
2277            PRIORITY_OWN,
2278            Some("npub1owner/playlist"),
2279        )?;
2280
2281        assert!(store.is_pinned(&old_root)?);
2282        assert!(store.get_tree_meta(&old_root)?.is_some());
2283
2284        store.put_blob(new_bytes)?;
2285        store.pin(&new_root)?;
2286        store.index_tree(
2287            &new_root,
2288            "owner",
2289            Some("playlist"),
2290            PRIORITY_OWN,
2291            Some("npub1owner/playlist"),
2292        )?;
2293
2294        assert!(
2295            !store.is_pinned(&old_root)?,
2296            "superseded root should be unpinned when ref is replaced"
2297        );
2298        assert!(
2299            store.get_tree_meta(&old_root)?.is_none(),
2300            "superseded root metadata should be removed when ref is replaced"
2301        );
2302        assert!(store.is_pinned(&new_root)?);
2303        assert!(store.get_tree_meta(&new_root)?.is_some());
2304
2305        Ok(())
2306    }
2307
2308    #[test]
2309    fn tracked_authors_round_trip_sorted_and_deduplicated() -> Result<()> {
2310        let temp = TempDir::new()?;
2311        let store = HashtreeStore::with_options(temp.path(), None, 1024 * 1024)?;
2312
2313        store
2314            .add_tracked_author("npub1zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzs9d3kk")?;
2315        store
2316            .add_tracked_author("npub1aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaqf5slm")?;
2317        store
2318            .add_tracked_author("npub1zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzs9d3kk")?;
2319
2320        assert_eq!(
2321            store.list_tracked_authors()?,
2322            vec![
2323                "npub1aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaqf5slm".to_string(),
2324                "npub1zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzs9d3kk".to_string(),
2325            ]
2326        );
2327        assert!(store.remove_tracked_author(
2328            "npub1aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaqf5slm"
2329        )?);
2330        assert!(!store.remove_tracked_author(
2331            "npub1bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbpqqqqq"
2332        )?);
2333        assert_eq!(
2334            store.list_tracked_authors()?,
2335            vec!["npub1zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzs9d3kk".to_string()]
2336        );
2337
2338        Ok(())
2339    }
2340
2341    #[cfg(feature = "s3")]
2342    #[test]
2343    fn async_store_s3_fallback_does_not_reenter_futures_executor() -> Result<()> {
2344        let temp = tempfile::TempDir::new()?;
2345        let local = Arc::new(LocalStore::new(
2346            temp.path().join("blobs"),
2347            &StorageBackend::Fs,
2348        )?);
2349
2350        let outcome = std::panic::catch_unwind(|| {
2351            sync_block_on(async {
2352                let aws_config = aws_config::from_env()
2353                    .region(aws_sdk_s3::config::Region::new("auto"))
2354                    .load()
2355                    .await;
2356                let s3_client = aws_sdk_s3::Client::from_conf(
2357                    aws_sdk_s3::config::Builder::from(&aws_config)
2358                        .endpoint_url("http://127.0.0.1:9")
2359                        .force_path_style(true)
2360                        .build(),
2361                );
2362
2363                let router = StorageRouter {
2364                    local,
2365                    s3_client: Some(s3_client),
2366                    s3_bucket: Some("test-bucket".to_string()),
2367                    s3_prefix: String::new(),
2368                    sync_tx: None,
2369                };
2370                let hash = [0u8; 32];
2371
2372                let _ = Store::has(&router, &hash).await;
2373                let _ = Store::get(&router, &hash).await;
2374            });
2375        });
2376
2377        assert!(
2378            outcome.is_ok(),
2379            "S3-backed async store methods should not panic inside futures::block_on"
2380        );
2381
2382        Ok(())
2383    }
2384}