1use anyhow::{Context, Result};
2use async_trait::async_trait;
3use futures::executor::block_on as sync_block_on;
4use futures::StreamExt;
5use hashtree_config::StorageBackend;
6use hashtree_core::store::{Store, StoreError};
7use hashtree_core::{
8 from_hex, sha256, to_hex, types::Hash, Cid, HashTree, HashTreeConfig, TreeNode,
9};
10use hashtree_fs::FsBlobStore;
11#[cfg(feature = "lmdb")]
12use hashtree_lmdb::LmdbBlobStore;
13use heed::types::*;
14use heed::{Database, EnvOpenOptions};
15use serde::{Deserialize, Serialize};
16#[cfg(feature = "s3")]
17use std::future::Future;
18use std::io::Write;
19use std::path::{Path, PathBuf};
20use std::sync::Arc;
21use std::time::{SystemTime, UNIX_EPOCH};
22
23mod upload;
24
25mod maintenance;
26mod retention;
27
28pub use maintenance::{compact_lmdb_environments_under, CompactResult, VerifyResult};
29pub use retention::{PinnedItem, StorageByPriority, StorageStats, TreeMeta};
30
31pub const PRIORITY_OTHER: u8 = 64;
33pub const PRIORITY_FOLLOWED: u8 = 128;
34pub const PRIORITY_OWN: u8 = 255;
35const LMDB_MAX_READERS: u32 = 1024;
36#[cfg(feature = "lmdb")]
37const LMDB_BLOB_MIN_MAP_SIZE_BYTES: u64 = 10 * 1024 * 1024 * 1024;
38
39#[derive(Debug, Clone, Serialize, Deserialize)]
41pub struct CachedRoot {
42 pub hash: String,
44 pub key: Option<String>,
46 pub updated_at: u64,
48 pub visibility: String,
50}
51
52#[derive(Debug, Clone)]
54pub struct LocalStoreStats {
55 pub count: usize,
56 pub total_bytes: u64,
57}
58
59pub enum LocalStore {
61 Fs(FsBlobStore),
62 #[cfg(feature = "lmdb")]
63 Lmdb(LmdbBlobStore),
64}
65
66#[cfg(feature = "lmdb")]
67fn is_fs_blob_shard_dir(path: &Path) -> bool {
68 path.file_name()
69 .and_then(|name| name.to_str())
70 .map(|name| name.len() == 2 && name.as_bytes().iter().all(u8::is_ascii_hexdigit))
71 .unwrap_or(false)
72}
73
74#[cfg(feature = "lmdb")]
75fn remove_stale_fs_blob_shards(path: &Path) -> Result<(), StoreError> {
76 let entries = std::fs::read_dir(path).map_err(StoreError::Io)?;
77 for entry in entries {
78 let entry = entry.map_err(StoreError::Io)?;
79 let entry_path = entry.path();
80 if entry_path.is_dir() && is_fs_blob_shard_dir(&entry_path) {
81 std::fs::remove_dir_all(&entry_path).map_err(StoreError::Io)?;
82 tracing::info!(
83 "Removed stale filesystem blob shard directory after LMDB cutover: {}",
84 entry_path.display()
85 );
86 }
87 }
88 Ok(())
89}
90
91impl LocalStore {
92 pub fn new<P: AsRef<Path>>(path: P, backend: &StorageBackend) -> Result<Self, StoreError> {
98 Self::new_unbounded(path, backend)
99 }
100
101 pub fn new_with_lmdb_map_size<P: AsRef<Path>>(
106 path: P,
107 backend: &StorageBackend,
108 _map_size_bytes: Option<u64>,
109 ) -> Result<Self, StoreError> {
110 match backend {
111 StorageBackend::Fs => Ok(LocalStore::Fs(FsBlobStore::new(path)?)),
112 #[cfg(feature = "lmdb")]
113 StorageBackend::Lmdb => match _map_size_bytes {
114 Some(map_size_bytes) => {
115 std::fs::create_dir_all(path.as_ref()).map_err(StoreError::Io)?;
116 remove_stale_fs_blob_shards(path.as_ref())?;
117 Ok(LocalStore::Lmdb(LmdbBlobStore::with_max_bytes(
118 path,
119 map_size_bytes,
120 )?))
121 }
122 None => {
123 std::fs::create_dir_all(path.as_ref()).map_err(StoreError::Io)?;
124 remove_stale_fs_blob_shards(path.as_ref())?;
125 Ok(LocalStore::Lmdb(LmdbBlobStore::new(path)?))
126 }
127 },
128 #[cfg(not(feature = "lmdb"))]
129 StorageBackend::Lmdb => {
130 tracing::warn!(
131 "LMDB backend requested but lmdb feature not enabled, using filesystem storage"
132 );
133 Ok(LocalStore::Fs(FsBlobStore::new(path)?))
134 }
135 }
136 }
137
138 pub fn new_unbounded<P: AsRef<Path>>(
140 path: P,
141 backend: &StorageBackend,
142 ) -> Result<Self, StoreError> {
143 Self::new_with_lmdb_map_size(path, backend, None)
144 }
145
146 pub fn backend(&self) -> StorageBackend {
147 match self {
148 LocalStore::Fs(_) => StorageBackend::Fs,
149 #[cfg(feature = "lmdb")]
150 LocalStore::Lmdb(_) => StorageBackend::Lmdb,
151 }
152 }
153
154 pub fn put_sync(&self, hash: Hash, data: &[u8]) -> Result<bool, StoreError> {
156 match self {
157 LocalStore::Fs(store) => store.put_sync(hash, data),
158 #[cfg(feature = "lmdb")]
159 LocalStore::Lmdb(store) => store.put_sync(hash, data),
160 }
161 }
162
163 pub fn put_many_sync(&self, items: &[(Hash, Vec<u8>)]) -> Result<usize, StoreError> {
165 match self {
166 LocalStore::Fs(store) => {
167 let mut inserted = 0usize;
168 for (hash, data) in items {
169 if store.put_sync(*hash, data.as_slice())? {
170 inserted += 1;
171 }
172 }
173 Ok(inserted)
174 }
175 #[cfg(feature = "lmdb")]
176 LocalStore::Lmdb(store) => store.put_many_sync(items),
177 }
178 }
179
180 pub fn get_sync(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
182 match self {
183 LocalStore::Fs(store) => store.get_sync(hash),
184 #[cfg(feature = "lmdb")]
185 LocalStore::Lmdb(store) => store.get_sync(hash),
186 }
187 }
188
189 pub fn exists(&self, hash: &Hash) -> Result<bool, StoreError> {
191 match self {
192 LocalStore::Fs(store) => Ok(store.exists(hash)),
193 #[cfg(feature = "lmdb")]
194 LocalStore::Lmdb(store) => store.exists(hash),
195 }
196 }
197
198 pub fn delete_sync(&self, hash: &Hash) -> Result<bool, StoreError> {
200 match self {
201 LocalStore::Fs(store) => store.delete_sync(hash),
202 #[cfg(feature = "lmdb")]
203 LocalStore::Lmdb(store) => store.delete_sync(hash),
204 }
205 }
206
207 pub fn stats(&self) -> Result<LocalStoreStats, StoreError> {
209 match self {
210 LocalStore::Fs(store) => {
211 let stats = store.stats()?;
212 Ok(LocalStoreStats {
213 count: stats.count,
214 total_bytes: stats.total_bytes,
215 })
216 }
217 #[cfg(feature = "lmdb")]
218 LocalStore::Lmdb(store) => {
219 let stats = store.stats()?;
220 Ok(LocalStoreStats {
221 count: stats.count,
222 total_bytes: stats.total_bytes,
223 })
224 }
225 }
226 }
227
228 pub fn list(&self) -> Result<Vec<Hash>, StoreError> {
230 match self {
231 LocalStore::Fs(store) => store.list(),
232 #[cfg(feature = "lmdb")]
233 LocalStore::Lmdb(store) => store.list(),
234 }
235 }
236}
237
238#[async_trait]
239impl Store for LocalStore {
240 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
241 self.put_sync(hash, &data)
242 }
243
244 async fn put_many(&self, items: Vec<(Hash, Vec<u8>)>) -> Result<usize, StoreError> {
245 self.put_many_sync(&items)
246 }
247
248 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
249 self.get_sync(hash)
250 }
251
252 async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
253 self.exists(hash)
254 }
255
256 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
257 self.delete_sync(hash)
258 }
259}
260
261#[cfg(feature = "s3")]
262use tokio::sync::mpsc;
263
264use crate::config::S3Config;
265
266#[cfg(feature = "s3")]
268enum S3SyncMessage {
269 Upload { hash: Hash, data: Vec<u8> },
270 Delete { hash: Hash },
271}
272
273pub struct StorageRouter {
278 local: Arc<LocalStore>,
280 #[cfg(feature = "s3")]
282 s3_client: Option<aws_sdk_s3::Client>,
283 #[cfg(feature = "s3")]
284 s3_bucket: Option<String>,
285 #[cfg(feature = "s3")]
286 s3_prefix: String,
287 #[cfg(feature = "s3")]
289 sync_tx: Option<mpsc::UnboundedSender<S3SyncMessage>>,
290}
291
292impl StorageRouter {
293 #[cfg(feature = "s3")]
294 fn run_s3_future_sync<F, T>(future: F) -> Result<T, StoreError>
295 where
296 F: Future<Output = T> + Send + 'static,
297 T: Send + 'static,
298 {
299 if tokio::runtime::Handle::try_current().is_ok() {
300 return std::thread::Builder::new()
301 .name("storage-s3-sync".to_string())
302 .spawn(move || {
303 tokio::runtime::Builder::new_current_thread()
304 .enable_all()
305 .build()
306 .expect("build storage s3 sync runtime")
307 .block_on(future)
308 })
309 .map_err(|err| StoreError::Other(format!("spawn S3 sync helper thread: {err}")))?
310 .join()
311 .map_err(|_| StoreError::Other("S3 sync helper thread panicked".to_string()));
312 }
313
314 let runtime = tokio::runtime::Builder::new_current_thread()
315 .enable_all()
316 .build()
317 .map_err(|err| StoreError::Other(format!("build storage s3 sync runtime: {err}")))?;
318 Ok(runtime.block_on(future))
319 }
320
321 pub fn new(local: Arc<LocalStore>) -> Self {
323 Self {
324 local,
325 #[cfg(feature = "s3")]
326 s3_client: None,
327 #[cfg(feature = "s3")]
328 s3_bucket: None,
329 #[cfg(feature = "s3")]
330 s3_prefix: String::new(),
331 #[cfg(feature = "s3")]
332 sync_tx: None,
333 }
334 }
335
336 #[cfg(feature = "s3")]
338 pub async fn with_s3(local: Arc<LocalStore>, config: &S3Config) -> Result<Self, anyhow::Error> {
339 use aws_sdk_s3::Client as S3Client;
340
341 let mut aws_config_loader = aws_config::from_env();
343 aws_config_loader =
344 aws_config_loader.region(aws_sdk_s3::config::Region::new(config.region.clone()));
345 let aws_config = aws_config_loader.load().await;
346
347 let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&aws_config);
349 s3_config_builder = s3_config_builder
350 .endpoint_url(&config.endpoint)
351 .force_path_style(true);
352
353 let s3_client = S3Client::from_conf(s3_config_builder.build());
354 let bucket = config.bucket.clone();
355 let prefix = config.prefix.clone().unwrap_or_default();
356
357 let (sync_tx, mut sync_rx) = mpsc::unbounded_channel::<S3SyncMessage>();
359
360 let sync_client = s3_client.clone();
362 let sync_bucket = bucket.clone();
363 let sync_prefix = prefix.clone();
364
365 tokio::spawn(async move {
366 use aws_sdk_s3::primitives::ByteStream;
367
368 tracing::info!("S3 background sync task started");
369
370 let semaphore = std::sync::Arc::new(tokio::sync::Semaphore::new(32));
372 let client = std::sync::Arc::new(sync_client);
373 let bucket = std::sync::Arc::new(sync_bucket);
374 let prefix = std::sync::Arc::new(sync_prefix);
375
376 while let Some(msg) = sync_rx.recv().await {
377 let client = client.clone();
378 let bucket = bucket.clone();
379 let prefix = prefix.clone();
380 let semaphore = semaphore.clone();
381
382 tokio::spawn(async move {
384 let _permit = semaphore.acquire().await;
386
387 match msg {
388 S3SyncMessage::Upload { hash, data } => {
389 let key = format!("{}{}.bin", prefix, to_hex(&hash));
390 tracing::debug!("S3 uploading {} ({} bytes)", &key, data.len());
391
392 match client
393 .put_object()
394 .bucket(bucket.as_str())
395 .key(&key)
396 .body(ByteStream::from(data))
397 .send()
398 .await
399 {
400 Ok(_) => tracing::debug!("S3 upload succeeded: {}", &key),
401 Err(e) => tracing::error!("S3 upload failed {}: {}", &key, e),
402 }
403 }
404 S3SyncMessage::Delete { hash } => {
405 let key = format!("{}{}.bin", prefix, to_hex(&hash));
406 tracing::debug!("S3 deleting {}", &key);
407
408 if let Err(e) = client
409 .delete_object()
410 .bucket(bucket.as_str())
411 .key(&key)
412 .send()
413 .await
414 {
415 tracing::error!("S3 delete failed {}: {}", &key, e);
416 }
417 }
418 }
419 });
420 }
421 });
422
423 tracing::info!(
424 "S3 storage initialized: bucket={}, prefix={}",
425 bucket,
426 prefix
427 );
428
429 Ok(Self {
430 local,
431 s3_client: Some(s3_client),
432 s3_bucket: Some(bucket),
433 s3_prefix: prefix,
434 sync_tx: Some(sync_tx),
435 })
436 }
437
438 pub fn put_sync(&self, hash: Hash, data: &[u8]) -> Result<bool, StoreError> {
440 let is_new = self.local.put_sync(hash, data)?;
442
443 #[cfg(feature = "s3")]
446 if is_new {
447 if let Some(ref tx) = self.sync_tx {
448 tracing::debug!(
449 "Queueing S3 upload for {} ({} bytes)",
450 crate::storage::to_hex(&hash)[..16].to_string(),
451 data.len(),
452 );
453 if let Err(e) = tx.send(S3SyncMessage::Upload {
454 hash,
455 data: data.to_vec(),
456 }) {
457 tracing::error!("Failed to queue S3 upload: {}", e);
458 }
459 }
460 }
461
462 Ok(is_new)
463 }
464
465 pub fn put_many_sync(&self, items: &[(Hash, Vec<u8>)]) -> Result<usize, StoreError> {
467 #[cfg(feature = "s3")]
468 let pending_uploads = if self.sync_tx.is_some() {
469 let mut pending = Vec::new();
470 for (hash, data) in items {
471 if !self.local.exists(hash)? {
472 pending.push((*hash, data.clone()));
473 }
474 }
475 pending
476 } else {
477 Vec::new()
478 };
479
480 let inserted = self.local.put_many_sync(items)?;
481
482 #[cfg(feature = "s3")]
483 if let Some(ref tx) = self.sync_tx {
484 for (hash, data) in pending_uploads {
485 if let Err(e) = tx.send(S3SyncMessage::Upload { hash, data }) {
486 tracing::error!("Failed to queue S3 upload: {}", e);
487 }
488 }
489 }
490
491 Ok(inserted)
492 }
493
494 pub fn get_sync(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
496 if let Some(data) = self.local.get_sync(hash)? {
498 return Ok(Some(data));
499 }
500
501 #[cfg(feature = "s3")]
503 if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
504 let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
505 let client = client.clone();
506 let bucket = bucket.clone();
507
508 match Self::run_s3_future_sync(async move {
509 client.get_object().bucket(bucket).key(key).send().await
510 }) {
511 Ok(Ok(output)) => {
512 match Self::run_s3_future_sync(async move { output.body.collect().await }) {
513 Ok(Ok(body)) => {
514 let data = body.into_bytes().to_vec();
515 let _ = self.local.put_sync(*hash, &data);
517 return Ok(Some(data));
518 }
519 Ok(Err(err)) => {
520 tracing::warn!("S3 body collect failed: {}", err);
521 }
522 Err(err) => {
523 tracing::warn!("S3 body collect runtime failed: {}", err);
524 }
525 }
526 }
527 Ok(Err(err)) => {
528 let service_err = err.into_service_error();
529 if !service_err.is_no_such_key() {
530 tracing::warn!("S3 get failed: {}", service_err);
531 }
532 }
533 Err(err) => {
534 tracing::warn!("S3 get runtime failed: {}", err);
535 }
536 }
537 }
538
539 Ok(None)
540 }
541
542 pub fn exists(&self, hash: &Hash) -> Result<bool, StoreError> {
544 if self.local.exists(hash)? {
546 return Ok(true);
547 }
548
549 #[cfg(feature = "s3")]
551 if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
552 let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
553 let client = client.clone();
554 let bucket = bucket.clone();
555
556 match Self::run_s3_future_sync(async move {
557 client.head_object().bucket(bucket).key(&key).send().await
558 }) {
559 Ok(Ok(_)) => return Ok(true),
560 Ok(Err(err)) => {
561 let service_err = err.into_service_error();
562 if !service_err.is_not_found() {
563 tracing::warn!("S3 head failed: {}", service_err);
564 }
565 }
566 Err(err) => {
567 tracing::warn!("S3 head runtime failed: {}", err);
568 }
569 }
570 }
571
572 Ok(false)
573 }
574
575 pub fn delete_sync(&self, hash: &Hash) -> Result<bool, StoreError> {
577 let deleted = self.local.delete_sync(hash)?;
578
579 #[cfg(feature = "s3")]
581 if let Some(ref tx) = self.sync_tx {
582 let _ = tx.send(S3SyncMessage::Delete { hash: *hash });
583 }
584
585 Ok(deleted)
586 }
587
588 pub fn delete_local_only(&self, hash: &Hash) -> Result<bool, StoreError> {
591 self.local.delete_sync(hash)
592 }
593
594 pub fn stats(&self) -> Result<LocalStoreStats, StoreError> {
596 self.local.stats()
597 }
598
599 pub fn list(&self) -> Result<Vec<Hash>, StoreError> {
601 self.local.list()
602 }
603
604 pub fn local_store(&self) -> Arc<LocalStore> {
606 Arc::clone(&self.local)
607 }
608}
609
610#[async_trait]
613impl Store for StorageRouter {
614 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
615 self.put_sync(hash, &data)
616 }
617
618 async fn put_many(&self, items: Vec<(Hash, Vec<u8>)>) -> Result<usize, StoreError> {
619 self.put_many_sync(&items)
620 }
621
622 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
623 self.get_sync(hash)
624 }
625
626 async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
627 self.exists(hash)
628 }
629
630 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
631 self.delete_sync(hash)
632 }
633}
634
635pub struct HashtreeStore {
636 base_path: PathBuf,
637 env: heed::Env,
638 pins: Database<Bytes, Unit>,
640 pinned_refs: Database<Str, Unit>,
642 blob_owners: Database<Bytes, Unit>,
644 pubkey_blobs: Database<Bytes, Bytes>,
646 tree_meta: Database<Bytes, Bytes>,
648 blob_trees: Database<Bytes, Unit>,
650 tree_refs: Database<Str, Bytes>,
652 cached_roots: Database<Str, Bytes>,
654 router: Arc<StorageRouter>,
656 max_size_bytes: u64,
658 evict_orphans: bool,
660}
661
662impl HashtreeStore {
663 pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
665 let config = hashtree_config::Config::load_or_default();
666 let max_size_bytes = config
667 .storage
668 .max_size_gb
669 .saturating_mul(1024 * 1024 * 1024);
670 Self::with_options_and_backend(
671 path,
672 None,
673 max_size_bytes,
674 config.storage.evict_orphans,
675 &config.storage.backend,
676 )
677 }
678
679 pub fn new_with_backend<P: AsRef<Path>>(
681 path: P,
682 backend: hashtree_config::StorageBackend,
683 max_size_bytes: u64,
684 ) -> Result<Self> {
685 Self::with_options_and_backend(path, None, max_size_bytes, true, &backend)
686 }
687
688 pub fn with_s3<P: AsRef<Path>>(path: P, s3_config: Option<&S3Config>) -> Result<Self> {
690 let config = hashtree_config::Config::load_or_default();
691 let max_size_bytes = config
692 .storage
693 .max_size_gb
694 .saturating_mul(1024 * 1024 * 1024);
695 Self::with_options_and_backend(
696 path,
697 s3_config,
698 max_size_bytes,
699 config.storage.evict_orphans,
700 &config.storage.backend,
701 )
702 }
703
704 pub fn with_options<P: AsRef<Path>>(
710 path: P,
711 s3_config: Option<&S3Config>,
712 max_size_bytes: u64,
713 ) -> Result<Self> {
714 let config = hashtree_config::Config::load_or_default();
715 Self::with_options_and_backend(
716 path,
717 s3_config,
718 max_size_bytes,
719 config.storage.evict_orphans,
720 &config.storage.backend,
721 )
722 }
723
724 fn with_options_and_backend<P: AsRef<Path>>(
725 path: P,
726 s3_config: Option<&S3Config>,
727 max_size_bytes: u64,
728 evict_orphans: bool,
729 backend: &hashtree_config::StorageBackend,
730 ) -> Result<Self> {
731 let path = path.as_ref();
732 std::fs::create_dir_all(path)?;
733
734 let env = unsafe {
735 EnvOpenOptions::new()
736 .map_size(10 * 1024 * 1024 * 1024) .max_dbs(9) .max_readers(LMDB_MAX_READERS)
739 .open(path)?
740 };
741 let _ = env.clear_stale_readers();
742
743 let mut wtxn = env.write_txn()?;
744 let pins = env.create_database(&mut wtxn, Some("pins"))?;
745 let pinned_refs = env.create_database(&mut wtxn, Some("pinned_refs"))?;
746 let blob_owners = env.create_database(&mut wtxn, Some("blob_owners"))?;
747 let pubkey_blobs = env.create_database(&mut wtxn, Some("pubkey_blobs"))?;
748 let tree_meta = env.create_database(&mut wtxn, Some("tree_meta"))?;
749 let blob_trees = env.create_database(&mut wtxn, Some("blob_trees"))?;
750 let tree_refs = env.create_database(&mut wtxn, Some("tree_refs"))?;
751 let cached_roots = env.create_database(&mut wtxn, Some("cached_roots"))?;
752 wtxn.commit()?;
753
754 let local_store = Arc::new(match backend {
758 hashtree_config::StorageBackend::Fs => LocalStore::Fs(
759 FsBlobStore::new(path.join("blobs"))
760 .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
761 ),
762 #[cfg(feature = "lmdb")]
763 hashtree_config::StorageBackend::Lmdb => {
764 std::fs::create_dir_all(path.join("blobs"))?;
765 remove_stale_fs_blob_shards(&path.join("blobs"))
766 .map_err(|e| anyhow::anyhow!("Failed to clean LMDB blob store path: {}", e))?;
767 let requested_map_size = max_size_bytes.max(LMDB_BLOB_MIN_MAP_SIZE_BYTES);
768 let map_size = usize::try_from(requested_map_size)
769 .context("LMDB blob map size exceeds usize")?;
770 LocalStore::Lmdb(
771 LmdbBlobStore::with_map_size(path.join("blobs"), map_size)
772 .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
773 )
774 }
775 #[cfg(not(feature = "lmdb"))]
776 hashtree_config::StorageBackend::Lmdb => {
777 tracing::warn!(
778 "LMDB backend requested but lmdb feature not enabled, using filesystem storage"
779 );
780 LocalStore::Fs(
781 FsBlobStore::new(path.join("blobs"))
782 .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
783 )
784 }
785 });
786
787 #[cfg(feature = "s3")]
789 let router = Arc::new(if let Some(s3_cfg) = s3_config {
790 tracing::info!(
791 "Initializing S3 storage backend: bucket={}, endpoint={}",
792 s3_cfg.bucket,
793 s3_cfg.endpoint
794 );
795
796 sync_block_on(async { StorageRouter::with_s3(local_store, s3_cfg).await })?
797 } else {
798 StorageRouter::new(local_store)
799 });
800
801 #[cfg(not(feature = "s3"))]
802 let router = Arc::new({
803 if s3_config.is_some() {
804 tracing::warn!(
805 "S3 config provided but S3 feature not enabled. Using local storage only."
806 );
807 }
808 StorageRouter::new(local_store)
809 });
810
811 Ok(Self {
812 base_path: path.to_path_buf(),
813 env,
814 pins,
815 pinned_refs,
816 blob_owners,
817 pubkey_blobs,
818 tree_meta,
819 blob_trees,
820 tree_refs,
821 cached_roots,
822 router,
823 max_size_bytes,
824 evict_orphans,
825 })
826 }
827
828 pub fn base_path(&self) -> &Path {
829 &self.base_path
830 }
831
832 pub fn router(&self) -> &StorageRouter {
834 &self.router
835 }
836
837 pub fn store_arc(&self) -> Arc<StorageRouter> {
840 Arc::clone(&self.router)
841 }
842
843 pub fn get_tree_node(&self, hash: &[u8; 32]) -> Result<Option<TreeNode>> {
845 let store = self.store_arc();
846 let tree = HashTree::new(HashTreeConfig::new(store).public());
847
848 sync_block_on(async {
849 tree.get_tree_node(hash)
850 .await
851 .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))
852 })
853 }
854
855 pub fn put_blob(&self, data: &[u8]) -> Result<String> {
857 let hash = sha256(data);
858 self.router
859 .put_sync(hash, data)
860 .map_err(|e| anyhow::anyhow!("Failed to store blob: {}", e))?;
861 Ok(to_hex(&hash))
862 }
863
864 pub fn put_cached_blob(&self, data: &[u8]) -> Result<String> {
870 let hash = sha256(data);
871 if self
872 .router
873 .exists(&hash)
874 .map_err(|e| anyhow::anyhow!("Failed to check cached blob: {}", e))?
875 {
876 return Ok(to_hex(&hash));
877 }
878
879 let incoming_bytes = data.len() as u64;
880 let _ = self.make_room_for_cached_blob(incoming_bytes);
881
882 let mut retried_after_cleanup = false;
883 loop {
884 match self.router.put_sync(hash, data) {
885 Ok(_) => return Ok(to_hex(&hash)),
886 Err(err) if !retried_after_cleanup && is_map_full_store_error(&err) => {
887 let freed = self.relieve_cached_blob_write_pressure(incoming_bytes)?;
888 if freed == 0 {
889 return Err(anyhow::anyhow!("Failed to store cached blob: {}", err));
890 }
891 retried_after_cleanup = true;
892 }
893 Err(err) => return Err(anyhow::anyhow!("Failed to store cached blob: {}", err)),
894 }
895 }
896 }
897
898 pub fn get_blob(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
900 self.router
901 .get_sync(hash)
902 .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))
903 }
904
905 pub fn blob_exists(&self, hash: &[u8; 32]) -> Result<bool> {
907 self.router
908 .exists(hash)
909 .map_err(|e| anyhow::anyhow!("Failed to check blob: {}", e))
910 }
911
912 fn blob_owner_key(sha256: &[u8; 32], pubkey: &[u8; 32]) -> [u8; 64] {
918 let mut key = [0u8; 64];
919 key[..32].copy_from_slice(sha256);
920 key[32..].copy_from_slice(pubkey);
921 key
922 }
923
924 pub fn set_blob_owner(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<()> {
927 let key = Self::blob_owner_key(sha256, pubkey);
928 let mut wtxn = self.env.write_txn()?;
929
930 self.blob_owners.put(&mut wtxn, &key[..], &())?;
932
933 let sha256_hex = to_hex(sha256);
935
936 let mut blobs: Vec<BlobMetadata> = self
938 .pubkey_blobs
939 .get(&wtxn, pubkey)?
940 .and_then(|b| serde_json::from_slice(b).ok())
941 .unwrap_or_default();
942
943 if !blobs.iter().any(|b| b.sha256 == sha256_hex) {
945 let now = SystemTime::now()
946 .duration_since(UNIX_EPOCH)
947 .unwrap()
948 .as_secs();
949
950 let size = self
952 .get_blob(sha256)?
953 .map(|data| data.len() as u64)
954 .unwrap_or(0);
955
956 blobs.push(BlobMetadata {
957 sha256: sha256_hex,
958 size,
959 mime_type: "application/octet-stream".to_string(),
960 uploaded: now,
961 });
962
963 let blobs_json = serde_json::to_vec(&blobs)?;
964 self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
965 }
966
967 wtxn.commit()?;
968 Ok(())
969 }
970
971 pub fn is_blob_owner(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<bool> {
973 let key = Self::blob_owner_key(sha256, pubkey);
974 let rtxn = self.env.read_txn()?;
975 Ok(self.blob_owners.get(&rtxn, &key[..])?.is_some())
976 }
977
978 pub fn get_blob_owners(&self, sha256: &[u8; 32]) -> Result<Vec<[u8; 32]>> {
980 let rtxn = self.env.read_txn()?;
981
982 let mut owners = Vec::new();
983 for item in self.blob_owners.prefix_iter(&rtxn, &sha256[..])? {
984 let (key, _) = item?;
985 if key.len() == 64 {
986 let mut pubkey = [0u8; 32];
988 pubkey.copy_from_slice(&key[32..64]);
989 owners.push(pubkey);
990 }
991 }
992 Ok(owners)
993 }
994
995 pub fn blob_has_owners(&self, sha256: &[u8; 32]) -> Result<bool> {
997 let rtxn = self.env.read_txn()?;
998
999 for item in self.blob_owners.prefix_iter(&rtxn, &sha256[..])? {
1001 if item.is_ok() {
1002 return Ok(true);
1003 }
1004 }
1005 Ok(false)
1006 }
1007
1008 pub fn get_blob_owner(&self, sha256: &[u8; 32]) -> Result<Option<[u8; 32]>> {
1010 Ok(self.get_blob_owners(sha256)?.into_iter().next())
1011 }
1012
1013 pub fn delete_blossom_blob(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<bool> {
1017 let key = Self::blob_owner_key(sha256, pubkey);
1018 let mut wtxn = self.env.write_txn()?;
1019
1020 self.blob_owners.delete(&mut wtxn, &key[..])?;
1022
1023 let sha256_hex = to_hex(sha256);
1025
1026 if let Some(blobs_bytes) = self.pubkey_blobs.get(&wtxn, pubkey)? {
1028 if let Ok(mut blobs) = serde_json::from_slice::<Vec<BlobMetadata>>(blobs_bytes) {
1029 blobs.retain(|b| b.sha256 != sha256_hex);
1030 let blobs_json = serde_json::to_vec(&blobs)?;
1031 self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
1032 }
1033 }
1034
1035 let mut has_other_owners = false;
1037 for item in self.blob_owners.prefix_iter(&wtxn, &sha256[..])? {
1038 if item.is_ok() {
1039 has_other_owners = true;
1040 break;
1041 }
1042 }
1043
1044 if has_other_owners {
1045 wtxn.commit()?;
1046 tracing::debug!(
1047 "Removed {} from blob {} owners, other owners remain",
1048 &to_hex(pubkey)[..8],
1049 &sha256_hex[..8]
1050 );
1051 return Ok(false);
1052 }
1053
1054 tracing::info!(
1056 "All owners removed from blob {}, deleting",
1057 &sha256_hex[..8]
1058 );
1059
1060 let _ = self.router.delete_sync(sha256);
1062
1063 wtxn.commit()?;
1064 Ok(true)
1065 }
1066
1067 pub fn list_blobs_by_pubkey(
1069 &self,
1070 pubkey: &[u8; 32],
1071 ) -> Result<Vec<crate::server::blossom::BlobDescriptor>> {
1072 let rtxn = self.env.read_txn()?;
1073
1074 let blobs: Vec<BlobMetadata> = self
1075 .pubkey_blobs
1076 .get(&rtxn, pubkey)?
1077 .and_then(|b| serde_json::from_slice(b).ok())
1078 .unwrap_or_default();
1079
1080 Ok(blobs
1081 .into_iter()
1082 .map(|b| crate::server::blossom::BlobDescriptor {
1083 url: format!("/{}", b.sha256),
1084 sha256: b.sha256,
1085 size: b.size,
1086 mime_type: b.mime_type,
1087 uploaded: b.uploaded,
1088 })
1089 .collect())
1090 }
1091
1092 pub fn get_chunk(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
1094 self.router
1095 .get_sync(hash)
1096 .map_err(|e| anyhow::anyhow!("Failed to get chunk: {}", e))
1097 }
1098
1099 pub fn get_file(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
1102 let store = self.store_arc();
1103 let tree = HashTree::new(HashTreeConfig::new(store).public());
1104
1105 sync_block_on(async {
1106 tree.read_file(hash)
1107 .await
1108 .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
1109 })
1110 }
1111
1112 pub fn get_file_by_cid(&self, cid: &Cid) -> Result<Option<Vec<u8>>> {
1115 let store = self.store_arc();
1116 let tree = HashTree::new(HashTreeConfig::new(store).public());
1117
1118 sync_block_on(async {
1119 tree.get(cid, None)
1120 .await
1121 .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
1122 })
1123 }
1124
1125 fn ensure_cid_exists(&self, cid: &Cid) -> Result<()> {
1126 let exists = self
1127 .router
1128 .exists(&cid.hash)
1129 .map_err(|e| anyhow::anyhow!("Failed to check cid existence: {}", e))?;
1130 if !exists {
1131 anyhow::bail!("CID not found: {}", to_hex(&cid.hash));
1132 }
1133 Ok(())
1134 }
1135
1136 pub fn write_file_by_cid_to_writer<W: Write>(&self, cid: &Cid, writer: &mut W) -> Result<u64> {
1138 self.ensure_cid_exists(cid)?;
1139
1140 let store = self.store_arc();
1141 let tree = HashTree::new(HashTreeConfig::new(store).public());
1142 let mut total_bytes = 0u64;
1143 let mut streamed_any_chunk = false;
1144
1145 sync_block_on(async {
1146 let mut stream = tree.get_stream(cid);
1147 while let Some(chunk) = stream.next().await {
1148 streamed_any_chunk = true;
1149 let chunk =
1150 chunk.map_err(|e| anyhow::anyhow!("Failed to stream file chunk: {}", e))?;
1151 writer
1152 .write_all(&chunk)
1153 .map_err(|e| anyhow::anyhow!("Failed to write file chunk: {}", e))?;
1154 total_bytes += chunk.len() as u64;
1155 }
1156 Ok::<(), anyhow::Error>(())
1157 })?;
1158
1159 if !streamed_any_chunk {
1160 anyhow::bail!("CID not found: {}", to_hex(&cid.hash));
1161 }
1162
1163 writer
1164 .flush()
1165 .map_err(|e| anyhow::anyhow!("Failed to flush output: {}", e))?;
1166 Ok(total_bytes)
1167 }
1168
1169 pub fn write_file_by_cid<P: AsRef<Path>>(&self, cid: &Cid, output_path: P) -> Result<u64> {
1171 self.ensure_cid_exists(cid)?;
1172
1173 let output_path = output_path.as_ref();
1174 if let Some(parent) = output_path.parent() {
1175 if !parent.as_os_str().is_empty() {
1176 std::fs::create_dir_all(parent).with_context(|| {
1177 format!("Failed to create output directory {}", parent.display())
1178 })?;
1179 }
1180 }
1181
1182 let mut file = std::fs::File::create(output_path)
1183 .with_context(|| format!("Failed to create output file {}", output_path.display()))?;
1184 self.write_file_by_cid_to_writer(cid, &mut file)
1185 }
1186
1187 pub fn write_file<P: AsRef<Path>>(&self, hash: &[u8; 32], output_path: P) -> Result<u64> {
1189 self.write_file_by_cid(&Cid::public(*hash), output_path)
1190 }
1191
1192 pub fn resolve_path(&self, cid: &Cid, path: &str) -> Result<Option<Cid>> {
1194 let store = self.store_arc();
1195 let tree = HashTree::new(HashTreeConfig::new(store).public());
1196
1197 sync_block_on(async {
1198 tree.resolve_path(cid, path)
1199 .await
1200 .map_err(|e| anyhow::anyhow!("Failed to resolve path: {}", e))
1201 })
1202 }
1203
1204 pub fn get_file_chunk_metadata(&self, hash: &[u8; 32]) -> Result<Option<FileChunkMetadata>> {
1206 let store = self.store_arc();
1207 let tree = HashTree::new(HashTreeConfig::new(store.clone()).public());
1208
1209 sync_block_on(async {
1210 let exists = store
1213 .has(hash)
1214 .await
1215 .map_err(|e| anyhow::anyhow!("Failed to check existence: {}", e))?;
1216
1217 if !exists {
1218 return Ok(None);
1219 }
1220
1221 let total_size = tree
1223 .get_size(hash)
1224 .await
1225 .map_err(|e| anyhow::anyhow!("Failed to get size: {}", e))?;
1226
1227 let is_tree_node = tree
1229 .is_tree(hash)
1230 .await
1231 .map_err(|e| anyhow::anyhow!("Failed to check tree: {}", e))?;
1232
1233 if !is_tree_node {
1234 return Ok(Some(FileChunkMetadata {
1236 total_size,
1237 chunk_hashes: vec![],
1238 chunk_sizes: vec![],
1239 is_chunked: false,
1240 }));
1241 }
1242
1243 let node = match tree
1245 .get_tree_node(hash)
1246 .await
1247 .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
1248 {
1249 Some(n) => n,
1250 None => return Ok(None),
1251 };
1252
1253 let is_directory = tree
1255 .is_directory(hash)
1256 .await
1257 .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1258
1259 if is_directory {
1260 return Ok(None); }
1262
1263 let chunk_hashes: Vec<Hash> = node.links.iter().map(|l| l.hash).collect();
1265 let chunk_sizes: Vec<u64> = node.links.iter().map(|l| l.size).collect();
1266
1267 Ok(Some(FileChunkMetadata {
1268 total_size,
1269 chunk_hashes,
1270 chunk_sizes,
1271 is_chunked: !node.links.is_empty(),
1272 }))
1273 })
1274 }
1275
1276 pub fn get_file_range(
1278 &self,
1279 hash: &[u8; 32],
1280 start: u64,
1281 end: Option<u64>,
1282 ) -> Result<Option<(Vec<u8>, u64)>> {
1283 let metadata = match self.get_file_chunk_metadata(hash)? {
1284 Some(m) => m,
1285 None => return Ok(None),
1286 };
1287
1288 if metadata.total_size == 0 {
1289 return Ok(Some((Vec::new(), 0)));
1290 }
1291
1292 if start >= metadata.total_size {
1293 return Ok(None);
1294 }
1295
1296 let end = end
1297 .unwrap_or(metadata.total_size - 1)
1298 .min(metadata.total_size - 1);
1299
1300 if !metadata.is_chunked {
1302 let content = self.get_file(hash)?.unwrap_or_default();
1303 let range_content = if start < content.len() as u64 {
1304 content[start as usize..=(end as usize).min(content.len() - 1)].to_vec()
1305 } else {
1306 Vec::new()
1307 };
1308 return Ok(Some((range_content, metadata.total_size)));
1309 }
1310
1311 let mut result = Vec::new();
1313 let mut current_offset = 0u64;
1314
1315 for (i, chunk_hash) in metadata.chunk_hashes.iter().enumerate() {
1316 let chunk_size = metadata.chunk_sizes[i];
1317 let chunk_end = current_offset + chunk_size - 1;
1318
1319 if chunk_end >= start && current_offset <= end {
1321 let chunk_content = match self.get_chunk(chunk_hash)? {
1322 Some(content) => content,
1323 None => {
1324 return Err(anyhow::anyhow!("Chunk {} not found", to_hex(chunk_hash)));
1325 }
1326 };
1327
1328 let chunk_read_start = if current_offset >= start {
1329 0
1330 } else {
1331 (start - current_offset) as usize
1332 };
1333
1334 let chunk_read_end = if chunk_end <= end {
1335 chunk_size as usize - 1
1336 } else {
1337 (end - current_offset) as usize
1338 };
1339
1340 result.extend_from_slice(&chunk_content[chunk_read_start..=chunk_read_end]);
1341 }
1342
1343 current_offset += chunk_size;
1344
1345 if current_offset > end {
1346 break;
1347 }
1348 }
1349
1350 Ok(Some((result, metadata.total_size)))
1351 }
1352
1353 pub fn stream_file_range_chunks_owned(
1355 self: Arc<Self>,
1356 hash: &[u8; 32],
1357 start: u64,
1358 end: u64,
1359 ) -> Result<Option<FileRangeChunksOwned>> {
1360 let metadata = match self.get_file_chunk_metadata(hash)? {
1361 Some(m) => m,
1362 None => return Ok(None),
1363 };
1364
1365 if metadata.total_size == 0 || start >= metadata.total_size {
1366 return Ok(None);
1367 }
1368
1369 let end = end.min(metadata.total_size - 1);
1370
1371 Ok(Some(FileRangeChunksOwned {
1372 store: self,
1373 metadata,
1374 start,
1375 end,
1376 current_chunk_idx: 0,
1377 current_offset: 0,
1378 }))
1379 }
1380
1381 pub fn get_directory_listing(&self, hash: &[u8; 32]) -> Result<Option<DirectoryListing>> {
1383 let store = self.store_arc();
1384 let tree = HashTree::new(HashTreeConfig::new(store).public());
1385
1386 sync_block_on(async {
1387 let is_dir = tree
1389 .is_directory(hash)
1390 .await
1391 .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1392
1393 if !is_dir {
1394 return Ok(None);
1395 }
1396
1397 let cid = hashtree_core::Cid::public(*hash);
1399 let tree_entries = tree
1400 .list_directory(&cid)
1401 .await
1402 .map_err(|e| anyhow::anyhow!("Failed to list directory: {}", e))?;
1403
1404 let entries: Vec<DirEntry> = tree_entries
1405 .into_iter()
1406 .map(|e| DirEntry {
1407 name: e.name,
1408 cid: to_hex(&e.hash),
1409 is_directory: e.link_type.is_tree(),
1410 size: e.size,
1411 })
1412 .collect();
1413
1414 Ok(Some(DirectoryListing {
1415 dir_name: String::new(),
1416 entries,
1417 }))
1418 })
1419 }
1420
1421 pub fn get_directory_listing_by_cid(&self, cid: &Cid) -> Result<Option<DirectoryListing>> {
1423 let store = self.store_arc();
1424 let tree = HashTree::new(HashTreeConfig::new(store).public());
1425 let cid = cid.clone();
1426
1427 sync_block_on(async {
1428 let is_dir = tree
1429 .is_dir(&cid)
1430 .await
1431 .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1432
1433 if !is_dir {
1434 return Ok(None);
1435 }
1436
1437 let tree_entries = tree
1438 .list_directory(&cid)
1439 .await
1440 .map_err(|e| anyhow::anyhow!("Failed to list directory: {}", e))?;
1441
1442 let entries: Vec<DirEntry> = tree_entries
1443 .into_iter()
1444 .map(|e| DirEntry {
1445 name: e.name,
1446 cid: Cid {
1447 hash: e.hash,
1448 key: e.key,
1449 }
1450 .to_string(),
1451 is_directory: e.link_type.is_tree(),
1452 size: e.size,
1453 })
1454 .collect();
1455
1456 Ok(Some(DirectoryListing {
1457 dir_name: String::new(),
1458 entries,
1459 }))
1460 })
1461 }
1462
1463 pub fn add_pinned_ref(&self, key: &str) -> Result<()> {
1467 let mut wtxn = self.env.write_txn()?;
1468 self.pinned_refs.put(&mut wtxn, key, &())?;
1469 wtxn.commit()?;
1470 Ok(())
1471 }
1472
1473 pub fn remove_pinned_ref(&self, key: &str) -> Result<bool> {
1475 let mut wtxn = self.env.write_txn()?;
1476 let removed = self.pinned_refs.delete(&mut wtxn, key)?;
1477 wtxn.commit()?;
1478 Ok(removed)
1479 }
1480
1481 pub fn list_pinned_refs(&self) -> Result<Vec<String>> {
1483 let rtxn = self.env.read_txn()?;
1484 let mut refs = Vec::new();
1485
1486 for item in self.pinned_refs.iter(&rtxn)? {
1487 let (key, _) = item?;
1488 refs.push(key.to_string());
1489 }
1490
1491 refs.sort();
1492 Ok(refs)
1493 }
1494
1495 pub fn get_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<Option<CachedRoot>> {
1497 let key = format!("{}/{}", pubkey_hex, tree_name);
1498 let rtxn = self.env.read_txn()?;
1499 if let Some(bytes) = self.cached_roots.get(&rtxn, &key)? {
1500 let root: CachedRoot = rmp_serde::from_slice(bytes)
1501 .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
1502 Ok(Some(root))
1503 } else {
1504 Ok(None)
1505 }
1506 }
1507
1508 pub fn set_cached_root(
1510 &self,
1511 pubkey_hex: &str,
1512 tree_name: &str,
1513 hash: &str,
1514 key: Option<&str>,
1515 visibility: &str,
1516 updated_at: u64,
1517 ) -> Result<()> {
1518 let db_key = format!("{}/{}", pubkey_hex, tree_name);
1519 let root = CachedRoot {
1520 hash: hash.to_string(),
1521 key: key.map(|k| k.to_string()),
1522 updated_at,
1523 visibility: visibility.to_string(),
1524 };
1525 let bytes = rmp_serde::to_vec(&root)
1526 .map_err(|e| anyhow::anyhow!("Failed to serialize CachedRoot: {}", e))?;
1527 let mut wtxn = self.env.write_txn()?;
1528 self.cached_roots.put(&mut wtxn, &db_key, &bytes)?;
1529 wtxn.commit()?;
1530 Ok(())
1531 }
1532
1533 pub fn list_cached_roots(&self, pubkey_hex: &str) -> Result<Vec<(String, CachedRoot)>> {
1535 let prefix = format!("{}/", pubkey_hex);
1536 let rtxn = self.env.read_txn()?;
1537 let mut results = Vec::new();
1538
1539 for item in self.cached_roots.iter(&rtxn)? {
1540 let (key, bytes) = item?;
1541 if key.starts_with(&prefix) {
1542 let tree_name = key.strip_prefix(&prefix).unwrap_or(key);
1543 let root: CachedRoot = rmp_serde::from_slice(bytes)
1544 .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
1545 results.push((tree_name.to_string(), root));
1546 }
1547 }
1548
1549 Ok(results)
1550 }
1551
1552 pub fn delete_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<bool> {
1554 let key = format!("{}/{}", pubkey_hex, tree_name);
1555 let mut wtxn = self.env.write_txn()?;
1556 let deleted = self.cached_roots.delete(&mut wtxn, &key)?;
1557 wtxn.commit()?;
1558 Ok(deleted)
1559 }
1560}
1561
1562fn is_map_full_store_error(err: &StoreError) -> bool {
1563 let message = err.to_string();
1564 message.contains("MDB_MAP_FULL") || message.contains("MapFull")
1565}
1566
1567#[derive(Debug, Clone)]
1568pub struct FileChunkMetadata {
1569 pub total_size: u64,
1570 pub chunk_hashes: Vec<Hash>,
1571 pub chunk_sizes: Vec<u64>,
1572 pub is_chunked: bool,
1573}
1574
1575pub struct FileRangeChunksOwned {
1577 store: Arc<HashtreeStore>,
1578 metadata: FileChunkMetadata,
1579 start: u64,
1580 end: u64,
1581 current_chunk_idx: usize,
1582 current_offset: u64,
1583}
1584
1585impl Iterator for FileRangeChunksOwned {
1586 type Item = Result<Vec<u8>>;
1587
1588 fn next(&mut self) -> Option<Self::Item> {
1589 if !self.metadata.is_chunked || self.current_chunk_idx >= self.metadata.chunk_hashes.len() {
1590 return None;
1591 }
1592
1593 if self.current_offset > self.end {
1594 return None;
1595 }
1596
1597 let chunk_hash = &self.metadata.chunk_hashes[self.current_chunk_idx];
1598 let chunk_size = self.metadata.chunk_sizes[self.current_chunk_idx];
1599 let chunk_end = self.current_offset + chunk_size - 1;
1600
1601 self.current_chunk_idx += 1;
1602
1603 if chunk_end < self.start || self.current_offset > self.end {
1604 self.current_offset += chunk_size;
1605 return self.next();
1606 }
1607
1608 let chunk_content = match self.store.get_chunk(chunk_hash) {
1609 Ok(Some(content)) => content,
1610 Ok(None) => {
1611 return Some(Err(anyhow::anyhow!(
1612 "Chunk {} not found",
1613 to_hex(chunk_hash)
1614 )));
1615 }
1616 Err(e) => {
1617 return Some(Err(e));
1618 }
1619 };
1620
1621 let chunk_read_start = if self.current_offset >= self.start {
1622 0
1623 } else {
1624 (self.start - self.current_offset) as usize
1625 };
1626
1627 let chunk_read_end = if chunk_end <= self.end {
1628 chunk_size as usize - 1
1629 } else {
1630 (self.end - self.current_offset) as usize
1631 };
1632
1633 let result = chunk_content[chunk_read_start..=chunk_read_end].to_vec();
1634 self.current_offset += chunk_size;
1635
1636 Some(Ok(result))
1637 }
1638}
1639
1640#[derive(Debug)]
1641pub struct GcStats {
1642 pub deleted_dags: usize,
1643 pub freed_bytes: u64,
1644}
1645
1646#[derive(Debug, Clone)]
1647pub struct DirEntry {
1648 pub name: String,
1649 pub cid: String,
1650 pub is_directory: bool,
1651 pub size: u64,
1652}
1653
1654#[derive(Debug, Clone)]
1655pub struct DirectoryListing {
1656 pub dir_name: String,
1657 pub entries: Vec<DirEntry>,
1658}
1659
1660#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
1662pub struct BlobMetadata {
1663 pub sha256: String,
1664 pub size: u64,
1665 pub mime_type: String,
1666 pub uploaded: u64,
1667}
1668
1669impl crate::webrtc::ContentStore for HashtreeStore {
1671 fn get(&self, hash_hex: &str) -> Result<Option<Vec<u8>>> {
1672 let hash = from_hex(hash_hex).map_err(|e| anyhow::anyhow!("Invalid hash: {}", e))?;
1673 self.get_chunk(&hash)
1674 }
1675}
1676
1677#[cfg(test)]
1678mod tests {
1679 use super::*;
1680 #[cfg(feature = "lmdb")]
1681 use tempfile::TempDir;
1682
1683 #[cfg(feature = "lmdb")]
1684 #[test]
1685 fn hashtree_store_expands_blob_lmdb_map_size_to_storage_budget() -> Result<()> {
1686 let temp = TempDir::new()?;
1687 let requested = LMDB_BLOB_MIN_MAP_SIZE_BYTES + 64 * 1024 * 1024;
1688 let store = HashtreeStore::with_options_and_backend(
1689 temp.path(),
1690 None,
1691 requested,
1692 true,
1693 &StorageBackend::Lmdb,
1694 )?;
1695
1696 let map_size = match store.router.local.as_ref() {
1697 LocalStore::Lmdb(local) => local.map_size_bytes() as u64,
1698 LocalStore::Fs(_) => panic!("expected LMDB local store"),
1699 };
1700
1701 assert!(
1702 map_size >= requested,
1703 "expected blob LMDB map to grow to at least {requested} bytes, got {map_size}"
1704 );
1705
1706 drop(store);
1707 Ok(())
1708 }
1709
1710 #[cfg(feature = "lmdb")]
1711 #[test]
1712 fn local_store_can_override_lmdb_map_size() -> Result<()> {
1713 let temp = TempDir::new()?;
1714 let requested = 512 * 1024 * 1024u64;
1715 let store = LocalStore::new_with_lmdb_map_size(
1716 temp.path().join("lmdb-blobs"),
1717 &StorageBackend::Lmdb,
1718 Some(requested),
1719 )?;
1720
1721 let map_size = match store {
1722 LocalStore::Lmdb(local) => local.map_size_bytes() as u64,
1723 LocalStore::Fs(_) => panic!("expected LMDB local store"),
1724 };
1725
1726 assert!(
1727 map_size >= requested,
1728 "expected LMDB map to grow to at least {requested} bytes, got {map_size}"
1729 );
1730
1731 Ok(())
1732 }
1733
1734 #[cfg(feature = "lmdb")]
1735 #[test]
1736 fn lmdb_local_store_removes_stale_fs_blob_shard_dirs() -> Result<()> {
1737 let temp = TempDir::new()?;
1738 let path = temp.path().join("lmdb-blobs");
1739 std::fs::create_dir_all(path.join("aa"))?;
1740 std::fs::create_dir_all(path.join("b2"))?;
1741 std::fs::create_dir_all(path.join("keep-me"))?;
1742 std::fs::write(path.join("aa").join("blob.bin"), b"old fs shard")?;
1743 std::fs::write(path.join("b2").join("blob.bin"), b"old fs shard")?;
1744 std::fs::write(path.join("keep-me").join("note.txt"), b"keep")?;
1745
1746 let _store = LocalStore::new_with_lmdb_map_size(
1747 &path,
1748 &StorageBackend::Lmdb,
1749 Some(128 * 1024 * 1024),
1750 )?;
1751
1752 assert!(!path.join("aa").exists());
1753 assert!(!path.join("b2").exists());
1754 assert!(path.join("keep-me").exists());
1755 assert!(path.join("data.mdb").exists());
1756 assert!(path.join("lock.mdb").exists());
1757
1758 Ok(())
1759 }
1760
1761 #[cfg(feature = "lmdb")]
1762 #[test]
1763 fn replacing_tree_ref_unpins_and_unindexes_superseded_root() -> Result<()> {
1764 let temp = TempDir::new()?;
1765 let store = HashtreeStore::with_options_and_backend(
1766 temp.path(),
1767 None,
1768 LMDB_BLOB_MIN_MAP_SIZE_BYTES,
1769 true,
1770 &StorageBackend::Lmdb,
1771 )?;
1772
1773 let old_bytes = b"old published root";
1774 let new_bytes = b"new published root";
1775 let old_root = sha256(old_bytes);
1776 let new_root = sha256(new_bytes);
1777
1778 store.put_blob(old_bytes)?;
1779 store.pin(&old_root)?;
1780 store.index_tree(
1781 &old_root,
1782 "owner",
1783 Some("playlist"),
1784 PRIORITY_OWN,
1785 Some("npub1owner/playlist"),
1786 )?;
1787
1788 assert!(store.is_pinned(&old_root)?);
1789 assert!(store.get_tree_meta(&old_root)?.is_some());
1790
1791 store.put_blob(new_bytes)?;
1792 store.pin(&new_root)?;
1793 store.index_tree(
1794 &new_root,
1795 "owner",
1796 Some("playlist"),
1797 PRIORITY_OWN,
1798 Some("npub1owner/playlist"),
1799 )?;
1800
1801 assert!(
1802 !store.is_pinned(&old_root)?,
1803 "superseded root should be unpinned when ref is replaced"
1804 );
1805 assert!(
1806 store.get_tree_meta(&old_root)?.is_none(),
1807 "superseded root metadata should be removed when ref is replaced"
1808 );
1809 assert!(store.is_pinned(&new_root)?);
1810 assert!(store.get_tree_meta(&new_root)?.is_some());
1811
1812 Ok(())
1813 }
1814
1815 #[cfg(feature = "s3")]
1816 #[test]
1817 fn async_store_s3_fallback_does_not_reenter_futures_executor() -> Result<()> {
1818 let temp = tempfile::TempDir::new()?;
1819 let local = Arc::new(LocalStore::new(
1820 temp.path().join("blobs"),
1821 &StorageBackend::Fs,
1822 )?);
1823
1824 let outcome = std::panic::catch_unwind(|| {
1825 sync_block_on(async {
1826 let aws_config = aws_config::from_env()
1827 .region(aws_sdk_s3::config::Region::new("auto"))
1828 .load()
1829 .await;
1830 let s3_client = aws_sdk_s3::Client::from_conf(
1831 aws_sdk_s3::config::Builder::from(&aws_config)
1832 .endpoint_url("http://127.0.0.1:9")
1833 .force_path_style(true)
1834 .build(),
1835 );
1836
1837 let router = StorageRouter {
1838 local,
1839 s3_client: Some(s3_client),
1840 s3_bucket: Some("test-bucket".to_string()),
1841 s3_prefix: String::new(),
1842 sync_tx: None,
1843 };
1844 let hash = [0u8; 32];
1845
1846 let _ = Store::has(&router, &hash).await;
1847 let _ = Store::get(&router, &hash).await;
1848 });
1849 });
1850
1851 assert!(
1852 outcome.is_ok(),
1853 "S3-backed async store methods should not panic inside futures::block_on"
1854 );
1855
1856 Ok(())
1857 }
1858}