1use anyhow::{Context, Result};
2use async_trait::async_trait;
3use futures::executor::block_on as sync_block_on;
4use futures::StreamExt;
5use hashtree_config::StorageBackend;
6use hashtree_core::store::{Store, StoreError};
7use hashtree_core::{
8 from_hex, sha256, to_hex, types::Hash, Cid, HashTree, HashTreeConfig, TreeNode,
9};
10use hashtree_fs::FsBlobStore;
11#[cfg(feature = "lmdb")]
12use hashtree_lmdb::LmdbBlobStore;
13use heed::types::*;
14use heed::{Database, EnvOpenOptions};
15use serde::{Deserialize, Serialize};
16use std::collections::{HashMap, HashSet};
17#[cfg(feature = "s3")]
18use std::future::Future;
19use std::io::Write;
20use std::path::{Path, PathBuf};
21use std::sync::{Arc, Mutex};
22use std::time::{SystemTime, UNIX_EPOCH};
23
24mod upload;
25
26mod maintenance;
27mod retention;
28
29pub use maintenance::{
30 compact_lmdb_environments_under, CompactResult, R2ImportOptions, R2ImportResult, VerifyResult,
31};
32pub use retention::{OwnedBlobStats, PinnedItem, StorageByPriority, StorageStats, TreeMeta};
33
34pub const PRIORITY_OTHER: u8 = 64;
36pub const PRIORITY_FOLLOWED: u8 = 128;
37pub const PRIORITY_OWN: u8 = 255;
38const LMDB_MAX_READERS: u32 = 1024;
39const LMDB_METADATA_MIN_MAP_SIZE_BYTES: u64 = 1024 * 1024;
40#[cfg(feature = "lmdb")]
41const LMDB_BLOB_MIN_MAP_SIZE_BYTES: u64 = 16 * 1024 * 1024;
42const ACCESS_UPDATE_INTERVAL_SECS: u64 = 300;
43const ACCESS_UPDATE_GATE_MAX_ENTRIES: usize = 4096;
44
45fn unix_timestamp_now() -> u64 {
46 SystemTime::now()
47 .duration_since(UNIX_EPOCH)
48 .unwrap_or_default()
49 .as_secs()
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct CachedRoot {
55 pub hash: String,
57 pub key: Option<String>,
59 pub updated_at: u64,
61 pub visibility: String,
63}
64
65#[derive(Debug, Clone)]
67pub struct LocalStoreStats {
68 pub count: usize,
69 pub total_bytes: u64,
70}
71
72#[derive(Default)]
73struct BlobAccessUpdateGate {
74 next_update_by_hash: Mutex<HashMap<Hash, u64>>,
75}
76
77impl BlobAccessUpdateGate {
78 fn due_hashes<I>(&self, hashes: I, now: u64) -> Vec<Hash>
79 where
80 I: IntoIterator<Item = Hash>,
81 {
82 let Ok(mut next_update_by_hash) = self.next_update_by_hash.try_lock() else {
83 return Vec::new();
84 };
85
86 if next_update_by_hash.len() >= ACCESS_UPDATE_GATE_MAX_ENTRIES {
87 next_update_by_hash.retain(|_, next_update| *next_update > now);
88 if next_update_by_hash.len() >= ACCESS_UPDATE_GATE_MAX_ENTRIES {
89 next_update_by_hash.clear();
90 }
91 }
92
93 let mut due = Vec::new();
94 let mut seen = HashSet::new();
95 for hash in hashes {
96 if !seen.insert(hash) {
97 continue;
98 }
99 if next_update_by_hash
100 .get(&hash)
101 .is_some_and(|next_update| now < *next_update)
102 {
103 continue;
104 }
105 next_update_by_hash.insert(hash, now.saturating_add(ACCESS_UPDATE_INTERVAL_SECS));
106 due.push(hash);
107 }
108 due
109 }
110}
111
112pub enum LocalStore {
114 Fs(FsBlobStore),
115 #[cfg(feature = "lmdb")]
116 Lmdb(LmdbBlobStore),
117}
118
119#[cfg(feature = "lmdb")]
120fn is_fs_blob_shard_dir(path: &Path) -> bool {
121 path.file_name()
122 .and_then(|name| name.to_str())
123 .map(|name| name.len() == 2 && name.as_bytes().iter().all(u8::is_ascii_hexdigit))
124 .unwrap_or(false)
125}
126
127fn lmdb_map_size_for_existing_env(path: &Path, minimum_bytes: u64) -> Result<usize> {
128 let existing_bytes = std::fs::metadata(path.join("data.mdb"))
129 .map(|metadata| metadata.len())
130 .unwrap_or(0);
131 let requested = align_lmdb_map_size(minimum_bytes.max(existing_bytes));
132 usize::try_from(requested).context("LMDB map size exceeds usize")
133}
134
135fn align_lmdb_map_size(bytes: u64) -> u64 {
136 let page_size = (page_size::get() as u64).max(4096);
137 let remainder = bytes % page_size;
138 if remainder == 0 {
139 bytes
140 } else {
141 bytes.saturating_add(page_size - remainder)
142 }
143}
144
145#[cfg(feature = "lmdb")]
146fn remove_stale_fs_blob_shards(path: &Path) -> Result<(), StoreError> {
147 let entries = std::fs::read_dir(path).map_err(StoreError::Io)?;
148 for entry in entries {
149 let entry = entry.map_err(StoreError::Io)?;
150 let entry_path = entry.path();
151 if entry_path.is_dir() && is_fs_blob_shard_dir(&entry_path) {
152 std::fs::remove_dir_all(&entry_path).map_err(StoreError::Io)?;
153 tracing::info!(
154 "Removed stale filesystem blob shard directory after LMDB cutover: {}",
155 entry_path.display()
156 );
157 }
158 }
159 Ok(())
160}
161
162impl LocalStore {
163 pub fn new<P: AsRef<Path>>(path: P, backend: &StorageBackend) -> Result<Self, StoreError> {
169 Self::new_unbounded(path, backend)
170 }
171
172 pub fn new_with_lmdb_map_size<P: AsRef<Path>>(
177 path: P,
178 backend: &StorageBackend,
179 _map_size_bytes: Option<u64>,
180 ) -> Result<Self, StoreError> {
181 match backend {
182 StorageBackend::Fs => Ok(LocalStore::Fs(FsBlobStore::new(path)?)),
183 #[cfg(feature = "lmdb")]
184 StorageBackend::Lmdb => match _map_size_bytes {
185 Some(map_size_bytes) => {
186 std::fs::create_dir_all(path.as_ref()).map_err(StoreError::Io)?;
187 remove_stale_fs_blob_shards(path.as_ref())?;
188 Ok(LocalStore::Lmdb(LmdbBlobStore::with_max_bytes(
189 path,
190 map_size_bytes,
191 )?))
192 }
193 None => {
194 std::fs::create_dir_all(path.as_ref()).map_err(StoreError::Io)?;
195 remove_stale_fs_blob_shards(path.as_ref())?;
196 Ok(LocalStore::Lmdb(LmdbBlobStore::new(path)?))
197 }
198 },
199 #[cfg(not(feature = "lmdb"))]
200 StorageBackend::Lmdb => {
201 tracing::warn!(
202 "LMDB backend requested but lmdb feature not enabled, using filesystem storage"
203 );
204 Ok(LocalStore::Fs(FsBlobStore::new(path)?))
205 }
206 }
207 }
208
209 pub fn new_unbounded<P: AsRef<Path>>(
211 path: P,
212 backend: &StorageBackend,
213 ) -> Result<Self, StoreError> {
214 Self::new_with_lmdb_map_size(path, backend, None)
215 }
216
217 pub fn backend(&self) -> StorageBackend {
218 match self {
219 LocalStore::Fs(_) => StorageBackend::Fs,
220 #[cfg(feature = "lmdb")]
221 LocalStore::Lmdb(_) => StorageBackend::Lmdb,
222 }
223 }
224
225 pub fn put_sync(&self, hash: Hash, data: &[u8]) -> Result<bool, StoreError> {
227 match self {
228 LocalStore::Fs(store) => store.put_sync(hash, data),
229 #[cfg(feature = "lmdb")]
230 LocalStore::Lmdb(store) => store.put_sync(hash, data),
231 }
232 }
233
234 pub fn put_many_sync(&self, items: &[(Hash, Vec<u8>)]) -> Result<usize, StoreError> {
236 match self {
237 LocalStore::Fs(store) => {
238 let mut inserted = 0usize;
239 for (hash, data) in items {
240 if store.put_sync(*hash, data.as_slice())? {
241 inserted += 1;
242 }
243 }
244 Ok(inserted)
245 }
246 #[cfg(feature = "lmdb")]
247 LocalStore::Lmdb(store) => store.put_many_sync(items),
248 }
249 }
250
251 pub fn get_sync(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
253 match self {
254 LocalStore::Fs(store) => store.get_sync(hash),
255 #[cfg(feature = "lmdb")]
256 LocalStore::Lmdb(store) => store.get_sync(hash),
257 }
258 }
259
260 pub fn get_range_sync(
261 &self,
262 hash: &Hash,
263 start: u64,
264 end_inclusive: u64,
265 ) -> Result<Option<Vec<u8>>, StoreError> {
266 match self {
267 LocalStore::Fs(store) => store.get_range_sync(hash, start, end_inclusive),
268 #[cfg(feature = "lmdb")]
269 LocalStore::Lmdb(store) => store.get_range_sync(hash, start, end_inclusive),
270 }
271 }
272
273 pub fn blob_size_sync(&self, hash: &Hash) -> Result<Option<u64>, StoreError> {
274 match self {
275 LocalStore::Fs(store) => store.blob_size_sync(hash),
276 #[cfg(feature = "lmdb")]
277 LocalStore::Lmdb(store) => store.blob_size_sync(hash),
278 }
279 }
280
281 pub fn touch_accessed_sync(&self, hash: &Hash, now: u64) -> Result<bool, StoreError> {
282 match self {
283 LocalStore::Fs(store) => store.touch_accessed_sync(hash, now),
284 #[cfg(feature = "lmdb")]
285 LocalStore::Lmdb(store) => store.touch_accessed_sync(hash, now),
286 }
287 }
288
289 pub fn touch_many_accessed_sync(&self, hashes: &[Hash], now: u64) -> Result<usize, StoreError> {
290 match self {
291 LocalStore::Fs(store) => store.touch_many_accessed_sync(hashes, now),
292 #[cfg(feature = "lmdb")]
293 LocalStore::Lmdb(store) => store.touch_many_accessed_sync(hashes, now),
294 }
295 }
296
297 pub fn last_accessed_at_sync(&self, hash: &Hash) -> Result<Option<u64>, StoreError> {
298 match self {
299 LocalStore::Fs(store) => store.last_accessed_at_sync(hash),
300 #[cfg(feature = "lmdb")]
301 LocalStore::Lmdb(store) => store.last_accessed_at_sync(hash),
302 }
303 }
304
305 pub fn many_last_accessed_at_sync(
306 &self,
307 hashes: &[Hash],
308 ) -> Result<Vec<(Hash, u64)>, StoreError> {
309 match self {
310 LocalStore::Fs(store) => store.many_last_accessed_at_sync(hashes),
311 #[cfg(feature = "lmdb")]
312 LocalStore::Lmdb(store) => store.many_last_accessed_at_sync(hashes),
313 }
314 }
315
316 pub fn exists(&self, hash: &Hash) -> Result<bool, StoreError> {
318 match self {
319 LocalStore::Fs(store) => Ok(store.exists(hash)),
320 #[cfg(feature = "lmdb")]
321 LocalStore::Lmdb(store) => store.exists(hash),
322 }
323 }
324
325 pub fn existing_hashes_in_sorted_candidates(
327 &self,
328 sorted_hashes: &[Hash],
329 ) -> Result<Vec<bool>, StoreError> {
330 match self {
331 LocalStore::Fs(store) => Ok(sorted_hashes
332 .iter()
333 .map(|hash| store.exists(hash))
334 .collect()),
335 #[cfg(feature = "lmdb")]
336 LocalStore::Lmdb(store) => store.existing_hashes_in_sorted_candidates(sorted_hashes),
337 }
338 }
339
340 pub fn delete_sync(&self, hash: &Hash) -> Result<bool, StoreError> {
342 match self {
343 LocalStore::Fs(store) => store.delete_sync(hash),
344 #[cfg(feature = "lmdb")]
345 LocalStore::Lmdb(store) => store.delete_sync(hash),
346 }
347 }
348
349 pub fn stats(&self) -> Result<LocalStoreStats, StoreError> {
351 match self {
352 LocalStore::Fs(store) => {
353 let stats = store.stats()?;
354 Ok(LocalStoreStats {
355 count: stats.count,
356 total_bytes: stats.total_bytes,
357 })
358 }
359 #[cfg(feature = "lmdb")]
360 LocalStore::Lmdb(store) => {
361 let stats = store.stats()?;
362 Ok(LocalStoreStats {
363 count: stats.count,
364 total_bytes: stats.total_bytes,
365 })
366 }
367 }
368 }
369
370 pub fn list(&self) -> Result<Vec<Hash>, StoreError> {
372 match self {
373 LocalStore::Fs(store) => store.list(),
374 #[cfg(feature = "lmdb")]
375 LocalStore::Lmdb(store) => store.list(),
376 }
377 }
378}
379
380#[async_trait]
381impl Store for LocalStore {
382 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
383 self.put_sync(hash, &data)
384 }
385
386 async fn put_many(&self, items: Vec<(Hash, Vec<u8>)>) -> Result<usize, StoreError> {
387 self.put_many_sync(&items)
388 }
389
390 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
391 self.get_sync(hash)
392 }
393
394 async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
395 self.exists(hash)
396 }
397
398 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
399 self.delete_sync(hash)
400 }
401}
402
403#[cfg(feature = "s3")]
404use tokio::sync::mpsc;
405
406use crate::config::S3Config;
407
408#[cfg(feature = "s3")]
410enum S3SyncMessage {
411 Upload { hash: Hash, data: Vec<u8> },
412 Delete { hash: Hash },
413}
414
415pub struct StorageRouter {
420 local: Arc<LocalStore>,
422 #[cfg(feature = "s3")]
424 s3_client: Option<aws_sdk_s3::Client>,
425 #[cfg(feature = "s3")]
426 s3_bucket: Option<String>,
427 #[cfg(feature = "s3")]
428 s3_prefix: String,
429 #[cfg(feature = "s3")]
431 sync_tx: Option<mpsc::UnboundedSender<S3SyncMessage>>,
432}
433
434impl StorageRouter {
435 #[cfg(feature = "s3")]
436 fn run_s3_future_sync<F, T>(future: F) -> Result<T, StoreError>
437 where
438 F: Future<Output = T> + Send + 'static,
439 T: Send + 'static,
440 {
441 if tokio::runtime::Handle::try_current().is_ok() {
442 return std::thread::Builder::new()
443 .name("storage-s3-sync".to_string())
444 .spawn(move || {
445 let runtime = tokio::runtime::Builder::new_current_thread()
446 .enable_all()
447 .build()
448 .map_err(|err| {
449 StoreError::Other(format!("build storage s3 sync runtime: {err}"))
450 })?;
451 Ok(runtime.block_on(future))
452 })
453 .map_err(|err| StoreError::Other(format!("spawn S3 sync helper thread: {err}")))?
454 .join()
455 .map_err(|_| StoreError::Other("S3 sync helper thread panicked".to_string()))?;
456 }
457
458 let runtime = tokio::runtime::Builder::new_current_thread()
459 .enable_all()
460 .build()
461 .map_err(|err| StoreError::Other(format!("build storage s3 sync runtime: {err}")))?;
462 Ok(runtime.block_on(future))
463 }
464
465 pub fn new(local: Arc<LocalStore>) -> Self {
467 Self {
468 local,
469 #[cfg(feature = "s3")]
470 s3_client: None,
471 #[cfg(feature = "s3")]
472 s3_bucket: None,
473 #[cfg(feature = "s3")]
474 s3_prefix: String::new(),
475 #[cfg(feature = "s3")]
476 sync_tx: None,
477 }
478 }
479
480 #[cfg(feature = "s3")]
482 pub async fn with_s3(local: Arc<LocalStore>, config: &S3Config) -> Result<Self, anyhow::Error> {
483 use aws_sdk_s3::Client as S3Client;
484
485 let mut aws_config_loader = aws_config::from_env();
487 aws_config_loader =
488 aws_config_loader.region(aws_sdk_s3::config::Region::new(config.region.clone()));
489 let aws_config = aws_config_loader.load().await;
490
491 let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&aws_config);
493 s3_config_builder = s3_config_builder
494 .endpoint_url(&config.endpoint)
495 .force_path_style(true);
496
497 let s3_client = S3Client::from_conf(s3_config_builder.build());
498 let bucket = config.bucket.clone();
499 let prefix = config.prefix.clone().unwrap_or_default();
500
501 let (sync_tx, mut sync_rx) = mpsc::unbounded_channel::<S3SyncMessage>();
503
504 let sync_client = s3_client.clone();
506 let sync_bucket = bucket.clone();
507 let sync_prefix = prefix.clone();
508
509 tokio::spawn(async move {
510 use aws_sdk_s3::primitives::ByteStream;
511
512 tracing::info!("S3 background sync task started");
513
514 let semaphore = std::sync::Arc::new(tokio::sync::Semaphore::new(8));
516 let client = std::sync::Arc::new(sync_client);
517 let bucket = std::sync::Arc::new(sync_bucket);
518 let prefix = std::sync::Arc::new(sync_prefix);
519
520 while let Some(msg) = sync_rx.recv().await {
521 let client = client.clone();
522 let bucket = bucket.clone();
523 let prefix = prefix.clone();
524 let semaphore = semaphore.clone();
525
526 tokio::spawn(async move {
528 let _permit = semaphore.acquire().await;
530
531 match msg {
532 S3SyncMessage::Upload { hash, data } => {
533 let key = format!("{}{}.bin", prefix, to_hex(&hash));
534 tracing::debug!("S3 uploading {} ({} bytes)", &key, data.len());
535
536 let mut attempt = 1u8;
537 loop {
538 match client
539 .put_object()
540 .bucket(bucket.as_str())
541 .key(&key)
542 .body(ByteStream::from(data.clone()))
543 .send()
544 .await
545 {
546 Ok(_) => {
547 tracing::debug!("S3 upload succeeded: {}", &key);
548 break;
549 }
550 Err(e) if attempt < 3 => {
551 tracing::warn!(
552 "S3 upload retrying {}: attempt={} error={}",
553 &key,
554 attempt,
555 e
556 );
557 tokio::time::sleep(std::time::Duration::from_millis(
558 250 * u64::from(attempt),
559 ))
560 .await;
561 attempt += 1;
562 }
563 Err(e) => {
564 tracing::error!(
565 "S3 upload failed {} after {} attempts: {}",
566 &key,
567 attempt,
568 e
569 );
570 break;
571 }
572 }
573 }
574 }
575 S3SyncMessage::Delete { hash } => {
576 let key = format!("{}{}.bin", prefix, to_hex(&hash));
577 tracing::debug!("S3 deleting {}", &key);
578
579 let mut attempt = 1u8;
580 loop {
581 match client
582 .delete_object()
583 .bucket(bucket.as_str())
584 .key(&key)
585 .send()
586 .await
587 {
588 Ok(_) => break,
589 Err(e) if attempt < 3 => {
590 tracing::warn!(
591 "S3 delete retrying {}: attempt={} error={}",
592 &key,
593 attempt,
594 e
595 );
596 tokio::time::sleep(std::time::Duration::from_millis(
597 250 * u64::from(attempt),
598 ))
599 .await;
600 attempt += 1;
601 }
602 Err(e) => {
603 tracing::error!(
604 "S3 delete failed {} after {} attempts: {}",
605 &key,
606 attempt,
607 e
608 );
609 break;
610 }
611 }
612 }
613 }
614 }
615 });
616 }
617 });
618
619 tracing::info!(
620 "S3 storage initialized: bucket={}, prefix={}",
621 bucket,
622 prefix
623 );
624
625 Ok(Self {
626 local,
627 s3_client: Some(s3_client),
628 s3_bucket: Some(bucket),
629 s3_prefix: prefix,
630 sync_tx: Some(sync_tx),
631 })
632 }
633
634 pub fn put_sync(&self, hash: Hash, data: &[u8]) -> Result<bool, StoreError> {
636 let is_new = self.local.put_sync(hash, data)?;
638
639 #[cfg(feature = "s3")]
642 if is_new {
643 if let Some(ref tx) = self.sync_tx {
644 tracing::debug!(
645 "Queueing S3 upload for {} ({} bytes)",
646 crate::storage::to_hex(&hash)[..16].to_string(),
647 data.len(),
648 );
649 if let Err(e) = tx.send(S3SyncMessage::Upload {
650 hash,
651 data: data.to_vec(),
652 }) {
653 tracing::error!("Failed to queue S3 upload: {}", e);
654 }
655 }
656 }
657
658 Ok(is_new)
659 }
660
661 pub fn put_many_sync(&self, items: &[(Hash, Vec<u8>)]) -> Result<usize, StoreError> {
663 #[cfg(feature = "s3")]
664 let pending_uploads = if self.sync_tx.is_some() {
665 let mut pending = Vec::new();
666 for (hash, data) in items {
667 if !self.local.exists(hash)? {
668 pending.push((*hash, data.clone()));
669 }
670 }
671 pending
672 } else {
673 Vec::new()
674 };
675
676 let inserted = self.local.put_many_sync(items)?;
677
678 #[cfg(feature = "s3")]
679 if let Some(ref tx) = self.sync_tx {
680 for (hash, data) in pending_uploads {
681 if let Err(e) = tx.send(S3SyncMessage::Upload { hash, data }) {
682 tracing::error!("Failed to queue S3 upload: {}", e);
683 }
684 }
685 }
686
687 Ok(inserted)
688 }
689
690 pub fn get_sync(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
692 if let Some(data) = self.local.get_sync(hash)? {
694 return Ok(Some(data));
695 }
696
697 #[cfg(feature = "s3")]
699 if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
700 let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
701 let client = client.clone();
702 let bucket = bucket.clone();
703
704 match Self::run_s3_future_sync(async move {
705 client.get_object().bucket(bucket).key(key).send().await
706 }) {
707 Ok(Ok(output)) => {
708 match Self::run_s3_future_sync(async move { output.body.collect().await }) {
709 Ok(Ok(body)) => {
710 let data = body.into_bytes().to_vec();
711 let _ = self.local.put_sync(*hash, &data);
713 return Ok(Some(data));
714 }
715 Ok(Err(err)) => {
716 tracing::warn!("S3 body collect failed: {}", err);
717 }
718 Err(err) => {
719 tracing::warn!("S3 body collect runtime failed: {}", err);
720 }
721 }
722 }
723 Ok(Err(err)) => {
724 let service_err = err.into_service_error();
725 if !service_err.is_no_such_key() {
726 tracing::warn!("S3 get failed: {}", service_err);
727 }
728 }
729 Err(err) => {
730 tracing::warn!("S3 get runtime failed: {}", err);
731 }
732 }
733 }
734
735 Ok(None)
736 }
737
738 pub fn get_range_sync(
739 &self,
740 hash: &Hash,
741 start: u64,
742 end_inclusive: u64,
743 ) -> Result<Option<Vec<u8>>, StoreError> {
744 self.local.get_range_sync(hash, start, end_inclusive)
745 }
746
747 pub fn blob_size_sync(&self, hash: &Hash) -> Result<Option<u64>, StoreError> {
748 self.local.blob_size_sync(hash)
749 }
750
751 pub fn touch_accessed_sync(&self, hash: &Hash, now: u64) -> Result<bool, StoreError> {
752 self.local.touch_accessed_sync(hash, now)
753 }
754
755 pub fn touch_many_accessed_sync(&self, hashes: &[Hash], now: u64) -> Result<usize, StoreError> {
756 self.local.touch_many_accessed_sync(hashes, now)
757 }
758
759 pub fn last_accessed_at_sync(&self, hash: &Hash) -> Result<Option<u64>, StoreError> {
760 self.local.last_accessed_at_sync(hash)
761 }
762
763 pub fn many_last_accessed_at_sync(
764 &self,
765 hashes: &[Hash],
766 ) -> Result<Vec<(Hash, u64)>, StoreError> {
767 self.local.many_last_accessed_at_sync(hashes)
768 }
769
770 pub fn exists(&self, hash: &Hash) -> Result<bool, StoreError> {
772 if self.local.exists(hash)? {
774 return Ok(true);
775 }
776
777 #[cfg(feature = "s3")]
779 if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
780 let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
781 let client = client.clone();
782 let bucket = bucket.clone();
783
784 match Self::run_s3_future_sync(async move {
785 client.head_object().bucket(bucket).key(&key).send().await
786 }) {
787 Ok(Ok(_)) => return Ok(true),
788 Ok(Err(err)) => {
789 let service_err = err.into_service_error();
790 if !service_err.is_not_found() {
791 tracing::warn!("S3 head failed: {}", service_err);
792 }
793 }
794 Err(err) => {
795 tracing::warn!("S3 head runtime failed: {}", err);
796 }
797 }
798 }
799
800 Ok(false)
801 }
802
803 pub fn delete_sync(&self, hash: &Hash) -> Result<bool, StoreError> {
805 let deleted = self.local.delete_sync(hash)?;
806
807 #[cfg(feature = "s3")]
809 if let Some(ref tx) = self.sync_tx {
810 let _ = tx.send(S3SyncMessage::Delete { hash: *hash });
811 }
812
813 Ok(deleted)
814 }
815
816 pub fn delete_local_only(&self, hash: &Hash) -> Result<bool, StoreError> {
819 self.local.delete_sync(hash)
820 }
821
822 pub fn stats(&self) -> Result<LocalStoreStats, StoreError> {
824 self.local.stats()
825 }
826
827 pub fn list(&self) -> Result<Vec<Hash>, StoreError> {
829 self.local.list()
830 }
831
832 pub fn local_store(&self) -> Arc<LocalStore> {
834 Arc::clone(&self.local)
835 }
836}
837
838#[derive(Clone)]
839struct AccessRecordingStore {
840 inner: Arc<StorageRouter>,
841 accessed: Arc<Mutex<HashSet<Hash>>>,
842}
843
844impl AccessRecordingStore {
845 fn new(inner: Arc<StorageRouter>) -> Self {
846 Self {
847 inner,
848 accessed: Arc::new(Mutex::new(HashSet::new())),
849 }
850 }
851
852 fn take_accessed_hashes(&self) -> Vec<Hash> {
853 let Ok(mut accessed) = self.accessed.lock() else {
854 return Vec::new();
855 };
856 accessed.drain().collect()
857 }
858
859 fn record_access(&self, hash: &Hash) {
860 let Ok(mut accessed) = self.accessed.lock() else {
861 return;
862 };
863 accessed.insert(*hash);
864 }
865}
866
867#[async_trait]
868impl Store for AccessRecordingStore {
869 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
870 self.inner.put(hash, data).await
871 }
872
873 async fn put_many(&self, items: Vec<(Hash, Vec<u8>)>) -> Result<usize, StoreError> {
874 self.inner.put_many(items).await
875 }
876
877 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
878 let data = self.inner.get(hash).await?;
879 if data.is_some() {
880 self.record_access(hash);
881 }
882 Ok(data)
883 }
884
885 async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
886 self.inner.has(hash).await
887 }
888
889 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
890 self.inner.delete(hash).await
891 }
892}
893
894#[async_trait]
897impl Store for StorageRouter {
898 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
899 self.put_sync(hash, &data)
900 }
901
902 async fn put_many(&self, items: Vec<(Hash, Vec<u8>)>) -> Result<usize, StoreError> {
903 self.put_many_sync(&items)
904 }
905
906 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
907 self.get_sync(hash)
908 }
909
910 async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
911 self.exists(hash)
912 }
913
914 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
915 self.delete_sync(hash)
916 }
917}
918
919pub struct HashtreeStore {
920 base_path: PathBuf,
921 env: heed::Env,
922 pins: Database<Bytes, Unit>,
924 pinned_refs: Database<Str, Unit>,
926 tracked_authors: Database<Str, Unit>,
928 blob_owners: Database<Bytes, Unit>,
930 pubkey_blobs: Database<Bytes, Bytes>,
932 tree_meta: Database<Bytes, Bytes>,
934 blob_trees: Database<Bytes, Unit>,
936 tree_refs: Database<Str, Bytes>,
938 cached_roots: Database<Str, Bytes>,
940 router: Arc<StorageRouter>,
942 max_size_bytes: u64,
944 evict_orphans: bool,
946 blob_access_update_gate: BlobAccessUpdateGate,
948}
949
950impl HashtreeStore {
951 pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
953 let config = hashtree_config::Config::load_or_default();
954 let max_size_bytes = config
955 .storage
956 .max_size_gb
957 .saturating_mul(1024 * 1024 * 1024);
958 Self::with_options_and_backend(
959 path,
960 None,
961 max_size_bytes,
962 config.storage.evict_orphans,
963 &config.storage.backend,
964 )
965 }
966
967 pub fn new_with_backend<P: AsRef<Path>>(
969 path: P,
970 backend: hashtree_config::StorageBackend,
971 max_size_bytes: u64,
972 ) -> Result<Self> {
973 Self::with_options_and_backend(path, None, max_size_bytes, true, &backend)
974 }
975
976 pub fn with_s3<P: AsRef<Path>>(path: P, s3_config: Option<&S3Config>) -> Result<Self> {
978 let config = hashtree_config::Config::load_or_default();
979 let max_size_bytes = config
980 .storage
981 .max_size_gb
982 .saturating_mul(1024 * 1024 * 1024);
983 Self::with_options_and_backend(
984 path,
985 s3_config,
986 max_size_bytes,
987 config.storage.evict_orphans,
988 &config.storage.backend,
989 )
990 }
991
992 pub fn with_options<P: AsRef<Path>>(
998 path: P,
999 s3_config: Option<&S3Config>,
1000 max_size_bytes: u64,
1001 ) -> Result<Self> {
1002 let config = hashtree_config::Config::load_or_default();
1003 Self::with_options_and_backend(
1004 path,
1005 s3_config,
1006 max_size_bytes,
1007 config.storage.evict_orphans,
1008 &config.storage.backend,
1009 )
1010 }
1011
1012 pub fn with_options_and_backend<P: AsRef<Path>>(
1013 path: P,
1014 s3_config: Option<&S3Config>,
1015 max_size_bytes: u64,
1016 evict_orphans: bool,
1017 backend: &hashtree_config::StorageBackend,
1018 ) -> Result<Self> {
1019 let path = path.as_ref();
1020 std::fs::create_dir_all(path)?;
1021 let metadata_map_size =
1022 lmdb_map_size_for_existing_env(path, LMDB_METADATA_MIN_MAP_SIZE_BYTES)?;
1023
1024 let env = unsafe {
1025 EnvOpenOptions::new()
1026 .map_size(metadata_map_size)
1027 .max_dbs(10) .max_readers(LMDB_MAX_READERS)
1029 .open(path)?
1030 };
1031 let _ = env.clear_stale_readers();
1032
1033 let mut wtxn = env.write_txn()?;
1034 let pins = env.create_database(&mut wtxn, Some("pins"))?;
1035 let pinned_refs = env.create_database(&mut wtxn, Some("pinned_refs"))?;
1036 let tracked_authors = env.create_database(&mut wtxn, Some("tracked_authors"))?;
1037 let blob_owners = env.create_database(&mut wtxn, Some("blob_owners"))?;
1038 let pubkey_blobs = env.create_database(&mut wtxn, Some("pubkey_blobs"))?;
1039 let tree_meta = env.create_database(&mut wtxn, Some("tree_meta"))?;
1040 let blob_trees = env.create_database(&mut wtxn, Some("blob_trees"))?;
1041 let tree_refs = env.create_database(&mut wtxn, Some("tree_refs"))?;
1042 let cached_roots = env.create_database(&mut wtxn, Some("cached_roots"))?;
1043 wtxn.commit()?;
1044
1045 let local_store = Arc::new(match backend {
1049 hashtree_config::StorageBackend::Fs => LocalStore::Fs(
1050 FsBlobStore::new(path.join("blobs"))
1051 .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
1052 ),
1053 #[cfg(feature = "lmdb")]
1054 hashtree_config::StorageBackend::Lmdb => {
1055 std::fs::create_dir_all(path.join("blobs"))?;
1056 remove_stale_fs_blob_shards(&path.join("blobs"))
1057 .map_err(|e| anyhow::anyhow!("Failed to clean LMDB blob store path: {}", e))?;
1058 let requested_map_size = max_size_bytes.max(LMDB_BLOB_MIN_MAP_SIZE_BYTES);
1059 let map_size = usize::try_from(requested_map_size)
1060 .context("LMDB blob map size exceeds usize")?;
1061 LocalStore::Lmdb(
1062 LmdbBlobStore::with_map_size(path.join("blobs"), map_size)
1063 .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
1064 )
1065 }
1066 #[cfg(not(feature = "lmdb"))]
1067 hashtree_config::StorageBackend::Lmdb => {
1068 tracing::warn!(
1069 "LMDB backend requested but lmdb feature not enabled, using filesystem storage"
1070 );
1071 LocalStore::Fs(
1072 FsBlobStore::new(path.join("blobs"))
1073 .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
1074 )
1075 }
1076 });
1077
1078 #[cfg(feature = "s3")]
1080 let router = Arc::new(if let Some(s3_cfg) = s3_config {
1081 tracing::info!(
1082 "Initializing S3 storage backend: bucket={}, endpoint={}",
1083 s3_cfg.bucket,
1084 s3_cfg.endpoint
1085 );
1086
1087 sync_block_on(async { StorageRouter::with_s3(local_store, s3_cfg).await })?
1088 } else {
1089 StorageRouter::new(local_store)
1090 });
1091
1092 #[cfg(not(feature = "s3"))]
1093 let router = Arc::new({
1094 if s3_config.is_some() {
1095 tracing::warn!(
1096 "S3 config provided but S3 feature not enabled. Using local storage only."
1097 );
1098 }
1099 StorageRouter::new(local_store)
1100 });
1101
1102 Ok(Self {
1103 base_path: path.to_path_buf(),
1104 env,
1105 pins,
1106 pinned_refs,
1107 tracked_authors,
1108 blob_owners,
1109 pubkey_blobs,
1110 tree_meta,
1111 blob_trees,
1112 tree_refs,
1113 cached_roots,
1114 router,
1115 max_size_bytes,
1116 evict_orphans,
1117 blob_access_update_gate: BlobAccessUpdateGate::default(),
1118 })
1119 }
1120
1121 pub fn base_path(&self) -> &Path {
1122 &self.base_path
1123 }
1124
1125 pub fn router(&self) -> &StorageRouter {
1127 &self.router
1128 }
1129
1130 pub fn store_arc(&self) -> Arc<StorageRouter> {
1133 Arc::clone(&self.router)
1134 }
1135
1136 fn access_tracking_tree(&self) -> (HashTree<AccessRecordingStore>, AccessRecordingStore) {
1137 let access_store = AccessRecordingStore::new(self.store_arc());
1138 let tree = HashTree::new(HashTreeConfig::new(Arc::new(access_store.clone())).public());
1139 (tree, access_store)
1140 }
1141
1142 pub fn record_blob_accesses<I>(&self, hashes: I)
1143 where
1144 I: IntoIterator<Item = Hash>,
1145 {
1146 let now = unix_timestamp_now();
1147 let due_hashes = self.blob_access_update_gate.due_hashes(hashes, now);
1148 if due_hashes.is_empty() {
1149 return;
1150 }
1151
1152 if let Err(err) = self.router.touch_many_accessed_sync(&due_hashes, now) {
1153 tracing::debug!("Failed to update blob access metadata: {}", err);
1154 }
1155 }
1156
1157 fn record_blob_access_now(&self, hash: &Hash) {
1158 if let Err(err) = self.router.touch_accessed_sync(hash, unix_timestamp_now()) {
1159 tracing::debug!("Failed to update blob access metadata: {}", err);
1160 }
1161 }
1162
1163 pub fn blob_last_accessed_at(&self, hash: &Hash) -> Result<Option<u64>> {
1164 self.router
1165 .last_accessed_at_sync(hash)
1166 .map_err(|e| anyhow::anyhow!("Failed to read blob access metadata: {}", e))
1167 }
1168
1169 pub fn blob_last_accessed_many(&self, hashes: &[Hash]) -> Result<Vec<(Hash, u64)>> {
1170 self.router
1171 .many_last_accessed_at_sync(hashes)
1172 .map_err(|e| anyhow::anyhow!("Failed to read blob access metadata: {}", e))
1173 }
1174
1175 pub fn get_tree_node(&self, hash: &[u8; 32]) -> Result<Option<TreeNode>> {
1177 let (tree, access_store) = self.access_tracking_tree();
1178
1179 let result = sync_block_on(async {
1180 tree.get_tree_node(hash)
1181 .await
1182 .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))
1183 })?;
1184 if result.is_some() {
1185 self.record_blob_accesses(access_store.take_accessed_hashes());
1186 }
1187 Ok(result)
1188 }
1189
1190 pub fn put_blob(&self, data: &[u8]) -> Result<String> {
1192 let hash = sha256(data);
1193 let inserted = self
1194 .router
1195 .put_sync(hash, data)
1196 .map_err(|e| anyhow::anyhow!("Failed to store blob: {}", e))?;
1197 if !inserted {
1198 self.record_blob_access_now(&hash);
1199 }
1200 Ok(to_hex(&hash))
1201 }
1202
1203 pub fn put_owned_blob(&self, data: &[u8], pubkey: &[u8; 32]) -> Result<String> {
1205 let hash = sha256(data);
1206 if !self
1207 .router
1208 .exists(&hash)
1209 .map_err(|e| anyhow::anyhow!("Failed to check blob: {}", e))?
1210 {
1211 self.make_room_for_durable_blob(data.len() as u64)?;
1212 self.router
1213 .put_sync(hash, data)
1214 .map_err(|e| anyhow::anyhow!("Failed to store blob: {}", e))?;
1215 } else {
1216 self.record_blob_access_now(&hash);
1217 }
1218 self.set_blob_owner(&hash, pubkey)?;
1219 Ok(to_hex(&hash))
1220 }
1221
1222 pub fn put_cached_blob(&self, data: &[u8]) -> Result<String> {
1228 let hash = sha256(data);
1229 if self
1230 .router
1231 .exists(&hash)
1232 .map_err(|e| anyhow::anyhow!("Failed to check cached blob: {}", e))?
1233 {
1234 self.record_blob_access_now(&hash);
1235 return Ok(to_hex(&hash));
1236 }
1237
1238 let incoming_bytes = data.len() as u64;
1239 let _ = self.make_room_for_cached_blob(incoming_bytes);
1240
1241 let mut retried_after_cleanup = false;
1242 loop {
1243 match self.router.put_sync(hash, data) {
1244 Ok(_) => return Ok(to_hex(&hash)),
1245 Err(err) if !retried_after_cleanup && is_map_full_store_error(&err) => {
1246 let freed = self.relieve_cached_blob_write_pressure(incoming_bytes)?;
1247 if freed == 0 {
1248 return Err(anyhow::anyhow!("Failed to store cached blob: {}", err));
1249 }
1250 retried_after_cleanup = true;
1251 }
1252 Err(err) => return Err(anyhow::anyhow!("Failed to store cached blob: {}", err)),
1253 }
1254 }
1255 }
1256
1257 pub fn get_blob(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
1259 let data = self
1260 .router
1261 .get_sync(hash)
1262 .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))?;
1263 if data.is_some() {
1264 self.record_blob_accesses(std::iter::once(*hash));
1265 }
1266 Ok(data)
1267 }
1268
1269 pub fn get_blob_range(
1270 &self,
1271 hash: &[u8; 32],
1272 start: u64,
1273 end_inclusive: u64,
1274 ) -> Result<Option<Vec<u8>>> {
1275 let data = self
1276 .router
1277 .get_range_sync(hash, start, end_inclusive)
1278 .map_err(|e| anyhow::anyhow!("Failed to get blob range: {}", e))?;
1279 if data.is_some() {
1280 self.record_blob_accesses(std::iter::once(*hash));
1281 }
1282 Ok(data)
1283 }
1284
1285 pub fn blob_size(&self, hash: &[u8; 32]) -> Result<Option<u64>> {
1286 self.router
1287 .blob_size_sync(hash)
1288 .map_err(|e| anyhow::anyhow!("Failed to get blob size: {}", e))
1289 }
1290
1291 pub fn blob_exists(&self, hash: &[u8; 32]) -> Result<bool> {
1293 self.router
1294 .exists(hash)
1295 .map_err(|e| anyhow::anyhow!("Failed to check blob: {}", e))
1296 }
1297
1298 fn blob_owner_key(sha256: &[u8; 32], pubkey: &[u8; 32]) -> [u8; 64] {
1304 let mut key = [0u8; 64];
1305 key[..32].copy_from_slice(sha256);
1306 key[32..].copy_from_slice(pubkey);
1307 key
1308 }
1309
1310 pub fn set_blob_owner(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<()> {
1313 let key = Self::blob_owner_key(sha256, pubkey);
1314 let mut wtxn = self.env.write_txn()?;
1315
1316 self.blob_owners.put(&mut wtxn, &key[..], &())?;
1318
1319 let sha256_hex = to_hex(sha256);
1321
1322 let mut blobs: Vec<BlobMetadata> = self
1324 .pubkey_blobs
1325 .get(&wtxn, pubkey)?
1326 .and_then(|b| serde_json::from_slice(b).ok())
1327 .unwrap_or_default();
1328
1329 if !blobs.iter().any(|b| b.sha256 == sha256_hex) {
1331 let now = SystemTime::now()
1332 .duration_since(UNIX_EPOCH)
1333 .unwrap()
1334 .as_secs();
1335
1336 let size = self
1339 .router
1340 .blob_size_sync(sha256)
1341 .map_err(|e| anyhow::anyhow!("Failed to get blob size: {}", e))?
1342 .unwrap_or(0);
1343
1344 blobs.push(BlobMetadata {
1345 sha256: sha256_hex,
1346 size,
1347 mime_type: "application/octet-stream".to_string(),
1348 uploaded: now,
1349 });
1350
1351 let blobs_json = serde_json::to_vec(&blobs)?;
1352 self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
1353 }
1354
1355 wtxn.commit()?;
1356 Ok(())
1357 }
1358
1359 pub fn is_blob_owner(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<bool> {
1361 let key = Self::blob_owner_key(sha256, pubkey);
1362 let rtxn = self.env.read_txn()?;
1363 Ok(self.blob_owners.get(&rtxn, &key[..])?.is_some())
1364 }
1365
1366 pub fn get_blob_owners(&self, sha256: &[u8; 32]) -> Result<Vec<[u8; 32]>> {
1368 let rtxn = self.env.read_txn()?;
1369
1370 let mut owners = Vec::new();
1371 for item in self.blob_owners.prefix_iter(&rtxn, &sha256[..])? {
1372 let (key, _) = item?;
1373 if key.len() == 64 {
1374 let mut pubkey = [0u8; 32];
1376 pubkey.copy_from_slice(&key[32..64]);
1377 owners.push(pubkey);
1378 }
1379 }
1380 Ok(owners)
1381 }
1382
1383 pub fn blob_has_owners(&self, sha256: &[u8; 32]) -> Result<bool> {
1385 let rtxn = self.env.read_txn()?;
1386
1387 for item in self.blob_owners.prefix_iter(&rtxn, &sha256[..])? {
1389 if item.is_ok() {
1390 return Ok(true);
1391 }
1392 }
1393 Ok(false)
1394 }
1395
1396 pub fn get_blob_owner(&self, sha256: &[u8; 32]) -> Result<Option<[u8; 32]>> {
1398 Ok(self.get_blob_owners(sha256)?.into_iter().next())
1399 }
1400
1401 pub fn delete_blossom_blob(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<bool> {
1405 let key = Self::blob_owner_key(sha256, pubkey);
1406 let mut wtxn = self.env.write_txn()?;
1407
1408 self.blob_owners.delete(&mut wtxn, &key[..])?;
1410
1411 let sha256_hex = to_hex(sha256);
1413
1414 if let Some(blobs_bytes) = self.pubkey_blobs.get(&wtxn, pubkey)? {
1416 if let Ok(mut blobs) = serde_json::from_slice::<Vec<BlobMetadata>>(blobs_bytes) {
1417 blobs.retain(|b| b.sha256 != sha256_hex);
1418 let blobs_json = serde_json::to_vec(&blobs)?;
1419 self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
1420 }
1421 }
1422
1423 let mut has_other_owners = false;
1425 for item in self.blob_owners.prefix_iter(&wtxn, &sha256[..])? {
1426 if item.is_ok() {
1427 has_other_owners = true;
1428 break;
1429 }
1430 }
1431
1432 if has_other_owners {
1433 wtxn.commit()?;
1434 tracing::debug!(
1435 "Removed {} from blob {} owners, other owners remain",
1436 &to_hex(pubkey)[..8],
1437 &sha256_hex[..8]
1438 );
1439 return Ok(false);
1440 }
1441
1442 tracing::info!(
1444 "All owners removed from blob {}, deleting",
1445 &sha256_hex[..8]
1446 );
1447
1448 let _ = self.router.delete_sync(sha256);
1450
1451 wtxn.commit()?;
1452 Ok(true)
1453 }
1454
1455 pub fn list_blobs_by_pubkey(
1457 &self,
1458 pubkey: &[u8; 32],
1459 ) -> Result<Vec<crate::server::blossom::BlobDescriptor>> {
1460 let rtxn = self.env.read_txn()?;
1461
1462 let blobs: Vec<BlobMetadata> = self
1463 .pubkey_blobs
1464 .get(&rtxn, pubkey)?
1465 .and_then(|b| serde_json::from_slice(b).ok())
1466 .unwrap_or_default();
1467
1468 Ok(blobs
1469 .into_iter()
1470 .map(|b| crate::server::blossom::BlobDescriptor {
1471 url: format!("/{}", b.sha256),
1472 sha256: b.sha256,
1473 size: b.size,
1474 mime_type: b.mime_type,
1475 uploaded: b.uploaded,
1476 })
1477 .collect())
1478 }
1479
1480 pub fn get_chunk(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
1482 let data = self
1483 .router
1484 .get_sync(hash)
1485 .map_err(|e| anyhow::anyhow!("Failed to get chunk: {}", e))?;
1486 if data.is_some() {
1487 self.record_blob_accesses(std::iter::once(*hash));
1488 }
1489 Ok(data)
1490 }
1491
1492 pub fn get_file(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
1495 let (tree, access_store) = self.access_tracking_tree();
1496
1497 let result = sync_block_on(async {
1498 tree.read_file(hash)
1499 .await
1500 .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
1501 })?;
1502 if result.is_some() {
1503 self.record_blob_accesses(access_store.take_accessed_hashes());
1504 }
1505 Ok(result)
1506 }
1507
1508 pub fn get_file_by_cid(&self, cid: &Cid) -> Result<Option<Vec<u8>>> {
1511 let (tree, access_store) = self.access_tracking_tree();
1512
1513 let result = sync_block_on(async {
1514 tree.get(cid, None)
1515 .await
1516 .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
1517 })?;
1518 if result.is_some() {
1519 self.record_blob_accesses(access_store.take_accessed_hashes());
1520 }
1521 Ok(result)
1522 }
1523
1524 fn ensure_cid_exists(&self, cid: &Cid) -> Result<()> {
1525 let exists = self
1526 .router
1527 .exists(&cid.hash)
1528 .map_err(|e| anyhow::anyhow!("Failed to check cid existence: {}", e))?;
1529 if !exists {
1530 anyhow::bail!("CID not found: {}", to_hex(&cid.hash));
1531 }
1532 Ok(())
1533 }
1534
1535 pub fn write_file_by_cid_to_writer<W: Write>(&self, cid: &Cid, writer: &mut W) -> Result<u64> {
1537 self.ensure_cid_exists(cid)?;
1538
1539 let (tree, access_store) = self.access_tracking_tree();
1540 let mut total_bytes = 0u64;
1541 let mut streamed_any_chunk = false;
1542
1543 sync_block_on(async {
1544 let mut stream = tree.get_stream(cid);
1545 while let Some(chunk) = stream.next().await {
1546 streamed_any_chunk = true;
1547 let chunk =
1548 chunk.map_err(|e| anyhow::anyhow!("Failed to stream file chunk: {}", e))?;
1549 writer
1550 .write_all(&chunk)
1551 .map_err(|e| anyhow::anyhow!("Failed to write file chunk: {}", e))?;
1552 total_bytes += chunk.len() as u64;
1553 }
1554 Ok::<(), anyhow::Error>(())
1555 })?;
1556
1557 if !streamed_any_chunk {
1558 anyhow::bail!("CID not found: {}", to_hex(&cid.hash));
1559 }
1560 self.record_blob_accesses(access_store.take_accessed_hashes());
1561
1562 writer
1563 .flush()
1564 .map_err(|e| anyhow::anyhow!("Failed to flush output: {}", e))?;
1565 Ok(total_bytes)
1566 }
1567
1568 pub fn write_file_by_cid<P: AsRef<Path>>(&self, cid: &Cid, output_path: P) -> Result<u64> {
1570 self.ensure_cid_exists(cid)?;
1571
1572 let output_path = output_path.as_ref();
1573 if let Some(parent) = output_path.parent() {
1574 if !parent.as_os_str().is_empty() {
1575 std::fs::create_dir_all(parent).with_context(|| {
1576 format!("Failed to create output directory {}", parent.display())
1577 })?;
1578 }
1579 }
1580
1581 let mut file = std::fs::File::create(output_path)
1582 .with_context(|| format!("Failed to create output file {}", output_path.display()))?;
1583 self.write_file_by_cid_to_writer(cid, &mut file)
1584 }
1585
1586 pub fn write_file<P: AsRef<Path>>(&self, hash: &[u8; 32], output_path: P) -> Result<u64> {
1588 self.write_file_by_cid(&Cid::public(*hash), output_path)
1589 }
1590
1591 pub fn resolve_path(&self, cid: &Cid, path: &str) -> Result<Option<Cid>> {
1593 let (tree, access_store) = self.access_tracking_tree();
1594
1595 let result = sync_block_on(async {
1596 tree.resolve_path(cid, path)
1597 .await
1598 .map_err(|e| anyhow::anyhow!("Failed to resolve path: {}", e))
1599 })?;
1600 if result.is_some() {
1601 self.record_blob_accesses(access_store.take_accessed_hashes());
1602 }
1603 Ok(result)
1604 }
1605
1606 pub fn get_file_chunk_metadata(&self, hash: &[u8; 32]) -> Result<Option<FileChunkMetadata>> {
1608 let access_store = AccessRecordingStore::new(self.store_arc());
1609 let tree = HashTree::new(HashTreeConfig::new(Arc::new(access_store.clone())).public());
1610
1611 let metadata: Result<Option<FileChunkMetadata>> = sync_block_on(async {
1612 let exists = access_store
1615 .has(hash)
1616 .await
1617 .map_err(|e| anyhow::anyhow!("Failed to check existence: {}", e))?;
1618
1619 if !exists {
1620 return Ok(None);
1621 }
1622
1623 let total_size = tree
1625 .get_size(hash)
1626 .await
1627 .map_err(|e| anyhow::anyhow!("Failed to get size: {}", e))?;
1628
1629 let is_tree_node = tree
1631 .is_tree(hash)
1632 .await
1633 .map_err(|e| anyhow::anyhow!("Failed to check tree: {}", e))?;
1634
1635 if !is_tree_node {
1636 return Ok(Some(FileChunkMetadata {
1638 total_size,
1639 chunk_hashes: vec![],
1640 chunk_sizes: vec![],
1641 is_chunked: false,
1642 }));
1643 }
1644
1645 let node = match tree
1647 .get_tree_node(hash)
1648 .await
1649 .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
1650 {
1651 Some(n) => n,
1652 None => return Ok(None),
1653 };
1654
1655 let is_directory = tree
1657 .is_directory(hash)
1658 .await
1659 .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1660
1661 if is_directory {
1662 return Ok(None); }
1664
1665 let chunk_hashes: Vec<Hash> = node.links.iter().map(|l| l.hash).collect();
1667 let chunk_sizes: Vec<u64> = node.links.iter().map(|l| l.size).collect();
1668
1669 Ok(Some(FileChunkMetadata {
1670 total_size,
1671 chunk_hashes,
1672 chunk_sizes,
1673 is_chunked: !node.links.is_empty(),
1674 }))
1675 });
1676 let metadata = metadata?;
1677 if metadata.is_some() {
1678 self.record_blob_accesses(access_store.take_accessed_hashes());
1679 }
1680 Ok(metadata)
1681 }
1682
1683 pub fn get_file_range(
1685 &self,
1686 hash: &[u8; 32],
1687 start: u64,
1688 end: Option<u64>,
1689 ) -> Result<Option<(Vec<u8>, u64)>> {
1690 let metadata = match self.get_file_chunk_metadata(hash)? {
1691 Some(m) => m,
1692 None => return Ok(None),
1693 };
1694
1695 if metadata.total_size == 0 {
1696 return Ok(Some((Vec::new(), 0)));
1697 }
1698
1699 if start >= metadata.total_size {
1700 return Ok(None);
1701 }
1702
1703 let end = end
1704 .unwrap_or(metadata.total_size - 1)
1705 .min(metadata.total_size - 1);
1706
1707 if !metadata.is_chunked {
1709 let content = self.get_file(hash)?.unwrap_or_default();
1710 let range_content = if start < content.len() as u64 {
1711 content[start as usize..=(end as usize).min(content.len() - 1)].to_vec()
1712 } else {
1713 Vec::new()
1714 };
1715 return Ok(Some((range_content, metadata.total_size)));
1716 }
1717
1718 let mut result = Vec::new();
1720 let mut current_offset = 0u64;
1721
1722 for (i, chunk_hash) in metadata.chunk_hashes.iter().enumerate() {
1723 let chunk_size = metadata.chunk_sizes[i];
1724 let chunk_end = current_offset + chunk_size - 1;
1725
1726 if chunk_end >= start && current_offset <= end {
1728 let chunk_content = match self.get_chunk(chunk_hash)? {
1729 Some(content) => content,
1730 None => {
1731 return Err(anyhow::anyhow!("Chunk {} not found", to_hex(chunk_hash)));
1732 }
1733 };
1734
1735 let chunk_read_start = if current_offset >= start {
1736 0
1737 } else {
1738 (start - current_offset) as usize
1739 };
1740
1741 let chunk_read_end = if chunk_end <= end {
1742 chunk_size as usize - 1
1743 } else {
1744 (end - current_offset) as usize
1745 };
1746
1747 result.extend_from_slice(&chunk_content[chunk_read_start..=chunk_read_end]);
1748 }
1749
1750 current_offset += chunk_size;
1751
1752 if current_offset > end {
1753 break;
1754 }
1755 }
1756
1757 Ok(Some((result, metadata.total_size)))
1758 }
1759
1760 pub fn stream_file_range_chunks_owned(
1762 self: Arc<Self>,
1763 hash: &[u8; 32],
1764 start: u64,
1765 end: u64,
1766 ) -> Result<Option<FileRangeChunksOwned>> {
1767 let metadata = match self.get_file_chunk_metadata(hash)? {
1768 Some(m) => m,
1769 None => return Ok(None),
1770 };
1771
1772 if metadata.total_size == 0 || start >= metadata.total_size {
1773 return Ok(None);
1774 }
1775
1776 let end = end.min(metadata.total_size - 1);
1777
1778 Ok(Some(FileRangeChunksOwned {
1779 store: self,
1780 metadata,
1781 start,
1782 end,
1783 current_chunk_idx: 0,
1784 current_offset: 0,
1785 }))
1786 }
1787
1788 pub fn get_directory_listing(&self, hash: &[u8; 32]) -> Result<Option<DirectoryListing>> {
1790 let (tree, access_store) = self.access_tracking_tree();
1791
1792 let listing: Result<Option<DirectoryListing>> = sync_block_on(async {
1793 let is_dir = tree
1795 .is_directory(hash)
1796 .await
1797 .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1798
1799 if !is_dir {
1800 return Ok(None);
1801 }
1802
1803 let cid = hashtree_core::Cid::public(*hash);
1805 let tree_entries = tree
1806 .list_directory(&cid)
1807 .await
1808 .map_err(|e| anyhow::anyhow!("Failed to list directory: {}", e))?;
1809
1810 let entries: Vec<DirEntry> = tree_entries
1811 .into_iter()
1812 .map(|e| DirEntry {
1813 name: e.name,
1814 cid: to_hex(&e.hash),
1815 is_directory: e.link_type.is_tree(),
1816 size: e.size,
1817 })
1818 .collect();
1819
1820 Ok(Some(DirectoryListing {
1821 dir_name: String::new(),
1822 entries,
1823 }))
1824 });
1825 let listing = listing?;
1826 if listing.is_some() {
1827 self.record_blob_accesses(access_store.take_accessed_hashes());
1828 }
1829 Ok(listing)
1830 }
1831
1832 pub fn get_directory_listing_by_cid(&self, cid: &Cid) -> Result<Option<DirectoryListing>> {
1834 let (tree, access_store) = self.access_tracking_tree();
1835 let cid = cid.clone();
1836
1837 let listing: Result<Option<DirectoryListing>> = sync_block_on(async {
1838 let is_dir = tree
1839 .is_dir(&cid)
1840 .await
1841 .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1842
1843 if !is_dir {
1844 return Ok(None);
1845 }
1846
1847 let tree_entries = tree
1848 .list_directory(&cid)
1849 .await
1850 .map_err(|e| anyhow::anyhow!("Failed to list directory: {}", e))?;
1851
1852 let entries: Vec<DirEntry> = tree_entries
1853 .into_iter()
1854 .map(|e| DirEntry {
1855 name: e.name,
1856 cid: Cid {
1857 hash: e.hash,
1858 key: e.key,
1859 }
1860 .to_string(),
1861 is_directory: e.link_type.is_tree(),
1862 size: e.size,
1863 })
1864 .collect();
1865
1866 Ok(Some(DirectoryListing {
1867 dir_name: String::new(),
1868 entries,
1869 }))
1870 });
1871 let listing = listing?;
1872 if listing.is_some() {
1873 self.record_blob_accesses(access_store.take_accessed_hashes());
1874 }
1875 Ok(listing)
1876 }
1877
1878 pub fn add_pinned_ref(&self, key: &str) -> Result<()> {
1882 let mut wtxn = self.env.write_txn()?;
1883 self.pinned_refs.put(&mut wtxn, key, &())?;
1884 wtxn.commit()?;
1885 Ok(())
1886 }
1887
1888 pub fn remove_pinned_ref(&self, key: &str) -> Result<bool> {
1890 let mut wtxn = self.env.write_txn()?;
1891 let removed = self.pinned_refs.delete(&mut wtxn, key)?;
1892 wtxn.commit()?;
1893 Ok(removed)
1894 }
1895
1896 pub fn list_pinned_refs(&self) -> Result<Vec<String>> {
1898 let rtxn = self.env.read_txn()?;
1899 let mut refs = Vec::new();
1900
1901 for item in self.pinned_refs.iter(&rtxn)? {
1902 let (key, _) = item?;
1903 refs.push(key.to_string());
1904 }
1905
1906 refs.sort();
1907 Ok(refs)
1908 }
1909
1910 pub fn add_tracked_author(&self, npub: &str) -> Result<bool> {
1912 let mut wtxn = self.env.write_txn()?;
1913 let inserted = self.tracked_authors.get(&wtxn, npub)?.is_none();
1914 self.tracked_authors.put(&mut wtxn, npub, &())?;
1915 wtxn.commit()?;
1916 Ok(inserted)
1917 }
1918
1919 pub fn remove_tracked_author(&self, npub: &str) -> Result<bool> {
1921 let mut wtxn = self.env.write_txn()?;
1922 let removed = self.tracked_authors.delete(&mut wtxn, npub)?;
1923 wtxn.commit()?;
1924 Ok(removed)
1925 }
1926
1927 pub fn list_tracked_authors(&self) -> Result<Vec<String>> {
1929 let rtxn = self.env.read_txn()?;
1930 let mut authors = Vec::new();
1931
1932 for item in self.tracked_authors.iter(&rtxn)? {
1933 let (npub, _) = item?;
1934 authors.push(npub.to_string());
1935 }
1936
1937 authors.sort();
1938 Ok(authors)
1939 }
1940
1941 pub fn get_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<Option<CachedRoot>> {
1943 let key = format!("{}/{}", pubkey_hex, tree_name);
1944 let rtxn = self.env.read_txn()?;
1945 if let Some(bytes) = self.cached_roots.get(&rtxn, &key)? {
1946 let root: CachedRoot = rmp_serde::from_slice(bytes)
1947 .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
1948 Ok(Some(root))
1949 } else {
1950 Ok(None)
1951 }
1952 }
1953
1954 pub fn set_cached_root(
1956 &self,
1957 pubkey_hex: &str,
1958 tree_name: &str,
1959 hash: &str,
1960 key: Option<&str>,
1961 visibility: &str,
1962 updated_at: u64,
1963 ) -> Result<()> {
1964 let db_key = format!("{}/{}", pubkey_hex, tree_name);
1965 let root = CachedRoot {
1966 hash: hash.to_string(),
1967 key: key.map(|k| k.to_string()),
1968 updated_at,
1969 visibility: visibility.to_string(),
1970 };
1971 let bytes = rmp_serde::to_vec(&root)
1972 .map_err(|e| anyhow::anyhow!("Failed to serialize CachedRoot: {}", e))?;
1973 let mut wtxn = self.env.write_txn()?;
1974 self.cached_roots.put(&mut wtxn, &db_key, &bytes)?;
1975 wtxn.commit()?;
1976 Ok(())
1977 }
1978
1979 pub fn list_cached_roots(&self, pubkey_hex: &str) -> Result<Vec<(String, CachedRoot)>> {
1981 let prefix = format!("{}/", pubkey_hex);
1982 let rtxn = self.env.read_txn()?;
1983 let mut results = Vec::new();
1984
1985 for item in self.cached_roots.iter(&rtxn)? {
1986 let (key, bytes) = item?;
1987 if key.starts_with(&prefix) {
1988 let tree_name = key.strip_prefix(&prefix).unwrap_or(key);
1989 let root: CachedRoot = rmp_serde::from_slice(bytes)
1990 .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
1991 results.push((tree_name.to_string(), root));
1992 }
1993 }
1994
1995 Ok(results)
1996 }
1997
1998 pub fn delete_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<bool> {
2000 let key = format!("{}/{}", pubkey_hex, tree_name);
2001 let mut wtxn = self.env.write_txn()?;
2002 let deleted = self.cached_roots.delete(&mut wtxn, &key)?;
2003 wtxn.commit()?;
2004 Ok(deleted)
2005 }
2006}
2007
2008fn is_map_full_store_error(err: &StoreError) -> bool {
2009 let message = err.to_string();
2010 message.contains("MDB_MAP_FULL") || message.contains("MapFull")
2011}
2012
2013#[derive(Debug, Clone)]
2014pub struct FileChunkMetadata {
2015 pub total_size: u64,
2016 pub chunk_hashes: Vec<Hash>,
2017 pub chunk_sizes: Vec<u64>,
2018 pub is_chunked: bool,
2019}
2020
2021pub struct FileRangeChunksOwned {
2023 store: Arc<HashtreeStore>,
2024 metadata: FileChunkMetadata,
2025 start: u64,
2026 end: u64,
2027 current_chunk_idx: usize,
2028 current_offset: u64,
2029}
2030
2031impl Iterator for FileRangeChunksOwned {
2032 type Item = Result<Vec<u8>>;
2033
2034 fn next(&mut self) -> Option<Self::Item> {
2035 if !self.metadata.is_chunked || self.current_chunk_idx >= self.metadata.chunk_hashes.len() {
2036 return None;
2037 }
2038
2039 if self.current_offset > self.end {
2040 return None;
2041 }
2042
2043 let chunk_hash = &self.metadata.chunk_hashes[self.current_chunk_idx];
2044 let chunk_size = self.metadata.chunk_sizes[self.current_chunk_idx];
2045 let chunk_end = self.current_offset + chunk_size - 1;
2046
2047 self.current_chunk_idx += 1;
2048
2049 if chunk_end < self.start || self.current_offset > self.end {
2050 self.current_offset += chunk_size;
2051 return self.next();
2052 }
2053
2054 let chunk_content = match self.store.get_chunk(chunk_hash) {
2055 Ok(Some(content)) => content,
2056 Ok(None) => {
2057 return Some(Err(anyhow::anyhow!(
2058 "Chunk {} not found",
2059 to_hex(chunk_hash)
2060 )));
2061 }
2062 Err(e) => {
2063 return Some(Err(e));
2064 }
2065 };
2066
2067 let chunk_read_start = if self.current_offset >= self.start {
2068 0
2069 } else {
2070 (self.start - self.current_offset) as usize
2071 };
2072
2073 let chunk_read_end = if chunk_end <= self.end {
2074 chunk_size as usize - 1
2075 } else {
2076 (self.end - self.current_offset) as usize
2077 };
2078
2079 let result = chunk_content[chunk_read_start..=chunk_read_end].to_vec();
2080 self.current_offset += chunk_size;
2081
2082 Some(Ok(result))
2083 }
2084}
2085
2086#[derive(Debug)]
2087pub struct GcStats {
2088 pub deleted_dags: usize,
2089 pub freed_bytes: u64,
2090}
2091
2092#[derive(Debug, Clone)]
2093pub struct DirEntry {
2094 pub name: String,
2095 pub cid: String,
2096 pub is_directory: bool,
2097 pub size: u64,
2098}
2099
2100#[derive(Debug, Clone)]
2101pub struct DirectoryListing {
2102 pub dir_name: String,
2103 pub entries: Vec<DirEntry>,
2104}
2105
2106#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
2108pub struct BlobMetadata {
2109 pub sha256: String,
2110 pub size: u64,
2111 pub mime_type: String,
2112 pub uploaded: u64,
2113}
2114
2115impl crate::webrtc::ContentStore for HashtreeStore {
2117 fn get(&self, hash_hex: &str) -> Result<Option<Vec<u8>>> {
2118 let hash = from_hex(hash_hex).map_err(|e| anyhow::anyhow!("Invalid hash: {}", e))?;
2119 self.get_chunk(&hash)
2120 }
2121}
2122
2123#[cfg(test)]
2124mod tests {
2125 use super::*;
2126 #[cfg(feature = "lmdb")]
2127 use tempfile::TempDir;
2128
2129 #[test]
2130 fn blob_access_update_gate_deduplicates_and_throttles() {
2131 let gate = BlobAccessUpdateGate::default();
2132 let first = sha256(b"first");
2133 let second = sha256(b"second");
2134
2135 assert_eq!(
2136 gate.due_hashes([first, first, second], 10),
2137 vec![first, second]
2138 );
2139 assert!(gate.due_hashes([first, second], 11).is_empty());
2140 assert_eq!(
2141 gate.due_hashes([second, first], 10 + ACCESS_UPDATE_INTERVAL_SECS),
2142 vec![second, first]
2143 );
2144 }
2145
2146 #[cfg(feature = "lmdb")]
2147 #[test]
2148 fn hashtree_store_expands_blob_lmdb_map_size_to_storage_budget() -> Result<()> {
2149 let temp = TempDir::new()?;
2150 let requested = LMDB_BLOB_MIN_MAP_SIZE_BYTES + 64 * 1024 * 1024;
2151 let store = HashtreeStore::with_options_and_backend(
2152 temp.path(),
2153 None,
2154 requested,
2155 true,
2156 &StorageBackend::Lmdb,
2157 )?;
2158
2159 let map_size = match store.router.local.as_ref() {
2160 LocalStore::Lmdb(local) => local.map_size_bytes() as u64,
2161 LocalStore::Fs(_) => panic!("expected LMDB local store"),
2162 };
2163
2164 assert!(
2165 map_size >= requested,
2166 "expected blob LMDB map to grow to at least {requested} bytes, got {map_size}"
2167 );
2168
2169 drop(store);
2170 Ok(())
2171 }
2172
2173 #[cfg(feature = "lmdb")]
2174 #[test]
2175 fn local_store_can_override_lmdb_map_size() -> Result<()> {
2176 let temp = TempDir::new()?;
2177 let requested = 512 * 1024 * 1024u64;
2178 let store = LocalStore::new_with_lmdb_map_size(
2179 temp.path().join("lmdb-blobs"),
2180 &StorageBackend::Lmdb,
2181 Some(requested),
2182 )?;
2183
2184 let map_size = match store {
2185 LocalStore::Lmdb(local) => local.map_size_bytes() as u64,
2186 LocalStore::Fs(_) => panic!("expected LMDB local store"),
2187 };
2188
2189 assert!(
2190 map_size >= requested,
2191 "expected LMDB map to grow to at least {requested} bytes, got {map_size}"
2192 );
2193
2194 Ok(())
2195 }
2196
2197 #[cfg(feature = "lmdb")]
2198 #[test]
2199 fn lmdb_local_store_removes_stale_fs_blob_shard_dirs() -> Result<()> {
2200 let temp = TempDir::new()?;
2201 let path = temp.path().join("lmdb-blobs");
2202 std::fs::create_dir_all(path.join("aa"))?;
2203 std::fs::create_dir_all(path.join("b2"))?;
2204 std::fs::create_dir_all(path.join("keep-me"))?;
2205 std::fs::write(path.join("aa").join("blob.bin"), b"old fs shard")?;
2206 std::fs::write(path.join("b2").join("blob.bin"), b"old fs shard")?;
2207 std::fs::write(path.join("keep-me").join("note.txt"), b"keep")?;
2208
2209 let _store = LocalStore::new_with_lmdb_map_size(
2210 &path,
2211 &StorageBackend::Lmdb,
2212 Some(128 * 1024 * 1024),
2213 )?;
2214
2215 assert!(!path.join("aa").exists());
2216 assert!(!path.join("b2").exists());
2217 assert!(path.join("keep-me").exists());
2218 assert!(path.join("data.mdb").exists());
2219 assert!(path.join("lock.mdb").exists());
2220
2221 Ok(())
2222 }
2223
2224 #[cfg(feature = "lmdb")]
2225 #[test]
2226 fn duplicate_blossom_writes_refresh_blob_last_accessed() -> Result<()> {
2227 let temp = TempDir::new()?;
2228 let store = HashtreeStore::with_options_and_backend(
2229 temp.path(),
2230 None,
2231 LMDB_BLOB_MIN_MAP_SIZE_BYTES,
2232 true,
2233 &StorageBackend::Lmdb,
2234 )?;
2235
2236 let data = b"cached blossom duplicate";
2237 let hash = sha256(data);
2238 store.put_cached_blob(data)?;
2239 store.router.touch_accessed_sync(&hash, 1)?;
2240 store.put_cached_blob(data)?;
2241 assert!(store.blob_last_accessed_at(&hash)?.unwrap_or(0) > 1);
2242
2243 let owned = b"owned blossom duplicate";
2244 let owned_hash = sha256(owned);
2245 let owner = [7u8; 32];
2246 store.put_owned_blob(owned, &owner)?;
2247 store.router.touch_accessed_sync(&owned_hash, 1)?;
2248 store.put_owned_blob(owned, &owner)?;
2249 assert!(store.blob_last_accessed_at(&owned_hash)?.unwrap_or(0) > 1);
2250
2251 Ok(())
2252 }
2253
2254 #[cfg(feature = "lmdb")]
2255 #[test]
2256 fn replacing_tree_ref_unpins_and_unindexes_superseded_root() -> Result<()> {
2257 let temp = TempDir::new()?;
2258 let store = HashtreeStore::with_options_and_backend(
2259 temp.path(),
2260 None,
2261 LMDB_BLOB_MIN_MAP_SIZE_BYTES,
2262 true,
2263 &StorageBackend::Lmdb,
2264 )?;
2265
2266 let old_bytes = b"old published root";
2267 let new_bytes = b"new published root";
2268 let old_root = sha256(old_bytes);
2269 let new_root = sha256(new_bytes);
2270
2271 store.put_blob(old_bytes)?;
2272 store.pin(&old_root)?;
2273 store.index_tree(
2274 &old_root,
2275 "owner",
2276 Some("playlist"),
2277 PRIORITY_OWN,
2278 Some("npub1owner/playlist"),
2279 )?;
2280
2281 assert!(store.is_pinned(&old_root)?);
2282 assert!(store.get_tree_meta(&old_root)?.is_some());
2283
2284 store.put_blob(new_bytes)?;
2285 store.pin(&new_root)?;
2286 store.index_tree(
2287 &new_root,
2288 "owner",
2289 Some("playlist"),
2290 PRIORITY_OWN,
2291 Some("npub1owner/playlist"),
2292 )?;
2293
2294 assert!(
2295 !store.is_pinned(&old_root)?,
2296 "superseded root should be unpinned when ref is replaced"
2297 );
2298 assert!(
2299 store.get_tree_meta(&old_root)?.is_none(),
2300 "superseded root metadata should be removed when ref is replaced"
2301 );
2302 assert!(store.is_pinned(&new_root)?);
2303 assert!(store.get_tree_meta(&new_root)?.is_some());
2304
2305 Ok(())
2306 }
2307
2308 #[test]
2309 fn tracked_authors_round_trip_sorted_and_deduplicated() -> Result<()> {
2310 let temp = TempDir::new()?;
2311 let store = HashtreeStore::with_options(temp.path(), None, 1024 * 1024)?;
2312
2313 store
2314 .add_tracked_author("npub1zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzs9d3kk")?;
2315 store
2316 .add_tracked_author("npub1aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaqf5slm")?;
2317 store
2318 .add_tracked_author("npub1zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzs9d3kk")?;
2319
2320 assert_eq!(
2321 store.list_tracked_authors()?,
2322 vec![
2323 "npub1aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaqf5slm".to_string(),
2324 "npub1zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzs9d3kk".to_string(),
2325 ]
2326 );
2327 assert!(store.remove_tracked_author(
2328 "npub1aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaqf5slm"
2329 )?);
2330 assert!(!store.remove_tracked_author(
2331 "npub1bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbpqqqqq"
2332 )?);
2333 assert_eq!(
2334 store.list_tracked_authors()?,
2335 vec!["npub1zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzs9d3kk".to_string()]
2336 );
2337
2338 Ok(())
2339 }
2340
2341 #[cfg(feature = "s3")]
2342 #[test]
2343 fn async_store_s3_fallback_does_not_reenter_futures_executor() -> Result<()> {
2344 let temp = tempfile::TempDir::new()?;
2345 let local = Arc::new(LocalStore::new(
2346 temp.path().join("blobs"),
2347 &StorageBackend::Fs,
2348 )?);
2349
2350 let outcome = std::panic::catch_unwind(|| {
2351 sync_block_on(async {
2352 let aws_config = aws_config::from_env()
2353 .region(aws_sdk_s3::config::Region::new("auto"))
2354 .load()
2355 .await;
2356 let s3_client = aws_sdk_s3::Client::from_conf(
2357 aws_sdk_s3::config::Builder::from(&aws_config)
2358 .endpoint_url("http://127.0.0.1:9")
2359 .force_path_style(true)
2360 .build(),
2361 );
2362
2363 let router = StorageRouter {
2364 local,
2365 s3_client: Some(s3_client),
2366 s3_bucket: Some("test-bucket".to_string()),
2367 s3_prefix: String::new(),
2368 sync_tx: None,
2369 };
2370 let hash = [0u8; 32];
2371
2372 let _ = Store::has(&router, &hash).await;
2373 let _ = Store::get(&router, &hash).await;
2374 });
2375 });
2376
2377 assert!(
2378 outcome.is_ok(),
2379 "S3-backed async store methods should not panic inside futures::block_on"
2380 );
2381
2382 Ok(())
2383 }
2384}