1use anyhow::{Context, Result};
2use async_trait::async_trait;
3use futures::executor::block_on as sync_block_on;
4use futures::StreamExt;
5use hashtree_config::StorageBackend;
6use hashtree_core::store::{Store, StoreError};
7use hashtree_core::{
8 from_hex, sha256, to_hex, types::Hash, Cid, HashTree, HashTreeConfig, TreeNode,
9};
10use hashtree_fs::FsBlobStore;
11#[cfg(feature = "lmdb")]
12use hashtree_lmdb::LmdbBlobStore;
13use heed::types::*;
14use heed::{Database, EnvOpenOptions};
15use serde::{Deserialize, Serialize};
16use std::collections::{HashMap, HashSet};
17#[cfg(feature = "s3")]
18use std::future::Future;
19use std::io::Write;
20use std::path::{Path, PathBuf};
21use std::sync::atomic::{AtomicBool, Ordering};
22use std::sync::{Arc, Mutex};
23use std::time::{SystemTime, UNIX_EPOCH};
24
25mod upload;
26
27mod maintenance;
28mod retention;
29
30#[cfg(feature = "s3")]
31const DEFAULT_S3_SYNC_TIMEOUT_MS: u64 = 5_000;
32#[cfg(feature = "s3")]
33const S3_SYNC_TIMEOUT_MS_ENV: &str = "HTREE_S3_SYNC_TIMEOUT_MS";
34
35pub use maintenance::{
36 compact_lmdb_environments_under, CompactResult, R2ImportOptions, R2ImportResult, VerifyResult,
37};
38pub use retention::{OwnedBlobStats, PinnedItem, StorageByPriority, StorageStats, TreeMeta};
39
40pub const PRIORITY_OTHER: u8 = 64;
42pub const PRIORITY_FOLLOWED: u8 = 128;
43pub const PRIORITY_OWN: u8 = 255;
44const LMDB_MAX_READERS: u32 = 1024;
45const LMDB_METADATA_MIN_MAP_SIZE_BYTES: u64 = 64 * 1024 * 1024;
46const LMDB_METADATA_MAX_MAP_SIZE_BYTES: u64 = 64 * 1024 * 1024 * 1024;
47const LMDB_METADATA_STORAGE_RATIO_DIVISOR: u64 = 1024;
48#[cfg(feature = "lmdb")]
49const LMDB_BLOB_MIN_MAP_SIZE_BYTES: u64 = 16 * 1024 * 1024;
50const ACCESS_UPDATE_INTERVAL_SECS: u64 = 300;
51const ACCESS_UPDATE_GATE_MAX_ENTRIES: usize = 4096;
52const ACCESS_UPDATE_BACKGROUND_BATCH_LIMIT: usize = 1024;
53
54fn unix_timestamp_now() -> u64 {
55 SystemTime::now()
56 .duration_since(UNIX_EPOCH)
57 .unwrap_or_default()
58 .as_secs()
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct CachedRoot {
64 pub hash: String,
66 pub key: Option<String>,
68 pub updated_at: u64,
70 pub visibility: String,
72}
73
74#[derive(Debug, Clone)]
76pub struct LocalStoreStats {
77 pub count: usize,
78 pub total_bytes: u64,
79}
80
81#[derive(Default)]
82struct BlobAccessUpdateGate {
83 next_update_by_hash: Mutex<HashMap<Hash, u64>>,
84}
85
86impl BlobAccessUpdateGate {
87 fn due_hashes<I>(&self, hashes: I, now: u64) -> Vec<Hash>
88 where
89 I: IntoIterator<Item = Hash>,
90 {
91 let Ok(mut next_update_by_hash) = self.next_update_by_hash.try_lock() else {
92 return Vec::new();
93 };
94
95 if next_update_by_hash.len() >= ACCESS_UPDATE_GATE_MAX_ENTRIES {
96 next_update_by_hash.retain(|_, next_update| *next_update > now);
97 if next_update_by_hash.len() >= ACCESS_UPDATE_GATE_MAX_ENTRIES {
98 next_update_by_hash.clear();
99 }
100 }
101
102 let mut due = Vec::new();
103 let mut seen = HashSet::new();
104 for hash in hashes {
105 if !seen.insert(hash) {
106 continue;
107 }
108 if next_update_by_hash
109 .get(&hash)
110 .is_some_and(|next_update| now < *next_update)
111 {
112 continue;
113 }
114 next_update_by_hash.insert(hash, now.saturating_add(ACCESS_UPDATE_INTERVAL_SECS));
115 due.push(hash);
116 }
117 due
118 }
119}
120
121pub enum LocalStore {
123 Fs(FsBlobStore),
124 #[cfg(feature = "lmdb")]
125 Lmdb(LmdbBlobStore),
126}
127
128#[cfg(feature = "lmdb")]
129fn is_fs_blob_shard_dir(path: &Path) -> bool {
130 path.file_name()
131 .and_then(|name| name.to_str())
132 .map(|name| name.len() == 2 && name.as_bytes().iter().all(u8::is_ascii_hexdigit))
133 .unwrap_or(false)
134}
135
136fn lmdb_metadata_map_size_for_storage_budget(max_size_bytes: u64) -> u64 {
137 if max_size_bytes == 0 {
138 return LMDB_METADATA_MAX_MAP_SIZE_BYTES;
139 }
140
141 max_size_bytes
142 .saturating_div(LMDB_METADATA_STORAGE_RATIO_DIVISOR)
143 .clamp(
144 LMDB_METADATA_MIN_MAP_SIZE_BYTES,
145 LMDB_METADATA_MAX_MAP_SIZE_BYTES,
146 )
147}
148
149fn lmdb_map_size_for_existing_env(path: &Path, requested_bytes: u64) -> Result<usize> {
150 let existing_bytes = std::fs::metadata(path.join("data.mdb"))
151 .map(|metadata| metadata.len())
152 .unwrap_or(0);
153 let requested = align_lmdb_map_size(requested_bytes.max(existing_bytes));
154 usize::try_from(requested).context("LMDB map size exceeds usize")
155}
156
157fn align_lmdb_map_size(bytes: u64) -> u64 {
158 let page_size = (page_size::get() as u64).max(4096);
159 let remainder = bytes % page_size;
160 if remainder == 0 {
161 bytes
162 } else {
163 bytes.saturating_add(page_size - remainder)
164 }
165}
166
167#[cfg(feature = "lmdb")]
168fn remove_stale_fs_blob_shards(path: &Path) -> Result<(), StoreError> {
169 let entries = std::fs::read_dir(path).map_err(StoreError::Io)?;
170 for entry in entries {
171 let entry = entry.map_err(StoreError::Io)?;
172 let entry_path = entry.path();
173 if entry_path.is_dir() && is_fs_blob_shard_dir(&entry_path) {
174 std::fs::remove_dir_all(&entry_path).map_err(StoreError::Io)?;
175 tracing::info!(
176 "Removed stale filesystem blob shard directory after LMDB cutover: {}",
177 entry_path.display()
178 );
179 }
180 }
181 Ok(())
182}
183
184impl LocalStore {
185 pub fn new<P: AsRef<Path>>(path: P, backend: &StorageBackend) -> Result<Self, StoreError> {
191 Self::new_unbounded(path, backend)
192 }
193
194 pub fn new_with_lmdb_map_size<P: AsRef<Path>>(
199 path: P,
200 backend: &StorageBackend,
201 _map_size_bytes: Option<u64>,
202 ) -> Result<Self, StoreError> {
203 match backend {
204 StorageBackend::Fs => Ok(LocalStore::Fs(FsBlobStore::new(path)?)),
205 #[cfg(feature = "lmdb")]
206 StorageBackend::Lmdb => match _map_size_bytes {
207 Some(map_size_bytes) => {
208 std::fs::create_dir_all(path.as_ref()).map_err(StoreError::Io)?;
209 remove_stale_fs_blob_shards(path.as_ref())?;
210 Ok(LocalStore::Lmdb(LmdbBlobStore::with_max_bytes(
211 path,
212 map_size_bytes,
213 )?))
214 }
215 None => {
216 std::fs::create_dir_all(path.as_ref()).map_err(StoreError::Io)?;
217 remove_stale_fs_blob_shards(path.as_ref())?;
218 Ok(LocalStore::Lmdb(LmdbBlobStore::new(path)?))
219 }
220 },
221 #[cfg(not(feature = "lmdb"))]
222 StorageBackend::Lmdb => {
223 tracing::warn!(
224 "LMDB backend requested but lmdb feature not enabled, using filesystem storage"
225 );
226 Ok(LocalStore::Fs(FsBlobStore::new(path)?))
227 }
228 }
229 }
230
231 pub fn new_unbounded<P: AsRef<Path>>(
233 path: P,
234 backend: &StorageBackend,
235 ) -> Result<Self, StoreError> {
236 Self::new_with_lmdb_map_size(path, backend, None)
237 }
238
239 pub fn backend(&self) -> StorageBackend {
240 match self {
241 LocalStore::Fs(_) => StorageBackend::Fs,
242 #[cfg(feature = "lmdb")]
243 LocalStore::Lmdb(_) => StorageBackend::Lmdb,
244 }
245 }
246
247 pub fn put_sync(&self, hash: Hash, data: &[u8]) -> Result<bool, StoreError> {
249 match self {
250 LocalStore::Fs(store) => store.put_sync(hash, data),
251 #[cfg(feature = "lmdb")]
252 LocalStore::Lmdb(store) => store.put_sync(hash, data),
253 }
254 }
255
256 pub fn put_many_sync(&self, items: &[(Hash, Vec<u8>)]) -> Result<usize, StoreError> {
258 match self {
259 LocalStore::Fs(store) => {
260 let mut inserted = 0usize;
261 for (hash, data) in items {
262 if store.put_sync(*hash, data.as_slice())? {
263 inserted += 1;
264 }
265 }
266 Ok(inserted)
267 }
268 #[cfg(feature = "lmdb")]
269 LocalStore::Lmdb(store) => store.put_many_sync(items),
270 }
271 }
272
273 pub fn get_sync(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
275 match self {
276 LocalStore::Fs(store) => store.get_sync(hash),
277 #[cfg(feature = "lmdb")]
278 LocalStore::Lmdb(store) => store.get_sync(hash),
279 }
280 }
281
282 pub fn get_range_sync(
283 &self,
284 hash: &Hash,
285 start: u64,
286 end_inclusive: u64,
287 ) -> Result<Option<Vec<u8>>, StoreError> {
288 match self {
289 LocalStore::Fs(store) => store.get_range_sync(hash, start, end_inclusive),
290 #[cfg(feature = "lmdb")]
291 LocalStore::Lmdb(store) => store.get_range_sync(hash, start, end_inclusive),
292 }
293 }
294
295 pub fn blob_size_sync(&self, hash: &Hash) -> Result<Option<u64>, StoreError> {
296 match self {
297 LocalStore::Fs(store) => store.blob_size_sync(hash),
298 #[cfg(feature = "lmdb")]
299 LocalStore::Lmdb(store) => store.blob_size_sync(hash),
300 }
301 }
302
303 pub fn touch_accessed_sync(&self, hash: &Hash, now: u64) -> Result<bool, StoreError> {
304 match self {
305 LocalStore::Fs(store) => store.touch_accessed_sync(hash, now),
306 #[cfg(feature = "lmdb")]
307 LocalStore::Lmdb(store) => store.touch_accessed_sync(hash, now),
308 }
309 }
310
311 pub fn touch_many_accessed_sync(&self, hashes: &[Hash], now: u64) -> Result<usize, StoreError> {
312 match self {
313 LocalStore::Fs(store) => store.touch_many_accessed_sync(hashes, now),
314 #[cfg(feature = "lmdb")]
315 LocalStore::Lmdb(store) => store.touch_many_accessed_sync(hashes, now),
316 }
317 }
318
319 pub fn last_accessed_at_sync(&self, hash: &Hash) -> Result<Option<u64>, StoreError> {
320 match self {
321 LocalStore::Fs(store) => store.last_accessed_at_sync(hash),
322 #[cfg(feature = "lmdb")]
323 LocalStore::Lmdb(store) => store.last_accessed_at_sync(hash),
324 }
325 }
326
327 pub fn many_last_accessed_at_sync(
328 &self,
329 hashes: &[Hash],
330 ) -> Result<Vec<(Hash, u64)>, StoreError> {
331 match self {
332 LocalStore::Fs(store) => store.many_last_accessed_at_sync(hashes),
333 #[cfg(feature = "lmdb")]
334 LocalStore::Lmdb(store) => store.many_last_accessed_at_sync(hashes),
335 }
336 }
337
338 pub fn exists(&self, hash: &Hash) -> Result<bool, StoreError> {
340 match self {
341 LocalStore::Fs(store) => Ok(store.exists(hash)),
342 #[cfg(feature = "lmdb")]
343 LocalStore::Lmdb(store) => store.exists(hash),
344 }
345 }
346
347 pub fn existing_hashes_in_sorted_candidates(
349 &self,
350 sorted_hashes: &[Hash],
351 ) -> Result<Vec<bool>, StoreError> {
352 match self {
353 LocalStore::Fs(store) => Ok(sorted_hashes
354 .iter()
355 .map(|hash| store.exists(hash))
356 .collect()),
357 #[cfg(feature = "lmdb")]
358 LocalStore::Lmdb(store) => store.existing_hashes_in_sorted_candidates(sorted_hashes),
359 }
360 }
361
362 pub fn delete_sync(&self, hash: &Hash) -> Result<bool, StoreError> {
364 match self {
365 LocalStore::Fs(store) => store.delete_sync(hash),
366 #[cfg(feature = "lmdb")]
367 LocalStore::Lmdb(store) => store.delete_sync(hash),
368 }
369 }
370
371 pub fn stats(&self) -> Result<LocalStoreStats, StoreError> {
373 match self {
374 LocalStore::Fs(store) => {
375 let stats = store.stats()?;
376 Ok(LocalStoreStats {
377 count: stats.count,
378 total_bytes: stats.total_bytes,
379 })
380 }
381 #[cfg(feature = "lmdb")]
382 LocalStore::Lmdb(store) => {
383 let stats = store.stats()?;
384 Ok(LocalStoreStats {
385 count: stats.count,
386 total_bytes: stats.total_bytes,
387 })
388 }
389 }
390 }
391
392 pub fn list(&self) -> Result<Vec<Hash>, StoreError> {
394 match self {
395 LocalStore::Fs(store) => store.list(),
396 #[cfg(feature = "lmdb")]
397 LocalStore::Lmdb(store) => store.list(),
398 }
399 }
400}
401
402#[async_trait]
403impl Store for LocalStore {
404 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
405 self.put_sync(hash, &data)
406 }
407
408 async fn put_many(&self, items: Vec<(Hash, Vec<u8>)>) -> Result<usize, StoreError> {
409 self.put_many_sync(&items)
410 }
411
412 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
413 self.get_sync(hash)
414 }
415
416 async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
417 self.exists(hash)
418 }
419
420 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
421 self.delete_sync(hash)
422 }
423}
424
425#[cfg(feature = "s3")]
426use tokio::sync::mpsc;
427
428use crate::config::S3Config;
429
430#[cfg(feature = "s3")]
432enum S3SyncMessage {
433 Upload { hash: Hash, data: Vec<u8> },
434 Delete { hash: Hash },
435}
436
437pub struct StorageRouter {
442 local: Arc<LocalStore>,
444 #[cfg(feature = "s3")]
446 s3_client: Option<aws_sdk_s3::Client>,
447 #[cfg(feature = "s3")]
448 s3_bucket: Option<String>,
449 #[cfg(feature = "s3")]
450 s3_prefix: String,
451 #[cfg(feature = "s3")]
453 sync_tx: Option<mpsc::UnboundedSender<S3SyncMessage>>,
454}
455
456impl StorageRouter {
457 #[cfg(feature = "s3")]
458 fn s3_sync_timeout() -> std::time::Duration {
459 let millis = std::env::var(S3_SYNC_TIMEOUT_MS_ENV)
460 .ok()
461 .and_then(|value| value.parse::<u64>().ok())
462 .filter(|value| *value > 0)
463 .unwrap_or(DEFAULT_S3_SYNC_TIMEOUT_MS);
464 std::time::Duration::from_millis(millis)
465 }
466
467 #[cfg(feature = "s3")]
468 fn s3_sync_timeout_error(timeout: std::time::Duration) -> StoreError {
469 StoreError::Other(format!(
470 "S3 sync operation timed out after {}ms",
471 timeout.as_millis()
472 ))
473 }
474
475 #[cfg(feature = "s3")]
476 fn run_s3_future_sync<F, T>(future: F) -> Result<T, StoreError>
477 where
478 F: Future<Output = T> + Send + 'static,
479 T: Send + 'static,
480 {
481 let timeout = Self::s3_sync_timeout();
482 if tokio::runtime::Handle::try_current().is_ok() {
483 return std::thread::Builder::new()
484 .name("storage-s3-sync".to_string())
485 .spawn(move || {
486 let runtime = tokio::runtime::Builder::new_current_thread()
487 .enable_all()
488 .build()
489 .map_err(|err| {
490 StoreError::Other(format!("build storage s3 sync runtime: {err}"))
491 })?;
492 runtime.block_on(async move {
493 tokio::time::timeout(timeout, future)
494 .await
495 .map_err(|_| Self::s3_sync_timeout_error(timeout))
496 })
497 })
498 .map_err(|err| StoreError::Other(format!("spawn S3 sync helper thread: {err}")))?
499 .join()
500 .map_err(|_| StoreError::Other("S3 sync helper thread panicked".to_string()))?;
501 }
502
503 let runtime = tokio::runtime::Builder::new_current_thread()
504 .enable_all()
505 .build()
506 .map_err(|err| StoreError::Other(format!("build storage s3 sync runtime: {err}")))?;
507 runtime.block_on(async move {
508 tokio::time::timeout(timeout, future)
509 .await
510 .map_err(|_| Self::s3_sync_timeout_error(timeout))
511 })
512 }
513
514 pub fn new(local: Arc<LocalStore>) -> Self {
516 Self {
517 local,
518 #[cfg(feature = "s3")]
519 s3_client: None,
520 #[cfg(feature = "s3")]
521 s3_bucket: None,
522 #[cfg(feature = "s3")]
523 s3_prefix: String::new(),
524 #[cfg(feature = "s3")]
525 sync_tx: None,
526 }
527 }
528
529 #[cfg(feature = "s3")]
531 pub async fn with_s3(local: Arc<LocalStore>, config: &S3Config) -> Result<Self, anyhow::Error> {
532 use aws_sdk_s3::Client as S3Client;
533
534 let mut aws_config_loader = aws_config::from_env();
536 aws_config_loader =
537 aws_config_loader.region(aws_sdk_s3::config::Region::new(config.region.clone()));
538 let aws_config = aws_config_loader.load().await;
539
540 let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&aws_config);
542 s3_config_builder = s3_config_builder
543 .endpoint_url(&config.endpoint)
544 .force_path_style(true);
545
546 let s3_client = S3Client::from_conf(s3_config_builder.build());
547 let bucket = config.bucket.clone();
548 let prefix = config.prefix.clone().unwrap_or_default();
549
550 let (sync_tx, mut sync_rx) = mpsc::unbounded_channel::<S3SyncMessage>();
552
553 let sync_client = s3_client.clone();
555 let sync_bucket = bucket.clone();
556 let sync_prefix = prefix.clone();
557
558 tokio::spawn(async move {
559 use aws_sdk_s3::primitives::ByteStream;
560
561 tracing::info!("S3 background sync task started");
562
563 let semaphore = std::sync::Arc::new(tokio::sync::Semaphore::new(8));
565 let client = std::sync::Arc::new(sync_client);
566 let bucket = std::sync::Arc::new(sync_bucket);
567 let prefix = std::sync::Arc::new(sync_prefix);
568
569 while let Some(msg) = sync_rx.recv().await {
570 let client = client.clone();
571 let bucket = bucket.clone();
572 let prefix = prefix.clone();
573 let semaphore = semaphore.clone();
574
575 tokio::spawn(async move {
577 let _permit = semaphore.acquire().await;
579
580 match msg {
581 S3SyncMessage::Upload { hash, data } => {
582 let key = format!("{}{}.bin", prefix, to_hex(&hash));
583 tracing::debug!("S3 uploading {} ({} bytes)", &key, data.len());
584
585 let mut attempt = 1u8;
586 loop {
587 match client
588 .put_object()
589 .bucket(bucket.as_str())
590 .key(&key)
591 .body(ByteStream::from(data.clone()))
592 .send()
593 .await
594 {
595 Ok(_) => {
596 tracing::debug!("S3 upload succeeded: {}", &key);
597 break;
598 }
599 Err(e) if attempt < 3 => {
600 tracing::warn!(
601 "S3 upload retrying {}: attempt={} error={}",
602 &key,
603 attempt,
604 e
605 );
606 tokio::time::sleep(std::time::Duration::from_millis(
607 250 * u64::from(attempt),
608 ))
609 .await;
610 attempt += 1;
611 }
612 Err(e) => {
613 tracing::error!(
614 "S3 upload failed {} after {} attempts: {}",
615 &key,
616 attempt,
617 e
618 );
619 break;
620 }
621 }
622 }
623 }
624 S3SyncMessage::Delete { hash } => {
625 let key = format!("{}{}.bin", prefix, to_hex(&hash));
626 tracing::debug!("S3 deleting {}", &key);
627
628 let mut attempt = 1u8;
629 loop {
630 match client
631 .delete_object()
632 .bucket(bucket.as_str())
633 .key(&key)
634 .send()
635 .await
636 {
637 Ok(_) => break,
638 Err(e) if attempt < 3 => {
639 tracing::warn!(
640 "S3 delete retrying {}: attempt={} error={}",
641 &key,
642 attempt,
643 e
644 );
645 tokio::time::sleep(std::time::Duration::from_millis(
646 250 * u64::from(attempt),
647 ))
648 .await;
649 attempt += 1;
650 }
651 Err(e) => {
652 tracing::error!(
653 "S3 delete failed {} after {} attempts: {}",
654 &key,
655 attempt,
656 e
657 );
658 break;
659 }
660 }
661 }
662 }
663 }
664 });
665 }
666 });
667
668 tracing::info!(
669 "S3 storage initialized: bucket={}, prefix={}",
670 bucket,
671 prefix
672 );
673
674 Ok(Self {
675 local,
676 s3_client: Some(s3_client),
677 s3_bucket: Some(bucket),
678 s3_prefix: prefix,
679 sync_tx: Some(sync_tx),
680 })
681 }
682
683 pub fn put_sync(&self, hash: Hash, data: &[u8]) -> Result<bool, StoreError> {
685 let is_new = self.local.put_sync(hash, data)?;
687
688 #[cfg(feature = "s3")]
691 if is_new {
692 if let Some(ref tx) = self.sync_tx {
693 tracing::debug!(
694 "Queueing S3 upload for {} ({} bytes)",
695 crate::storage::to_hex(&hash)[..16].to_string(),
696 data.len(),
697 );
698 if let Err(e) = tx.send(S3SyncMessage::Upload {
699 hash,
700 data: data.to_vec(),
701 }) {
702 tracing::error!("Failed to queue S3 upload: {}", e);
703 }
704 }
705 }
706
707 Ok(is_new)
708 }
709
710 pub fn put_many_sync(&self, items: &[(Hash, Vec<u8>)]) -> Result<usize, StoreError> {
712 #[cfg(feature = "s3")]
713 let pending_uploads = if self.sync_tx.is_some() {
714 let mut pending = Vec::new();
715 for (hash, data) in items {
716 if !self.local.exists(hash)? {
717 pending.push((*hash, data.clone()));
718 }
719 }
720 pending
721 } else {
722 Vec::new()
723 };
724
725 let inserted = self.local.put_many_sync(items)?;
726
727 #[cfg(feature = "s3")]
728 if let Some(ref tx) = self.sync_tx {
729 for (hash, data) in pending_uploads {
730 if let Err(e) = tx.send(S3SyncMessage::Upload { hash, data }) {
731 tracing::error!("Failed to queue S3 upload: {}", e);
732 }
733 }
734 }
735
736 Ok(inserted)
737 }
738
739 pub fn get_sync(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
741 if let Some(data) = self.local.get_sync(hash)? {
743 return Ok(Some(data));
744 }
745
746 #[cfg(feature = "s3")]
748 if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
749 let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
750 let client = client.clone();
751 let bucket = bucket.clone();
752
753 match Self::run_s3_future_sync(async move {
754 client.get_object().bucket(bucket).key(key).send().await
755 }) {
756 Ok(Ok(output)) => {
757 match Self::run_s3_future_sync(async move { output.body.collect().await }) {
758 Ok(Ok(body)) => {
759 let data = body.into_bytes().to_vec();
760 let _ = self.local.put_sync(*hash, &data);
762 return Ok(Some(data));
763 }
764 Ok(Err(err)) => {
765 tracing::warn!("S3 body collect failed: {}", err);
766 }
767 Err(err) => {
768 tracing::warn!("S3 body collect runtime failed: {}", err);
769 }
770 }
771 }
772 Ok(Err(err)) => {
773 let service_err = err.into_service_error();
774 if !service_err.is_no_such_key() {
775 tracing::warn!("S3 get failed: {}", service_err);
776 }
777 }
778 Err(err) => {
779 tracing::warn!("S3 get runtime failed: {}", err);
780 }
781 }
782 }
783
784 Ok(None)
785 }
786
787 pub fn get_range_sync(
788 &self,
789 hash: &Hash,
790 start: u64,
791 end_inclusive: u64,
792 ) -> Result<Option<Vec<u8>>, StoreError> {
793 self.local.get_range_sync(hash, start, end_inclusive)
794 }
795
796 pub fn blob_size_sync(&self, hash: &Hash) -> Result<Option<u64>, StoreError> {
797 self.local.blob_size_sync(hash)
798 }
799
800 pub fn touch_accessed_sync(&self, hash: &Hash, now: u64) -> Result<bool, StoreError> {
801 self.local.touch_accessed_sync(hash, now)
802 }
803
804 pub fn touch_many_accessed_sync(&self, hashes: &[Hash], now: u64) -> Result<usize, StoreError> {
805 self.local.touch_many_accessed_sync(hashes, now)
806 }
807
808 pub fn last_accessed_at_sync(&self, hash: &Hash) -> Result<Option<u64>, StoreError> {
809 self.local.last_accessed_at_sync(hash)
810 }
811
812 pub fn many_last_accessed_at_sync(
813 &self,
814 hashes: &[Hash],
815 ) -> Result<Vec<(Hash, u64)>, StoreError> {
816 self.local.many_last_accessed_at_sync(hashes)
817 }
818
819 pub fn exists(&self, hash: &Hash) -> Result<bool, StoreError> {
821 if self.local.exists(hash)? {
823 return Ok(true);
824 }
825
826 #[cfg(feature = "s3")]
828 if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
829 let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
830 let client = client.clone();
831 let bucket = bucket.clone();
832
833 match Self::run_s3_future_sync(async move {
834 client.head_object().bucket(bucket).key(&key).send().await
835 }) {
836 Ok(Ok(_)) => return Ok(true),
837 Ok(Err(err)) => {
838 let service_err = err.into_service_error();
839 if !service_err.is_not_found() {
840 tracing::warn!("S3 head failed: {}", service_err);
841 }
842 }
843 Err(err) => {
844 tracing::warn!("S3 head runtime failed: {}", err);
845 }
846 }
847 }
848
849 Ok(false)
850 }
851
852 pub fn delete_sync(&self, hash: &Hash) -> Result<bool, StoreError> {
854 let deleted = self.local.delete_sync(hash)?;
855
856 #[cfg(feature = "s3")]
858 if let Some(ref tx) = self.sync_tx {
859 let _ = tx.send(S3SyncMessage::Delete { hash: *hash });
860 }
861
862 Ok(deleted)
863 }
864
865 pub fn delete_local_only(&self, hash: &Hash) -> Result<bool, StoreError> {
868 self.local.delete_sync(hash)
869 }
870
871 pub fn stats(&self) -> Result<LocalStoreStats, StoreError> {
873 self.local.stats()
874 }
875
876 pub fn list(&self) -> Result<Vec<Hash>, StoreError> {
878 self.local.list()
879 }
880
881 pub fn local_store(&self) -> Arc<LocalStore> {
883 Arc::clone(&self.local)
884 }
885}
886
887#[derive(Clone)]
888struct AccessRecordingStore {
889 inner: Arc<StorageRouter>,
890 accessed: Arc<Mutex<HashSet<Hash>>>,
891}
892
893impl AccessRecordingStore {
894 fn new(inner: Arc<StorageRouter>) -> Self {
895 Self {
896 inner,
897 accessed: Arc::new(Mutex::new(HashSet::new())),
898 }
899 }
900
901 fn take_accessed_hashes(&self) -> Vec<Hash> {
902 let Ok(mut accessed) = self.accessed.lock() else {
903 return Vec::new();
904 };
905 accessed.drain().collect()
906 }
907
908 fn record_access(&self, hash: &Hash) {
909 let Ok(mut accessed) = self.accessed.lock() else {
910 return;
911 };
912 accessed.insert(*hash);
913 }
914}
915
916#[async_trait]
917impl Store for AccessRecordingStore {
918 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
919 self.inner.put(hash, data).await
920 }
921
922 async fn put_many(&self, items: Vec<(Hash, Vec<u8>)>) -> Result<usize, StoreError> {
923 self.inner.put_many(items).await
924 }
925
926 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
927 let data = self.inner.get(hash).await?;
928 if data.is_some() {
929 self.record_access(hash);
930 }
931 Ok(data)
932 }
933
934 async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
935 self.inner.has(hash).await
936 }
937
938 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
939 self.inner.delete(hash).await
940 }
941}
942
943#[async_trait]
946impl Store for StorageRouter {
947 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
948 self.put_sync(hash, &data)
949 }
950
951 async fn put_many(&self, items: Vec<(Hash, Vec<u8>)>) -> Result<usize, StoreError> {
952 self.put_many_sync(&items)
953 }
954
955 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
956 self.get_sync(hash)
957 }
958
959 async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
960 self.exists(hash)
961 }
962
963 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
964 self.delete_sync(hash)
965 }
966}
967
968pub struct HashtreeStore {
969 base_path: PathBuf,
970 env: heed::Env,
971 pins: Database<Bytes, Unit>,
973 pinned_refs: Database<Str, Unit>,
975 tracked_authors: Database<Str, Unit>,
977 blob_owners: Database<Bytes, Unit>,
979 pubkey_blobs: Database<Bytes, Bytes>,
981 tree_meta: Database<Bytes, Bytes>,
983 blob_trees: Database<Bytes, Unit>,
985 tree_refs: Database<Str, Bytes>,
987 cached_roots: Database<Str, Bytes>,
989 router: Arc<StorageRouter>,
991 max_size_bytes: u64,
993 evict_orphans: bool,
995 blob_access_update_gate: BlobAccessUpdateGate,
997 blob_access_update_inflight: Arc<AtomicBool>,
999}
1000
1001impl HashtreeStore {
1002 pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
1004 let config = hashtree_config::Config::load_or_default();
1005 let max_size_bytes = config
1006 .storage
1007 .max_size_gb
1008 .saturating_mul(1024 * 1024 * 1024);
1009 Self::with_options_and_backend(
1010 path,
1011 None,
1012 max_size_bytes,
1013 config.storage.evict_orphans,
1014 &config.storage.backend,
1015 )
1016 }
1017
1018 pub fn new_with_backend<P: AsRef<Path>>(
1020 path: P,
1021 backend: hashtree_config::StorageBackend,
1022 max_size_bytes: u64,
1023 ) -> Result<Self> {
1024 Self::with_options_and_backend(path, None, max_size_bytes, true, &backend)
1025 }
1026
1027 pub fn with_s3<P: AsRef<Path>>(path: P, s3_config: Option<&S3Config>) -> Result<Self> {
1029 let config = hashtree_config::Config::load_or_default();
1030 let max_size_bytes = config
1031 .storage
1032 .max_size_gb
1033 .saturating_mul(1024 * 1024 * 1024);
1034 Self::with_options_and_backend(
1035 path,
1036 s3_config,
1037 max_size_bytes,
1038 config.storage.evict_orphans,
1039 &config.storage.backend,
1040 )
1041 }
1042
1043 pub fn with_options<P: AsRef<Path>>(
1049 path: P,
1050 s3_config: Option<&S3Config>,
1051 max_size_bytes: u64,
1052 ) -> Result<Self> {
1053 let config = hashtree_config::Config::load_or_default();
1054 Self::with_options_and_backend(
1055 path,
1056 s3_config,
1057 max_size_bytes,
1058 config.storage.evict_orphans,
1059 &config.storage.backend,
1060 )
1061 }
1062
1063 pub fn with_options_and_backend<P: AsRef<Path>>(
1064 path: P,
1065 s3_config: Option<&S3Config>,
1066 max_size_bytes: u64,
1067 evict_orphans: bool,
1068 backend: &hashtree_config::StorageBackend,
1069 ) -> Result<Self> {
1070 let path = path.as_ref();
1071 std::fs::create_dir_all(path)?;
1072 let metadata_map_size = lmdb_map_size_for_existing_env(
1073 path,
1074 lmdb_metadata_map_size_for_storage_budget(max_size_bytes),
1075 )?;
1076
1077 let env = unsafe {
1078 EnvOpenOptions::new()
1079 .map_size(metadata_map_size)
1080 .max_dbs(10) .max_readers(LMDB_MAX_READERS)
1082 .open(path)?
1083 };
1084 let _ = env.clear_stale_readers();
1085 if env.info().map_size < metadata_map_size {
1086 unsafe { env.resize(metadata_map_size) }?;
1087 }
1088
1089 let mut wtxn = env.write_txn()?;
1090 let pins = env.create_database(&mut wtxn, Some("pins"))?;
1091 let pinned_refs = env.create_database(&mut wtxn, Some("pinned_refs"))?;
1092 let tracked_authors = env.create_database(&mut wtxn, Some("tracked_authors"))?;
1093 let blob_owners = env.create_database(&mut wtxn, Some("blob_owners"))?;
1094 let pubkey_blobs = env.create_database(&mut wtxn, Some("pubkey_blobs"))?;
1095 let tree_meta = env.create_database(&mut wtxn, Some("tree_meta"))?;
1096 let blob_trees = env.create_database(&mut wtxn, Some("blob_trees"))?;
1097 let tree_refs = env.create_database(&mut wtxn, Some("tree_refs"))?;
1098 let cached_roots = env.create_database(&mut wtxn, Some("cached_roots"))?;
1099 wtxn.commit()?;
1100
1101 let local_store = Arc::new(match backend {
1105 hashtree_config::StorageBackend::Fs => LocalStore::Fs(
1106 FsBlobStore::new(path.join("blobs"))
1107 .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
1108 ),
1109 #[cfg(feature = "lmdb")]
1110 hashtree_config::StorageBackend::Lmdb => {
1111 std::fs::create_dir_all(path.join("blobs"))?;
1112 remove_stale_fs_blob_shards(&path.join("blobs"))
1113 .map_err(|e| anyhow::anyhow!("Failed to clean LMDB blob store path: {}", e))?;
1114 let requested_map_size = max_size_bytes.max(LMDB_BLOB_MIN_MAP_SIZE_BYTES);
1115 let map_size = usize::try_from(requested_map_size)
1116 .context("LMDB blob map size exceeds usize")?;
1117 LocalStore::Lmdb(
1118 LmdbBlobStore::with_map_size(path.join("blobs"), map_size)
1119 .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
1120 )
1121 }
1122 #[cfg(not(feature = "lmdb"))]
1123 hashtree_config::StorageBackend::Lmdb => {
1124 tracing::warn!(
1125 "LMDB backend requested but lmdb feature not enabled, using filesystem storage"
1126 );
1127 LocalStore::Fs(
1128 FsBlobStore::new(path.join("blobs"))
1129 .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
1130 )
1131 }
1132 });
1133
1134 #[cfg(feature = "s3")]
1136 let router = Arc::new(if let Some(s3_cfg) = s3_config {
1137 tracing::info!(
1138 "Initializing S3 storage backend: bucket={}, endpoint={}",
1139 s3_cfg.bucket,
1140 s3_cfg.endpoint
1141 );
1142
1143 sync_block_on(async { StorageRouter::with_s3(local_store, s3_cfg).await })?
1144 } else {
1145 StorageRouter::new(local_store)
1146 });
1147
1148 #[cfg(not(feature = "s3"))]
1149 let router = Arc::new({
1150 if s3_config.is_some() {
1151 tracing::warn!(
1152 "S3 config provided but S3 feature not enabled. Using local storage only."
1153 );
1154 }
1155 StorageRouter::new(local_store)
1156 });
1157
1158 Ok(Self {
1159 base_path: path.to_path_buf(),
1160 env,
1161 pins,
1162 pinned_refs,
1163 tracked_authors,
1164 blob_owners,
1165 pubkey_blobs,
1166 tree_meta,
1167 blob_trees,
1168 tree_refs,
1169 cached_roots,
1170 router,
1171 max_size_bytes,
1172 evict_orphans,
1173 blob_access_update_gate: BlobAccessUpdateGate::default(),
1174 blob_access_update_inflight: Arc::new(AtomicBool::new(false)),
1175 })
1176 }
1177
1178 pub fn base_path(&self) -> &Path {
1179 &self.base_path
1180 }
1181
1182 pub fn router(&self) -> &StorageRouter {
1184 &self.router
1185 }
1186
1187 pub fn store_arc(&self) -> Arc<StorageRouter> {
1190 Arc::clone(&self.router)
1191 }
1192
1193 fn access_tracking_tree(&self) -> (HashTree<AccessRecordingStore>, AccessRecordingStore) {
1194 let access_store = AccessRecordingStore::new(self.store_arc());
1195 let tree = HashTree::new(HashTreeConfig::new(Arc::new(access_store.clone())).public());
1196 (tree, access_store)
1197 }
1198
1199 pub fn record_blob_accesses<I>(&self, hashes: I)
1200 where
1201 I: IntoIterator<Item = Hash>,
1202 {
1203 let now = unix_timestamp_now();
1204 let mut due_hashes = self.blob_access_update_gate.due_hashes(hashes, now);
1205 if due_hashes.is_empty() {
1206 return;
1207 }
1208
1209 if self
1210 .blob_access_update_inflight
1211 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
1212 .is_err()
1213 {
1214 return;
1215 }
1216
1217 if due_hashes.len() > ACCESS_UPDATE_BACKGROUND_BATCH_LIMIT {
1218 due_hashes.truncate(ACCESS_UPDATE_BACKGROUND_BATCH_LIMIT);
1219 }
1220
1221 let router = Arc::clone(&self.router);
1222 let inflight = Arc::clone(&self.blob_access_update_inflight);
1223 let spawn_result = std::thread::Builder::new()
1224 .name("blob-access-update".to_string())
1225 .spawn(move || {
1226 if let Err(err) = router.touch_many_accessed_sync(&due_hashes, now) {
1227 tracing::debug!("Failed to update blob access metadata: {}", err);
1228 }
1229 inflight.store(false, Ordering::Release);
1230 });
1231 if let Err(err) = spawn_result {
1232 self.blob_access_update_inflight
1233 .store(false, Ordering::Release);
1234 tracing::debug!("Failed to spawn blob access metadata updater: {}", err);
1235 }
1236 }
1237
1238 fn record_blob_access_now(&self, hash: &Hash) {
1239 if let Err(err) = self.router.touch_accessed_sync(hash, unix_timestamp_now()) {
1240 tracing::debug!("Failed to update blob access metadata: {}", err);
1241 }
1242 }
1243
1244 pub fn blob_last_accessed_at(&self, hash: &Hash) -> Result<Option<u64>> {
1245 self.router
1246 .last_accessed_at_sync(hash)
1247 .map_err(|e| anyhow::anyhow!("Failed to read blob access metadata: {}", e))
1248 }
1249
1250 pub fn blob_last_accessed_many(&self, hashes: &[Hash]) -> Result<Vec<(Hash, u64)>> {
1251 self.router
1252 .many_last_accessed_at_sync(hashes)
1253 .map_err(|e| anyhow::anyhow!("Failed to read blob access metadata: {}", e))
1254 }
1255
1256 pub fn get_tree_node(&self, hash: &[u8; 32]) -> Result<Option<TreeNode>> {
1258 let (tree, access_store) = self.access_tracking_tree();
1259
1260 let result = sync_block_on(async {
1261 tree.get_tree_node(hash)
1262 .await
1263 .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))
1264 })?;
1265 if result.is_some() {
1266 self.record_blob_accesses(access_store.take_accessed_hashes());
1267 }
1268 Ok(result)
1269 }
1270
1271 pub fn put_blob(&self, data: &[u8]) -> Result<String> {
1273 let hash = sha256(data);
1274 let inserted = self
1275 .router
1276 .put_sync(hash, data)
1277 .map_err(|e| anyhow::anyhow!("Failed to store blob: {}", e))?;
1278 if !inserted {
1279 self.record_blob_access_now(&hash);
1280 }
1281 Ok(to_hex(&hash))
1282 }
1283
1284 pub fn put_owned_blob(&self, data: &[u8], pubkey: &[u8; 32]) -> Result<String> {
1286 let hash = sha256(data);
1287 if !self
1288 .router
1289 .exists(&hash)
1290 .map_err(|e| anyhow::anyhow!("Failed to check blob: {}", e))?
1291 {
1292 self.make_room_for_durable_blob(data.len() as u64)?;
1293 self.router
1294 .put_sync(hash, data)
1295 .map_err(|e| anyhow::anyhow!("Failed to store blob: {}", e))?;
1296 } else {
1297 self.record_blob_access_now(&hash);
1298 }
1299 self.set_blob_owner(&hash, pubkey)?;
1300 Ok(to_hex(&hash))
1301 }
1302
1303 pub fn put_cached_blob(&self, data: &[u8]) -> Result<String> {
1309 let hash = sha256(data);
1310 if self
1311 .router
1312 .exists(&hash)
1313 .map_err(|e| anyhow::anyhow!("Failed to check cached blob: {}", e))?
1314 {
1315 self.record_blob_access_now(&hash);
1316 return Ok(to_hex(&hash));
1317 }
1318
1319 let incoming_bytes = data.len() as u64;
1320 let _ = self.make_room_for_cached_blob(incoming_bytes);
1321
1322 let mut retried_after_cleanup = false;
1323 loop {
1324 match self.router.put_sync(hash, data) {
1325 Ok(_) => return Ok(to_hex(&hash)),
1326 Err(err) if !retried_after_cleanup && is_map_full_store_error(&err) => {
1327 let freed = self.relieve_cached_blob_write_pressure(incoming_bytes)?;
1328 if freed == 0 {
1329 return Err(anyhow::anyhow!("Failed to store cached blob: {}", err));
1330 }
1331 retried_after_cleanup = true;
1332 }
1333 Err(err) => return Err(anyhow::anyhow!("Failed to store cached blob: {}", err)),
1334 }
1335 }
1336 }
1337
1338 pub fn get_blob(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
1340 let data = self
1341 .router
1342 .get_sync(hash)
1343 .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))?;
1344 if data.is_some() {
1345 self.record_blob_accesses(std::iter::once(*hash));
1346 }
1347 Ok(data)
1348 }
1349
1350 pub fn get_blob_range(
1351 &self,
1352 hash: &[u8; 32],
1353 start: u64,
1354 end_inclusive: u64,
1355 ) -> Result<Option<Vec<u8>>> {
1356 let data = self
1357 .router
1358 .get_range_sync(hash, start, end_inclusive)
1359 .map_err(|e| anyhow::anyhow!("Failed to get blob range: {}", e))?;
1360 if data.is_some() {
1361 self.record_blob_accesses(std::iter::once(*hash));
1362 }
1363 Ok(data)
1364 }
1365
1366 pub fn blob_size(&self, hash: &[u8; 32]) -> Result<Option<u64>> {
1367 self.router
1368 .blob_size_sync(hash)
1369 .map_err(|e| anyhow::anyhow!("Failed to get blob size: {}", e))
1370 }
1371
1372 pub fn blob_exists(&self, hash: &[u8; 32]) -> Result<bool> {
1374 self.router
1375 .exists(hash)
1376 .map_err(|e| anyhow::anyhow!("Failed to check blob: {}", e))
1377 }
1378
1379 fn blob_owner_key(sha256: &[u8; 32], pubkey: &[u8; 32]) -> [u8; 64] {
1385 let mut key = [0u8; 64];
1386 key[..32].copy_from_slice(sha256);
1387 key[32..].copy_from_slice(pubkey);
1388 key
1389 }
1390
1391 pub fn set_blob_owner(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<()> {
1394 let key = Self::blob_owner_key(sha256, pubkey);
1395 let mut wtxn = self.env.write_txn()?;
1396
1397 self.blob_owners.put(&mut wtxn, &key[..], &())?;
1399
1400 let sha256_hex = to_hex(sha256);
1402
1403 let mut blobs: Vec<BlobMetadata> = self
1405 .pubkey_blobs
1406 .get(&wtxn, pubkey)?
1407 .and_then(|b| serde_json::from_slice(b).ok())
1408 .unwrap_or_default();
1409
1410 if !blobs.iter().any(|b| b.sha256 == sha256_hex) {
1412 let now = SystemTime::now()
1413 .duration_since(UNIX_EPOCH)
1414 .unwrap()
1415 .as_secs();
1416
1417 let size = self
1420 .router
1421 .blob_size_sync(sha256)
1422 .map_err(|e| anyhow::anyhow!("Failed to get blob size: {}", e))?
1423 .unwrap_or(0);
1424
1425 blobs.push(BlobMetadata {
1426 sha256: sha256_hex,
1427 size,
1428 mime_type: "application/octet-stream".to_string(),
1429 uploaded: now,
1430 });
1431
1432 let blobs_json = serde_json::to_vec(&blobs)?;
1433 self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
1434 }
1435
1436 wtxn.commit()?;
1437 Ok(())
1438 }
1439
1440 pub fn is_blob_owner(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<bool> {
1442 let key = Self::blob_owner_key(sha256, pubkey);
1443 let rtxn = self.env.read_txn()?;
1444 Ok(self.blob_owners.get(&rtxn, &key[..])?.is_some())
1445 }
1446
1447 pub fn get_blob_owners(&self, sha256: &[u8; 32]) -> Result<Vec<[u8; 32]>> {
1449 let rtxn = self.env.read_txn()?;
1450
1451 let mut owners = Vec::new();
1452 for item in self.blob_owners.prefix_iter(&rtxn, &sha256[..])? {
1453 let (key, _) = item?;
1454 if key.len() == 64 {
1455 let mut pubkey = [0u8; 32];
1457 pubkey.copy_from_slice(&key[32..64]);
1458 owners.push(pubkey);
1459 }
1460 }
1461 Ok(owners)
1462 }
1463
1464 pub fn blob_has_owners(&self, sha256: &[u8; 32]) -> Result<bool> {
1466 let rtxn = self.env.read_txn()?;
1467
1468 for item in self.blob_owners.prefix_iter(&rtxn, &sha256[..])? {
1470 if item.is_ok() {
1471 return Ok(true);
1472 }
1473 }
1474 Ok(false)
1475 }
1476
1477 pub fn get_blob_owner(&self, sha256: &[u8; 32]) -> Result<Option<[u8; 32]>> {
1479 Ok(self.get_blob_owners(sha256)?.into_iter().next())
1480 }
1481
1482 pub fn delete_blossom_blob(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<bool> {
1486 let key = Self::blob_owner_key(sha256, pubkey);
1487 let mut wtxn = self.env.write_txn()?;
1488
1489 self.blob_owners.delete(&mut wtxn, &key[..])?;
1491
1492 let sha256_hex = to_hex(sha256);
1494
1495 if let Some(blobs_bytes) = self.pubkey_blobs.get(&wtxn, pubkey)? {
1497 if let Ok(mut blobs) = serde_json::from_slice::<Vec<BlobMetadata>>(blobs_bytes) {
1498 blobs.retain(|b| b.sha256 != sha256_hex);
1499 let blobs_json = serde_json::to_vec(&blobs)?;
1500 self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
1501 }
1502 }
1503
1504 let mut has_other_owners = false;
1506 for item in self.blob_owners.prefix_iter(&wtxn, &sha256[..])? {
1507 if item.is_ok() {
1508 has_other_owners = true;
1509 break;
1510 }
1511 }
1512
1513 if has_other_owners {
1514 wtxn.commit()?;
1515 tracing::debug!(
1516 "Removed {} from blob {} owners, other owners remain",
1517 &to_hex(pubkey)[..8],
1518 &sha256_hex[..8]
1519 );
1520 return Ok(false);
1521 }
1522
1523 tracing::info!(
1525 "All owners removed from blob {}, deleting",
1526 &sha256_hex[..8]
1527 );
1528
1529 let _ = self.router.delete_sync(sha256);
1531
1532 wtxn.commit()?;
1533 Ok(true)
1534 }
1535
1536 pub fn list_blobs_by_pubkey(
1538 &self,
1539 pubkey: &[u8; 32],
1540 ) -> Result<Vec<crate::server::blossom::BlobDescriptor>> {
1541 let rtxn = self.env.read_txn()?;
1542
1543 let blobs: Vec<BlobMetadata> = self
1544 .pubkey_blobs
1545 .get(&rtxn, pubkey)?
1546 .and_then(|b| serde_json::from_slice(b).ok())
1547 .unwrap_or_default();
1548
1549 Ok(blobs
1550 .into_iter()
1551 .map(|b| crate::server::blossom::BlobDescriptor {
1552 url: format!("/{}", b.sha256),
1553 sha256: b.sha256,
1554 size: b.size,
1555 mime_type: b.mime_type,
1556 uploaded: b.uploaded,
1557 })
1558 .collect())
1559 }
1560
1561 pub fn get_chunk(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
1563 let data = self
1564 .router
1565 .get_sync(hash)
1566 .map_err(|e| anyhow::anyhow!("Failed to get chunk: {}", e))?;
1567 if data.is_some() {
1568 self.record_blob_accesses(std::iter::once(*hash));
1569 }
1570 Ok(data)
1571 }
1572
1573 pub fn get_file(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
1576 let (tree, access_store) = self.access_tracking_tree();
1577
1578 let result = sync_block_on(async {
1579 tree.read_file(hash)
1580 .await
1581 .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
1582 })?;
1583 if result.is_some() {
1584 self.record_blob_accesses(access_store.take_accessed_hashes());
1585 }
1586 Ok(result)
1587 }
1588
1589 pub fn get_file_by_cid(&self, cid: &Cid) -> Result<Option<Vec<u8>>> {
1592 let (tree, access_store) = self.access_tracking_tree();
1593
1594 let result = sync_block_on(async {
1595 tree.get(cid, None)
1596 .await
1597 .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
1598 })?;
1599 if result.is_some() {
1600 self.record_blob_accesses(access_store.take_accessed_hashes());
1601 }
1602 Ok(result)
1603 }
1604
1605 fn ensure_cid_exists(&self, cid: &Cid) -> Result<()> {
1606 let exists = self
1607 .router
1608 .exists(&cid.hash)
1609 .map_err(|e| anyhow::anyhow!("Failed to check cid existence: {}", e))?;
1610 if !exists {
1611 anyhow::bail!("CID not found: {}", to_hex(&cid.hash));
1612 }
1613 Ok(())
1614 }
1615
1616 pub fn write_file_by_cid_to_writer<W: Write>(&self, cid: &Cid, writer: &mut W) -> Result<u64> {
1618 self.ensure_cid_exists(cid)?;
1619
1620 let (tree, access_store) = self.access_tracking_tree();
1621 let mut total_bytes = 0u64;
1622 let mut streamed_any_chunk = false;
1623
1624 sync_block_on(async {
1625 let mut stream = tree.get_stream(cid);
1626 while let Some(chunk) = stream.next().await {
1627 streamed_any_chunk = true;
1628 let chunk =
1629 chunk.map_err(|e| anyhow::anyhow!("Failed to stream file chunk: {}", e))?;
1630 writer
1631 .write_all(&chunk)
1632 .map_err(|e| anyhow::anyhow!("Failed to write file chunk: {}", e))?;
1633 total_bytes += chunk.len() as u64;
1634 }
1635 Ok::<(), anyhow::Error>(())
1636 })?;
1637
1638 if !streamed_any_chunk {
1639 anyhow::bail!("CID not found: {}", to_hex(&cid.hash));
1640 }
1641 self.record_blob_accesses(access_store.take_accessed_hashes());
1642
1643 writer
1644 .flush()
1645 .map_err(|e| anyhow::anyhow!("Failed to flush output: {}", e))?;
1646 Ok(total_bytes)
1647 }
1648
1649 pub fn write_file_by_cid<P: AsRef<Path>>(&self, cid: &Cid, output_path: P) -> Result<u64> {
1651 self.ensure_cid_exists(cid)?;
1652
1653 let output_path = output_path.as_ref();
1654 if let Some(parent) = output_path.parent() {
1655 if !parent.as_os_str().is_empty() {
1656 std::fs::create_dir_all(parent).with_context(|| {
1657 format!("Failed to create output directory {}", parent.display())
1658 })?;
1659 }
1660 }
1661
1662 let mut file = std::fs::File::create(output_path)
1663 .with_context(|| format!("Failed to create output file {}", output_path.display()))?;
1664 self.write_file_by_cid_to_writer(cid, &mut file)
1665 }
1666
1667 pub fn write_file<P: AsRef<Path>>(&self, hash: &[u8; 32], output_path: P) -> Result<u64> {
1669 self.write_file_by_cid(&Cid::public(*hash), output_path)
1670 }
1671
1672 pub fn resolve_path(&self, cid: &Cid, path: &str) -> Result<Option<Cid>> {
1674 let (tree, access_store) = self.access_tracking_tree();
1675
1676 let result = sync_block_on(async {
1677 tree.resolve_path(cid, path)
1678 .await
1679 .map_err(|e| anyhow::anyhow!("Failed to resolve path: {}", e))
1680 })?;
1681 if result.is_some() {
1682 self.record_blob_accesses(access_store.take_accessed_hashes());
1683 }
1684 Ok(result)
1685 }
1686
1687 pub fn get_file_chunk_metadata(&self, hash: &[u8; 32]) -> Result<Option<FileChunkMetadata>> {
1689 let access_store = AccessRecordingStore::new(self.store_arc());
1690 let tree = HashTree::new(HashTreeConfig::new(Arc::new(access_store.clone())).public());
1691
1692 let metadata: Result<Option<FileChunkMetadata>> = sync_block_on(async {
1693 let exists = access_store
1696 .has(hash)
1697 .await
1698 .map_err(|e| anyhow::anyhow!("Failed to check existence: {}", e))?;
1699
1700 if !exists {
1701 return Ok(None);
1702 }
1703
1704 let total_size = tree
1706 .get_size(hash)
1707 .await
1708 .map_err(|e| anyhow::anyhow!("Failed to get size: {}", e))?;
1709
1710 let is_tree_node = tree
1712 .is_tree(hash)
1713 .await
1714 .map_err(|e| anyhow::anyhow!("Failed to check tree: {}", e))?;
1715
1716 if !is_tree_node {
1717 return Ok(Some(FileChunkMetadata {
1719 total_size,
1720 chunk_hashes: vec![],
1721 chunk_sizes: vec![],
1722 is_chunked: false,
1723 }));
1724 }
1725
1726 let node = match tree
1728 .get_tree_node(hash)
1729 .await
1730 .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
1731 {
1732 Some(n) => n,
1733 None => return Ok(None),
1734 };
1735
1736 let is_directory = tree
1738 .is_directory(hash)
1739 .await
1740 .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1741
1742 if is_directory {
1743 return Ok(None); }
1745
1746 let chunk_hashes: Vec<Hash> = node.links.iter().map(|l| l.hash).collect();
1748 let chunk_sizes: Vec<u64> = node.links.iter().map(|l| l.size).collect();
1749
1750 Ok(Some(FileChunkMetadata {
1751 total_size,
1752 chunk_hashes,
1753 chunk_sizes,
1754 is_chunked: !node.links.is_empty(),
1755 }))
1756 });
1757 let metadata = metadata?;
1758 if metadata.is_some() {
1759 self.record_blob_accesses(access_store.take_accessed_hashes());
1760 }
1761 Ok(metadata)
1762 }
1763
1764 pub fn get_file_range(
1766 &self,
1767 hash: &[u8; 32],
1768 start: u64,
1769 end: Option<u64>,
1770 ) -> Result<Option<(Vec<u8>, u64)>> {
1771 let metadata = match self.get_file_chunk_metadata(hash)? {
1772 Some(m) => m,
1773 None => return Ok(None),
1774 };
1775
1776 if metadata.total_size == 0 {
1777 return Ok(Some((Vec::new(), 0)));
1778 }
1779
1780 if start >= metadata.total_size {
1781 return Ok(None);
1782 }
1783
1784 let end = end
1785 .unwrap_or(metadata.total_size - 1)
1786 .min(metadata.total_size - 1);
1787
1788 if !metadata.is_chunked {
1790 let content = self.get_file(hash)?.unwrap_or_default();
1791 let range_content = if start < content.len() as u64 {
1792 content[start as usize..=(end as usize).min(content.len() - 1)].to_vec()
1793 } else {
1794 Vec::new()
1795 };
1796 return Ok(Some((range_content, metadata.total_size)));
1797 }
1798
1799 let mut result = Vec::new();
1801 let mut current_offset = 0u64;
1802
1803 for (i, chunk_hash) in metadata.chunk_hashes.iter().enumerate() {
1804 let chunk_size = metadata.chunk_sizes[i];
1805 let chunk_end = current_offset + chunk_size - 1;
1806
1807 if chunk_end >= start && current_offset <= end {
1809 let chunk_content = match self.get_chunk(chunk_hash)? {
1810 Some(content) => content,
1811 None => {
1812 return Err(anyhow::anyhow!("Chunk {} not found", to_hex(chunk_hash)));
1813 }
1814 };
1815
1816 let chunk_read_start = if current_offset >= start {
1817 0
1818 } else {
1819 (start - current_offset) as usize
1820 };
1821
1822 let chunk_read_end = if chunk_end <= end {
1823 chunk_size as usize - 1
1824 } else {
1825 (end - current_offset) as usize
1826 };
1827
1828 result.extend_from_slice(&chunk_content[chunk_read_start..=chunk_read_end]);
1829 }
1830
1831 current_offset += chunk_size;
1832
1833 if current_offset > end {
1834 break;
1835 }
1836 }
1837
1838 Ok(Some((result, metadata.total_size)))
1839 }
1840
1841 pub fn stream_file_range_chunks_owned(
1843 self: Arc<Self>,
1844 hash: &[u8; 32],
1845 start: u64,
1846 end: u64,
1847 ) -> Result<Option<FileRangeChunksOwned>> {
1848 let metadata = match self.get_file_chunk_metadata(hash)? {
1849 Some(m) => m,
1850 None => return Ok(None),
1851 };
1852
1853 if metadata.total_size == 0 || start >= metadata.total_size {
1854 return Ok(None);
1855 }
1856
1857 let end = end.min(metadata.total_size - 1);
1858
1859 Ok(Some(FileRangeChunksOwned {
1860 store: self,
1861 metadata,
1862 start,
1863 end,
1864 current_chunk_idx: 0,
1865 current_offset: 0,
1866 }))
1867 }
1868
1869 pub fn get_directory_listing(&self, hash: &[u8; 32]) -> Result<Option<DirectoryListing>> {
1871 let (tree, access_store) = self.access_tracking_tree();
1872
1873 let listing: Result<Option<DirectoryListing>> = sync_block_on(async {
1874 let is_dir = tree
1876 .is_directory(hash)
1877 .await
1878 .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1879
1880 if !is_dir {
1881 return Ok(None);
1882 }
1883
1884 let cid = hashtree_core::Cid::public(*hash);
1886 let tree_entries = tree
1887 .list_directory(&cid)
1888 .await
1889 .map_err(|e| anyhow::anyhow!("Failed to list directory: {}", e))?;
1890
1891 let entries: Vec<DirEntry> = tree_entries
1892 .into_iter()
1893 .map(|e| DirEntry {
1894 name: e.name,
1895 cid: to_hex(&e.hash),
1896 is_directory: e.link_type.is_tree(),
1897 size: e.size,
1898 })
1899 .collect();
1900
1901 Ok(Some(DirectoryListing {
1902 dir_name: String::new(),
1903 entries,
1904 }))
1905 });
1906 let listing = listing?;
1907 if listing.is_some() {
1908 self.record_blob_accesses(access_store.take_accessed_hashes());
1909 }
1910 Ok(listing)
1911 }
1912
1913 pub fn get_directory_listing_by_cid(&self, cid: &Cid) -> Result<Option<DirectoryListing>> {
1915 let (tree, access_store) = self.access_tracking_tree();
1916 let cid = cid.clone();
1917
1918 let listing: Result<Option<DirectoryListing>> = sync_block_on(async {
1919 let is_dir = tree
1920 .is_dir(&cid)
1921 .await
1922 .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1923
1924 if !is_dir {
1925 return Ok(None);
1926 }
1927
1928 let tree_entries = tree
1929 .list_directory(&cid)
1930 .await
1931 .map_err(|e| anyhow::anyhow!("Failed to list directory: {}", e))?;
1932
1933 let entries: Vec<DirEntry> = tree_entries
1934 .into_iter()
1935 .map(|e| DirEntry {
1936 name: e.name,
1937 cid: Cid {
1938 hash: e.hash,
1939 key: e.key,
1940 }
1941 .to_string(),
1942 is_directory: e.link_type.is_tree(),
1943 size: e.size,
1944 })
1945 .collect();
1946
1947 Ok(Some(DirectoryListing {
1948 dir_name: String::new(),
1949 entries,
1950 }))
1951 });
1952 let listing = listing?;
1953 if listing.is_some() {
1954 self.record_blob_accesses(access_store.take_accessed_hashes());
1955 }
1956 Ok(listing)
1957 }
1958
1959 pub fn add_pinned_ref(&self, key: &str) -> Result<()> {
1963 let mut wtxn = self.env.write_txn()?;
1964 self.pinned_refs.put(&mut wtxn, key, &())?;
1965 wtxn.commit()?;
1966 Ok(())
1967 }
1968
1969 pub fn remove_pinned_ref(&self, key: &str) -> Result<bool> {
1971 let mut wtxn = self.env.write_txn()?;
1972 let removed = self.pinned_refs.delete(&mut wtxn, key)?;
1973 wtxn.commit()?;
1974 Ok(removed)
1975 }
1976
1977 pub fn list_pinned_refs(&self) -> Result<Vec<String>> {
1979 let rtxn = self.env.read_txn()?;
1980 let mut refs = Vec::new();
1981
1982 for item in self.pinned_refs.iter(&rtxn)? {
1983 let (key, _) = item?;
1984 refs.push(key.to_string());
1985 }
1986
1987 refs.sort();
1988 Ok(refs)
1989 }
1990
1991 pub fn add_tracked_author(&self, npub: &str) -> Result<bool> {
1993 let mut wtxn = self.env.write_txn()?;
1994 let inserted = self.tracked_authors.get(&wtxn, npub)?.is_none();
1995 self.tracked_authors.put(&mut wtxn, npub, &())?;
1996 wtxn.commit()?;
1997 Ok(inserted)
1998 }
1999
2000 pub fn remove_tracked_author(&self, npub: &str) -> Result<bool> {
2002 let mut wtxn = self.env.write_txn()?;
2003 let removed = self.tracked_authors.delete(&mut wtxn, npub)?;
2004 wtxn.commit()?;
2005 Ok(removed)
2006 }
2007
2008 pub fn list_tracked_authors(&self) -> Result<Vec<String>> {
2010 let rtxn = self.env.read_txn()?;
2011 let mut authors = Vec::new();
2012
2013 for item in self.tracked_authors.iter(&rtxn)? {
2014 let (npub, _) = item?;
2015 authors.push(npub.to_string());
2016 }
2017
2018 authors.sort();
2019 Ok(authors)
2020 }
2021
2022 pub fn get_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<Option<CachedRoot>> {
2024 let key = format!("{}/{}", pubkey_hex, tree_name);
2025 let rtxn = self.env.read_txn()?;
2026 if let Some(bytes) = self.cached_roots.get(&rtxn, &key)? {
2027 let root: CachedRoot = rmp_serde::from_slice(bytes)
2028 .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
2029 Ok(Some(root))
2030 } else {
2031 Ok(None)
2032 }
2033 }
2034
2035 pub fn set_cached_root(
2037 &self,
2038 pubkey_hex: &str,
2039 tree_name: &str,
2040 hash: &str,
2041 key: Option<&str>,
2042 visibility: &str,
2043 updated_at: u64,
2044 ) -> Result<()> {
2045 let db_key = format!("{}/{}", pubkey_hex, tree_name);
2046 let root = CachedRoot {
2047 hash: hash.to_string(),
2048 key: key.map(|k| k.to_string()),
2049 updated_at,
2050 visibility: visibility.to_string(),
2051 };
2052 let bytes = rmp_serde::to_vec(&root)
2053 .map_err(|e| anyhow::anyhow!("Failed to serialize CachedRoot: {}", e))?;
2054 let mut wtxn = self.env.write_txn()?;
2055 self.cached_roots.put(&mut wtxn, &db_key, &bytes)?;
2056 wtxn.commit()?;
2057 Ok(())
2058 }
2059
2060 pub fn list_cached_roots(&self, pubkey_hex: &str) -> Result<Vec<(String, CachedRoot)>> {
2062 let prefix = format!("{}/", pubkey_hex);
2063 let rtxn = self.env.read_txn()?;
2064 let mut results = Vec::new();
2065
2066 for item in self.cached_roots.iter(&rtxn)? {
2067 let (key, bytes) = item?;
2068 if key.starts_with(&prefix) {
2069 let tree_name = key.strip_prefix(&prefix).unwrap_or(key);
2070 let root: CachedRoot = rmp_serde::from_slice(bytes)
2071 .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
2072 results.push((tree_name.to_string(), root));
2073 }
2074 }
2075
2076 Ok(results)
2077 }
2078
2079 pub fn delete_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<bool> {
2081 let key = format!("{}/{}", pubkey_hex, tree_name);
2082 let mut wtxn = self.env.write_txn()?;
2083 let deleted = self.cached_roots.delete(&mut wtxn, &key)?;
2084 wtxn.commit()?;
2085 Ok(deleted)
2086 }
2087}
2088
2089fn is_map_full_store_error(err: &StoreError) -> bool {
2090 let message = err.to_string();
2091 message.contains("MDB_MAP_FULL") || message.contains("MapFull")
2092}
2093
2094#[derive(Debug, Clone)]
2095pub struct FileChunkMetadata {
2096 pub total_size: u64,
2097 pub chunk_hashes: Vec<Hash>,
2098 pub chunk_sizes: Vec<u64>,
2099 pub is_chunked: bool,
2100}
2101
2102pub struct FileRangeChunksOwned {
2104 store: Arc<HashtreeStore>,
2105 metadata: FileChunkMetadata,
2106 start: u64,
2107 end: u64,
2108 current_chunk_idx: usize,
2109 current_offset: u64,
2110}
2111
2112impl Iterator for FileRangeChunksOwned {
2113 type Item = Result<Vec<u8>>;
2114
2115 fn next(&mut self) -> Option<Self::Item> {
2116 if !self.metadata.is_chunked || self.current_chunk_idx >= self.metadata.chunk_hashes.len() {
2117 return None;
2118 }
2119
2120 if self.current_offset > self.end {
2121 return None;
2122 }
2123
2124 let chunk_hash = &self.metadata.chunk_hashes[self.current_chunk_idx];
2125 let chunk_size = self.metadata.chunk_sizes[self.current_chunk_idx];
2126 let chunk_end = self.current_offset + chunk_size - 1;
2127
2128 self.current_chunk_idx += 1;
2129
2130 if chunk_end < self.start || self.current_offset > self.end {
2131 self.current_offset += chunk_size;
2132 return self.next();
2133 }
2134
2135 let chunk_content = match self.store.get_chunk(chunk_hash) {
2136 Ok(Some(content)) => content,
2137 Ok(None) => {
2138 return Some(Err(anyhow::anyhow!(
2139 "Chunk {} not found",
2140 to_hex(chunk_hash)
2141 )));
2142 }
2143 Err(e) => {
2144 return Some(Err(e));
2145 }
2146 };
2147
2148 let chunk_read_start = if self.current_offset >= self.start {
2149 0
2150 } else {
2151 (self.start - self.current_offset) as usize
2152 };
2153
2154 let chunk_read_end = if chunk_end <= self.end {
2155 chunk_size as usize - 1
2156 } else {
2157 (self.end - self.current_offset) as usize
2158 };
2159
2160 let result = chunk_content[chunk_read_start..=chunk_read_end].to_vec();
2161 self.current_offset += chunk_size;
2162
2163 Some(Ok(result))
2164 }
2165}
2166
2167#[derive(Debug)]
2168pub struct GcStats {
2169 pub deleted_dags: usize,
2170 pub freed_bytes: u64,
2171}
2172
2173#[derive(Debug, Clone)]
2174pub struct DirEntry {
2175 pub name: String,
2176 pub cid: String,
2177 pub is_directory: bool,
2178 pub size: u64,
2179}
2180
2181#[derive(Debug, Clone)]
2182pub struct DirectoryListing {
2183 pub dir_name: String,
2184 pub entries: Vec<DirEntry>,
2185}
2186
2187#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
2189pub struct BlobMetadata {
2190 pub sha256: String,
2191 pub size: u64,
2192 pub mime_type: String,
2193 pub uploaded: u64,
2194}
2195
2196impl crate::webrtc::ContentStore for HashtreeStore {
2198 fn get(&self, hash_hex: &str) -> Result<Option<Vec<u8>>> {
2199 let hash = from_hex(hash_hex).map_err(|e| anyhow::anyhow!("Invalid hash: {}", e))?;
2200 self.get_chunk(&hash)
2201 }
2202}
2203
2204#[cfg(test)]
2205mod tests {
2206 use super::*;
2207 #[cfg(feature = "lmdb")]
2208 use tempfile::TempDir;
2209
2210 #[test]
2211 fn blob_access_update_gate_deduplicates_and_throttles() {
2212 let gate = BlobAccessUpdateGate::default();
2213 let first = sha256(b"first");
2214 let second = sha256(b"second");
2215
2216 assert_eq!(
2217 gate.due_hashes([first, first, second], 10),
2218 vec![first, second]
2219 );
2220 assert!(gate.due_hashes([first, second], 11).is_empty());
2221 assert_eq!(
2222 gate.due_hashes([second, first], 10 + ACCESS_UPDATE_INTERVAL_SECS),
2223 vec![second, first]
2224 );
2225 }
2226
2227 #[cfg(feature = "lmdb")]
2228 #[test]
2229 fn hashtree_store_expands_blob_lmdb_map_size_to_storage_budget() -> Result<()> {
2230 let temp = TempDir::new()?;
2231 let requested = LMDB_BLOB_MIN_MAP_SIZE_BYTES + 64 * 1024 * 1024;
2232 let store = HashtreeStore::with_options_and_backend(
2233 temp.path(),
2234 None,
2235 requested,
2236 true,
2237 &StorageBackend::Lmdb,
2238 )?;
2239
2240 let map_size = match store.router.local.as_ref() {
2241 LocalStore::Lmdb(local) => local.map_size_bytes() as u64,
2242 LocalStore::Fs(_) => panic!("expected LMDB local store"),
2243 };
2244
2245 assert!(
2246 map_size >= requested,
2247 "expected blob LMDB map to grow to at least {requested} bytes, got {map_size}"
2248 );
2249
2250 drop(store);
2251 Ok(())
2252 }
2253
2254 #[cfg(feature = "lmdb")]
2255 #[test]
2256 fn hashtree_store_expands_metadata_lmdb_map_size_to_storage_budget() -> Result<()> {
2257 let temp = TempDir::new()?;
2258 let storage_budget = 256 * 1024 * 1024 * 1024u64;
2259 let expected = lmdb_metadata_map_size_for_storage_budget(storage_budget);
2260 let store = HashtreeStore::with_options_and_backend(
2261 temp.path(),
2262 None,
2263 storage_budget,
2264 true,
2265 &StorageBackend::Lmdb,
2266 )?;
2267
2268 let map_size = store.env.info().map_size as u64;
2269 assert!(
2270 map_size >= expected,
2271 "expected metadata LMDB map to grow to at least {expected} bytes, got {map_size}"
2272 );
2273
2274 drop(store);
2275 Ok(())
2276 }
2277
2278 #[cfg(feature = "lmdb")]
2279 #[test]
2280 fn local_store_can_override_lmdb_map_size() -> Result<()> {
2281 let temp = TempDir::new()?;
2282 let requested = 512 * 1024 * 1024u64;
2283 let store = LocalStore::new_with_lmdb_map_size(
2284 temp.path().join("lmdb-blobs"),
2285 &StorageBackend::Lmdb,
2286 Some(requested),
2287 )?;
2288
2289 let map_size = match store {
2290 LocalStore::Lmdb(local) => local.map_size_bytes() as u64,
2291 LocalStore::Fs(_) => panic!("expected LMDB local store"),
2292 };
2293
2294 assert!(
2295 map_size >= requested,
2296 "expected LMDB map to grow to at least {requested} bytes, got {map_size}"
2297 );
2298
2299 Ok(())
2300 }
2301
2302 #[cfg(feature = "lmdb")]
2303 #[test]
2304 fn lmdb_local_store_removes_stale_fs_blob_shard_dirs() -> Result<()> {
2305 let temp = TempDir::new()?;
2306 let path = temp.path().join("lmdb-blobs");
2307 std::fs::create_dir_all(path.join("aa"))?;
2308 std::fs::create_dir_all(path.join("b2"))?;
2309 std::fs::create_dir_all(path.join("keep-me"))?;
2310 std::fs::write(path.join("aa").join("blob.bin"), b"old fs shard")?;
2311 std::fs::write(path.join("b2").join("blob.bin"), b"old fs shard")?;
2312 std::fs::write(path.join("keep-me").join("note.txt"), b"keep")?;
2313
2314 let _store = LocalStore::new_with_lmdb_map_size(
2315 &path,
2316 &StorageBackend::Lmdb,
2317 Some(128 * 1024 * 1024),
2318 )?;
2319
2320 assert!(!path.join("aa").exists());
2321 assert!(!path.join("b2").exists());
2322 assert!(path.join("keep-me").exists());
2323 assert!(path.join("data.mdb").exists());
2324 assert!(path.join("lock.mdb").exists());
2325
2326 Ok(())
2327 }
2328
2329 #[cfg(feature = "lmdb")]
2330 #[test]
2331 fn duplicate_blossom_writes_refresh_blob_last_accessed() -> Result<()> {
2332 let temp = TempDir::new()?;
2333 let store = HashtreeStore::with_options_and_backend(
2334 temp.path(),
2335 None,
2336 LMDB_BLOB_MIN_MAP_SIZE_BYTES,
2337 true,
2338 &StorageBackend::Lmdb,
2339 )?;
2340
2341 let data = b"cached blossom duplicate";
2342 let hash = sha256(data);
2343 store.put_cached_blob(data)?;
2344 store.router.touch_accessed_sync(&hash, 1)?;
2345 store.put_cached_blob(data)?;
2346 assert!(store.blob_last_accessed_at(&hash)?.unwrap_or(0) > 1);
2347
2348 let owned = b"owned blossom duplicate";
2349 let owned_hash = sha256(owned);
2350 let owner = [7u8; 32];
2351 store.put_owned_blob(owned, &owner)?;
2352 store.router.touch_accessed_sync(&owned_hash, 1)?;
2353 store.put_owned_blob(owned, &owner)?;
2354 assert!(store.blob_last_accessed_at(&owned_hash)?.unwrap_or(0) > 1);
2355
2356 Ok(())
2357 }
2358
2359 #[cfg(feature = "lmdb")]
2360 #[test]
2361 fn replacing_tree_ref_unpins_and_unindexes_superseded_root() -> Result<()> {
2362 let temp = TempDir::new()?;
2363 let store = HashtreeStore::with_options_and_backend(
2364 temp.path(),
2365 None,
2366 LMDB_BLOB_MIN_MAP_SIZE_BYTES,
2367 true,
2368 &StorageBackend::Lmdb,
2369 )?;
2370
2371 let old_bytes = b"old published root";
2372 let new_bytes = b"new published root";
2373 let old_root = sha256(old_bytes);
2374 let new_root = sha256(new_bytes);
2375
2376 store.put_blob(old_bytes)?;
2377 store.pin(&old_root)?;
2378 store.index_tree(
2379 &old_root,
2380 "owner",
2381 Some("playlist"),
2382 PRIORITY_OWN,
2383 Some("npub1owner/playlist"),
2384 )?;
2385
2386 assert!(store.is_pinned(&old_root)?);
2387 assert!(store.get_tree_meta(&old_root)?.is_some());
2388
2389 store.put_blob(new_bytes)?;
2390 store.pin(&new_root)?;
2391 store.index_tree(
2392 &new_root,
2393 "owner",
2394 Some("playlist"),
2395 PRIORITY_OWN,
2396 Some("npub1owner/playlist"),
2397 )?;
2398
2399 assert!(
2400 !store.is_pinned(&old_root)?,
2401 "superseded root should be unpinned when ref is replaced"
2402 );
2403 assert!(
2404 store.get_tree_meta(&old_root)?.is_none(),
2405 "superseded root metadata should be removed when ref is replaced"
2406 );
2407 assert!(store.is_pinned(&new_root)?);
2408 assert!(store.get_tree_meta(&new_root)?.is_some());
2409
2410 Ok(())
2411 }
2412
2413 #[test]
2414 fn tracked_authors_round_trip_sorted_and_deduplicated() -> Result<()> {
2415 let temp = TempDir::new()?;
2416 let store = HashtreeStore::with_options(temp.path(), None, 1024 * 1024)?;
2417
2418 store
2419 .add_tracked_author("npub1zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzs9d3kk")?;
2420 store
2421 .add_tracked_author("npub1aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaqf5slm")?;
2422 store
2423 .add_tracked_author("npub1zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzs9d3kk")?;
2424
2425 assert_eq!(
2426 store.list_tracked_authors()?,
2427 vec![
2428 "npub1aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaqf5slm".to_string(),
2429 "npub1zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzs9d3kk".to_string(),
2430 ]
2431 );
2432 assert!(store.remove_tracked_author(
2433 "npub1aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaqf5slm"
2434 )?);
2435 assert!(!store.remove_tracked_author(
2436 "npub1bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbpqqqqq"
2437 )?);
2438 assert_eq!(
2439 store.list_tracked_authors()?,
2440 vec!["npub1zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzs9d3kk".to_string()]
2441 );
2442
2443 Ok(())
2444 }
2445
2446 #[cfg(feature = "s3")]
2447 #[test]
2448 fn async_store_s3_fallback_does_not_reenter_futures_executor() -> Result<()> {
2449 let temp = tempfile::TempDir::new()?;
2450 let local = Arc::new(LocalStore::new(
2451 temp.path().join("blobs"),
2452 &StorageBackend::Fs,
2453 )?);
2454
2455 let outcome = std::panic::catch_unwind(|| {
2456 sync_block_on(async {
2457 let aws_config = aws_config::from_env()
2458 .region(aws_sdk_s3::config::Region::new("auto"))
2459 .load()
2460 .await;
2461 let s3_client = aws_sdk_s3::Client::from_conf(
2462 aws_sdk_s3::config::Builder::from(&aws_config)
2463 .endpoint_url("http://127.0.0.1:9")
2464 .force_path_style(true)
2465 .build(),
2466 );
2467
2468 let router = StorageRouter {
2469 local,
2470 s3_client: Some(s3_client),
2471 s3_bucket: Some("test-bucket".to_string()),
2472 s3_prefix: String::new(),
2473 sync_tx: None,
2474 };
2475 let hash = [0u8; 32];
2476
2477 let _ = Store::has(&router, &hash).await;
2478 let _ = Store::get(&router, &hash).await;
2479 });
2480 });
2481
2482 assert!(
2483 outcome.is_ok(),
2484 "S3-backed async store methods should not panic inside futures::block_on"
2485 );
2486
2487 Ok(())
2488 }
2489}