1use anyhow::{Context, Result};
2use async_trait::async_trait;
3use futures::executor::block_on as sync_block_on;
4use futures::StreamExt;
5use hashtree_config::StorageBackend;
6use hashtree_core::store::{Store, StoreError};
7use hashtree_core::{
8 from_hex, sha256, to_hex, types::Hash, Cid, HashTree, HashTreeConfig, TreeNode,
9};
10use hashtree_fs::FsBlobStore;
11#[cfg(feature = "lmdb")]
12use hashtree_lmdb::LmdbBlobStore;
13use heed::types::*;
14use heed::{Database, EnvOpenOptions};
15use serde::{Deserialize, Serialize};
16use std::collections::{HashMap, HashSet};
17#[cfg(feature = "s3")]
18use std::future::Future;
19use std::io::Write;
20use std::path::{Path, PathBuf};
21use std::sync::atomic::{AtomicBool, Ordering};
22use std::sync::{Arc, Mutex};
23use std::time::{Instant, SystemTime, UNIX_EPOCH};
24
25mod upload;
26
27mod maintenance;
28mod retention;
29
30#[cfg(feature = "s3")]
31const DEFAULT_S3_SYNC_TIMEOUT_MS: u64 = 5_000;
32#[cfg(feature = "s3")]
33const S3_SYNC_TIMEOUT_MS_ENV: &str = "HTREE_S3_SYNC_TIMEOUT_MS";
34
35pub use maintenance::{
36 compact_lmdb_environments_under, CompactResult, R2ImportOptions, R2ImportResult, VerifyResult,
37};
38pub use retention::{OwnedBlobStats, PinnedItem, StorageByPriority, StorageStats, TreeMeta};
39
40pub const PRIORITY_OTHER: u8 = 64;
42pub const PRIORITY_FOLLOWED: u8 = 128;
43pub const PRIORITY_OWN: u8 = 255;
44const LMDB_MAX_READERS: u32 = 1024;
45const LMDB_METADATA_MIN_MAP_SIZE_BYTES: u64 = 64 * 1024 * 1024;
46const LMDB_METADATA_MAX_MAP_SIZE_BYTES: u64 = 64 * 1024 * 1024 * 1024;
47const LMDB_METADATA_STORAGE_RATIO_DIVISOR: u64 = 1024;
48const LMDB_METADATA_REOPEN_HEADROOM_BYTES: u64 = 64 * 1024 * 1024;
49#[cfg(feature = "lmdb")]
50const LMDB_BLOB_MIN_MAP_SIZE_BYTES: u64 = 16 * 1024 * 1024;
51const ACCESS_UPDATE_INTERVAL_SECS: u64 = 300;
52const ACCESS_UPDATE_GATE_MAX_ENTRIES: usize = 4096;
53const DEFAULT_ACCESS_UPDATE_BACKGROUND_BATCH_LIMIT: usize = 64;
54const ACCESS_UPDATE_BACKGROUND_BATCH_LIMIT_ENV: &str = "HTREE_ACCESS_UPDATE_BACKGROUND_BATCH_LIMIT";
55const CACHED_BLOB_BATCH_EXISTING_PRECHECK_ENV: &str = "HTREE_CACHED_BLOB_BATCH_EXISTING_PRECHECK";
56const SLOW_OWNED_BLOB_BATCH_LOG_MS_ENV: &str = "HTREE_SLOW_OWNED_BLOB_BATCH_LOG_MS";
57const SLOW_CACHED_BLOB_BATCH_LOG_MS_ENV: &str = "HTREE_SLOW_CACHED_BLOB_BATCH_LOG_MS";
58
59fn slow_owned_blob_batch_log_ms() -> Option<u128> {
60 std::env::var(SLOW_OWNED_BLOB_BATCH_LOG_MS_ENV)
61 .ok()
62 .and_then(|value| value.parse::<u128>().ok())
63 .filter(|value| *value > 0)
64}
65
66fn slow_cached_blob_batch_log_ms() -> Option<u128> {
67 std::env::var(SLOW_CACHED_BLOB_BATCH_LOG_MS_ENV)
68 .ok()
69 .and_then(|value| value.parse::<u128>().ok())
70 .filter(|value| *value > 0)
71}
72
73fn access_update_background_batch_limit() -> usize {
74 std::env::var(ACCESS_UPDATE_BACKGROUND_BATCH_LIMIT_ENV)
75 .ok()
76 .and_then(|value| value.parse::<usize>().ok())
77 .unwrap_or(DEFAULT_ACCESS_UPDATE_BACKGROUND_BATCH_LIMIT)
78}
79
80fn cached_blob_batch_existing_precheck() -> bool {
81 std::env::var(CACHED_BLOB_BATCH_EXISTING_PRECHECK_ENV)
82 .is_ok_and(|value| value == "1" || value.eq_ignore_ascii_case("true"))
83}
84
85fn unix_timestamp_now() -> u64 {
86 SystemTime::now()
87 .duration_since(UNIX_EPOCH)
88 .unwrap_or_default()
89 .as_secs()
90}
91
92#[derive(Debug, Clone, Serialize, Deserialize)]
94pub struct CachedRoot {
95 pub hash: String,
97 pub key: Option<String>,
99 pub updated_at: u64,
101 pub visibility: String,
103}
104
105#[derive(Debug, Clone)]
107pub struct LocalStoreStats {
108 pub count: usize,
109 pub total_bytes: u64,
110}
111
112#[derive(Default)]
113struct BlobAccessUpdateGate {
114 next_update_by_hash: Mutex<HashMap<Hash, u64>>,
115}
116
117impl BlobAccessUpdateGate {
118 fn due_hashes<I>(&self, hashes: I, now: u64) -> Vec<Hash>
119 where
120 I: IntoIterator<Item = Hash>,
121 {
122 let Ok(mut next_update_by_hash) = self.next_update_by_hash.try_lock() else {
123 return Vec::new();
124 };
125
126 if next_update_by_hash.len() >= ACCESS_UPDATE_GATE_MAX_ENTRIES {
127 next_update_by_hash.retain(|_, next_update| *next_update > now);
128 if next_update_by_hash.len() >= ACCESS_UPDATE_GATE_MAX_ENTRIES {
129 next_update_by_hash.clear();
130 }
131 }
132
133 let mut due = Vec::new();
134 let mut seen = HashSet::new();
135 for hash in hashes {
136 if !seen.insert(hash) {
137 continue;
138 }
139 if next_update_by_hash
140 .get(&hash)
141 .is_some_and(|next_update| now < *next_update)
142 {
143 continue;
144 }
145 next_update_by_hash.insert(hash, now.saturating_add(ACCESS_UPDATE_INTERVAL_SECS));
146 due.push(hash);
147 }
148 due
149 }
150}
151
152pub enum LocalStore {
154 Fs(FsBlobStore),
155 #[cfg(feature = "lmdb")]
156 Lmdb(LmdbBlobStore),
157}
158
159#[cfg(feature = "lmdb")]
160fn is_fs_blob_shard_dir(path: &Path) -> bool {
161 path.file_name()
162 .and_then(|name| name.to_str())
163 .map(|name| name.len() == 2 && name.as_bytes().iter().all(u8::is_ascii_hexdigit))
164 .unwrap_or(false)
165}
166
167fn lmdb_metadata_map_size_for_storage_budget(max_size_bytes: u64) -> u64 {
168 if max_size_bytes == 0 {
169 return LMDB_METADATA_MAX_MAP_SIZE_BYTES;
170 }
171
172 max_size_bytes
173 .saturating_div(LMDB_METADATA_STORAGE_RATIO_DIVISOR)
174 .clamp(
175 LMDB_METADATA_MIN_MAP_SIZE_BYTES,
176 LMDB_METADATA_MAX_MAP_SIZE_BYTES,
177 )
178}
179
180fn lmdb_map_size_for_existing_env(path: &Path, requested_bytes: u64) -> Result<usize> {
181 let existing_bytes = std::fs::metadata(path.join("data.mdb"))
182 .map(|metadata| metadata.len())
183 .unwrap_or(0);
184 let requested = if existing_bytes > requested_bytes {
185 let existing_headroom = existing_bytes
186 .saturating_div(10)
187 .max(LMDB_METADATA_REOPEN_HEADROOM_BYTES);
188 existing_bytes.saturating_add(existing_headroom)
189 } else {
190 requested_bytes
191 };
192 let requested = align_lmdb_map_size(requested);
193 usize::try_from(requested).context("LMDB map size exceeds usize")
194}
195
196fn align_lmdb_map_size(bytes: u64) -> u64 {
197 let page_size = (page_size::get() as u64).max(4096);
198 let remainder = bytes % page_size;
199 if remainder == 0 {
200 bytes
201 } else {
202 bytes.saturating_add(page_size - remainder)
203 }
204}
205
206#[cfg(feature = "lmdb")]
207fn remove_stale_fs_blob_shards(path: &Path) -> Result<(), StoreError> {
208 let entries = std::fs::read_dir(path).map_err(StoreError::Io)?;
209 for entry in entries {
210 let entry = entry.map_err(StoreError::Io)?;
211 let entry_path = entry.path();
212 if entry_path.is_dir() && is_fs_blob_shard_dir(&entry_path) {
213 std::fs::remove_dir_all(&entry_path).map_err(StoreError::Io)?;
214 tracing::info!(
215 "Removed stale filesystem blob shard directory after LMDB cutover: {}",
216 entry_path.display()
217 );
218 }
219 }
220 Ok(())
221}
222
223impl LocalStore {
224 pub fn new<P: AsRef<Path>>(path: P, backend: &StorageBackend) -> Result<Self, StoreError> {
230 Self::new_unbounded(path, backend)
231 }
232
233 pub fn new_with_lmdb_map_size<P: AsRef<Path>>(
238 path: P,
239 backend: &StorageBackend,
240 _map_size_bytes: Option<u64>,
241 ) -> Result<Self, StoreError> {
242 match backend {
243 StorageBackend::Fs => Ok(LocalStore::Fs(FsBlobStore::new(path)?)),
244 #[cfg(feature = "lmdb")]
245 StorageBackend::Lmdb => match _map_size_bytes {
246 Some(map_size_bytes) => {
247 std::fs::create_dir_all(path.as_ref()).map_err(StoreError::Io)?;
248 remove_stale_fs_blob_shards(path.as_ref())?;
249 Ok(LocalStore::Lmdb(LmdbBlobStore::with_max_bytes(
250 path,
251 map_size_bytes,
252 )?))
253 }
254 None => {
255 std::fs::create_dir_all(path.as_ref()).map_err(StoreError::Io)?;
256 remove_stale_fs_blob_shards(path.as_ref())?;
257 Ok(LocalStore::Lmdb(LmdbBlobStore::new(path)?))
258 }
259 },
260 #[cfg(not(feature = "lmdb"))]
261 StorageBackend::Lmdb => {
262 tracing::warn!(
263 "LMDB backend requested but lmdb feature not enabled, using filesystem storage"
264 );
265 Ok(LocalStore::Fs(FsBlobStore::new(path)?))
266 }
267 }
268 }
269
270 pub fn new_unbounded<P: AsRef<Path>>(
272 path: P,
273 backend: &StorageBackend,
274 ) -> Result<Self, StoreError> {
275 Self::new_with_lmdb_map_size(path, backend, None)
276 }
277
278 pub fn backend(&self) -> StorageBackend {
279 match self {
280 LocalStore::Fs(_) => StorageBackend::Fs,
281 #[cfg(feature = "lmdb")]
282 LocalStore::Lmdb(_) => StorageBackend::Lmdb,
283 }
284 }
285
286 pub fn put_sync(&self, hash: Hash, data: &[u8]) -> Result<bool, StoreError> {
288 match self {
289 LocalStore::Fs(store) => store.put_sync(hash, data),
290 #[cfg(feature = "lmdb")]
291 LocalStore::Lmdb(store) => store.put_sync(hash, data),
292 }
293 }
294
295 pub fn put_many_sync(&self, items: &[(Hash, Vec<u8>)]) -> Result<usize, StoreError> {
297 match self {
298 LocalStore::Fs(store) => {
299 let mut inserted = 0usize;
300 for (hash, data) in items {
301 if store.put_sync(*hash, data.as_slice())? {
302 inserted += 1;
303 }
304 }
305 Ok(inserted)
306 }
307 #[cfg(feature = "lmdb")]
308 LocalStore::Lmdb(store) => store.put_many_sync(items),
309 }
310 }
311
312 pub fn get_sync(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
314 match self {
315 LocalStore::Fs(store) => store.get_sync(hash),
316 #[cfg(feature = "lmdb")]
317 LocalStore::Lmdb(store) => store.get_sync(hash),
318 }
319 }
320
321 pub fn get_range_sync(
322 &self,
323 hash: &Hash,
324 start: u64,
325 end_inclusive: u64,
326 ) -> Result<Option<Vec<u8>>, StoreError> {
327 match self {
328 LocalStore::Fs(store) => store.get_range_sync(hash, start, end_inclusive),
329 #[cfg(feature = "lmdb")]
330 LocalStore::Lmdb(store) => store.get_range_sync(hash, start, end_inclusive),
331 }
332 }
333
334 pub fn blob_size_sync(&self, hash: &Hash) -> Result<Option<u64>, StoreError> {
335 match self {
336 LocalStore::Fs(store) => store.blob_size_sync(hash),
337 #[cfg(feature = "lmdb")]
338 LocalStore::Lmdb(store) => store.blob_size_sync(hash),
339 }
340 }
341
342 pub fn touch_accessed_sync(&self, hash: &Hash, now: u64) -> Result<bool, StoreError> {
343 match self {
344 LocalStore::Fs(store) => store.touch_accessed_sync(hash, now),
345 #[cfg(feature = "lmdb")]
346 LocalStore::Lmdb(store) => store.touch_accessed_sync(hash, now),
347 }
348 }
349
350 pub fn touch_many_accessed_sync(&self, hashes: &[Hash], now: u64) -> Result<usize, StoreError> {
351 match self {
352 LocalStore::Fs(store) => store.touch_many_accessed_sync(hashes, now),
353 #[cfg(feature = "lmdb")]
354 LocalStore::Lmdb(store) => store.touch_many_accessed_sync(hashes, now),
355 }
356 }
357
358 pub fn last_accessed_at_sync(&self, hash: &Hash) -> Result<Option<u64>, StoreError> {
359 match self {
360 LocalStore::Fs(store) => store.last_accessed_at_sync(hash),
361 #[cfg(feature = "lmdb")]
362 LocalStore::Lmdb(store) => store.last_accessed_at_sync(hash),
363 }
364 }
365
366 pub fn many_last_accessed_at_sync(
367 &self,
368 hashes: &[Hash],
369 ) -> Result<Vec<(Hash, u64)>, StoreError> {
370 match self {
371 LocalStore::Fs(store) => store.many_last_accessed_at_sync(hashes),
372 #[cfg(feature = "lmdb")]
373 LocalStore::Lmdb(store) => store.many_last_accessed_at_sync(hashes),
374 }
375 }
376
377 pub fn exists(&self, hash: &Hash) -> Result<bool, StoreError> {
379 match self {
380 LocalStore::Fs(store) => Ok(store.exists(hash)),
381 #[cfg(feature = "lmdb")]
382 LocalStore::Lmdb(store) => store.exists(hash),
383 }
384 }
385
386 pub fn existing_hashes_in_sorted_candidates(
388 &self,
389 sorted_hashes: &[Hash],
390 ) -> Result<Vec<bool>, StoreError> {
391 match self {
392 LocalStore::Fs(store) => Ok(sorted_hashes
393 .iter()
394 .map(|hash| store.exists(hash))
395 .collect()),
396 #[cfg(feature = "lmdb")]
397 LocalStore::Lmdb(store) => store.existing_hashes_in_sorted_candidates(sorted_hashes),
398 }
399 }
400
401 pub fn delete_sync(&self, hash: &Hash) -> Result<bool, StoreError> {
403 match self {
404 LocalStore::Fs(store) => store.delete_sync(hash),
405 #[cfg(feature = "lmdb")]
406 LocalStore::Lmdb(store) => store.delete_sync(hash),
407 }
408 }
409
410 pub fn stats(&self) -> Result<LocalStoreStats, StoreError> {
412 match self {
413 LocalStore::Fs(store) => {
414 let stats = store.stats()?;
415 Ok(LocalStoreStats {
416 count: stats.count,
417 total_bytes: stats.total_bytes,
418 })
419 }
420 #[cfg(feature = "lmdb")]
421 LocalStore::Lmdb(store) => {
422 let stats = store.stats()?;
423 Ok(LocalStoreStats {
424 count: stats.count,
425 total_bytes: stats.total_bytes,
426 })
427 }
428 }
429 }
430
431 pub fn list(&self) -> Result<Vec<Hash>, StoreError> {
433 match self {
434 LocalStore::Fs(store) => store.list(),
435 #[cfg(feature = "lmdb")]
436 LocalStore::Lmdb(store) => store.list(),
437 }
438 }
439}
440
441#[async_trait]
442impl Store for LocalStore {
443 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
444 self.put_sync(hash, &data)
445 }
446
447 async fn put_many(&self, items: Vec<(Hash, Vec<u8>)>) -> Result<usize, StoreError> {
448 self.put_many_sync(&items)
449 }
450
451 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
452 self.get_sync(hash)
453 }
454
455 async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
456 self.exists(hash)
457 }
458
459 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
460 self.delete_sync(hash)
461 }
462}
463
464#[cfg(feature = "s3")]
465use tokio::sync::mpsc;
466
467use crate::config::S3Config;
468
469#[cfg(feature = "s3")]
471enum S3SyncMessage {
472 Upload { hash: Hash, data: Vec<u8> },
473 Delete { hash: Hash },
474}
475
476pub struct StorageRouter {
481 local: Arc<LocalStore>,
483 #[cfg(feature = "s3")]
485 s3_client: Option<aws_sdk_s3::Client>,
486 #[cfg(feature = "s3")]
487 s3_bucket: Option<String>,
488 #[cfg(feature = "s3")]
489 s3_prefix: String,
490 #[cfg(feature = "s3")]
492 sync_tx: Option<mpsc::UnboundedSender<S3SyncMessage>>,
493}
494
495impl StorageRouter {
496 #[cfg(feature = "s3")]
497 fn s3_sync_timeout() -> std::time::Duration {
498 let millis = std::env::var(S3_SYNC_TIMEOUT_MS_ENV)
499 .ok()
500 .and_then(|value| value.parse::<u64>().ok())
501 .filter(|value| *value > 0)
502 .unwrap_or(DEFAULT_S3_SYNC_TIMEOUT_MS);
503 std::time::Duration::from_millis(millis)
504 }
505
506 #[cfg(feature = "s3")]
507 fn s3_sync_timeout_error(timeout: std::time::Duration) -> StoreError {
508 StoreError::Other(format!(
509 "S3 sync operation timed out after {}ms",
510 timeout.as_millis()
511 ))
512 }
513
514 #[cfg(feature = "s3")]
515 fn run_s3_future_sync<F, T>(future: F) -> Result<T, StoreError>
516 where
517 F: Future<Output = T> + Send + 'static,
518 T: Send + 'static,
519 {
520 let timeout = Self::s3_sync_timeout();
521 if tokio::runtime::Handle::try_current().is_ok() {
522 return std::thread::Builder::new()
523 .name("storage-s3-sync".to_string())
524 .spawn(move || {
525 let runtime = tokio::runtime::Builder::new_current_thread()
526 .enable_all()
527 .build()
528 .map_err(|err| {
529 StoreError::Other(format!("build storage s3 sync runtime: {err}"))
530 })?;
531 runtime.block_on(async move {
532 tokio::time::timeout(timeout, future)
533 .await
534 .map_err(|_| Self::s3_sync_timeout_error(timeout))
535 })
536 })
537 .map_err(|err| StoreError::Other(format!("spawn S3 sync helper thread: {err}")))?
538 .join()
539 .map_err(|_| StoreError::Other("S3 sync helper thread panicked".to_string()))?;
540 }
541
542 let runtime = tokio::runtime::Builder::new_current_thread()
543 .enable_all()
544 .build()
545 .map_err(|err| StoreError::Other(format!("build storage s3 sync runtime: {err}")))?;
546 runtime.block_on(async move {
547 tokio::time::timeout(timeout, future)
548 .await
549 .map_err(|_| Self::s3_sync_timeout_error(timeout))
550 })
551 }
552
553 pub fn new(local: Arc<LocalStore>) -> Self {
555 Self {
556 local,
557 #[cfg(feature = "s3")]
558 s3_client: None,
559 #[cfg(feature = "s3")]
560 s3_bucket: None,
561 #[cfg(feature = "s3")]
562 s3_prefix: String::new(),
563 #[cfg(feature = "s3")]
564 sync_tx: None,
565 }
566 }
567
568 #[cfg(feature = "s3")]
570 pub async fn with_s3(local: Arc<LocalStore>, config: &S3Config) -> Result<Self, anyhow::Error> {
571 use aws_sdk_s3::Client as S3Client;
572
573 let mut aws_config_loader = aws_config::from_env();
575 aws_config_loader =
576 aws_config_loader.region(aws_sdk_s3::config::Region::new(config.region.clone()));
577 let aws_config = aws_config_loader.load().await;
578
579 let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&aws_config);
581 s3_config_builder = s3_config_builder
582 .endpoint_url(&config.endpoint)
583 .force_path_style(true);
584
585 let s3_client = S3Client::from_conf(s3_config_builder.build());
586 let bucket = config.bucket.clone();
587 let prefix = config.prefix.clone().unwrap_or_default();
588
589 let (sync_tx, mut sync_rx) = mpsc::unbounded_channel::<S3SyncMessage>();
591
592 let sync_client = s3_client.clone();
594 let sync_bucket = bucket.clone();
595 let sync_prefix = prefix.clone();
596
597 tokio::spawn(async move {
598 use aws_sdk_s3::primitives::ByteStream;
599
600 tracing::info!("S3 background sync task started");
601
602 let semaphore = std::sync::Arc::new(tokio::sync::Semaphore::new(8));
604 let client = std::sync::Arc::new(sync_client);
605 let bucket = std::sync::Arc::new(sync_bucket);
606 let prefix = std::sync::Arc::new(sync_prefix);
607
608 while let Some(msg) = sync_rx.recv().await {
609 let client = client.clone();
610 let bucket = bucket.clone();
611 let prefix = prefix.clone();
612 let semaphore = semaphore.clone();
613
614 tokio::spawn(async move {
616 let _permit = semaphore.acquire().await;
618
619 match msg {
620 S3SyncMessage::Upload { hash, data } => {
621 let key = format!("{}{}.bin", prefix, to_hex(&hash));
622 tracing::debug!("S3 uploading {} ({} bytes)", &key, data.len());
623
624 let mut attempt = 1u8;
625 loop {
626 match client
627 .put_object()
628 .bucket(bucket.as_str())
629 .key(&key)
630 .body(ByteStream::from(data.clone()))
631 .send()
632 .await
633 {
634 Ok(_) => {
635 tracing::debug!("S3 upload succeeded: {}", &key);
636 break;
637 }
638 Err(e) if attempt < 3 => {
639 tracing::warn!(
640 "S3 upload retrying {}: attempt={} error={}",
641 &key,
642 attempt,
643 e
644 );
645 tokio::time::sleep(std::time::Duration::from_millis(
646 250 * u64::from(attempt),
647 ))
648 .await;
649 attempt += 1;
650 }
651 Err(e) => {
652 tracing::error!(
653 "S3 upload failed {} after {} attempts: {}",
654 &key,
655 attempt,
656 e
657 );
658 break;
659 }
660 }
661 }
662 }
663 S3SyncMessage::Delete { hash } => {
664 let key = format!("{}{}.bin", prefix, to_hex(&hash));
665 tracing::debug!("S3 deleting {}", &key);
666
667 let mut attempt = 1u8;
668 loop {
669 match client
670 .delete_object()
671 .bucket(bucket.as_str())
672 .key(&key)
673 .send()
674 .await
675 {
676 Ok(_) => break,
677 Err(e) if attempt < 3 => {
678 tracing::warn!(
679 "S3 delete retrying {}: attempt={} error={}",
680 &key,
681 attempt,
682 e
683 );
684 tokio::time::sleep(std::time::Duration::from_millis(
685 250 * u64::from(attempt),
686 ))
687 .await;
688 attempt += 1;
689 }
690 Err(e) => {
691 tracing::error!(
692 "S3 delete failed {} after {} attempts: {}",
693 &key,
694 attempt,
695 e
696 );
697 break;
698 }
699 }
700 }
701 }
702 }
703 });
704 }
705 });
706
707 tracing::info!(
708 "S3 storage initialized: bucket={}, prefix={}",
709 bucket,
710 prefix
711 );
712
713 Ok(Self {
714 local,
715 s3_client: Some(s3_client),
716 s3_bucket: Some(bucket),
717 s3_prefix: prefix,
718 sync_tx: Some(sync_tx),
719 })
720 }
721
722 pub fn put_sync(&self, hash: Hash, data: &[u8]) -> Result<bool, StoreError> {
724 let is_new = self.local.put_sync(hash, data)?;
726
727 #[cfg(feature = "s3")]
730 if is_new {
731 if let Some(ref tx) = self.sync_tx {
732 tracing::debug!(
733 "Queueing S3 upload for {} ({} bytes)",
734 crate::storage::to_hex(&hash)[..16].to_string(),
735 data.len(),
736 );
737 if let Err(e) = tx.send(S3SyncMessage::Upload {
738 hash,
739 data: data.to_vec(),
740 }) {
741 tracing::error!("Failed to queue S3 upload: {}", e);
742 }
743 }
744 }
745
746 Ok(is_new)
747 }
748
749 pub fn put_many_sync(&self, items: &[(Hash, Vec<u8>)]) -> Result<usize, StoreError> {
751 #[cfg(feature = "s3")]
752 let pending_uploads = if self.sync_tx.is_some() {
753 let mut pending = Vec::new();
754 for (hash, data) in items {
755 if !self.local.exists(hash)? {
756 pending.push((*hash, data.clone()));
757 }
758 }
759 pending
760 } else {
761 Vec::new()
762 };
763
764 let inserted = self.local.put_many_sync(items)?;
765
766 #[cfg(feature = "s3")]
767 if let Some(ref tx) = self.sync_tx {
768 for (hash, data) in pending_uploads {
769 if let Err(e) = tx.send(S3SyncMessage::Upload { hash, data }) {
770 tracing::error!("Failed to queue S3 upload: {}", e);
771 }
772 }
773 }
774
775 Ok(inserted)
776 }
777
778 pub fn get_sync(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
780 if let Some(data) = self.local.get_sync(hash)? {
782 return Ok(Some(data));
783 }
784
785 #[cfg(feature = "s3")]
787 if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
788 let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
789 let client = client.clone();
790 let bucket = bucket.clone();
791
792 match Self::run_s3_future_sync(async move {
793 client.get_object().bucket(bucket).key(key).send().await
794 }) {
795 Ok(Ok(output)) => {
796 match Self::run_s3_future_sync(async move { output.body.collect().await }) {
797 Ok(Ok(body)) => {
798 let data = body.into_bytes().to_vec();
799 let _ = self.local.put_sync(*hash, &data);
801 return Ok(Some(data));
802 }
803 Ok(Err(err)) => {
804 tracing::warn!("S3 body collect failed: {}", err);
805 }
806 Err(err) => {
807 tracing::warn!("S3 body collect runtime failed: {}", err);
808 }
809 }
810 }
811 Ok(Err(err)) => {
812 let service_err = err.into_service_error();
813 if !service_err.is_no_such_key() {
814 tracing::warn!("S3 get failed: {}", service_err);
815 }
816 }
817 Err(err) => {
818 tracing::warn!("S3 get runtime failed: {}", err);
819 }
820 }
821 }
822
823 Ok(None)
824 }
825
826 pub fn get_range_sync(
827 &self,
828 hash: &Hash,
829 start: u64,
830 end_inclusive: u64,
831 ) -> Result<Option<Vec<u8>>, StoreError> {
832 self.local.get_range_sync(hash, start, end_inclusive)
833 }
834
835 pub fn blob_size_sync(&self, hash: &Hash) -> Result<Option<u64>, StoreError> {
836 self.local.blob_size_sync(hash)
837 }
838
839 pub fn touch_accessed_sync(&self, hash: &Hash, now: u64) -> Result<bool, StoreError> {
840 self.local.touch_accessed_sync(hash, now)
841 }
842
843 pub fn touch_many_accessed_sync(&self, hashes: &[Hash], now: u64) -> Result<usize, StoreError> {
844 self.local.touch_many_accessed_sync(hashes, now)
845 }
846
847 pub fn last_accessed_at_sync(&self, hash: &Hash) -> Result<Option<u64>, StoreError> {
848 self.local.last_accessed_at_sync(hash)
849 }
850
851 pub fn many_last_accessed_at_sync(
852 &self,
853 hashes: &[Hash],
854 ) -> Result<Vec<(Hash, u64)>, StoreError> {
855 self.local.many_last_accessed_at_sync(hashes)
856 }
857
858 pub fn exists(&self, hash: &Hash) -> Result<bool, StoreError> {
860 if self.local.exists(hash)? {
862 return Ok(true);
863 }
864
865 #[cfg(feature = "s3")]
867 if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
868 let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
869 let client = client.clone();
870 let bucket = bucket.clone();
871
872 match Self::run_s3_future_sync(async move {
873 client.head_object().bucket(bucket).key(&key).send().await
874 }) {
875 Ok(Ok(_)) => return Ok(true),
876 Ok(Err(err)) => {
877 let service_err = err.into_service_error();
878 if !service_err.is_not_found() {
879 tracing::warn!("S3 head failed: {}", service_err);
880 }
881 }
882 Err(err) => {
883 tracing::warn!("S3 head runtime failed: {}", err);
884 }
885 }
886 }
887
888 Ok(false)
889 }
890
891 pub fn delete_sync(&self, hash: &Hash) -> Result<bool, StoreError> {
893 let deleted = self.local.delete_sync(hash)?;
894
895 #[cfg(feature = "s3")]
897 if let Some(ref tx) = self.sync_tx {
898 let _ = tx.send(S3SyncMessage::Delete { hash: *hash });
899 }
900
901 Ok(deleted)
902 }
903
904 pub fn delete_local_only(&self, hash: &Hash) -> Result<bool, StoreError> {
907 self.local.delete_sync(hash)
908 }
909
910 pub fn stats(&self) -> Result<LocalStoreStats, StoreError> {
912 self.local.stats()
913 }
914
915 pub fn list(&self) -> Result<Vec<Hash>, StoreError> {
917 self.local.list()
918 }
919
920 pub fn existing_local_hashes_in_sorted_candidates(
922 &self,
923 sorted_hashes: &[Hash],
924 ) -> Result<Vec<bool>, StoreError> {
925 self.local
926 .existing_hashes_in_sorted_candidates(sorted_hashes)
927 }
928
929 pub fn local_store(&self) -> Arc<LocalStore> {
931 Arc::clone(&self.local)
932 }
933}
934
935#[derive(Clone)]
936struct AccessRecordingStore {
937 inner: Arc<StorageRouter>,
938 accessed: Arc<Mutex<HashSet<Hash>>>,
939}
940
941impl AccessRecordingStore {
942 fn new(inner: Arc<StorageRouter>) -> Self {
943 Self {
944 inner,
945 accessed: Arc::new(Mutex::new(HashSet::new())),
946 }
947 }
948
949 fn take_accessed_hashes(&self) -> Vec<Hash> {
950 let Ok(mut accessed) = self.accessed.lock() else {
951 return Vec::new();
952 };
953 accessed.drain().collect()
954 }
955
956 fn record_access(&self, hash: &Hash) {
957 let Ok(mut accessed) = self.accessed.lock() else {
958 return;
959 };
960 accessed.insert(*hash);
961 }
962}
963
964#[async_trait]
965impl Store for AccessRecordingStore {
966 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
967 self.inner.put(hash, data).await
968 }
969
970 async fn put_many(&self, items: Vec<(Hash, Vec<u8>)>) -> Result<usize, StoreError> {
971 self.inner.put_many(items).await
972 }
973
974 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
975 let data = self.inner.get(hash).await?;
976 if data.is_some() {
977 self.record_access(hash);
978 }
979 Ok(data)
980 }
981
982 async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
983 self.inner.has(hash).await
984 }
985
986 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
987 self.inner.delete(hash).await
988 }
989}
990
991#[async_trait]
994impl Store for StorageRouter {
995 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
996 self.put_sync(hash, &data)
997 }
998
999 async fn put_many(&self, items: Vec<(Hash, Vec<u8>)>) -> Result<usize, StoreError> {
1000 self.put_many_sync(&items)
1001 }
1002
1003 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
1004 self.get_sync(hash)
1005 }
1006
1007 async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
1008 self.exists(hash)
1009 }
1010
1011 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
1012 self.delete_sync(hash)
1013 }
1014}
1015
1016pub struct HashtreeStore {
1017 base_path: PathBuf,
1018 env: heed::Env,
1019 pins: Database<Bytes, Unit>,
1021 pinned_refs: Database<Str, Unit>,
1023 tracked_authors: Database<Str, Unit>,
1025 blob_owners: Database<Bytes, Unit>,
1027 pubkey_blobs: Database<Bytes, Bytes>,
1029 pubkey_blob_index: Database<Bytes, Bytes>,
1031 tree_meta: Database<Bytes, Bytes>,
1033 blob_trees: Database<Bytes, Unit>,
1035 tree_refs: Database<Str, Bytes>,
1037 cached_roots: Database<Str, Bytes>,
1039 router: Arc<StorageRouter>,
1041 max_size_bytes: u64,
1043 evict_orphans: bool,
1045 blob_access_update_gate: BlobAccessUpdateGate,
1047 blob_access_update_inflight: Arc<AtomicBool>,
1049}
1050
1051impl HashtreeStore {
1052 pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
1054 let config = hashtree_config::Config::load_or_default();
1055 let max_size_bytes = config
1056 .storage
1057 .max_size_gb
1058 .saturating_mul(1024 * 1024 * 1024);
1059 Self::with_options_and_backend(
1060 path,
1061 None,
1062 max_size_bytes,
1063 config.storage.evict_orphans,
1064 &config.storage.backend,
1065 )
1066 }
1067
1068 pub fn new_with_backend<P: AsRef<Path>>(
1070 path: P,
1071 backend: hashtree_config::StorageBackend,
1072 max_size_bytes: u64,
1073 ) -> Result<Self> {
1074 Self::with_options_and_backend(path, None, max_size_bytes, true, &backend)
1075 }
1076
1077 pub fn with_s3<P: AsRef<Path>>(path: P, s3_config: Option<&S3Config>) -> Result<Self> {
1079 let config = hashtree_config::Config::load_or_default();
1080 let max_size_bytes = config
1081 .storage
1082 .max_size_gb
1083 .saturating_mul(1024 * 1024 * 1024);
1084 Self::with_options_and_backend(
1085 path,
1086 s3_config,
1087 max_size_bytes,
1088 config.storage.evict_orphans,
1089 &config.storage.backend,
1090 )
1091 }
1092
1093 pub fn with_options<P: AsRef<Path>>(
1099 path: P,
1100 s3_config: Option<&S3Config>,
1101 max_size_bytes: u64,
1102 ) -> Result<Self> {
1103 let config = hashtree_config::Config::load_or_default();
1104 Self::with_options_and_backend(
1105 path,
1106 s3_config,
1107 max_size_bytes,
1108 config.storage.evict_orphans,
1109 &config.storage.backend,
1110 )
1111 }
1112
1113 pub fn with_options_and_backend<P: AsRef<Path>>(
1114 path: P,
1115 s3_config: Option<&S3Config>,
1116 max_size_bytes: u64,
1117 evict_orphans: bool,
1118 backend: &hashtree_config::StorageBackend,
1119 ) -> Result<Self> {
1120 let path = path.as_ref();
1121 std::fs::create_dir_all(path)?;
1122 let metadata_map_size = lmdb_map_size_for_existing_env(
1123 path,
1124 lmdb_metadata_map_size_for_storage_budget(max_size_bytes),
1125 )?;
1126
1127 let env = unsafe {
1128 EnvOpenOptions::new()
1129 .map_size(metadata_map_size)
1130 .max_dbs(11) .max_readers(LMDB_MAX_READERS)
1132 .open(path)?
1133 };
1134 let _ = env.clear_stale_readers();
1135 if env.info().map_size < metadata_map_size {
1136 unsafe { env.resize(metadata_map_size) }?;
1137 }
1138
1139 let mut wtxn = env.write_txn()?;
1140 let pins = env.create_database(&mut wtxn, Some("pins"))?;
1141 let pinned_refs = env.create_database(&mut wtxn, Some("pinned_refs"))?;
1142 let tracked_authors = env.create_database(&mut wtxn, Some("tracked_authors"))?;
1143 let blob_owners = env.create_database(&mut wtxn, Some("blob_owners"))?;
1144 let pubkey_blobs = env.create_database(&mut wtxn, Some("pubkey_blobs"))?;
1145 let pubkey_blob_index = env.create_database(&mut wtxn, Some("pubkey_blob_index"))?;
1146 let tree_meta = env.create_database(&mut wtxn, Some("tree_meta"))?;
1147 let blob_trees = env.create_database(&mut wtxn, Some("blob_trees"))?;
1148 let tree_refs = env.create_database(&mut wtxn, Some("tree_refs"))?;
1149 let cached_roots = env.create_database(&mut wtxn, Some("cached_roots"))?;
1150 wtxn.commit()?;
1151
1152 let local_store = Arc::new(match backend {
1156 hashtree_config::StorageBackend::Fs => LocalStore::Fs(
1157 FsBlobStore::new(path.join("blobs"))
1158 .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
1159 ),
1160 #[cfg(feature = "lmdb")]
1161 hashtree_config::StorageBackend::Lmdb => {
1162 std::fs::create_dir_all(path.join("blobs"))?;
1163 remove_stale_fs_blob_shards(&path.join("blobs"))
1164 .map_err(|e| anyhow::anyhow!("Failed to clean LMDB blob store path: {}", e))?;
1165 let requested_map_size = max_size_bytes.max(LMDB_BLOB_MIN_MAP_SIZE_BYTES);
1166 let map_size = usize::try_from(requested_map_size)
1167 .context("LMDB blob map size exceeds usize")?;
1168 LocalStore::Lmdb(
1169 LmdbBlobStore::with_map_size(path.join("blobs"), map_size)
1170 .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
1171 )
1172 }
1173 #[cfg(not(feature = "lmdb"))]
1174 hashtree_config::StorageBackend::Lmdb => {
1175 tracing::warn!(
1176 "LMDB backend requested but lmdb feature not enabled, using filesystem storage"
1177 );
1178 LocalStore::Fs(
1179 FsBlobStore::new(path.join("blobs"))
1180 .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
1181 )
1182 }
1183 });
1184
1185 #[cfg(feature = "s3")]
1187 let router = Arc::new(if let Some(s3_cfg) = s3_config {
1188 tracing::info!(
1189 "Initializing S3 storage backend: bucket={}, endpoint={}",
1190 s3_cfg.bucket,
1191 s3_cfg.endpoint
1192 );
1193
1194 sync_block_on(async { StorageRouter::with_s3(local_store, s3_cfg).await })?
1195 } else {
1196 StorageRouter::new(local_store)
1197 });
1198
1199 #[cfg(not(feature = "s3"))]
1200 let router = Arc::new({
1201 if s3_config.is_some() {
1202 tracing::warn!(
1203 "S3 config provided but S3 feature not enabled. Using local storage only."
1204 );
1205 }
1206 StorageRouter::new(local_store)
1207 });
1208
1209 Ok(Self {
1210 base_path: path.to_path_buf(),
1211 env,
1212 pins,
1213 pinned_refs,
1214 tracked_authors,
1215 blob_owners,
1216 pubkey_blobs,
1217 pubkey_blob_index,
1218 tree_meta,
1219 blob_trees,
1220 tree_refs,
1221 cached_roots,
1222 router,
1223 max_size_bytes,
1224 evict_orphans,
1225 blob_access_update_gate: BlobAccessUpdateGate::default(),
1226 blob_access_update_inflight: Arc::new(AtomicBool::new(false)),
1227 })
1228 }
1229
1230 pub fn base_path(&self) -> &Path {
1231 &self.base_path
1232 }
1233
1234 pub fn router(&self) -> &StorageRouter {
1236 &self.router
1237 }
1238
1239 pub fn store_arc(&self) -> Arc<StorageRouter> {
1242 Arc::clone(&self.router)
1243 }
1244
1245 fn access_tracking_tree(&self) -> (HashTree<AccessRecordingStore>, AccessRecordingStore) {
1246 let access_store = AccessRecordingStore::new(self.store_arc());
1247 let tree = HashTree::new(HashTreeConfig::new(Arc::new(access_store.clone())).public());
1248 (tree, access_store)
1249 }
1250
1251 pub fn record_blob_accesses<I>(&self, hashes: I)
1252 where
1253 I: IntoIterator<Item = Hash>,
1254 {
1255 let access_update_batch_limit = access_update_background_batch_limit();
1256 if access_update_batch_limit == 0 {
1257 return;
1258 }
1259
1260 let now = unix_timestamp_now();
1261 let mut due_hashes = self.blob_access_update_gate.due_hashes(hashes, now);
1262 if due_hashes.is_empty() {
1263 return;
1264 }
1265
1266 if self
1267 .blob_access_update_inflight
1268 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
1269 .is_err()
1270 {
1271 return;
1272 }
1273
1274 if due_hashes.len() > access_update_batch_limit {
1275 due_hashes.truncate(access_update_batch_limit);
1276 }
1277
1278 let router = Arc::clone(&self.router);
1279 let inflight = Arc::clone(&self.blob_access_update_inflight);
1280 let spawn_result = std::thread::Builder::new()
1281 .name("blob-access-update".to_string())
1282 .spawn(move || {
1283 if let Err(err) = router.touch_many_accessed_sync(&due_hashes, now) {
1284 tracing::debug!("Failed to update blob access metadata: {}", err);
1285 }
1286 inflight.store(false, Ordering::Release);
1287 });
1288 if let Err(err) = spawn_result {
1289 self.blob_access_update_inflight
1290 .store(false, Ordering::Release);
1291 tracing::debug!("Failed to spawn blob access metadata updater: {}", err);
1292 }
1293 }
1294
1295 fn record_blob_access_now(&self, hash: &Hash) {
1296 if let Err(err) = self.router.touch_accessed_sync(hash, unix_timestamp_now()) {
1297 tracing::debug!("Failed to update blob access metadata: {}", err);
1298 }
1299 }
1300
1301 pub fn blob_last_accessed_at(&self, hash: &Hash) -> Result<Option<u64>> {
1302 self.router
1303 .last_accessed_at_sync(hash)
1304 .map_err(|e| anyhow::anyhow!("Failed to read blob access metadata: {}", e))
1305 }
1306
1307 pub fn blob_last_accessed_many(&self, hashes: &[Hash]) -> Result<Vec<(Hash, u64)>> {
1308 self.router
1309 .many_last_accessed_at_sync(hashes)
1310 .map_err(|e| anyhow::anyhow!("Failed to read blob access metadata: {}", e))
1311 }
1312
1313 pub fn get_tree_node(&self, hash: &[u8; 32]) -> Result<Option<TreeNode>> {
1315 let (tree, access_store) = self.access_tracking_tree();
1316
1317 let result = sync_block_on(async {
1318 tree.get_tree_node(hash)
1319 .await
1320 .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))
1321 })?;
1322 if result.is_some() {
1323 self.record_blob_accesses(access_store.take_accessed_hashes());
1324 }
1325 Ok(result)
1326 }
1327
1328 pub fn put_blob(&self, data: &[u8]) -> Result<String> {
1330 let hash = sha256(data);
1331 let inserted = self
1332 .router
1333 .put_sync(hash, data)
1334 .map_err(|e| anyhow::anyhow!("Failed to store blob: {}", e))?;
1335 if !inserted {
1336 self.record_blob_access_now(&hash);
1337 }
1338 Ok(to_hex(&hash))
1339 }
1340
1341 pub fn put_owned_blob(&self, data: &[u8], pubkey: &[u8; 32]) -> Result<String> {
1343 let hash = sha256(data);
1344 if !self
1345 .router
1346 .exists(&hash)
1347 .map_err(|e| anyhow::anyhow!("Failed to check blob: {}", e))?
1348 {
1349 self.make_room_for_durable_blob(data.len() as u64)?;
1350 self.router
1351 .put_sync(hash, data)
1352 .map_err(|e| anyhow::anyhow!("Failed to store blob: {}", e))?;
1353 } else {
1354 self.record_blob_access_now(&hash);
1355 }
1356 self.set_blob_owner_with_size(&hash, pubkey, data.len() as u64)?;
1357 Ok(to_hex(&hash))
1358 }
1359
1360 pub fn put_owned_blobs(&self, items: &[(Hash, Vec<u8>)], pubkey: &[u8; 32]) -> Result<usize> {
1362 let started_at = Instant::now();
1363 let slow_log_ms = slow_owned_blob_batch_log_ms();
1364 if items.is_empty() {
1365 return Ok(0);
1366 }
1367 let incoming_bytes = items.iter().fold(0u64, |total, (_, data)| {
1368 total.saturating_add(data.len() as u64)
1369 });
1370 let count = items.len();
1371 let room_started = Instant::now();
1372 self.make_room_for_durable_blob(incoming_bytes)?;
1373 let make_room_ms = room_started.elapsed().as_millis();
1374 let raw_started = Instant::now();
1375 let inserted = self
1376 .router
1377 .put_many_sync(items)
1378 .map_err(|e| anyhow::anyhow!("Failed to store blob batch: {}", e))?;
1379 let raw_write_ms = raw_started.elapsed().as_millis();
1380
1381 let now = SystemTime::now()
1382 .duration_since(UNIX_EPOCH)
1383 .unwrap()
1384 .as_secs();
1385 let owner_started = Instant::now();
1386 let mut wtxn = self.env.write_txn()?;
1387 for (hash, data) in items {
1388 let owner_key = Self::blob_owner_key(hash, pubkey);
1389 self.blob_owners.put(&mut wtxn, &owner_key[..], &())?;
1390
1391 let index_key = Self::pubkey_blob_key(pubkey, hash);
1392 if self.pubkey_blob_index.get(&wtxn, &index_key[..])?.is_none() {
1393 let metadata = BlobMetadata {
1394 sha256: to_hex(hash),
1395 size: data.len() as u64,
1396 mime_type: "application/octet-stream".to_string(),
1397 uploaded: now,
1398 };
1399 self.pubkey_blob_index.put(
1400 &mut wtxn,
1401 &index_key[..],
1402 &serde_json::to_vec(&metadata)?,
1403 )?;
1404 }
1405 }
1406 wtxn.commit()?;
1407 let owner_index_ms = owner_started.elapsed().as_millis();
1408 let total_ms = started_at.elapsed().as_millis();
1409 if slow_log_ms.is_some_and(|threshold| total_ms >= threshold) {
1410 tracing::warn!(
1411 blobs = count,
1412 inserted,
1413 incoming_bytes,
1414 total_ms,
1415 make_room_ms,
1416 raw_write_ms,
1417 owner_index_ms,
1418 "slow owned Blossom blob batch write"
1419 );
1420 }
1421 Ok(inserted)
1422 }
1423
1424 pub fn put_cached_blob(&self, data: &[u8]) -> Result<String> {
1430 let hash = sha256(data);
1431 if self
1432 .router
1433 .exists(&hash)
1434 .map_err(|e| anyhow::anyhow!("Failed to check cached blob: {}", e))?
1435 {
1436 self.record_blob_access_now(&hash);
1437 return Ok(to_hex(&hash));
1438 }
1439
1440 let incoming_bytes = data.len() as u64;
1441 let _ = self.make_room_for_cached_blob(incoming_bytes);
1442
1443 let mut retried_after_cleanup = false;
1444 loop {
1445 match self.router.put_sync(hash, data) {
1446 Ok(_) => return Ok(to_hex(&hash)),
1447 Err(err) if !retried_after_cleanup && is_map_full_store_error(&err) => {
1448 let freed = self.relieve_cached_blob_write_pressure(incoming_bytes)?;
1449 if freed == 0 {
1450 return Err(anyhow::anyhow!("Failed to store cached blob: {}", err));
1451 }
1452 retried_after_cleanup = true;
1453 }
1454 Err(err) => return Err(anyhow::anyhow!("Failed to store cached blob: {}", err)),
1455 }
1456 }
1457 }
1458
1459 pub fn put_cached_blobs(&self, items: &[(Hash, Vec<u8>)]) -> Result<usize> {
1461 let started_at = Instant::now();
1462 let slow_log_ms = slow_cached_blob_batch_log_ms();
1463 if items.is_empty() {
1464 return Ok(0);
1465 }
1466
1467 let missing_items;
1468 let write_items: &[(Hash, Vec<u8>)] = if cached_blob_batch_existing_precheck() {
1469 let mut sorted_hashes: Vec<Hash> = items.iter().map(|(hash, _)| *hash).collect();
1470 sorted_hashes.sort_unstable();
1471 sorted_hashes.dedup();
1472 let existing = self
1473 .router
1474 .existing_local_hashes_in_sorted_candidates(&sorted_hashes)
1475 .map_err(|e| anyhow::anyhow!("Failed to check cached blob batch: {}", e))?;
1476 let existing_hashes: HashSet<Hash> = sorted_hashes
1477 .into_iter()
1478 .zip(existing)
1479 .filter_map(|(hash, exists)| exists.then_some(hash))
1480 .collect();
1481 missing_items = items
1482 .iter()
1483 .filter(|(hash, _)| !existing_hashes.contains(hash))
1484 .cloned()
1485 .collect::<Vec<_>>();
1486 if missing_items.is_empty() {
1487 return Ok(0);
1488 }
1489 missing_items.as_slice()
1490 } else {
1491 items
1492 };
1493
1494 let incoming_bytes = write_items.iter().fold(0u64, |total, (_, data)| {
1495 total.saturating_add(data.len() as u64)
1496 });
1497 let room_started = Instant::now();
1498 let _ = self.make_room_for_cached_blob(incoming_bytes);
1499 let make_room_ms = room_started.elapsed().as_millis();
1500
1501 let mut retried_after_cleanup = false;
1502 loop {
1503 let raw_started = Instant::now();
1504 match self.router.put_many_sync(write_items) {
1505 Ok(inserted) => {
1506 let raw_write_ms = raw_started.elapsed().as_millis();
1507 let total_ms = started_at.elapsed().as_millis();
1508 if slow_log_ms.is_some_and(|threshold| total_ms >= threshold) {
1509 tracing::warn!(
1510 blobs = write_items.len(),
1511 inserted,
1512 incoming_bytes,
1513 total_ms,
1514 make_room_ms,
1515 raw_write_ms,
1516 "slow cached Blossom blob batch write"
1517 );
1518 }
1519 return Ok(inserted);
1520 }
1521 Err(err) if !retried_after_cleanup && is_map_full_store_error(&err) => {
1522 let freed = self.relieve_cached_blob_write_pressure(incoming_bytes)?;
1523 if freed == 0 {
1524 return Err(anyhow::anyhow!(
1525 "Failed to store cached blob batch: {}",
1526 err
1527 ));
1528 }
1529 retried_after_cleanup = true;
1530 }
1531 Err(err) => {
1532 return Err(anyhow::anyhow!(
1533 "Failed to store cached blob batch: {}",
1534 err
1535 ));
1536 }
1537 }
1538 }
1539 }
1540
1541 pub fn get_blob(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
1543 let data = self
1544 .router
1545 .get_sync(hash)
1546 .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))?;
1547 if data.is_some() {
1548 self.record_blob_accesses(std::iter::once(*hash));
1549 }
1550 Ok(data)
1551 }
1552
1553 pub fn get_blob_range(
1554 &self,
1555 hash: &[u8; 32],
1556 start: u64,
1557 end_inclusive: u64,
1558 ) -> Result<Option<Vec<u8>>> {
1559 let data = self
1560 .router
1561 .get_range_sync(hash, start, end_inclusive)
1562 .map_err(|e| anyhow::anyhow!("Failed to get blob range: {}", e))?;
1563 if data.is_some() {
1564 self.record_blob_accesses(std::iter::once(*hash));
1565 }
1566 Ok(data)
1567 }
1568
1569 pub fn blob_size(&self, hash: &[u8; 32]) -> Result<Option<u64>> {
1570 self.router
1571 .blob_size_sync(hash)
1572 .map_err(|e| anyhow::anyhow!("Failed to get blob size: {}", e))
1573 }
1574
1575 pub fn blob_exists(&self, hash: &[u8; 32]) -> Result<bool> {
1577 self.router
1578 .exists(hash)
1579 .map_err(|e| anyhow::anyhow!("Failed to check blob: {}", e))
1580 }
1581
1582 fn blob_owner_key(sha256: &[u8; 32], pubkey: &[u8; 32]) -> [u8; 64] {
1588 let mut key = [0u8; 64];
1589 key[..32].copy_from_slice(sha256);
1590 key[32..].copy_from_slice(pubkey);
1591 key
1592 }
1593
1594 fn pubkey_blob_key(pubkey: &[u8; 32], sha256: &[u8; 32]) -> [u8; 64] {
1595 let mut key = [0u8; 64];
1596 key[..32].copy_from_slice(pubkey);
1597 key[32..].copy_from_slice(sha256);
1598 key
1599 }
1600
1601 pub fn set_blob_owner(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<()> {
1604 let size = self
1605 .router
1606 .blob_size_sync(sha256)
1607 .map_err(|e| anyhow::anyhow!("Failed to get blob size: {}", e))?
1608 .unwrap_or(0);
1609 self.set_blob_owner_with_size(sha256, pubkey, size)
1610 }
1611
1612 fn set_blob_owner_with_size(
1613 &self,
1614 sha256: &[u8; 32],
1615 pubkey: &[u8; 32],
1616 size: u64,
1617 ) -> Result<()> {
1618 let key = Self::blob_owner_key(sha256, pubkey);
1619 let index_key = Self::pubkey_blob_key(pubkey, sha256);
1620 let mut wtxn = self.env.write_txn()?;
1621
1622 self.blob_owners.put(&mut wtxn, &key[..], &())?;
1624
1625 if self.pubkey_blob_index.get(&wtxn, &index_key[..])?.is_none() {
1626 let now = SystemTime::now()
1627 .duration_since(UNIX_EPOCH)
1628 .unwrap()
1629 .as_secs();
1630 let metadata = BlobMetadata {
1631 sha256: to_hex(sha256),
1632 size,
1633 mime_type: "application/octet-stream".to_string(),
1634 uploaded: now,
1635 };
1636 self.pubkey_blob_index.put(
1637 &mut wtxn,
1638 &index_key[..],
1639 &serde_json::to_vec(&metadata)?,
1640 )?;
1641 }
1642
1643 wtxn.commit()?;
1644 Ok(())
1645 }
1646
1647 pub fn is_blob_owner(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<bool> {
1649 let key = Self::blob_owner_key(sha256, pubkey);
1650 let rtxn = self.env.read_txn()?;
1651 Ok(self.blob_owners.get(&rtxn, &key[..])?.is_some())
1652 }
1653
1654 pub fn get_blob_owners(&self, sha256: &[u8; 32]) -> Result<Vec<[u8; 32]>> {
1656 let rtxn = self.env.read_txn()?;
1657
1658 let mut owners = Vec::new();
1659 for item in self.blob_owners.prefix_iter(&rtxn, &sha256[..])? {
1660 let (key, _) = item?;
1661 if key.len() == 64 {
1662 let mut pubkey = [0u8; 32];
1664 pubkey.copy_from_slice(&key[32..64]);
1665 owners.push(pubkey);
1666 }
1667 }
1668 Ok(owners)
1669 }
1670
1671 pub fn blob_has_owners(&self, sha256: &[u8; 32]) -> Result<bool> {
1673 let rtxn = self.env.read_txn()?;
1674
1675 for item in self.blob_owners.prefix_iter(&rtxn, &sha256[..])? {
1677 if item.is_ok() {
1678 return Ok(true);
1679 }
1680 }
1681 Ok(false)
1682 }
1683
1684 pub fn get_blob_owner(&self, sha256: &[u8; 32]) -> Result<Option<[u8; 32]>> {
1686 Ok(self.get_blob_owners(sha256)?.into_iter().next())
1687 }
1688
1689 pub fn delete_blossom_blob(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<bool> {
1693 let key = Self::blob_owner_key(sha256, pubkey);
1694 let mut wtxn = self.env.write_txn()?;
1695
1696 self.blob_owners.delete(&mut wtxn, &key[..])?;
1698 self.pubkey_blob_index
1699 .delete(&mut wtxn, &Self::pubkey_blob_key(pubkey, sha256)[..])?;
1700
1701 let sha256_hex = to_hex(sha256);
1703
1704 if let Some(blobs_bytes) = self.pubkey_blobs.get(&wtxn, pubkey)? {
1706 if let Ok(mut blobs) = serde_json::from_slice::<Vec<BlobMetadata>>(blobs_bytes) {
1707 blobs.retain(|b| b.sha256 != sha256_hex);
1708 let blobs_json = serde_json::to_vec(&blobs)?;
1709 self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
1710 }
1711 }
1712
1713 let mut has_other_owners = false;
1715 for item in self.blob_owners.prefix_iter(&wtxn, &sha256[..])? {
1716 if item.is_ok() {
1717 has_other_owners = true;
1718 break;
1719 }
1720 }
1721
1722 if has_other_owners {
1723 wtxn.commit()?;
1724 tracing::debug!(
1725 "Removed {} from blob {} owners, other owners remain",
1726 &to_hex(pubkey)[..8],
1727 &sha256_hex[..8]
1728 );
1729 return Ok(false);
1730 }
1731
1732 tracing::info!(
1734 "All owners removed from blob {}, deleting",
1735 &sha256_hex[..8]
1736 );
1737
1738 let _ = self.router.delete_sync(sha256);
1740
1741 wtxn.commit()?;
1742 Ok(true)
1743 }
1744
1745 pub fn list_blobs_by_pubkey(
1747 &self,
1748 pubkey: &[u8; 32],
1749 ) -> Result<Vec<crate::server::blossom::BlobDescriptor>> {
1750 let rtxn = self.env.read_txn()?;
1751
1752 let mut blobs: Vec<BlobMetadata> = self
1753 .pubkey_blobs
1754 .get(&rtxn, pubkey)?
1755 .and_then(|b| serde_json::from_slice(b).ok())
1756 .unwrap_or_default();
1757 let mut seen: HashSet<String> = blobs.iter().map(|blob| blob.sha256.clone()).collect();
1758
1759 for item in self.pubkey_blob_index.prefix_iter(&rtxn, pubkey)? {
1760 let (_, metadata_bytes) = item?;
1761 let metadata: BlobMetadata = match serde_json::from_slice(metadata_bytes) {
1762 Ok(metadata) => metadata,
1763 Err(_) => continue,
1764 };
1765 if seen.insert(metadata.sha256.clone()) {
1766 blobs.push(metadata);
1767 }
1768 }
1769
1770 Ok(blobs
1771 .into_iter()
1772 .map(|b| crate::server::blossom::BlobDescriptor {
1773 url: format!("/{}", b.sha256),
1774 sha256: b.sha256,
1775 size: b.size,
1776 mime_type: b.mime_type,
1777 uploaded: b.uploaded,
1778 })
1779 .collect())
1780 }
1781
1782 pub fn get_chunk(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
1784 let data = self
1785 .router
1786 .get_sync(hash)
1787 .map_err(|e| anyhow::anyhow!("Failed to get chunk: {}", e))?;
1788 if data.is_some() {
1789 self.record_blob_accesses(std::iter::once(*hash));
1790 }
1791 Ok(data)
1792 }
1793
1794 pub fn get_file(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
1797 let (tree, access_store) = self.access_tracking_tree();
1798
1799 let result = sync_block_on(async {
1800 tree.read_file(hash)
1801 .await
1802 .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
1803 })?;
1804 if result.is_some() {
1805 self.record_blob_accesses(access_store.take_accessed_hashes());
1806 }
1807 Ok(result)
1808 }
1809
1810 pub fn get_file_by_cid(&self, cid: &Cid) -> Result<Option<Vec<u8>>> {
1813 let (tree, access_store) = self.access_tracking_tree();
1814
1815 let result = sync_block_on(async {
1816 tree.get(cid, None)
1817 .await
1818 .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
1819 })?;
1820 if result.is_some() {
1821 self.record_blob_accesses(access_store.take_accessed_hashes());
1822 }
1823 Ok(result)
1824 }
1825
1826 fn ensure_cid_exists(&self, cid: &Cid) -> Result<()> {
1827 let exists = self
1828 .router
1829 .exists(&cid.hash)
1830 .map_err(|e| anyhow::anyhow!("Failed to check cid existence: {}", e))?;
1831 if !exists {
1832 anyhow::bail!("CID not found: {}", to_hex(&cid.hash));
1833 }
1834 Ok(())
1835 }
1836
1837 pub fn write_file_by_cid_to_writer<W: Write>(&self, cid: &Cid, writer: &mut W) -> Result<u64> {
1839 self.ensure_cid_exists(cid)?;
1840
1841 let (tree, access_store) = self.access_tracking_tree();
1842 let mut total_bytes = 0u64;
1843 let mut streamed_any_chunk = false;
1844
1845 sync_block_on(async {
1846 let mut stream = tree.get_stream(cid);
1847 while let Some(chunk) = stream.next().await {
1848 streamed_any_chunk = true;
1849 let chunk =
1850 chunk.map_err(|e| anyhow::anyhow!("Failed to stream file chunk: {}", e))?;
1851 writer
1852 .write_all(&chunk)
1853 .map_err(|e| anyhow::anyhow!("Failed to write file chunk: {}", e))?;
1854 total_bytes += chunk.len() as u64;
1855 }
1856 Ok::<(), anyhow::Error>(())
1857 })?;
1858
1859 if !streamed_any_chunk {
1860 anyhow::bail!("CID not found: {}", to_hex(&cid.hash));
1861 }
1862 self.record_blob_accesses(access_store.take_accessed_hashes());
1863
1864 writer
1865 .flush()
1866 .map_err(|e| anyhow::anyhow!("Failed to flush output: {}", e))?;
1867 Ok(total_bytes)
1868 }
1869
1870 pub fn write_file_by_cid<P: AsRef<Path>>(&self, cid: &Cid, output_path: P) -> Result<u64> {
1872 self.ensure_cid_exists(cid)?;
1873
1874 let output_path = output_path.as_ref();
1875 if let Some(parent) = output_path.parent() {
1876 if !parent.as_os_str().is_empty() {
1877 std::fs::create_dir_all(parent).with_context(|| {
1878 format!("Failed to create output directory {}", parent.display())
1879 })?;
1880 }
1881 }
1882
1883 let mut file = std::fs::File::create(output_path)
1884 .with_context(|| format!("Failed to create output file {}", output_path.display()))?;
1885 self.write_file_by_cid_to_writer(cid, &mut file)
1886 }
1887
1888 pub fn write_file<P: AsRef<Path>>(&self, hash: &[u8; 32], output_path: P) -> Result<u64> {
1890 self.write_file_by_cid(&Cid::public(*hash), output_path)
1891 }
1892
1893 pub fn resolve_path(&self, cid: &Cid, path: &str) -> Result<Option<Cid>> {
1895 let (tree, access_store) = self.access_tracking_tree();
1896
1897 let result = sync_block_on(async {
1898 tree.resolve_path(cid, path)
1899 .await
1900 .map_err(|e| anyhow::anyhow!("Failed to resolve path: {}", e))
1901 })?;
1902 if result.is_some() {
1903 self.record_blob_accesses(access_store.take_accessed_hashes());
1904 }
1905 Ok(result)
1906 }
1907
1908 pub fn get_file_chunk_metadata(&self, hash: &[u8; 32]) -> Result<Option<FileChunkMetadata>> {
1910 let access_store = AccessRecordingStore::new(self.store_arc());
1911 let tree = HashTree::new(HashTreeConfig::new(Arc::new(access_store.clone())).public());
1912
1913 let metadata: Result<Option<FileChunkMetadata>> = sync_block_on(async {
1914 let exists = access_store
1917 .has(hash)
1918 .await
1919 .map_err(|e| anyhow::anyhow!("Failed to check existence: {}", e))?;
1920
1921 if !exists {
1922 return Ok(None);
1923 }
1924
1925 let total_size = tree
1927 .get_size(hash)
1928 .await
1929 .map_err(|e| anyhow::anyhow!("Failed to get size: {}", e))?;
1930
1931 let is_tree_node = tree
1933 .is_tree(hash)
1934 .await
1935 .map_err(|e| anyhow::anyhow!("Failed to check tree: {}", e))?;
1936
1937 if !is_tree_node {
1938 return Ok(Some(FileChunkMetadata {
1940 total_size,
1941 chunk_hashes: vec![],
1942 chunk_sizes: vec![],
1943 is_chunked: false,
1944 }));
1945 }
1946
1947 let node = match tree
1949 .get_tree_node(hash)
1950 .await
1951 .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
1952 {
1953 Some(n) => n,
1954 None => return Ok(None),
1955 };
1956
1957 let is_directory = tree
1959 .is_directory(hash)
1960 .await
1961 .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1962
1963 if is_directory {
1964 return Ok(None); }
1966
1967 let chunk_hashes: Vec<Hash> = node.links.iter().map(|l| l.hash).collect();
1969 let chunk_sizes: Vec<u64> = node.links.iter().map(|l| l.size).collect();
1970
1971 Ok(Some(FileChunkMetadata {
1972 total_size,
1973 chunk_hashes,
1974 chunk_sizes,
1975 is_chunked: !node.links.is_empty(),
1976 }))
1977 });
1978 let metadata = metadata?;
1979 if metadata.is_some() {
1980 self.record_blob_accesses(access_store.take_accessed_hashes());
1981 }
1982 Ok(metadata)
1983 }
1984
1985 pub fn get_file_range(
1987 &self,
1988 hash: &[u8; 32],
1989 start: u64,
1990 end: Option<u64>,
1991 ) -> Result<Option<(Vec<u8>, u64)>> {
1992 let metadata = match self.get_file_chunk_metadata(hash)? {
1993 Some(m) => m,
1994 None => return Ok(None),
1995 };
1996
1997 if metadata.total_size == 0 {
1998 return Ok(Some((Vec::new(), 0)));
1999 }
2000
2001 if start >= metadata.total_size {
2002 return Ok(None);
2003 }
2004
2005 let end = end
2006 .unwrap_or(metadata.total_size - 1)
2007 .min(metadata.total_size - 1);
2008
2009 if !metadata.is_chunked {
2011 let content = self.get_file(hash)?.unwrap_or_default();
2012 let range_content = if start < content.len() as u64 {
2013 content[start as usize..=(end as usize).min(content.len() - 1)].to_vec()
2014 } else {
2015 Vec::new()
2016 };
2017 return Ok(Some((range_content, metadata.total_size)));
2018 }
2019
2020 let mut result = Vec::new();
2022 let mut current_offset = 0u64;
2023
2024 for (i, chunk_hash) in metadata.chunk_hashes.iter().enumerate() {
2025 let chunk_size = metadata.chunk_sizes[i];
2026 let chunk_end = current_offset + chunk_size - 1;
2027
2028 if chunk_end >= start && current_offset <= end {
2030 let chunk_content = match self.get_chunk(chunk_hash)? {
2031 Some(content) => content,
2032 None => {
2033 return Err(anyhow::anyhow!("Chunk {} not found", to_hex(chunk_hash)));
2034 }
2035 };
2036
2037 let chunk_read_start = if current_offset >= start {
2038 0
2039 } else {
2040 (start - current_offset) as usize
2041 };
2042
2043 let chunk_read_end = if chunk_end <= end {
2044 chunk_size as usize - 1
2045 } else {
2046 (end - current_offset) as usize
2047 };
2048
2049 result.extend_from_slice(&chunk_content[chunk_read_start..=chunk_read_end]);
2050 }
2051
2052 current_offset += chunk_size;
2053
2054 if current_offset > end {
2055 break;
2056 }
2057 }
2058
2059 Ok(Some((result, metadata.total_size)))
2060 }
2061
2062 pub fn stream_file_range_chunks_owned(
2064 self: Arc<Self>,
2065 hash: &[u8; 32],
2066 start: u64,
2067 end: u64,
2068 ) -> Result<Option<FileRangeChunksOwned>> {
2069 let metadata = match self.get_file_chunk_metadata(hash)? {
2070 Some(m) => m,
2071 None => return Ok(None),
2072 };
2073
2074 if metadata.total_size == 0 || start >= metadata.total_size {
2075 return Ok(None);
2076 }
2077
2078 let end = end.min(metadata.total_size - 1);
2079
2080 Ok(Some(FileRangeChunksOwned {
2081 store: self,
2082 metadata,
2083 start,
2084 end,
2085 current_chunk_idx: 0,
2086 current_offset: 0,
2087 }))
2088 }
2089
2090 pub fn get_directory_listing(&self, hash: &[u8; 32]) -> Result<Option<DirectoryListing>> {
2092 let (tree, access_store) = self.access_tracking_tree();
2093
2094 let listing: Result<Option<DirectoryListing>> = sync_block_on(async {
2095 let is_dir = tree
2097 .is_directory(hash)
2098 .await
2099 .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
2100
2101 if !is_dir {
2102 return Ok(None);
2103 }
2104
2105 let cid = hashtree_core::Cid::public(*hash);
2107 let tree_entries = tree
2108 .list_directory(&cid)
2109 .await
2110 .map_err(|e| anyhow::anyhow!("Failed to list directory: {}", e))?;
2111
2112 let entries: Vec<DirEntry> = tree_entries
2113 .into_iter()
2114 .map(|e| DirEntry {
2115 name: e.name,
2116 cid: to_hex(&e.hash),
2117 is_directory: e.link_type.is_tree(),
2118 size: e.size,
2119 })
2120 .collect();
2121
2122 Ok(Some(DirectoryListing {
2123 dir_name: String::new(),
2124 entries,
2125 }))
2126 });
2127 let listing = listing?;
2128 if listing.is_some() {
2129 self.record_blob_accesses(access_store.take_accessed_hashes());
2130 }
2131 Ok(listing)
2132 }
2133
2134 pub fn get_directory_listing_by_cid(&self, cid: &Cid) -> Result<Option<DirectoryListing>> {
2136 let (tree, access_store) = self.access_tracking_tree();
2137 let cid = cid.clone();
2138
2139 let listing: Result<Option<DirectoryListing>> = sync_block_on(async {
2140 let is_dir = tree
2141 .is_dir(&cid)
2142 .await
2143 .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
2144
2145 if !is_dir {
2146 return Ok(None);
2147 }
2148
2149 let tree_entries = tree
2150 .list_directory(&cid)
2151 .await
2152 .map_err(|e| anyhow::anyhow!("Failed to list directory: {}", e))?;
2153
2154 let entries: Vec<DirEntry> = tree_entries
2155 .into_iter()
2156 .map(|e| DirEntry {
2157 name: e.name,
2158 cid: Cid {
2159 hash: e.hash,
2160 key: e.key,
2161 }
2162 .to_string(),
2163 is_directory: e.link_type.is_tree(),
2164 size: e.size,
2165 })
2166 .collect();
2167
2168 Ok(Some(DirectoryListing {
2169 dir_name: String::new(),
2170 entries,
2171 }))
2172 });
2173 let listing = listing?;
2174 if listing.is_some() {
2175 self.record_blob_accesses(access_store.take_accessed_hashes());
2176 }
2177 Ok(listing)
2178 }
2179
2180 pub fn add_pinned_ref(&self, key: &str) -> Result<()> {
2184 let mut wtxn = self.env.write_txn()?;
2185 self.pinned_refs.put(&mut wtxn, key, &())?;
2186 wtxn.commit()?;
2187 Ok(())
2188 }
2189
2190 pub fn remove_pinned_ref(&self, key: &str) -> Result<bool> {
2192 let mut wtxn = self.env.write_txn()?;
2193 let removed = self.pinned_refs.delete(&mut wtxn, key)?;
2194 wtxn.commit()?;
2195 Ok(removed)
2196 }
2197
2198 pub fn list_pinned_refs(&self) -> Result<Vec<String>> {
2200 let rtxn = self.env.read_txn()?;
2201 let mut refs = Vec::new();
2202
2203 for item in self.pinned_refs.iter(&rtxn)? {
2204 let (key, _) = item?;
2205 refs.push(key.to_string());
2206 }
2207
2208 refs.sort();
2209 Ok(refs)
2210 }
2211
2212 pub fn add_tracked_author(&self, npub: &str) -> Result<bool> {
2214 let mut wtxn = self.env.write_txn()?;
2215 let inserted = self.tracked_authors.get(&wtxn, npub)?.is_none();
2216 self.tracked_authors.put(&mut wtxn, npub, &())?;
2217 wtxn.commit()?;
2218 Ok(inserted)
2219 }
2220
2221 pub fn remove_tracked_author(&self, npub: &str) -> Result<bool> {
2223 let mut wtxn = self.env.write_txn()?;
2224 let removed = self.tracked_authors.delete(&mut wtxn, npub)?;
2225 wtxn.commit()?;
2226 Ok(removed)
2227 }
2228
2229 pub fn list_tracked_authors(&self) -> Result<Vec<String>> {
2231 let rtxn = self.env.read_txn()?;
2232 let mut authors = Vec::new();
2233
2234 for item in self.tracked_authors.iter(&rtxn)? {
2235 let (npub, _) = item?;
2236 authors.push(npub.to_string());
2237 }
2238
2239 authors.sort();
2240 Ok(authors)
2241 }
2242
2243 pub fn get_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<Option<CachedRoot>> {
2245 let key = format!("{}/{}", pubkey_hex, tree_name);
2246 let rtxn = self.env.read_txn()?;
2247 if let Some(bytes) = self.cached_roots.get(&rtxn, &key)? {
2248 let root: CachedRoot = rmp_serde::from_slice(bytes)
2249 .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
2250 Ok(Some(root))
2251 } else {
2252 Ok(None)
2253 }
2254 }
2255
2256 pub fn set_cached_root(
2258 &self,
2259 pubkey_hex: &str,
2260 tree_name: &str,
2261 hash: &str,
2262 key: Option<&str>,
2263 visibility: &str,
2264 updated_at: u64,
2265 ) -> Result<()> {
2266 let db_key = format!("{}/{}", pubkey_hex, tree_name);
2267 let root = CachedRoot {
2268 hash: hash.to_string(),
2269 key: key.map(|k| k.to_string()),
2270 updated_at,
2271 visibility: visibility.to_string(),
2272 };
2273 let bytes = rmp_serde::to_vec(&root)
2274 .map_err(|e| anyhow::anyhow!("Failed to serialize CachedRoot: {}", e))?;
2275 let mut wtxn = self.env.write_txn()?;
2276 self.cached_roots.put(&mut wtxn, &db_key, &bytes)?;
2277 wtxn.commit()?;
2278 Ok(())
2279 }
2280
2281 pub fn list_cached_roots(&self, pubkey_hex: &str) -> Result<Vec<(String, CachedRoot)>> {
2283 let prefix = format!("{}/", pubkey_hex);
2284 let rtxn = self.env.read_txn()?;
2285 let mut results = Vec::new();
2286
2287 for item in self.cached_roots.iter(&rtxn)? {
2288 let (key, bytes) = item?;
2289 if key.starts_with(&prefix) {
2290 let tree_name = key.strip_prefix(&prefix).unwrap_or(key);
2291 let root: CachedRoot = rmp_serde::from_slice(bytes)
2292 .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
2293 results.push((tree_name.to_string(), root));
2294 }
2295 }
2296
2297 Ok(results)
2298 }
2299
2300 pub fn delete_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<bool> {
2302 let key = format!("{}/{}", pubkey_hex, tree_name);
2303 let mut wtxn = self.env.write_txn()?;
2304 let deleted = self.cached_roots.delete(&mut wtxn, &key)?;
2305 wtxn.commit()?;
2306 Ok(deleted)
2307 }
2308}
2309
2310fn is_map_full_store_error(err: &StoreError) -> bool {
2311 let message = err.to_string();
2312 message.contains("MDB_MAP_FULL") || message.contains("MapFull")
2313}
2314
2315#[derive(Debug, Clone)]
2316pub struct FileChunkMetadata {
2317 pub total_size: u64,
2318 pub chunk_hashes: Vec<Hash>,
2319 pub chunk_sizes: Vec<u64>,
2320 pub is_chunked: bool,
2321}
2322
2323pub struct FileRangeChunksOwned {
2325 store: Arc<HashtreeStore>,
2326 metadata: FileChunkMetadata,
2327 start: u64,
2328 end: u64,
2329 current_chunk_idx: usize,
2330 current_offset: u64,
2331}
2332
2333impl Iterator for FileRangeChunksOwned {
2334 type Item = Result<Vec<u8>>;
2335
2336 fn next(&mut self) -> Option<Self::Item> {
2337 if !self.metadata.is_chunked || self.current_chunk_idx >= self.metadata.chunk_hashes.len() {
2338 return None;
2339 }
2340
2341 if self.current_offset > self.end {
2342 return None;
2343 }
2344
2345 let chunk_hash = &self.metadata.chunk_hashes[self.current_chunk_idx];
2346 let chunk_size = self.metadata.chunk_sizes[self.current_chunk_idx];
2347 let chunk_end = self.current_offset + chunk_size - 1;
2348
2349 self.current_chunk_idx += 1;
2350
2351 if chunk_end < self.start || self.current_offset > self.end {
2352 self.current_offset += chunk_size;
2353 return self.next();
2354 }
2355
2356 let chunk_content = match self.store.get_chunk(chunk_hash) {
2357 Ok(Some(content)) => content,
2358 Ok(None) => {
2359 return Some(Err(anyhow::anyhow!(
2360 "Chunk {} not found",
2361 to_hex(chunk_hash)
2362 )));
2363 }
2364 Err(e) => {
2365 return Some(Err(e));
2366 }
2367 };
2368
2369 let chunk_read_start = if self.current_offset >= self.start {
2370 0
2371 } else {
2372 (self.start - self.current_offset) as usize
2373 };
2374
2375 let chunk_read_end = if chunk_end <= self.end {
2376 chunk_size as usize - 1
2377 } else {
2378 (self.end - self.current_offset) as usize
2379 };
2380
2381 let result = chunk_content[chunk_read_start..=chunk_read_end].to_vec();
2382 self.current_offset += chunk_size;
2383
2384 Some(Ok(result))
2385 }
2386}
2387
2388#[derive(Debug)]
2389pub struct GcStats {
2390 pub deleted_dags: usize,
2391 pub freed_bytes: u64,
2392}
2393
2394#[derive(Debug, Clone)]
2395pub struct DirEntry {
2396 pub name: String,
2397 pub cid: String,
2398 pub is_directory: bool,
2399 pub size: u64,
2400}
2401
2402#[derive(Debug, Clone)]
2403pub struct DirectoryListing {
2404 pub dir_name: String,
2405 pub entries: Vec<DirEntry>,
2406}
2407
2408#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
2410pub struct BlobMetadata {
2411 pub sha256: String,
2412 pub size: u64,
2413 pub mime_type: String,
2414 pub uploaded: u64,
2415}
2416
2417impl crate::webrtc::ContentStore for HashtreeStore {
2419 fn get(&self, hash_hex: &str) -> Result<Option<Vec<u8>>> {
2420 let hash = from_hex(hash_hex).map_err(|e| anyhow::anyhow!("Invalid hash: {}", e))?;
2421 self.get_chunk(&hash)
2422 }
2423}
2424
2425#[cfg(test)]
2426mod tests {
2427 use super::*;
2428 #[cfg(feature = "lmdb")]
2429 use tempfile::TempDir;
2430
2431 #[test]
2432 fn blob_access_update_gate_deduplicates_and_throttles() {
2433 let gate = BlobAccessUpdateGate::default();
2434 let first = sha256(b"first");
2435 let second = sha256(b"second");
2436
2437 assert_eq!(
2438 gate.due_hashes([first, first, second], 10),
2439 vec![first, second]
2440 );
2441 assert!(gate.due_hashes([first, second], 11).is_empty());
2442 assert_eq!(
2443 gate.due_hashes([second, first], 10 + ACCESS_UPDATE_INTERVAL_SECS),
2444 vec![second, first]
2445 );
2446 }
2447
2448 #[cfg(feature = "lmdb")]
2449 #[test]
2450 fn hashtree_store_expands_blob_lmdb_map_size_to_storage_budget() -> Result<()> {
2451 let temp = TempDir::new()?;
2452 let requested = LMDB_BLOB_MIN_MAP_SIZE_BYTES + 64 * 1024 * 1024;
2453 let store = HashtreeStore::with_options_and_backend(
2454 temp.path(),
2455 None,
2456 requested,
2457 true,
2458 &StorageBackend::Lmdb,
2459 )?;
2460
2461 let map_size = match store.router.local.as_ref() {
2462 LocalStore::Lmdb(local) => local.map_size_bytes() as u64,
2463 LocalStore::Fs(_) => panic!("expected LMDB local store"),
2464 };
2465
2466 assert!(
2467 map_size >= requested,
2468 "expected blob LMDB map to grow to at least {requested} bytes, got {map_size}"
2469 );
2470
2471 drop(store);
2472 Ok(())
2473 }
2474
2475 #[cfg(feature = "lmdb")]
2476 #[test]
2477 fn hashtree_store_expands_metadata_lmdb_map_size_to_storage_budget() -> Result<()> {
2478 let temp = TempDir::new()?;
2479 let storage_budget = 256 * 1024 * 1024 * 1024u64;
2480 let expected = lmdb_metadata_map_size_for_storage_budget(storage_budget);
2481 let store = HashtreeStore::with_options_and_backend(
2482 temp.path(),
2483 None,
2484 storage_budget,
2485 true,
2486 &StorageBackend::Lmdb,
2487 )?;
2488
2489 let map_size = store.env.info().map_size as u64;
2490 assert!(
2491 map_size >= expected,
2492 "expected metadata LMDB map to grow to at least {expected} bytes, got {map_size}"
2493 );
2494
2495 drop(store);
2496 Ok(())
2497 }
2498
2499 #[cfg(feature = "lmdb")]
2500 #[test]
2501 fn lmdb_map_size_for_existing_env_keeps_matching_requested_size() -> Result<()> {
2502 let temp = TempDir::new()?;
2503 let requested = LMDB_METADATA_MIN_MAP_SIZE_BYTES;
2504 std::fs::File::create(temp.path().join("data.mdb"))?.set_len(requested)?;
2505
2506 let map_size = lmdb_map_size_for_existing_env(temp.path(), requested)? as u64;
2507
2508 assert_eq!(map_size, align_lmdb_map_size(requested));
2509 Ok(())
2510 }
2511
2512 #[cfg(feature = "lmdb")]
2513 #[test]
2514 fn lmdb_map_size_for_existing_env_adds_headroom_when_existing_is_larger() -> Result<()> {
2515 let temp = TempDir::new()?;
2516 let requested = LMDB_METADATA_MIN_MAP_SIZE_BYTES;
2517 let existing = requested + 4096;
2518 std::fs::File::create(temp.path().join("data.mdb"))?.set_len(existing)?;
2519
2520 let map_size = lmdb_map_size_for_existing_env(temp.path(), requested)? as u64;
2521 let expected = align_lmdb_map_size(existing + LMDB_METADATA_REOPEN_HEADROOM_BYTES);
2522
2523 assert_eq!(map_size, expected);
2524 Ok(())
2525 }
2526
2527 #[cfg(feature = "lmdb")]
2528 #[test]
2529 fn local_store_can_override_lmdb_map_size() -> Result<()> {
2530 let temp = TempDir::new()?;
2531 let requested = 512 * 1024 * 1024u64;
2532 let store = LocalStore::new_with_lmdb_map_size(
2533 temp.path().join("lmdb-blobs"),
2534 &StorageBackend::Lmdb,
2535 Some(requested),
2536 )?;
2537
2538 let map_size = match store {
2539 LocalStore::Lmdb(local) => local.map_size_bytes() as u64,
2540 LocalStore::Fs(_) => panic!("expected LMDB local store"),
2541 };
2542
2543 assert!(
2544 map_size >= requested,
2545 "expected LMDB map to grow to at least {requested} bytes, got {map_size}"
2546 );
2547
2548 Ok(())
2549 }
2550
2551 #[cfg(feature = "lmdb")]
2552 #[test]
2553 fn lmdb_local_store_removes_stale_fs_blob_shard_dirs() -> Result<()> {
2554 let temp = TempDir::new()?;
2555 let path = temp.path().join("lmdb-blobs");
2556 std::fs::create_dir_all(path.join("aa"))?;
2557 std::fs::create_dir_all(path.join("b2"))?;
2558 std::fs::create_dir_all(path.join("keep-me"))?;
2559 std::fs::write(path.join("aa").join("blob.bin"), b"old fs shard")?;
2560 std::fs::write(path.join("b2").join("blob.bin"), b"old fs shard")?;
2561 std::fs::write(path.join("keep-me").join("note.txt"), b"keep")?;
2562
2563 let _store = LocalStore::new_with_lmdb_map_size(
2564 &path,
2565 &StorageBackend::Lmdb,
2566 Some(128 * 1024 * 1024),
2567 )?;
2568
2569 assert!(!path.join("aa").exists());
2570 assert!(!path.join("b2").exists());
2571 assert!(path.join("keep-me").exists());
2572 assert!(path.join("data.mdb").exists());
2573 assert!(path.join("lock.mdb").exists());
2574
2575 Ok(())
2576 }
2577
2578 #[cfg(feature = "lmdb")]
2579 #[test]
2580 fn duplicate_blossom_writes_refresh_blob_last_accessed() -> Result<()> {
2581 let temp = TempDir::new()?;
2582 let store = HashtreeStore::with_options_and_backend(
2583 temp.path(),
2584 None,
2585 LMDB_BLOB_MIN_MAP_SIZE_BYTES,
2586 true,
2587 &StorageBackend::Lmdb,
2588 )?;
2589
2590 let data = b"cached blossom duplicate";
2591 let hash = sha256(data);
2592 store.put_cached_blob(data)?;
2593 store.router.touch_accessed_sync(&hash, 1)?;
2594 store.put_cached_blob(data)?;
2595 assert!(store.blob_last_accessed_at(&hash)?.unwrap_or(0) > 1);
2596
2597 let cached_batch = [
2598 (
2599 sha256(b"cached blossom batch 1"),
2600 b"cached blossom batch 1".to_vec(),
2601 ),
2602 (
2603 sha256(b"cached blossom batch 2"),
2604 b"cached blossom batch 2".to_vec(),
2605 ),
2606 ];
2607 assert_eq!(store.put_cached_blobs(&cached_batch)?, 2);
2608 assert_eq!(store.put_cached_blobs(&cached_batch)?, 0);
2609 assert_eq!(
2610 store.get_blob(&cached_batch[0].0)?.as_deref(),
2611 Some(cached_batch[0].1.as_slice())
2612 );
2613
2614 let owned = b"owned blossom duplicate";
2615 let owned_hash = sha256(owned);
2616 let owner = [7u8; 32];
2617 store.put_owned_blob(owned, &owner)?;
2618 store.router.touch_accessed_sync(&owned_hash, 1)?;
2619 store.put_owned_blob(owned, &owner)?;
2620 assert!(store.blob_last_accessed_at(&owned_hash)?.unwrap_or(0) > 1);
2621 let owned_blobs = store.list_blobs_by_pubkey(&owner)?;
2622 assert_eq!(owned_blobs.len(), 1);
2623 assert_eq!(owned_blobs[0].sha256, to_hex(&owned_hash));
2624
2625 let batch = [
2626 (
2627 sha256(b"owned blossom batch 1"),
2628 b"owned blossom batch 1".to_vec(),
2629 ),
2630 (
2631 sha256(b"owned blossom batch 2"),
2632 b"owned blossom batch 2".to_vec(),
2633 ),
2634 ];
2635 store.put_owned_blobs(&batch, &owner)?;
2636 let owned_blobs = store.list_blobs_by_pubkey(&owner)?;
2637 assert_eq!(owned_blobs.len(), 3);
2638
2639 Ok(())
2640 }
2641
2642 #[cfg(feature = "lmdb")]
2643 #[test]
2644 fn replacing_tree_ref_unpins_and_unindexes_superseded_root() -> Result<()> {
2645 let temp = TempDir::new()?;
2646 let store = HashtreeStore::with_options_and_backend(
2647 temp.path(),
2648 None,
2649 LMDB_BLOB_MIN_MAP_SIZE_BYTES,
2650 true,
2651 &StorageBackend::Lmdb,
2652 )?;
2653
2654 let old_bytes = b"old published root";
2655 let new_bytes = b"new published root";
2656 let old_root = sha256(old_bytes);
2657 let new_root = sha256(new_bytes);
2658
2659 store.put_blob(old_bytes)?;
2660 store.pin(&old_root)?;
2661 store.index_tree(
2662 &old_root,
2663 "owner",
2664 Some("playlist"),
2665 PRIORITY_OWN,
2666 Some("npub1owner/playlist"),
2667 )?;
2668
2669 assert!(store.is_pinned(&old_root)?);
2670 assert!(store.get_tree_meta(&old_root)?.is_some());
2671
2672 store.put_blob(new_bytes)?;
2673 store.pin(&new_root)?;
2674 store.index_tree(
2675 &new_root,
2676 "owner",
2677 Some("playlist"),
2678 PRIORITY_OWN,
2679 Some("npub1owner/playlist"),
2680 )?;
2681
2682 assert!(
2683 !store.is_pinned(&old_root)?,
2684 "superseded root should be unpinned when ref is replaced"
2685 );
2686 assert!(
2687 store.get_tree_meta(&old_root)?.is_none(),
2688 "superseded root metadata should be removed when ref is replaced"
2689 );
2690 assert!(store.is_pinned(&new_root)?);
2691 assert!(store.get_tree_meta(&new_root)?.is_some());
2692
2693 Ok(())
2694 }
2695
2696 #[test]
2697 fn tracked_authors_round_trip_sorted_and_deduplicated() -> Result<()> {
2698 let temp = TempDir::new()?;
2699 let store = HashtreeStore::with_options(temp.path(), None, 1024 * 1024)?;
2700
2701 store
2702 .add_tracked_author("npub1zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzs9d3kk")?;
2703 store
2704 .add_tracked_author("npub1aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaqf5slm")?;
2705 store
2706 .add_tracked_author("npub1zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzs9d3kk")?;
2707
2708 assert_eq!(
2709 store.list_tracked_authors()?,
2710 vec![
2711 "npub1aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaqf5slm".to_string(),
2712 "npub1zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzs9d3kk".to_string(),
2713 ]
2714 );
2715 assert!(store.remove_tracked_author(
2716 "npub1aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaqf5slm"
2717 )?);
2718 assert!(!store.remove_tracked_author(
2719 "npub1bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbpqqqqq"
2720 )?);
2721 assert_eq!(
2722 store.list_tracked_authors()?,
2723 vec!["npub1zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzs9d3kk".to_string()]
2724 );
2725
2726 Ok(())
2727 }
2728
2729 #[cfg(feature = "s3")]
2730 #[test]
2731 fn async_store_s3_fallback_does_not_reenter_futures_executor() -> Result<()> {
2732 let temp = tempfile::TempDir::new()?;
2733 let local = Arc::new(LocalStore::new(
2734 temp.path().join("blobs"),
2735 &StorageBackend::Fs,
2736 )?);
2737
2738 let outcome = std::panic::catch_unwind(|| {
2739 sync_block_on(async {
2740 let aws_config = aws_config::from_env()
2741 .region(aws_sdk_s3::config::Region::new("auto"))
2742 .load()
2743 .await;
2744 let s3_client = aws_sdk_s3::Client::from_conf(
2745 aws_sdk_s3::config::Builder::from(&aws_config)
2746 .endpoint_url("http://127.0.0.1:9")
2747 .force_path_style(true)
2748 .build(),
2749 );
2750
2751 let router = StorageRouter {
2752 local,
2753 s3_client: Some(s3_client),
2754 s3_bucket: Some("test-bucket".to_string()),
2755 s3_prefix: String::new(),
2756 sync_tx: None,
2757 };
2758 let hash = [0u8; 32];
2759
2760 let _ = Store::has(&router, &hash).await;
2761 let _ = Store::get(&router, &hash).await;
2762 });
2763 });
2764
2765 assert!(
2766 outcome.is_ok(),
2767 "S3-backed async store methods should not panic inside futures::block_on"
2768 );
2769
2770 Ok(())
2771 }
2772}