1use anyhow::{Context, Result};
2use async_trait::async_trait;
3use futures::executor::block_on as sync_block_on;
4use futures::StreamExt;
5use hashtree_config::StorageBackend;
6use hashtree_core::store::{Store, StoreError};
7use hashtree_core::{
8 from_hex, sha256, to_hex, types::Hash, Cid, HashTree, HashTreeConfig, TreeNode,
9};
10use hashtree_fs::FsBlobStore;
11#[cfg(feature = "lmdb")]
12use hashtree_lmdb::LmdbBlobStore;
13use heed::types::*;
14use heed::{Database, EnvOpenOptions};
15use serde::{Deserialize, Serialize};
16use std::collections::{HashMap, HashSet};
17#[cfg(feature = "s3")]
18use std::future::Future;
19use std::io::Write;
20use std::path::{Path, PathBuf};
21use std::sync::atomic::{AtomicBool, Ordering};
22use std::sync::{Arc, Mutex};
23use std::time::{SystemTime, UNIX_EPOCH};
24
25mod upload;
26
27mod maintenance;
28mod retention;
29
30#[cfg(feature = "s3")]
31const DEFAULT_S3_SYNC_TIMEOUT_MS: u64 = 5_000;
32#[cfg(feature = "s3")]
33const S3_SYNC_TIMEOUT_MS_ENV: &str = "HTREE_S3_SYNC_TIMEOUT_MS";
34
35pub use maintenance::{
36 compact_lmdb_environments_under, CompactResult, R2ImportOptions, R2ImportResult, VerifyResult,
37};
38pub use retention::{OwnedBlobStats, PinnedItem, StorageByPriority, StorageStats, TreeMeta};
39
40pub const PRIORITY_OTHER: u8 = 64;
42pub const PRIORITY_FOLLOWED: u8 = 128;
43pub const PRIORITY_OWN: u8 = 255;
44const LMDB_MAX_READERS: u32 = 1024;
45const LMDB_METADATA_MIN_MAP_SIZE_BYTES: u64 = 64 * 1024 * 1024;
46const LMDB_METADATA_MAX_MAP_SIZE_BYTES: u64 = 64 * 1024 * 1024 * 1024;
47const LMDB_METADATA_STORAGE_RATIO_DIVISOR: u64 = 1024;
48const LMDB_METADATA_REOPEN_HEADROOM_BYTES: u64 = 64 * 1024 * 1024;
49#[cfg(feature = "lmdb")]
50const LMDB_BLOB_MIN_MAP_SIZE_BYTES: u64 = 16 * 1024 * 1024;
51const ACCESS_UPDATE_INTERVAL_SECS: u64 = 300;
52const ACCESS_UPDATE_GATE_MAX_ENTRIES: usize = 4096;
53const ACCESS_UPDATE_BACKGROUND_BATCH_LIMIT: usize = 1024;
54
55fn unix_timestamp_now() -> u64 {
56 SystemTime::now()
57 .duration_since(UNIX_EPOCH)
58 .unwrap_or_default()
59 .as_secs()
60}
61
62#[derive(Debug, Clone, Serialize, Deserialize)]
64pub struct CachedRoot {
65 pub hash: String,
67 pub key: Option<String>,
69 pub updated_at: u64,
71 pub visibility: String,
73}
74
75#[derive(Debug, Clone)]
77pub struct LocalStoreStats {
78 pub count: usize,
79 pub total_bytes: u64,
80}
81
82#[derive(Default)]
83struct BlobAccessUpdateGate {
84 next_update_by_hash: Mutex<HashMap<Hash, u64>>,
85}
86
87impl BlobAccessUpdateGate {
88 fn due_hashes<I>(&self, hashes: I, now: u64) -> Vec<Hash>
89 where
90 I: IntoIterator<Item = Hash>,
91 {
92 let Ok(mut next_update_by_hash) = self.next_update_by_hash.try_lock() else {
93 return Vec::new();
94 };
95
96 if next_update_by_hash.len() >= ACCESS_UPDATE_GATE_MAX_ENTRIES {
97 next_update_by_hash.retain(|_, next_update| *next_update > now);
98 if next_update_by_hash.len() >= ACCESS_UPDATE_GATE_MAX_ENTRIES {
99 next_update_by_hash.clear();
100 }
101 }
102
103 let mut due = Vec::new();
104 let mut seen = HashSet::new();
105 for hash in hashes {
106 if !seen.insert(hash) {
107 continue;
108 }
109 if next_update_by_hash
110 .get(&hash)
111 .is_some_and(|next_update| now < *next_update)
112 {
113 continue;
114 }
115 next_update_by_hash.insert(hash, now.saturating_add(ACCESS_UPDATE_INTERVAL_SECS));
116 due.push(hash);
117 }
118 due
119 }
120}
121
122pub enum LocalStore {
124 Fs(FsBlobStore),
125 #[cfg(feature = "lmdb")]
126 Lmdb(LmdbBlobStore),
127}
128
129#[cfg(feature = "lmdb")]
130fn is_fs_blob_shard_dir(path: &Path) -> bool {
131 path.file_name()
132 .and_then(|name| name.to_str())
133 .map(|name| name.len() == 2 && name.as_bytes().iter().all(u8::is_ascii_hexdigit))
134 .unwrap_or(false)
135}
136
137fn lmdb_metadata_map_size_for_storage_budget(max_size_bytes: u64) -> u64 {
138 if max_size_bytes == 0 {
139 return LMDB_METADATA_MAX_MAP_SIZE_BYTES;
140 }
141
142 max_size_bytes
143 .saturating_div(LMDB_METADATA_STORAGE_RATIO_DIVISOR)
144 .clamp(
145 LMDB_METADATA_MIN_MAP_SIZE_BYTES,
146 LMDB_METADATA_MAX_MAP_SIZE_BYTES,
147 )
148}
149
150fn lmdb_map_size_for_existing_env(path: &Path, requested_bytes: u64) -> Result<usize> {
151 let existing_bytes = std::fs::metadata(path.join("data.mdb"))
152 .map(|metadata| metadata.len())
153 .unwrap_or(0);
154 let existing_headroom = if existing_bytes == 0 {
155 0
156 } else {
157 existing_bytes
158 .saturating_div(10)
159 .max(LMDB_METADATA_REOPEN_HEADROOM_BYTES)
160 };
161 let requested =
162 align_lmdb_map_size(requested_bytes.max(existing_bytes.saturating_add(existing_headroom)));
163 usize::try_from(requested).context("LMDB map size exceeds usize")
164}
165
166fn align_lmdb_map_size(bytes: u64) -> u64 {
167 let page_size = (page_size::get() as u64).max(4096);
168 let remainder = bytes % page_size;
169 if remainder == 0 {
170 bytes
171 } else {
172 bytes.saturating_add(page_size - remainder)
173 }
174}
175
176#[cfg(feature = "lmdb")]
177fn remove_stale_fs_blob_shards(path: &Path) -> Result<(), StoreError> {
178 let entries = std::fs::read_dir(path).map_err(StoreError::Io)?;
179 for entry in entries {
180 let entry = entry.map_err(StoreError::Io)?;
181 let entry_path = entry.path();
182 if entry_path.is_dir() && is_fs_blob_shard_dir(&entry_path) {
183 std::fs::remove_dir_all(&entry_path).map_err(StoreError::Io)?;
184 tracing::info!(
185 "Removed stale filesystem blob shard directory after LMDB cutover: {}",
186 entry_path.display()
187 );
188 }
189 }
190 Ok(())
191}
192
193impl LocalStore {
194 pub fn new<P: AsRef<Path>>(path: P, backend: &StorageBackend) -> Result<Self, StoreError> {
200 Self::new_unbounded(path, backend)
201 }
202
203 pub fn new_with_lmdb_map_size<P: AsRef<Path>>(
208 path: P,
209 backend: &StorageBackend,
210 _map_size_bytes: Option<u64>,
211 ) -> Result<Self, StoreError> {
212 match backend {
213 StorageBackend::Fs => Ok(LocalStore::Fs(FsBlobStore::new(path)?)),
214 #[cfg(feature = "lmdb")]
215 StorageBackend::Lmdb => match _map_size_bytes {
216 Some(map_size_bytes) => {
217 std::fs::create_dir_all(path.as_ref()).map_err(StoreError::Io)?;
218 remove_stale_fs_blob_shards(path.as_ref())?;
219 Ok(LocalStore::Lmdb(LmdbBlobStore::with_max_bytes(
220 path,
221 map_size_bytes,
222 )?))
223 }
224 None => {
225 std::fs::create_dir_all(path.as_ref()).map_err(StoreError::Io)?;
226 remove_stale_fs_blob_shards(path.as_ref())?;
227 Ok(LocalStore::Lmdb(LmdbBlobStore::new(path)?))
228 }
229 },
230 #[cfg(not(feature = "lmdb"))]
231 StorageBackend::Lmdb => {
232 tracing::warn!(
233 "LMDB backend requested but lmdb feature not enabled, using filesystem storage"
234 );
235 Ok(LocalStore::Fs(FsBlobStore::new(path)?))
236 }
237 }
238 }
239
240 pub fn new_unbounded<P: AsRef<Path>>(
242 path: P,
243 backend: &StorageBackend,
244 ) -> Result<Self, StoreError> {
245 Self::new_with_lmdb_map_size(path, backend, None)
246 }
247
248 pub fn backend(&self) -> StorageBackend {
249 match self {
250 LocalStore::Fs(_) => StorageBackend::Fs,
251 #[cfg(feature = "lmdb")]
252 LocalStore::Lmdb(_) => StorageBackend::Lmdb,
253 }
254 }
255
256 pub fn put_sync(&self, hash: Hash, data: &[u8]) -> Result<bool, StoreError> {
258 match self {
259 LocalStore::Fs(store) => store.put_sync(hash, data),
260 #[cfg(feature = "lmdb")]
261 LocalStore::Lmdb(store) => store.put_sync(hash, data),
262 }
263 }
264
265 pub fn put_many_sync(&self, items: &[(Hash, Vec<u8>)]) -> Result<usize, StoreError> {
267 match self {
268 LocalStore::Fs(store) => {
269 let mut inserted = 0usize;
270 for (hash, data) in items {
271 if store.put_sync(*hash, data.as_slice())? {
272 inserted += 1;
273 }
274 }
275 Ok(inserted)
276 }
277 #[cfg(feature = "lmdb")]
278 LocalStore::Lmdb(store) => store.put_many_sync(items),
279 }
280 }
281
282 pub fn get_sync(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
284 match self {
285 LocalStore::Fs(store) => store.get_sync(hash),
286 #[cfg(feature = "lmdb")]
287 LocalStore::Lmdb(store) => store.get_sync(hash),
288 }
289 }
290
291 pub fn get_range_sync(
292 &self,
293 hash: &Hash,
294 start: u64,
295 end_inclusive: u64,
296 ) -> Result<Option<Vec<u8>>, StoreError> {
297 match self {
298 LocalStore::Fs(store) => store.get_range_sync(hash, start, end_inclusive),
299 #[cfg(feature = "lmdb")]
300 LocalStore::Lmdb(store) => store.get_range_sync(hash, start, end_inclusive),
301 }
302 }
303
304 pub fn blob_size_sync(&self, hash: &Hash) -> Result<Option<u64>, StoreError> {
305 match self {
306 LocalStore::Fs(store) => store.blob_size_sync(hash),
307 #[cfg(feature = "lmdb")]
308 LocalStore::Lmdb(store) => store.blob_size_sync(hash),
309 }
310 }
311
312 pub fn touch_accessed_sync(&self, hash: &Hash, now: u64) -> Result<bool, StoreError> {
313 match self {
314 LocalStore::Fs(store) => store.touch_accessed_sync(hash, now),
315 #[cfg(feature = "lmdb")]
316 LocalStore::Lmdb(store) => store.touch_accessed_sync(hash, now),
317 }
318 }
319
320 pub fn touch_many_accessed_sync(&self, hashes: &[Hash], now: u64) -> Result<usize, StoreError> {
321 match self {
322 LocalStore::Fs(store) => store.touch_many_accessed_sync(hashes, now),
323 #[cfg(feature = "lmdb")]
324 LocalStore::Lmdb(store) => store.touch_many_accessed_sync(hashes, now),
325 }
326 }
327
328 pub fn last_accessed_at_sync(&self, hash: &Hash) -> Result<Option<u64>, StoreError> {
329 match self {
330 LocalStore::Fs(store) => store.last_accessed_at_sync(hash),
331 #[cfg(feature = "lmdb")]
332 LocalStore::Lmdb(store) => store.last_accessed_at_sync(hash),
333 }
334 }
335
336 pub fn many_last_accessed_at_sync(
337 &self,
338 hashes: &[Hash],
339 ) -> Result<Vec<(Hash, u64)>, StoreError> {
340 match self {
341 LocalStore::Fs(store) => store.many_last_accessed_at_sync(hashes),
342 #[cfg(feature = "lmdb")]
343 LocalStore::Lmdb(store) => store.many_last_accessed_at_sync(hashes),
344 }
345 }
346
347 pub fn exists(&self, hash: &Hash) -> Result<bool, StoreError> {
349 match self {
350 LocalStore::Fs(store) => Ok(store.exists(hash)),
351 #[cfg(feature = "lmdb")]
352 LocalStore::Lmdb(store) => store.exists(hash),
353 }
354 }
355
356 pub fn existing_hashes_in_sorted_candidates(
358 &self,
359 sorted_hashes: &[Hash],
360 ) -> Result<Vec<bool>, StoreError> {
361 match self {
362 LocalStore::Fs(store) => Ok(sorted_hashes
363 .iter()
364 .map(|hash| store.exists(hash))
365 .collect()),
366 #[cfg(feature = "lmdb")]
367 LocalStore::Lmdb(store) => store.existing_hashes_in_sorted_candidates(sorted_hashes),
368 }
369 }
370
371 pub fn delete_sync(&self, hash: &Hash) -> Result<bool, StoreError> {
373 match self {
374 LocalStore::Fs(store) => store.delete_sync(hash),
375 #[cfg(feature = "lmdb")]
376 LocalStore::Lmdb(store) => store.delete_sync(hash),
377 }
378 }
379
380 pub fn stats(&self) -> Result<LocalStoreStats, StoreError> {
382 match self {
383 LocalStore::Fs(store) => {
384 let stats = store.stats()?;
385 Ok(LocalStoreStats {
386 count: stats.count,
387 total_bytes: stats.total_bytes,
388 })
389 }
390 #[cfg(feature = "lmdb")]
391 LocalStore::Lmdb(store) => {
392 let stats = store.stats()?;
393 Ok(LocalStoreStats {
394 count: stats.count,
395 total_bytes: stats.total_bytes,
396 })
397 }
398 }
399 }
400
401 pub fn list(&self) -> Result<Vec<Hash>, StoreError> {
403 match self {
404 LocalStore::Fs(store) => store.list(),
405 #[cfg(feature = "lmdb")]
406 LocalStore::Lmdb(store) => store.list(),
407 }
408 }
409}
410
411#[async_trait]
412impl Store for LocalStore {
413 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
414 self.put_sync(hash, &data)
415 }
416
417 async fn put_many(&self, items: Vec<(Hash, Vec<u8>)>) -> Result<usize, StoreError> {
418 self.put_many_sync(&items)
419 }
420
421 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
422 self.get_sync(hash)
423 }
424
425 async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
426 self.exists(hash)
427 }
428
429 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
430 self.delete_sync(hash)
431 }
432}
433
434#[cfg(feature = "s3")]
435use tokio::sync::mpsc;
436
437use crate::config::S3Config;
438
439#[cfg(feature = "s3")]
441enum S3SyncMessage {
442 Upload { hash: Hash, data: Vec<u8> },
443 Delete { hash: Hash },
444}
445
446pub struct StorageRouter {
451 local: Arc<LocalStore>,
453 #[cfg(feature = "s3")]
455 s3_client: Option<aws_sdk_s3::Client>,
456 #[cfg(feature = "s3")]
457 s3_bucket: Option<String>,
458 #[cfg(feature = "s3")]
459 s3_prefix: String,
460 #[cfg(feature = "s3")]
462 sync_tx: Option<mpsc::UnboundedSender<S3SyncMessage>>,
463}
464
465impl StorageRouter {
466 #[cfg(feature = "s3")]
467 fn s3_sync_timeout() -> std::time::Duration {
468 let millis = std::env::var(S3_SYNC_TIMEOUT_MS_ENV)
469 .ok()
470 .and_then(|value| value.parse::<u64>().ok())
471 .filter(|value| *value > 0)
472 .unwrap_or(DEFAULT_S3_SYNC_TIMEOUT_MS);
473 std::time::Duration::from_millis(millis)
474 }
475
476 #[cfg(feature = "s3")]
477 fn s3_sync_timeout_error(timeout: std::time::Duration) -> StoreError {
478 StoreError::Other(format!(
479 "S3 sync operation timed out after {}ms",
480 timeout.as_millis()
481 ))
482 }
483
484 #[cfg(feature = "s3")]
485 fn run_s3_future_sync<F, T>(future: F) -> Result<T, StoreError>
486 where
487 F: Future<Output = T> + Send + 'static,
488 T: Send + 'static,
489 {
490 let timeout = Self::s3_sync_timeout();
491 if tokio::runtime::Handle::try_current().is_ok() {
492 return std::thread::Builder::new()
493 .name("storage-s3-sync".to_string())
494 .spawn(move || {
495 let runtime = tokio::runtime::Builder::new_current_thread()
496 .enable_all()
497 .build()
498 .map_err(|err| {
499 StoreError::Other(format!("build storage s3 sync runtime: {err}"))
500 })?;
501 runtime.block_on(async move {
502 tokio::time::timeout(timeout, future)
503 .await
504 .map_err(|_| Self::s3_sync_timeout_error(timeout))
505 })
506 })
507 .map_err(|err| StoreError::Other(format!("spawn S3 sync helper thread: {err}")))?
508 .join()
509 .map_err(|_| StoreError::Other("S3 sync helper thread panicked".to_string()))?;
510 }
511
512 let runtime = tokio::runtime::Builder::new_current_thread()
513 .enable_all()
514 .build()
515 .map_err(|err| StoreError::Other(format!("build storage s3 sync runtime: {err}")))?;
516 runtime.block_on(async move {
517 tokio::time::timeout(timeout, future)
518 .await
519 .map_err(|_| Self::s3_sync_timeout_error(timeout))
520 })
521 }
522
523 pub fn new(local: Arc<LocalStore>) -> Self {
525 Self {
526 local,
527 #[cfg(feature = "s3")]
528 s3_client: None,
529 #[cfg(feature = "s3")]
530 s3_bucket: None,
531 #[cfg(feature = "s3")]
532 s3_prefix: String::new(),
533 #[cfg(feature = "s3")]
534 sync_tx: None,
535 }
536 }
537
538 #[cfg(feature = "s3")]
540 pub async fn with_s3(local: Arc<LocalStore>, config: &S3Config) -> Result<Self, anyhow::Error> {
541 use aws_sdk_s3::Client as S3Client;
542
543 let mut aws_config_loader = aws_config::from_env();
545 aws_config_loader =
546 aws_config_loader.region(aws_sdk_s3::config::Region::new(config.region.clone()));
547 let aws_config = aws_config_loader.load().await;
548
549 let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&aws_config);
551 s3_config_builder = s3_config_builder
552 .endpoint_url(&config.endpoint)
553 .force_path_style(true);
554
555 let s3_client = S3Client::from_conf(s3_config_builder.build());
556 let bucket = config.bucket.clone();
557 let prefix = config.prefix.clone().unwrap_or_default();
558
559 let (sync_tx, mut sync_rx) = mpsc::unbounded_channel::<S3SyncMessage>();
561
562 let sync_client = s3_client.clone();
564 let sync_bucket = bucket.clone();
565 let sync_prefix = prefix.clone();
566
567 tokio::spawn(async move {
568 use aws_sdk_s3::primitives::ByteStream;
569
570 tracing::info!("S3 background sync task started");
571
572 let semaphore = std::sync::Arc::new(tokio::sync::Semaphore::new(8));
574 let client = std::sync::Arc::new(sync_client);
575 let bucket = std::sync::Arc::new(sync_bucket);
576 let prefix = std::sync::Arc::new(sync_prefix);
577
578 while let Some(msg) = sync_rx.recv().await {
579 let client = client.clone();
580 let bucket = bucket.clone();
581 let prefix = prefix.clone();
582 let semaphore = semaphore.clone();
583
584 tokio::spawn(async move {
586 let _permit = semaphore.acquire().await;
588
589 match msg {
590 S3SyncMessage::Upload { hash, data } => {
591 let key = format!("{}{}.bin", prefix, to_hex(&hash));
592 tracing::debug!("S3 uploading {} ({} bytes)", &key, data.len());
593
594 let mut attempt = 1u8;
595 loop {
596 match client
597 .put_object()
598 .bucket(bucket.as_str())
599 .key(&key)
600 .body(ByteStream::from(data.clone()))
601 .send()
602 .await
603 {
604 Ok(_) => {
605 tracing::debug!("S3 upload succeeded: {}", &key);
606 break;
607 }
608 Err(e) if attempt < 3 => {
609 tracing::warn!(
610 "S3 upload retrying {}: attempt={} error={}",
611 &key,
612 attempt,
613 e
614 );
615 tokio::time::sleep(std::time::Duration::from_millis(
616 250 * u64::from(attempt),
617 ))
618 .await;
619 attempt += 1;
620 }
621 Err(e) => {
622 tracing::error!(
623 "S3 upload failed {} after {} attempts: {}",
624 &key,
625 attempt,
626 e
627 );
628 break;
629 }
630 }
631 }
632 }
633 S3SyncMessage::Delete { hash } => {
634 let key = format!("{}{}.bin", prefix, to_hex(&hash));
635 tracing::debug!("S3 deleting {}", &key);
636
637 let mut attempt = 1u8;
638 loop {
639 match client
640 .delete_object()
641 .bucket(bucket.as_str())
642 .key(&key)
643 .send()
644 .await
645 {
646 Ok(_) => break,
647 Err(e) if attempt < 3 => {
648 tracing::warn!(
649 "S3 delete retrying {}: attempt={} error={}",
650 &key,
651 attempt,
652 e
653 );
654 tokio::time::sleep(std::time::Duration::from_millis(
655 250 * u64::from(attempt),
656 ))
657 .await;
658 attempt += 1;
659 }
660 Err(e) => {
661 tracing::error!(
662 "S3 delete failed {} after {} attempts: {}",
663 &key,
664 attempt,
665 e
666 );
667 break;
668 }
669 }
670 }
671 }
672 }
673 });
674 }
675 });
676
677 tracing::info!(
678 "S3 storage initialized: bucket={}, prefix={}",
679 bucket,
680 prefix
681 );
682
683 Ok(Self {
684 local,
685 s3_client: Some(s3_client),
686 s3_bucket: Some(bucket),
687 s3_prefix: prefix,
688 sync_tx: Some(sync_tx),
689 })
690 }
691
692 pub fn put_sync(&self, hash: Hash, data: &[u8]) -> Result<bool, StoreError> {
694 let is_new = self.local.put_sync(hash, data)?;
696
697 #[cfg(feature = "s3")]
700 if is_new {
701 if let Some(ref tx) = self.sync_tx {
702 tracing::debug!(
703 "Queueing S3 upload for {} ({} bytes)",
704 crate::storage::to_hex(&hash)[..16].to_string(),
705 data.len(),
706 );
707 if let Err(e) = tx.send(S3SyncMessage::Upload {
708 hash,
709 data: data.to_vec(),
710 }) {
711 tracing::error!("Failed to queue S3 upload: {}", e);
712 }
713 }
714 }
715
716 Ok(is_new)
717 }
718
719 pub fn put_many_sync(&self, items: &[(Hash, Vec<u8>)]) -> Result<usize, StoreError> {
721 #[cfg(feature = "s3")]
722 let pending_uploads = if self.sync_tx.is_some() {
723 let mut pending = Vec::new();
724 for (hash, data) in items {
725 if !self.local.exists(hash)? {
726 pending.push((*hash, data.clone()));
727 }
728 }
729 pending
730 } else {
731 Vec::new()
732 };
733
734 let inserted = self.local.put_many_sync(items)?;
735
736 #[cfg(feature = "s3")]
737 if let Some(ref tx) = self.sync_tx {
738 for (hash, data) in pending_uploads {
739 if let Err(e) = tx.send(S3SyncMessage::Upload { hash, data }) {
740 tracing::error!("Failed to queue S3 upload: {}", e);
741 }
742 }
743 }
744
745 Ok(inserted)
746 }
747
748 pub fn get_sync(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
750 if let Some(data) = self.local.get_sync(hash)? {
752 return Ok(Some(data));
753 }
754
755 #[cfg(feature = "s3")]
757 if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
758 let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
759 let client = client.clone();
760 let bucket = bucket.clone();
761
762 match Self::run_s3_future_sync(async move {
763 client.get_object().bucket(bucket).key(key).send().await
764 }) {
765 Ok(Ok(output)) => {
766 match Self::run_s3_future_sync(async move { output.body.collect().await }) {
767 Ok(Ok(body)) => {
768 let data = body.into_bytes().to_vec();
769 let _ = self.local.put_sync(*hash, &data);
771 return Ok(Some(data));
772 }
773 Ok(Err(err)) => {
774 tracing::warn!("S3 body collect failed: {}", err);
775 }
776 Err(err) => {
777 tracing::warn!("S3 body collect runtime failed: {}", err);
778 }
779 }
780 }
781 Ok(Err(err)) => {
782 let service_err = err.into_service_error();
783 if !service_err.is_no_such_key() {
784 tracing::warn!("S3 get failed: {}", service_err);
785 }
786 }
787 Err(err) => {
788 tracing::warn!("S3 get runtime failed: {}", err);
789 }
790 }
791 }
792
793 Ok(None)
794 }
795
796 pub fn get_range_sync(
797 &self,
798 hash: &Hash,
799 start: u64,
800 end_inclusive: u64,
801 ) -> Result<Option<Vec<u8>>, StoreError> {
802 self.local.get_range_sync(hash, start, end_inclusive)
803 }
804
805 pub fn blob_size_sync(&self, hash: &Hash) -> Result<Option<u64>, StoreError> {
806 self.local.blob_size_sync(hash)
807 }
808
809 pub fn touch_accessed_sync(&self, hash: &Hash, now: u64) -> Result<bool, StoreError> {
810 self.local.touch_accessed_sync(hash, now)
811 }
812
813 pub fn touch_many_accessed_sync(&self, hashes: &[Hash], now: u64) -> Result<usize, StoreError> {
814 self.local.touch_many_accessed_sync(hashes, now)
815 }
816
817 pub fn last_accessed_at_sync(&self, hash: &Hash) -> Result<Option<u64>, StoreError> {
818 self.local.last_accessed_at_sync(hash)
819 }
820
821 pub fn many_last_accessed_at_sync(
822 &self,
823 hashes: &[Hash],
824 ) -> Result<Vec<(Hash, u64)>, StoreError> {
825 self.local.many_last_accessed_at_sync(hashes)
826 }
827
828 pub fn exists(&self, hash: &Hash) -> Result<bool, StoreError> {
830 if self.local.exists(hash)? {
832 return Ok(true);
833 }
834
835 #[cfg(feature = "s3")]
837 if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
838 let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
839 let client = client.clone();
840 let bucket = bucket.clone();
841
842 match Self::run_s3_future_sync(async move {
843 client.head_object().bucket(bucket).key(&key).send().await
844 }) {
845 Ok(Ok(_)) => return Ok(true),
846 Ok(Err(err)) => {
847 let service_err = err.into_service_error();
848 if !service_err.is_not_found() {
849 tracing::warn!("S3 head failed: {}", service_err);
850 }
851 }
852 Err(err) => {
853 tracing::warn!("S3 head runtime failed: {}", err);
854 }
855 }
856 }
857
858 Ok(false)
859 }
860
861 pub fn delete_sync(&self, hash: &Hash) -> Result<bool, StoreError> {
863 let deleted = self.local.delete_sync(hash)?;
864
865 #[cfg(feature = "s3")]
867 if let Some(ref tx) = self.sync_tx {
868 let _ = tx.send(S3SyncMessage::Delete { hash: *hash });
869 }
870
871 Ok(deleted)
872 }
873
874 pub fn delete_local_only(&self, hash: &Hash) -> Result<bool, StoreError> {
877 self.local.delete_sync(hash)
878 }
879
880 pub fn stats(&self) -> Result<LocalStoreStats, StoreError> {
882 self.local.stats()
883 }
884
885 pub fn list(&self) -> Result<Vec<Hash>, StoreError> {
887 self.local.list()
888 }
889
890 pub fn local_store(&self) -> Arc<LocalStore> {
892 Arc::clone(&self.local)
893 }
894}
895
896#[derive(Clone)]
897struct AccessRecordingStore {
898 inner: Arc<StorageRouter>,
899 accessed: Arc<Mutex<HashSet<Hash>>>,
900}
901
902impl AccessRecordingStore {
903 fn new(inner: Arc<StorageRouter>) -> Self {
904 Self {
905 inner,
906 accessed: Arc::new(Mutex::new(HashSet::new())),
907 }
908 }
909
910 fn take_accessed_hashes(&self) -> Vec<Hash> {
911 let Ok(mut accessed) = self.accessed.lock() else {
912 return Vec::new();
913 };
914 accessed.drain().collect()
915 }
916
917 fn record_access(&self, hash: &Hash) {
918 let Ok(mut accessed) = self.accessed.lock() else {
919 return;
920 };
921 accessed.insert(*hash);
922 }
923}
924
925#[async_trait]
926impl Store for AccessRecordingStore {
927 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
928 self.inner.put(hash, data).await
929 }
930
931 async fn put_many(&self, items: Vec<(Hash, Vec<u8>)>) -> Result<usize, StoreError> {
932 self.inner.put_many(items).await
933 }
934
935 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
936 let data = self.inner.get(hash).await?;
937 if data.is_some() {
938 self.record_access(hash);
939 }
940 Ok(data)
941 }
942
943 async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
944 self.inner.has(hash).await
945 }
946
947 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
948 self.inner.delete(hash).await
949 }
950}
951
952#[async_trait]
955impl Store for StorageRouter {
956 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
957 self.put_sync(hash, &data)
958 }
959
960 async fn put_many(&self, items: Vec<(Hash, Vec<u8>)>) -> Result<usize, StoreError> {
961 self.put_many_sync(&items)
962 }
963
964 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
965 self.get_sync(hash)
966 }
967
968 async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
969 self.exists(hash)
970 }
971
972 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
973 self.delete_sync(hash)
974 }
975}
976
977pub struct HashtreeStore {
978 base_path: PathBuf,
979 env: heed::Env,
980 pins: Database<Bytes, Unit>,
982 pinned_refs: Database<Str, Unit>,
984 tracked_authors: Database<Str, Unit>,
986 blob_owners: Database<Bytes, Unit>,
988 pubkey_blobs: Database<Bytes, Bytes>,
990 pubkey_blob_index: Database<Bytes, Bytes>,
992 tree_meta: Database<Bytes, Bytes>,
994 blob_trees: Database<Bytes, Unit>,
996 tree_refs: Database<Str, Bytes>,
998 cached_roots: Database<Str, Bytes>,
1000 router: Arc<StorageRouter>,
1002 max_size_bytes: u64,
1004 evict_orphans: bool,
1006 blob_access_update_gate: BlobAccessUpdateGate,
1008 blob_access_update_inflight: Arc<AtomicBool>,
1010}
1011
1012impl HashtreeStore {
1013 pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
1015 let config = hashtree_config::Config::load_or_default();
1016 let max_size_bytes = config
1017 .storage
1018 .max_size_gb
1019 .saturating_mul(1024 * 1024 * 1024);
1020 Self::with_options_and_backend(
1021 path,
1022 None,
1023 max_size_bytes,
1024 config.storage.evict_orphans,
1025 &config.storage.backend,
1026 )
1027 }
1028
1029 pub fn new_with_backend<P: AsRef<Path>>(
1031 path: P,
1032 backend: hashtree_config::StorageBackend,
1033 max_size_bytes: u64,
1034 ) -> Result<Self> {
1035 Self::with_options_and_backend(path, None, max_size_bytes, true, &backend)
1036 }
1037
1038 pub fn with_s3<P: AsRef<Path>>(path: P, s3_config: Option<&S3Config>) -> Result<Self> {
1040 let config = hashtree_config::Config::load_or_default();
1041 let max_size_bytes = config
1042 .storage
1043 .max_size_gb
1044 .saturating_mul(1024 * 1024 * 1024);
1045 Self::with_options_and_backend(
1046 path,
1047 s3_config,
1048 max_size_bytes,
1049 config.storage.evict_orphans,
1050 &config.storage.backend,
1051 )
1052 }
1053
1054 pub fn with_options<P: AsRef<Path>>(
1060 path: P,
1061 s3_config: Option<&S3Config>,
1062 max_size_bytes: u64,
1063 ) -> Result<Self> {
1064 let config = hashtree_config::Config::load_or_default();
1065 Self::with_options_and_backend(
1066 path,
1067 s3_config,
1068 max_size_bytes,
1069 config.storage.evict_orphans,
1070 &config.storage.backend,
1071 )
1072 }
1073
1074 pub fn with_options_and_backend<P: AsRef<Path>>(
1075 path: P,
1076 s3_config: Option<&S3Config>,
1077 max_size_bytes: u64,
1078 evict_orphans: bool,
1079 backend: &hashtree_config::StorageBackend,
1080 ) -> Result<Self> {
1081 let path = path.as_ref();
1082 std::fs::create_dir_all(path)?;
1083 let metadata_map_size = lmdb_map_size_for_existing_env(
1084 path,
1085 lmdb_metadata_map_size_for_storage_budget(max_size_bytes),
1086 )?;
1087
1088 let env = unsafe {
1089 EnvOpenOptions::new()
1090 .map_size(metadata_map_size)
1091 .max_dbs(11) .max_readers(LMDB_MAX_READERS)
1093 .open(path)?
1094 };
1095 let _ = env.clear_stale_readers();
1096 if env.info().map_size < metadata_map_size {
1097 unsafe { env.resize(metadata_map_size) }?;
1098 }
1099
1100 let mut wtxn = env.write_txn()?;
1101 let pins = env.create_database(&mut wtxn, Some("pins"))?;
1102 let pinned_refs = env.create_database(&mut wtxn, Some("pinned_refs"))?;
1103 let tracked_authors = env.create_database(&mut wtxn, Some("tracked_authors"))?;
1104 let blob_owners = env.create_database(&mut wtxn, Some("blob_owners"))?;
1105 let pubkey_blobs = env.create_database(&mut wtxn, Some("pubkey_blobs"))?;
1106 let pubkey_blob_index = env.create_database(&mut wtxn, Some("pubkey_blob_index"))?;
1107 let tree_meta = env.create_database(&mut wtxn, Some("tree_meta"))?;
1108 let blob_trees = env.create_database(&mut wtxn, Some("blob_trees"))?;
1109 let tree_refs = env.create_database(&mut wtxn, Some("tree_refs"))?;
1110 let cached_roots = env.create_database(&mut wtxn, Some("cached_roots"))?;
1111 wtxn.commit()?;
1112
1113 let local_store = Arc::new(match backend {
1117 hashtree_config::StorageBackend::Fs => LocalStore::Fs(
1118 FsBlobStore::new(path.join("blobs"))
1119 .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
1120 ),
1121 #[cfg(feature = "lmdb")]
1122 hashtree_config::StorageBackend::Lmdb => {
1123 std::fs::create_dir_all(path.join("blobs"))?;
1124 remove_stale_fs_blob_shards(&path.join("blobs"))
1125 .map_err(|e| anyhow::anyhow!("Failed to clean LMDB blob store path: {}", e))?;
1126 let requested_map_size = max_size_bytes.max(LMDB_BLOB_MIN_MAP_SIZE_BYTES);
1127 let map_size = usize::try_from(requested_map_size)
1128 .context("LMDB blob map size exceeds usize")?;
1129 LocalStore::Lmdb(
1130 LmdbBlobStore::with_map_size(path.join("blobs"), map_size)
1131 .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
1132 )
1133 }
1134 #[cfg(not(feature = "lmdb"))]
1135 hashtree_config::StorageBackend::Lmdb => {
1136 tracing::warn!(
1137 "LMDB backend requested but lmdb feature not enabled, using filesystem storage"
1138 );
1139 LocalStore::Fs(
1140 FsBlobStore::new(path.join("blobs"))
1141 .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
1142 )
1143 }
1144 });
1145
1146 #[cfg(feature = "s3")]
1148 let router = Arc::new(if let Some(s3_cfg) = s3_config {
1149 tracing::info!(
1150 "Initializing S3 storage backend: bucket={}, endpoint={}",
1151 s3_cfg.bucket,
1152 s3_cfg.endpoint
1153 );
1154
1155 sync_block_on(async { StorageRouter::with_s3(local_store, s3_cfg).await })?
1156 } else {
1157 StorageRouter::new(local_store)
1158 });
1159
1160 #[cfg(not(feature = "s3"))]
1161 let router = Arc::new({
1162 if s3_config.is_some() {
1163 tracing::warn!(
1164 "S3 config provided but S3 feature not enabled. Using local storage only."
1165 );
1166 }
1167 StorageRouter::new(local_store)
1168 });
1169
1170 Ok(Self {
1171 base_path: path.to_path_buf(),
1172 env,
1173 pins,
1174 pinned_refs,
1175 tracked_authors,
1176 blob_owners,
1177 pubkey_blobs,
1178 pubkey_blob_index,
1179 tree_meta,
1180 blob_trees,
1181 tree_refs,
1182 cached_roots,
1183 router,
1184 max_size_bytes,
1185 evict_orphans,
1186 blob_access_update_gate: BlobAccessUpdateGate::default(),
1187 blob_access_update_inflight: Arc::new(AtomicBool::new(false)),
1188 })
1189 }
1190
1191 pub fn base_path(&self) -> &Path {
1192 &self.base_path
1193 }
1194
1195 pub fn router(&self) -> &StorageRouter {
1197 &self.router
1198 }
1199
1200 pub fn store_arc(&self) -> Arc<StorageRouter> {
1203 Arc::clone(&self.router)
1204 }
1205
1206 fn access_tracking_tree(&self) -> (HashTree<AccessRecordingStore>, AccessRecordingStore) {
1207 let access_store = AccessRecordingStore::new(self.store_arc());
1208 let tree = HashTree::new(HashTreeConfig::new(Arc::new(access_store.clone())).public());
1209 (tree, access_store)
1210 }
1211
1212 pub fn record_blob_accesses<I>(&self, hashes: I)
1213 where
1214 I: IntoIterator<Item = Hash>,
1215 {
1216 let now = unix_timestamp_now();
1217 let mut due_hashes = self.blob_access_update_gate.due_hashes(hashes, now);
1218 if due_hashes.is_empty() {
1219 return;
1220 }
1221
1222 if self
1223 .blob_access_update_inflight
1224 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
1225 .is_err()
1226 {
1227 return;
1228 }
1229
1230 if due_hashes.len() > ACCESS_UPDATE_BACKGROUND_BATCH_LIMIT {
1231 due_hashes.truncate(ACCESS_UPDATE_BACKGROUND_BATCH_LIMIT);
1232 }
1233
1234 let router = Arc::clone(&self.router);
1235 let inflight = Arc::clone(&self.blob_access_update_inflight);
1236 let spawn_result = std::thread::Builder::new()
1237 .name("blob-access-update".to_string())
1238 .spawn(move || {
1239 if let Err(err) = router.touch_many_accessed_sync(&due_hashes, now) {
1240 tracing::debug!("Failed to update blob access metadata: {}", err);
1241 }
1242 inflight.store(false, Ordering::Release);
1243 });
1244 if let Err(err) = spawn_result {
1245 self.blob_access_update_inflight
1246 .store(false, Ordering::Release);
1247 tracing::debug!("Failed to spawn blob access metadata updater: {}", err);
1248 }
1249 }
1250
1251 fn record_blob_access_now(&self, hash: &Hash) {
1252 if let Err(err) = self.router.touch_accessed_sync(hash, unix_timestamp_now()) {
1253 tracing::debug!("Failed to update blob access metadata: {}", err);
1254 }
1255 }
1256
1257 pub fn blob_last_accessed_at(&self, hash: &Hash) -> Result<Option<u64>> {
1258 self.router
1259 .last_accessed_at_sync(hash)
1260 .map_err(|e| anyhow::anyhow!("Failed to read blob access metadata: {}", e))
1261 }
1262
1263 pub fn blob_last_accessed_many(&self, hashes: &[Hash]) -> Result<Vec<(Hash, u64)>> {
1264 self.router
1265 .many_last_accessed_at_sync(hashes)
1266 .map_err(|e| anyhow::anyhow!("Failed to read blob access metadata: {}", e))
1267 }
1268
1269 pub fn get_tree_node(&self, hash: &[u8; 32]) -> Result<Option<TreeNode>> {
1271 let (tree, access_store) = self.access_tracking_tree();
1272
1273 let result = sync_block_on(async {
1274 tree.get_tree_node(hash)
1275 .await
1276 .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))
1277 })?;
1278 if result.is_some() {
1279 self.record_blob_accesses(access_store.take_accessed_hashes());
1280 }
1281 Ok(result)
1282 }
1283
1284 pub fn put_blob(&self, data: &[u8]) -> Result<String> {
1286 let hash = sha256(data);
1287 let inserted = self
1288 .router
1289 .put_sync(hash, data)
1290 .map_err(|e| anyhow::anyhow!("Failed to store blob: {}", e))?;
1291 if !inserted {
1292 self.record_blob_access_now(&hash);
1293 }
1294 Ok(to_hex(&hash))
1295 }
1296
1297 pub fn put_owned_blob(&self, data: &[u8], pubkey: &[u8; 32]) -> Result<String> {
1299 let hash = sha256(data);
1300 if !self
1301 .router
1302 .exists(&hash)
1303 .map_err(|e| anyhow::anyhow!("Failed to check blob: {}", e))?
1304 {
1305 self.make_room_for_durable_blob(data.len() as u64)?;
1306 self.router
1307 .put_sync(hash, data)
1308 .map_err(|e| anyhow::anyhow!("Failed to store blob: {}", e))?;
1309 } else {
1310 self.record_blob_access_now(&hash);
1311 }
1312 self.set_blob_owner_with_size(&hash, pubkey, data.len() as u64)?;
1313 Ok(to_hex(&hash))
1314 }
1315
1316 pub fn put_owned_blobs(&self, items: &[(Hash, Vec<u8>)], pubkey: &[u8; 32]) -> Result<usize> {
1318 if items.is_empty() {
1319 return Ok(0);
1320 }
1321 let incoming_bytes = items
1322 .iter()
1323 .fold(0u64, |total, (_, data)| total.saturating_add(data.len() as u64));
1324 self.make_room_for_durable_blob(incoming_bytes)?;
1325 let inserted = self
1326 .router
1327 .put_many_sync(items)
1328 .map_err(|e| anyhow::anyhow!("Failed to store blob batch: {}", e))?;
1329
1330 let now = SystemTime::now()
1331 .duration_since(UNIX_EPOCH)
1332 .unwrap()
1333 .as_secs();
1334 let mut wtxn = self.env.write_txn()?;
1335 for (hash, data) in items {
1336 let owner_key = Self::blob_owner_key(hash, pubkey);
1337 self.blob_owners.put(&mut wtxn, &owner_key[..], &())?;
1338
1339 let index_key = Self::pubkey_blob_key(pubkey, hash);
1340 if self.pubkey_blob_index.get(&wtxn, &index_key[..])?.is_none() {
1341 let metadata = BlobMetadata {
1342 sha256: to_hex(hash),
1343 size: data.len() as u64,
1344 mime_type: "application/octet-stream".to_string(),
1345 uploaded: now,
1346 };
1347 self.pubkey_blob_index
1348 .put(&mut wtxn, &index_key[..], &serde_json::to_vec(&metadata)?)?;
1349 }
1350 }
1351 wtxn.commit()?;
1352 Ok(inserted)
1353 }
1354
1355 pub fn put_cached_blob(&self, data: &[u8]) -> Result<String> {
1361 let hash = sha256(data);
1362 if self
1363 .router
1364 .exists(&hash)
1365 .map_err(|e| anyhow::anyhow!("Failed to check cached blob: {}", e))?
1366 {
1367 self.record_blob_access_now(&hash);
1368 return Ok(to_hex(&hash));
1369 }
1370
1371 let incoming_bytes = data.len() as u64;
1372 let _ = self.make_room_for_cached_blob(incoming_bytes);
1373
1374 let mut retried_after_cleanup = false;
1375 loop {
1376 match self.router.put_sync(hash, data) {
1377 Ok(_) => return Ok(to_hex(&hash)),
1378 Err(err) if !retried_after_cleanup && is_map_full_store_error(&err) => {
1379 let freed = self.relieve_cached_blob_write_pressure(incoming_bytes)?;
1380 if freed == 0 {
1381 return Err(anyhow::anyhow!("Failed to store cached blob: {}", err));
1382 }
1383 retried_after_cleanup = true;
1384 }
1385 Err(err) => return Err(anyhow::anyhow!("Failed to store cached blob: {}", err)),
1386 }
1387 }
1388 }
1389
1390 pub fn get_blob(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
1392 let data = self
1393 .router
1394 .get_sync(hash)
1395 .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))?;
1396 if data.is_some() {
1397 self.record_blob_accesses(std::iter::once(*hash));
1398 }
1399 Ok(data)
1400 }
1401
1402 pub fn get_blob_range(
1403 &self,
1404 hash: &[u8; 32],
1405 start: u64,
1406 end_inclusive: u64,
1407 ) -> Result<Option<Vec<u8>>> {
1408 let data = self
1409 .router
1410 .get_range_sync(hash, start, end_inclusive)
1411 .map_err(|e| anyhow::anyhow!("Failed to get blob range: {}", e))?;
1412 if data.is_some() {
1413 self.record_blob_accesses(std::iter::once(*hash));
1414 }
1415 Ok(data)
1416 }
1417
1418 pub fn blob_size(&self, hash: &[u8; 32]) -> Result<Option<u64>> {
1419 self.router
1420 .blob_size_sync(hash)
1421 .map_err(|e| anyhow::anyhow!("Failed to get blob size: {}", e))
1422 }
1423
1424 pub fn blob_exists(&self, hash: &[u8; 32]) -> Result<bool> {
1426 self.router
1427 .exists(hash)
1428 .map_err(|e| anyhow::anyhow!("Failed to check blob: {}", e))
1429 }
1430
1431 fn blob_owner_key(sha256: &[u8; 32], pubkey: &[u8; 32]) -> [u8; 64] {
1437 let mut key = [0u8; 64];
1438 key[..32].copy_from_slice(sha256);
1439 key[32..].copy_from_slice(pubkey);
1440 key
1441 }
1442
1443 fn pubkey_blob_key(pubkey: &[u8; 32], sha256: &[u8; 32]) -> [u8; 64] {
1444 let mut key = [0u8; 64];
1445 key[..32].copy_from_slice(pubkey);
1446 key[32..].copy_from_slice(sha256);
1447 key
1448 }
1449
1450 pub fn set_blob_owner(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<()> {
1453 let size = self
1454 .router
1455 .blob_size_sync(sha256)
1456 .map_err(|e| anyhow::anyhow!("Failed to get blob size: {}", e))?
1457 .unwrap_or(0);
1458 self.set_blob_owner_with_size(sha256, pubkey, size)
1459 }
1460
1461 fn set_blob_owner_with_size(
1462 &self,
1463 sha256: &[u8; 32],
1464 pubkey: &[u8; 32],
1465 size: u64,
1466 ) -> Result<()> {
1467 let key = Self::blob_owner_key(sha256, pubkey);
1468 let index_key = Self::pubkey_blob_key(pubkey, sha256);
1469 let mut wtxn = self.env.write_txn()?;
1470
1471 self.blob_owners.put(&mut wtxn, &key[..], &())?;
1473
1474 if self.pubkey_blob_index.get(&wtxn, &index_key[..])?.is_none() {
1475 let now = SystemTime::now()
1476 .duration_since(UNIX_EPOCH)
1477 .unwrap()
1478 .as_secs();
1479 let metadata = BlobMetadata {
1480 sha256: to_hex(sha256),
1481 size,
1482 mime_type: "application/octet-stream".to_string(),
1483 uploaded: now,
1484 };
1485 self.pubkey_blob_index
1486 .put(&mut wtxn, &index_key[..], &serde_json::to_vec(&metadata)?)?;
1487 }
1488
1489 wtxn.commit()?;
1490 Ok(())
1491 }
1492
1493 pub fn is_blob_owner(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<bool> {
1495 let key = Self::blob_owner_key(sha256, pubkey);
1496 let rtxn = self.env.read_txn()?;
1497 Ok(self.blob_owners.get(&rtxn, &key[..])?.is_some())
1498 }
1499
1500 pub fn get_blob_owners(&self, sha256: &[u8; 32]) -> Result<Vec<[u8; 32]>> {
1502 let rtxn = self.env.read_txn()?;
1503
1504 let mut owners = Vec::new();
1505 for item in self.blob_owners.prefix_iter(&rtxn, &sha256[..])? {
1506 let (key, _) = item?;
1507 if key.len() == 64 {
1508 let mut pubkey = [0u8; 32];
1510 pubkey.copy_from_slice(&key[32..64]);
1511 owners.push(pubkey);
1512 }
1513 }
1514 Ok(owners)
1515 }
1516
1517 pub fn blob_has_owners(&self, sha256: &[u8; 32]) -> Result<bool> {
1519 let rtxn = self.env.read_txn()?;
1520
1521 for item in self.blob_owners.prefix_iter(&rtxn, &sha256[..])? {
1523 if item.is_ok() {
1524 return Ok(true);
1525 }
1526 }
1527 Ok(false)
1528 }
1529
1530 pub fn get_blob_owner(&self, sha256: &[u8; 32]) -> Result<Option<[u8; 32]>> {
1532 Ok(self.get_blob_owners(sha256)?.into_iter().next())
1533 }
1534
1535 pub fn delete_blossom_blob(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<bool> {
1539 let key = Self::blob_owner_key(sha256, pubkey);
1540 let mut wtxn = self.env.write_txn()?;
1541
1542 self.blob_owners.delete(&mut wtxn, &key[..])?;
1544 self.pubkey_blob_index
1545 .delete(&mut wtxn, &Self::pubkey_blob_key(pubkey, sha256)[..])?;
1546
1547 let sha256_hex = to_hex(sha256);
1549
1550 if let Some(blobs_bytes) = self.pubkey_blobs.get(&wtxn, pubkey)? {
1552 if let Ok(mut blobs) = serde_json::from_slice::<Vec<BlobMetadata>>(blobs_bytes) {
1553 blobs.retain(|b| b.sha256 != sha256_hex);
1554 let blobs_json = serde_json::to_vec(&blobs)?;
1555 self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
1556 }
1557 }
1558
1559 let mut has_other_owners = false;
1561 for item in self.blob_owners.prefix_iter(&wtxn, &sha256[..])? {
1562 if item.is_ok() {
1563 has_other_owners = true;
1564 break;
1565 }
1566 }
1567
1568 if has_other_owners {
1569 wtxn.commit()?;
1570 tracing::debug!(
1571 "Removed {} from blob {} owners, other owners remain",
1572 &to_hex(pubkey)[..8],
1573 &sha256_hex[..8]
1574 );
1575 return Ok(false);
1576 }
1577
1578 tracing::info!(
1580 "All owners removed from blob {}, deleting",
1581 &sha256_hex[..8]
1582 );
1583
1584 let _ = self.router.delete_sync(sha256);
1586
1587 wtxn.commit()?;
1588 Ok(true)
1589 }
1590
1591 pub fn list_blobs_by_pubkey(
1593 &self,
1594 pubkey: &[u8; 32],
1595 ) -> Result<Vec<crate::server::blossom::BlobDescriptor>> {
1596 let rtxn = self.env.read_txn()?;
1597
1598 let mut blobs: Vec<BlobMetadata> = self
1599 .pubkey_blobs
1600 .get(&rtxn, pubkey)?
1601 .and_then(|b| serde_json::from_slice(b).ok())
1602 .unwrap_or_default();
1603 let mut seen: HashSet<String> = blobs.iter().map(|blob| blob.sha256.clone()).collect();
1604
1605 for item in self.pubkey_blob_index.prefix_iter(&rtxn, pubkey)? {
1606 let (_, metadata_bytes) = item?;
1607 let metadata: BlobMetadata = match serde_json::from_slice(metadata_bytes) {
1608 Ok(metadata) => metadata,
1609 Err(_) => continue,
1610 };
1611 if seen.insert(metadata.sha256.clone()) {
1612 blobs.push(metadata);
1613 }
1614 }
1615
1616 Ok(blobs
1617 .into_iter()
1618 .map(|b| crate::server::blossom::BlobDescriptor {
1619 url: format!("/{}", b.sha256),
1620 sha256: b.sha256,
1621 size: b.size,
1622 mime_type: b.mime_type,
1623 uploaded: b.uploaded,
1624 })
1625 .collect())
1626 }
1627
1628 pub fn get_chunk(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
1630 let data = self
1631 .router
1632 .get_sync(hash)
1633 .map_err(|e| anyhow::anyhow!("Failed to get chunk: {}", e))?;
1634 if data.is_some() {
1635 self.record_blob_accesses(std::iter::once(*hash));
1636 }
1637 Ok(data)
1638 }
1639
1640 pub fn get_file(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
1643 let (tree, access_store) = self.access_tracking_tree();
1644
1645 let result = sync_block_on(async {
1646 tree.read_file(hash)
1647 .await
1648 .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
1649 })?;
1650 if result.is_some() {
1651 self.record_blob_accesses(access_store.take_accessed_hashes());
1652 }
1653 Ok(result)
1654 }
1655
1656 pub fn get_file_by_cid(&self, cid: &Cid) -> Result<Option<Vec<u8>>> {
1659 let (tree, access_store) = self.access_tracking_tree();
1660
1661 let result = sync_block_on(async {
1662 tree.get(cid, None)
1663 .await
1664 .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
1665 })?;
1666 if result.is_some() {
1667 self.record_blob_accesses(access_store.take_accessed_hashes());
1668 }
1669 Ok(result)
1670 }
1671
1672 fn ensure_cid_exists(&self, cid: &Cid) -> Result<()> {
1673 let exists = self
1674 .router
1675 .exists(&cid.hash)
1676 .map_err(|e| anyhow::anyhow!("Failed to check cid existence: {}", e))?;
1677 if !exists {
1678 anyhow::bail!("CID not found: {}", to_hex(&cid.hash));
1679 }
1680 Ok(())
1681 }
1682
1683 pub fn write_file_by_cid_to_writer<W: Write>(&self, cid: &Cid, writer: &mut W) -> Result<u64> {
1685 self.ensure_cid_exists(cid)?;
1686
1687 let (tree, access_store) = self.access_tracking_tree();
1688 let mut total_bytes = 0u64;
1689 let mut streamed_any_chunk = false;
1690
1691 sync_block_on(async {
1692 let mut stream = tree.get_stream(cid);
1693 while let Some(chunk) = stream.next().await {
1694 streamed_any_chunk = true;
1695 let chunk =
1696 chunk.map_err(|e| anyhow::anyhow!("Failed to stream file chunk: {}", e))?;
1697 writer
1698 .write_all(&chunk)
1699 .map_err(|e| anyhow::anyhow!("Failed to write file chunk: {}", e))?;
1700 total_bytes += chunk.len() as u64;
1701 }
1702 Ok::<(), anyhow::Error>(())
1703 })?;
1704
1705 if !streamed_any_chunk {
1706 anyhow::bail!("CID not found: {}", to_hex(&cid.hash));
1707 }
1708 self.record_blob_accesses(access_store.take_accessed_hashes());
1709
1710 writer
1711 .flush()
1712 .map_err(|e| anyhow::anyhow!("Failed to flush output: {}", e))?;
1713 Ok(total_bytes)
1714 }
1715
1716 pub fn write_file_by_cid<P: AsRef<Path>>(&self, cid: &Cid, output_path: P) -> Result<u64> {
1718 self.ensure_cid_exists(cid)?;
1719
1720 let output_path = output_path.as_ref();
1721 if let Some(parent) = output_path.parent() {
1722 if !parent.as_os_str().is_empty() {
1723 std::fs::create_dir_all(parent).with_context(|| {
1724 format!("Failed to create output directory {}", parent.display())
1725 })?;
1726 }
1727 }
1728
1729 let mut file = std::fs::File::create(output_path)
1730 .with_context(|| format!("Failed to create output file {}", output_path.display()))?;
1731 self.write_file_by_cid_to_writer(cid, &mut file)
1732 }
1733
1734 pub fn write_file<P: AsRef<Path>>(&self, hash: &[u8; 32], output_path: P) -> Result<u64> {
1736 self.write_file_by_cid(&Cid::public(*hash), output_path)
1737 }
1738
1739 pub fn resolve_path(&self, cid: &Cid, path: &str) -> Result<Option<Cid>> {
1741 let (tree, access_store) = self.access_tracking_tree();
1742
1743 let result = sync_block_on(async {
1744 tree.resolve_path(cid, path)
1745 .await
1746 .map_err(|e| anyhow::anyhow!("Failed to resolve path: {}", e))
1747 })?;
1748 if result.is_some() {
1749 self.record_blob_accesses(access_store.take_accessed_hashes());
1750 }
1751 Ok(result)
1752 }
1753
1754 pub fn get_file_chunk_metadata(&self, hash: &[u8; 32]) -> Result<Option<FileChunkMetadata>> {
1756 let access_store = AccessRecordingStore::new(self.store_arc());
1757 let tree = HashTree::new(HashTreeConfig::new(Arc::new(access_store.clone())).public());
1758
1759 let metadata: Result<Option<FileChunkMetadata>> = sync_block_on(async {
1760 let exists = access_store
1763 .has(hash)
1764 .await
1765 .map_err(|e| anyhow::anyhow!("Failed to check existence: {}", e))?;
1766
1767 if !exists {
1768 return Ok(None);
1769 }
1770
1771 let total_size = tree
1773 .get_size(hash)
1774 .await
1775 .map_err(|e| anyhow::anyhow!("Failed to get size: {}", e))?;
1776
1777 let is_tree_node = tree
1779 .is_tree(hash)
1780 .await
1781 .map_err(|e| anyhow::anyhow!("Failed to check tree: {}", e))?;
1782
1783 if !is_tree_node {
1784 return Ok(Some(FileChunkMetadata {
1786 total_size,
1787 chunk_hashes: vec![],
1788 chunk_sizes: vec![],
1789 is_chunked: false,
1790 }));
1791 }
1792
1793 let node = match tree
1795 .get_tree_node(hash)
1796 .await
1797 .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
1798 {
1799 Some(n) => n,
1800 None => return Ok(None),
1801 };
1802
1803 let is_directory = tree
1805 .is_directory(hash)
1806 .await
1807 .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1808
1809 if is_directory {
1810 return Ok(None); }
1812
1813 let chunk_hashes: Vec<Hash> = node.links.iter().map(|l| l.hash).collect();
1815 let chunk_sizes: Vec<u64> = node.links.iter().map(|l| l.size).collect();
1816
1817 Ok(Some(FileChunkMetadata {
1818 total_size,
1819 chunk_hashes,
1820 chunk_sizes,
1821 is_chunked: !node.links.is_empty(),
1822 }))
1823 });
1824 let metadata = metadata?;
1825 if metadata.is_some() {
1826 self.record_blob_accesses(access_store.take_accessed_hashes());
1827 }
1828 Ok(metadata)
1829 }
1830
1831 pub fn get_file_range(
1833 &self,
1834 hash: &[u8; 32],
1835 start: u64,
1836 end: Option<u64>,
1837 ) -> Result<Option<(Vec<u8>, u64)>> {
1838 let metadata = match self.get_file_chunk_metadata(hash)? {
1839 Some(m) => m,
1840 None => return Ok(None),
1841 };
1842
1843 if metadata.total_size == 0 {
1844 return Ok(Some((Vec::new(), 0)));
1845 }
1846
1847 if start >= metadata.total_size {
1848 return Ok(None);
1849 }
1850
1851 let end = end
1852 .unwrap_or(metadata.total_size - 1)
1853 .min(metadata.total_size - 1);
1854
1855 if !metadata.is_chunked {
1857 let content = self.get_file(hash)?.unwrap_or_default();
1858 let range_content = if start < content.len() as u64 {
1859 content[start as usize..=(end as usize).min(content.len() - 1)].to_vec()
1860 } else {
1861 Vec::new()
1862 };
1863 return Ok(Some((range_content, metadata.total_size)));
1864 }
1865
1866 let mut result = Vec::new();
1868 let mut current_offset = 0u64;
1869
1870 for (i, chunk_hash) in metadata.chunk_hashes.iter().enumerate() {
1871 let chunk_size = metadata.chunk_sizes[i];
1872 let chunk_end = current_offset + chunk_size - 1;
1873
1874 if chunk_end >= start && current_offset <= end {
1876 let chunk_content = match self.get_chunk(chunk_hash)? {
1877 Some(content) => content,
1878 None => {
1879 return Err(anyhow::anyhow!("Chunk {} not found", to_hex(chunk_hash)));
1880 }
1881 };
1882
1883 let chunk_read_start = if current_offset >= start {
1884 0
1885 } else {
1886 (start - current_offset) as usize
1887 };
1888
1889 let chunk_read_end = if chunk_end <= end {
1890 chunk_size as usize - 1
1891 } else {
1892 (end - current_offset) as usize
1893 };
1894
1895 result.extend_from_slice(&chunk_content[chunk_read_start..=chunk_read_end]);
1896 }
1897
1898 current_offset += chunk_size;
1899
1900 if current_offset > end {
1901 break;
1902 }
1903 }
1904
1905 Ok(Some((result, metadata.total_size)))
1906 }
1907
1908 pub fn stream_file_range_chunks_owned(
1910 self: Arc<Self>,
1911 hash: &[u8; 32],
1912 start: u64,
1913 end: u64,
1914 ) -> Result<Option<FileRangeChunksOwned>> {
1915 let metadata = match self.get_file_chunk_metadata(hash)? {
1916 Some(m) => m,
1917 None => return Ok(None),
1918 };
1919
1920 if metadata.total_size == 0 || start >= metadata.total_size {
1921 return Ok(None);
1922 }
1923
1924 let end = end.min(metadata.total_size - 1);
1925
1926 Ok(Some(FileRangeChunksOwned {
1927 store: self,
1928 metadata,
1929 start,
1930 end,
1931 current_chunk_idx: 0,
1932 current_offset: 0,
1933 }))
1934 }
1935
1936 pub fn get_directory_listing(&self, hash: &[u8; 32]) -> Result<Option<DirectoryListing>> {
1938 let (tree, access_store) = self.access_tracking_tree();
1939
1940 let listing: Result<Option<DirectoryListing>> = sync_block_on(async {
1941 let is_dir = tree
1943 .is_directory(hash)
1944 .await
1945 .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1946
1947 if !is_dir {
1948 return Ok(None);
1949 }
1950
1951 let cid = hashtree_core::Cid::public(*hash);
1953 let tree_entries = tree
1954 .list_directory(&cid)
1955 .await
1956 .map_err(|e| anyhow::anyhow!("Failed to list directory: {}", e))?;
1957
1958 let entries: Vec<DirEntry> = tree_entries
1959 .into_iter()
1960 .map(|e| DirEntry {
1961 name: e.name,
1962 cid: to_hex(&e.hash),
1963 is_directory: e.link_type.is_tree(),
1964 size: e.size,
1965 })
1966 .collect();
1967
1968 Ok(Some(DirectoryListing {
1969 dir_name: String::new(),
1970 entries,
1971 }))
1972 });
1973 let listing = listing?;
1974 if listing.is_some() {
1975 self.record_blob_accesses(access_store.take_accessed_hashes());
1976 }
1977 Ok(listing)
1978 }
1979
1980 pub fn get_directory_listing_by_cid(&self, cid: &Cid) -> Result<Option<DirectoryListing>> {
1982 let (tree, access_store) = self.access_tracking_tree();
1983 let cid = cid.clone();
1984
1985 let listing: Result<Option<DirectoryListing>> = sync_block_on(async {
1986 let is_dir = tree
1987 .is_dir(&cid)
1988 .await
1989 .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1990
1991 if !is_dir {
1992 return Ok(None);
1993 }
1994
1995 let tree_entries = tree
1996 .list_directory(&cid)
1997 .await
1998 .map_err(|e| anyhow::anyhow!("Failed to list directory: {}", e))?;
1999
2000 let entries: Vec<DirEntry> = tree_entries
2001 .into_iter()
2002 .map(|e| DirEntry {
2003 name: e.name,
2004 cid: Cid {
2005 hash: e.hash,
2006 key: e.key,
2007 }
2008 .to_string(),
2009 is_directory: e.link_type.is_tree(),
2010 size: e.size,
2011 })
2012 .collect();
2013
2014 Ok(Some(DirectoryListing {
2015 dir_name: String::new(),
2016 entries,
2017 }))
2018 });
2019 let listing = listing?;
2020 if listing.is_some() {
2021 self.record_blob_accesses(access_store.take_accessed_hashes());
2022 }
2023 Ok(listing)
2024 }
2025
2026 pub fn add_pinned_ref(&self, key: &str) -> Result<()> {
2030 let mut wtxn = self.env.write_txn()?;
2031 self.pinned_refs.put(&mut wtxn, key, &())?;
2032 wtxn.commit()?;
2033 Ok(())
2034 }
2035
2036 pub fn remove_pinned_ref(&self, key: &str) -> Result<bool> {
2038 let mut wtxn = self.env.write_txn()?;
2039 let removed = self.pinned_refs.delete(&mut wtxn, key)?;
2040 wtxn.commit()?;
2041 Ok(removed)
2042 }
2043
2044 pub fn list_pinned_refs(&self) -> Result<Vec<String>> {
2046 let rtxn = self.env.read_txn()?;
2047 let mut refs = Vec::new();
2048
2049 for item in self.pinned_refs.iter(&rtxn)? {
2050 let (key, _) = item?;
2051 refs.push(key.to_string());
2052 }
2053
2054 refs.sort();
2055 Ok(refs)
2056 }
2057
2058 pub fn add_tracked_author(&self, npub: &str) -> Result<bool> {
2060 let mut wtxn = self.env.write_txn()?;
2061 let inserted = self.tracked_authors.get(&wtxn, npub)?.is_none();
2062 self.tracked_authors.put(&mut wtxn, npub, &())?;
2063 wtxn.commit()?;
2064 Ok(inserted)
2065 }
2066
2067 pub fn remove_tracked_author(&self, npub: &str) -> Result<bool> {
2069 let mut wtxn = self.env.write_txn()?;
2070 let removed = self.tracked_authors.delete(&mut wtxn, npub)?;
2071 wtxn.commit()?;
2072 Ok(removed)
2073 }
2074
2075 pub fn list_tracked_authors(&self) -> Result<Vec<String>> {
2077 let rtxn = self.env.read_txn()?;
2078 let mut authors = Vec::new();
2079
2080 for item in self.tracked_authors.iter(&rtxn)? {
2081 let (npub, _) = item?;
2082 authors.push(npub.to_string());
2083 }
2084
2085 authors.sort();
2086 Ok(authors)
2087 }
2088
2089 pub fn get_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<Option<CachedRoot>> {
2091 let key = format!("{}/{}", pubkey_hex, tree_name);
2092 let rtxn = self.env.read_txn()?;
2093 if let Some(bytes) = self.cached_roots.get(&rtxn, &key)? {
2094 let root: CachedRoot = rmp_serde::from_slice(bytes)
2095 .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
2096 Ok(Some(root))
2097 } else {
2098 Ok(None)
2099 }
2100 }
2101
2102 pub fn set_cached_root(
2104 &self,
2105 pubkey_hex: &str,
2106 tree_name: &str,
2107 hash: &str,
2108 key: Option<&str>,
2109 visibility: &str,
2110 updated_at: u64,
2111 ) -> Result<()> {
2112 let db_key = format!("{}/{}", pubkey_hex, tree_name);
2113 let root = CachedRoot {
2114 hash: hash.to_string(),
2115 key: key.map(|k| k.to_string()),
2116 updated_at,
2117 visibility: visibility.to_string(),
2118 };
2119 let bytes = rmp_serde::to_vec(&root)
2120 .map_err(|e| anyhow::anyhow!("Failed to serialize CachedRoot: {}", e))?;
2121 let mut wtxn = self.env.write_txn()?;
2122 self.cached_roots.put(&mut wtxn, &db_key, &bytes)?;
2123 wtxn.commit()?;
2124 Ok(())
2125 }
2126
2127 pub fn list_cached_roots(&self, pubkey_hex: &str) -> Result<Vec<(String, CachedRoot)>> {
2129 let prefix = format!("{}/", pubkey_hex);
2130 let rtxn = self.env.read_txn()?;
2131 let mut results = Vec::new();
2132
2133 for item in self.cached_roots.iter(&rtxn)? {
2134 let (key, bytes) = item?;
2135 if key.starts_with(&prefix) {
2136 let tree_name = key.strip_prefix(&prefix).unwrap_or(key);
2137 let root: CachedRoot = rmp_serde::from_slice(bytes)
2138 .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
2139 results.push((tree_name.to_string(), root));
2140 }
2141 }
2142
2143 Ok(results)
2144 }
2145
2146 pub fn delete_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<bool> {
2148 let key = format!("{}/{}", pubkey_hex, tree_name);
2149 let mut wtxn = self.env.write_txn()?;
2150 let deleted = self.cached_roots.delete(&mut wtxn, &key)?;
2151 wtxn.commit()?;
2152 Ok(deleted)
2153 }
2154}
2155
2156fn is_map_full_store_error(err: &StoreError) -> bool {
2157 let message = err.to_string();
2158 message.contains("MDB_MAP_FULL") || message.contains("MapFull")
2159}
2160
2161#[derive(Debug, Clone)]
2162pub struct FileChunkMetadata {
2163 pub total_size: u64,
2164 pub chunk_hashes: Vec<Hash>,
2165 pub chunk_sizes: Vec<u64>,
2166 pub is_chunked: bool,
2167}
2168
2169pub struct FileRangeChunksOwned {
2171 store: Arc<HashtreeStore>,
2172 metadata: FileChunkMetadata,
2173 start: u64,
2174 end: u64,
2175 current_chunk_idx: usize,
2176 current_offset: u64,
2177}
2178
2179impl Iterator for FileRangeChunksOwned {
2180 type Item = Result<Vec<u8>>;
2181
2182 fn next(&mut self) -> Option<Self::Item> {
2183 if !self.metadata.is_chunked || self.current_chunk_idx >= self.metadata.chunk_hashes.len() {
2184 return None;
2185 }
2186
2187 if self.current_offset > self.end {
2188 return None;
2189 }
2190
2191 let chunk_hash = &self.metadata.chunk_hashes[self.current_chunk_idx];
2192 let chunk_size = self.metadata.chunk_sizes[self.current_chunk_idx];
2193 let chunk_end = self.current_offset + chunk_size - 1;
2194
2195 self.current_chunk_idx += 1;
2196
2197 if chunk_end < self.start || self.current_offset > self.end {
2198 self.current_offset += chunk_size;
2199 return self.next();
2200 }
2201
2202 let chunk_content = match self.store.get_chunk(chunk_hash) {
2203 Ok(Some(content)) => content,
2204 Ok(None) => {
2205 return Some(Err(anyhow::anyhow!(
2206 "Chunk {} not found",
2207 to_hex(chunk_hash)
2208 )));
2209 }
2210 Err(e) => {
2211 return Some(Err(e));
2212 }
2213 };
2214
2215 let chunk_read_start = if self.current_offset >= self.start {
2216 0
2217 } else {
2218 (self.start - self.current_offset) as usize
2219 };
2220
2221 let chunk_read_end = if chunk_end <= self.end {
2222 chunk_size as usize - 1
2223 } else {
2224 (self.end - self.current_offset) as usize
2225 };
2226
2227 let result = chunk_content[chunk_read_start..=chunk_read_end].to_vec();
2228 self.current_offset += chunk_size;
2229
2230 Some(Ok(result))
2231 }
2232}
2233
2234#[derive(Debug)]
2235pub struct GcStats {
2236 pub deleted_dags: usize,
2237 pub freed_bytes: u64,
2238}
2239
2240#[derive(Debug, Clone)]
2241pub struct DirEntry {
2242 pub name: String,
2243 pub cid: String,
2244 pub is_directory: bool,
2245 pub size: u64,
2246}
2247
2248#[derive(Debug, Clone)]
2249pub struct DirectoryListing {
2250 pub dir_name: String,
2251 pub entries: Vec<DirEntry>,
2252}
2253
2254#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
2256pub struct BlobMetadata {
2257 pub sha256: String,
2258 pub size: u64,
2259 pub mime_type: String,
2260 pub uploaded: u64,
2261}
2262
2263impl crate::webrtc::ContentStore for HashtreeStore {
2265 fn get(&self, hash_hex: &str) -> Result<Option<Vec<u8>>> {
2266 let hash = from_hex(hash_hex).map_err(|e| anyhow::anyhow!("Invalid hash: {}", e))?;
2267 self.get_chunk(&hash)
2268 }
2269}
2270
2271#[cfg(test)]
2272mod tests {
2273 use super::*;
2274 #[cfg(feature = "lmdb")]
2275 use tempfile::TempDir;
2276
2277 #[test]
2278 fn blob_access_update_gate_deduplicates_and_throttles() {
2279 let gate = BlobAccessUpdateGate::default();
2280 let first = sha256(b"first");
2281 let second = sha256(b"second");
2282
2283 assert_eq!(
2284 gate.due_hashes([first, first, second], 10),
2285 vec![first, second]
2286 );
2287 assert!(gate.due_hashes([first, second], 11).is_empty());
2288 assert_eq!(
2289 gate.due_hashes([second, first], 10 + ACCESS_UPDATE_INTERVAL_SECS),
2290 vec![second, first]
2291 );
2292 }
2293
2294 #[cfg(feature = "lmdb")]
2295 #[test]
2296 fn hashtree_store_expands_blob_lmdb_map_size_to_storage_budget() -> Result<()> {
2297 let temp = TempDir::new()?;
2298 let requested = LMDB_BLOB_MIN_MAP_SIZE_BYTES + 64 * 1024 * 1024;
2299 let store = HashtreeStore::with_options_and_backend(
2300 temp.path(),
2301 None,
2302 requested,
2303 true,
2304 &StorageBackend::Lmdb,
2305 )?;
2306
2307 let map_size = match store.router.local.as_ref() {
2308 LocalStore::Lmdb(local) => local.map_size_bytes() as u64,
2309 LocalStore::Fs(_) => panic!("expected LMDB local store"),
2310 };
2311
2312 assert!(
2313 map_size >= requested,
2314 "expected blob LMDB map to grow to at least {requested} bytes, got {map_size}"
2315 );
2316
2317 drop(store);
2318 Ok(())
2319 }
2320
2321 #[cfg(feature = "lmdb")]
2322 #[test]
2323 fn hashtree_store_expands_metadata_lmdb_map_size_to_storage_budget() -> Result<()> {
2324 let temp = TempDir::new()?;
2325 let storage_budget = 256 * 1024 * 1024 * 1024u64;
2326 let expected = lmdb_metadata_map_size_for_storage_budget(storage_budget);
2327 let store = HashtreeStore::with_options_and_backend(
2328 temp.path(),
2329 None,
2330 storage_budget,
2331 true,
2332 &StorageBackend::Lmdb,
2333 )?;
2334
2335 let map_size = store.env.info().map_size as u64;
2336 assert!(
2337 map_size >= expected,
2338 "expected metadata LMDB map to grow to at least {expected} bytes, got {map_size}"
2339 );
2340
2341 drop(store);
2342 Ok(())
2343 }
2344
2345 #[cfg(feature = "lmdb")]
2346 #[test]
2347 fn local_store_can_override_lmdb_map_size() -> Result<()> {
2348 let temp = TempDir::new()?;
2349 let requested = 512 * 1024 * 1024u64;
2350 let store = LocalStore::new_with_lmdb_map_size(
2351 temp.path().join("lmdb-blobs"),
2352 &StorageBackend::Lmdb,
2353 Some(requested),
2354 )?;
2355
2356 let map_size = match store {
2357 LocalStore::Lmdb(local) => local.map_size_bytes() as u64,
2358 LocalStore::Fs(_) => panic!("expected LMDB local store"),
2359 };
2360
2361 assert!(
2362 map_size >= requested,
2363 "expected LMDB map to grow to at least {requested} bytes, got {map_size}"
2364 );
2365
2366 Ok(())
2367 }
2368
2369 #[cfg(feature = "lmdb")]
2370 #[test]
2371 fn lmdb_local_store_removes_stale_fs_blob_shard_dirs() -> Result<()> {
2372 let temp = TempDir::new()?;
2373 let path = temp.path().join("lmdb-blobs");
2374 std::fs::create_dir_all(path.join("aa"))?;
2375 std::fs::create_dir_all(path.join("b2"))?;
2376 std::fs::create_dir_all(path.join("keep-me"))?;
2377 std::fs::write(path.join("aa").join("blob.bin"), b"old fs shard")?;
2378 std::fs::write(path.join("b2").join("blob.bin"), b"old fs shard")?;
2379 std::fs::write(path.join("keep-me").join("note.txt"), b"keep")?;
2380
2381 let _store = LocalStore::new_with_lmdb_map_size(
2382 &path,
2383 &StorageBackend::Lmdb,
2384 Some(128 * 1024 * 1024),
2385 )?;
2386
2387 assert!(!path.join("aa").exists());
2388 assert!(!path.join("b2").exists());
2389 assert!(path.join("keep-me").exists());
2390 assert!(path.join("data.mdb").exists());
2391 assert!(path.join("lock.mdb").exists());
2392
2393 Ok(())
2394 }
2395
2396 #[cfg(feature = "lmdb")]
2397 #[test]
2398 fn duplicate_blossom_writes_refresh_blob_last_accessed() -> Result<()> {
2399 let temp = TempDir::new()?;
2400 let store = HashtreeStore::with_options_and_backend(
2401 temp.path(),
2402 None,
2403 LMDB_BLOB_MIN_MAP_SIZE_BYTES,
2404 true,
2405 &StorageBackend::Lmdb,
2406 )?;
2407
2408 let data = b"cached blossom duplicate";
2409 let hash = sha256(data);
2410 store.put_cached_blob(data)?;
2411 store.router.touch_accessed_sync(&hash, 1)?;
2412 store.put_cached_blob(data)?;
2413 assert!(store.blob_last_accessed_at(&hash)?.unwrap_or(0) > 1);
2414
2415 let owned = b"owned blossom duplicate";
2416 let owned_hash = sha256(owned);
2417 let owner = [7u8; 32];
2418 store.put_owned_blob(owned, &owner)?;
2419 store.router.touch_accessed_sync(&owned_hash, 1)?;
2420 store.put_owned_blob(owned, &owner)?;
2421 assert!(store.blob_last_accessed_at(&owned_hash)?.unwrap_or(0) > 1);
2422 let owned_blobs = store.list_blobs_by_pubkey(&owner)?;
2423 assert_eq!(owned_blobs.len(), 1);
2424 assert_eq!(owned_blobs[0].sha256, to_hex(&owned_hash));
2425
2426 let batch = [
2427 (sha256(b"owned blossom batch 1"), b"owned blossom batch 1".to_vec()),
2428 (sha256(b"owned blossom batch 2"), b"owned blossom batch 2".to_vec()),
2429 ];
2430 store.put_owned_blobs(&batch, &owner)?;
2431 let owned_blobs = store.list_blobs_by_pubkey(&owner)?;
2432 assert_eq!(owned_blobs.len(), 3);
2433
2434 Ok(())
2435 }
2436
2437 #[cfg(feature = "lmdb")]
2438 #[test]
2439 fn replacing_tree_ref_unpins_and_unindexes_superseded_root() -> Result<()> {
2440 let temp = TempDir::new()?;
2441 let store = HashtreeStore::with_options_and_backend(
2442 temp.path(),
2443 None,
2444 LMDB_BLOB_MIN_MAP_SIZE_BYTES,
2445 true,
2446 &StorageBackend::Lmdb,
2447 )?;
2448
2449 let old_bytes = b"old published root";
2450 let new_bytes = b"new published root";
2451 let old_root = sha256(old_bytes);
2452 let new_root = sha256(new_bytes);
2453
2454 store.put_blob(old_bytes)?;
2455 store.pin(&old_root)?;
2456 store.index_tree(
2457 &old_root,
2458 "owner",
2459 Some("playlist"),
2460 PRIORITY_OWN,
2461 Some("npub1owner/playlist"),
2462 )?;
2463
2464 assert!(store.is_pinned(&old_root)?);
2465 assert!(store.get_tree_meta(&old_root)?.is_some());
2466
2467 store.put_blob(new_bytes)?;
2468 store.pin(&new_root)?;
2469 store.index_tree(
2470 &new_root,
2471 "owner",
2472 Some("playlist"),
2473 PRIORITY_OWN,
2474 Some("npub1owner/playlist"),
2475 )?;
2476
2477 assert!(
2478 !store.is_pinned(&old_root)?,
2479 "superseded root should be unpinned when ref is replaced"
2480 );
2481 assert!(
2482 store.get_tree_meta(&old_root)?.is_none(),
2483 "superseded root metadata should be removed when ref is replaced"
2484 );
2485 assert!(store.is_pinned(&new_root)?);
2486 assert!(store.get_tree_meta(&new_root)?.is_some());
2487
2488 Ok(())
2489 }
2490
2491 #[test]
2492 fn tracked_authors_round_trip_sorted_and_deduplicated() -> Result<()> {
2493 let temp = TempDir::new()?;
2494 let store = HashtreeStore::with_options(temp.path(), None, 1024 * 1024)?;
2495
2496 store
2497 .add_tracked_author("npub1zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzs9d3kk")?;
2498 store
2499 .add_tracked_author("npub1aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaqf5slm")?;
2500 store
2501 .add_tracked_author("npub1zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzs9d3kk")?;
2502
2503 assert_eq!(
2504 store.list_tracked_authors()?,
2505 vec![
2506 "npub1aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaqf5slm".to_string(),
2507 "npub1zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzs9d3kk".to_string(),
2508 ]
2509 );
2510 assert!(store.remove_tracked_author(
2511 "npub1aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaqf5slm"
2512 )?);
2513 assert!(!store.remove_tracked_author(
2514 "npub1bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbpqqqqq"
2515 )?);
2516 assert_eq!(
2517 store.list_tracked_authors()?,
2518 vec!["npub1zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzs9d3kk".to_string()]
2519 );
2520
2521 Ok(())
2522 }
2523
2524 #[cfg(feature = "s3")]
2525 #[test]
2526 fn async_store_s3_fallback_does_not_reenter_futures_executor() -> Result<()> {
2527 let temp = tempfile::TempDir::new()?;
2528 let local = Arc::new(LocalStore::new(
2529 temp.path().join("blobs"),
2530 &StorageBackend::Fs,
2531 )?);
2532
2533 let outcome = std::panic::catch_unwind(|| {
2534 sync_block_on(async {
2535 let aws_config = aws_config::from_env()
2536 .region(aws_sdk_s3::config::Region::new("auto"))
2537 .load()
2538 .await;
2539 let s3_client = aws_sdk_s3::Client::from_conf(
2540 aws_sdk_s3::config::Builder::from(&aws_config)
2541 .endpoint_url("http://127.0.0.1:9")
2542 .force_path_style(true)
2543 .build(),
2544 );
2545
2546 let router = StorageRouter {
2547 local,
2548 s3_client: Some(s3_client),
2549 s3_bucket: Some("test-bucket".to_string()),
2550 s3_prefix: String::new(),
2551 sync_tx: None,
2552 };
2553 let hash = [0u8; 32];
2554
2555 let _ = Store::has(&router, &hash).await;
2556 let _ = Store::get(&router, &hash).await;
2557 });
2558 });
2559
2560 assert!(
2561 outcome.is_ok(),
2562 "S3-backed async store methods should not panic inside futures::block_on"
2563 );
2564
2565 Ok(())
2566 }
2567}