1use anyhow::{Context, Result};
2use async_trait::async_trait;
3use futures::executor::block_on as sync_block_on;
4use futures::StreamExt;
5use hashtree_config::StorageBackend;
6use hashtree_core::store::{Store, StoreError};
7use hashtree_core::{
8 from_hex, sha256, to_hex, types::Hash, Cid, HashTree, HashTreeConfig, TreeNode,
9};
10use hashtree_fs::FsBlobStore;
11#[cfg(feature = "lmdb")]
12use hashtree_lmdb::LmdbBlobStore;
13use heed::types::*;
14use heed::{Database, EnvOpenOptions};
15use serde::{Deserialize, Serialize};
16#[cfg(feature = "s3")]
17use std::future::Future;
18use std::io::Write;
19use std::path::{Path, PathBuf};
20use std::sync::Arc;
21use std::time::{SystemTime, UNIX_EPOCH};
22
23mod upload;
24
25mod maintenance;
26mod retention;
27
28pub use maintenance::{compact_lmdb_environments_under, CompactResult, VerifyResult};
29pub use retention::{PinnedItem, StorageByPriority, StorageStats, TreeMeta};
30
31pub const PRIORITY_OTHER: u8 = 64;
33pub const PRIORITY_FOLLOWED: u8 = 128;
34pub const PRIORITY_OWN: u8 = 255;
35const LMDB_MAX_READERS: u32 = 1024;
36const LMDB_METADATA_MIN_MAP_SIZE_BYTES: u64 = 1024 * 1024;
37#[cfg(feature = "lmdb")]
38const LMDB_BLOB_MIN_MAP_SIZE_BYTES: u64 = 16 * 1024 * 1024;
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct CachedRoot {
43 pub hash: String,
45 pub key: Option<String>,
47 pub updated_at: u64,
49 pub visibility: String,
51}
52
53#[derive(Debug, Clone)]
55pub struct LocalStoreStats {
56 pub count: usize,
57 pub total_bytes: u64,
58}
59
60pub enum LocalStore {
62 Fs(FsBlobStore),
63 #[cfg(feature = "lmdb")]
64 Lmdb(LmdbBlobStore),
65}
66
67#[cfg(feature = "lmdb")]
68fn is_fs_blob_shard_dir(path: &Path) -> bool {
69 path.file_name()
70 .and_then(|name| name.to_str())
71 .map(|name| name.len() == 2 && name.as_bytes().iter().all(u8::is_ascii_hexdigit))
72 .unwrap_or(false)
73}
74
75fn lmdb_map_size_for_existing_env(path: &Path, minimum_bytes: u64) -> Result<usize> {
76 let existing_bytes = std::fs::metadata(path.join("data.mdb"))
77 .map(|metadata| metadata.len())
78 .unwrap_or(0);
79 let requested = align_lmdb_map_size(minimum_bytes.max(existing_bytes));
80 usize::try_from(requested).context("LMDB map size exceeds usize")
81}
82
83fn align_lmdb_map_size(bytes: u64) -> u64 {
84 let page_size = (page_size::get() as u64).max(4096);
85 let remainder = bytes % page_size;
86 if remainder == 0 {
87 bytes
88 } else {
89 bytes.saturating_add(page_size - remainder)
90 }
91}
92
93#[cfg(feature = "lmdb")]
94fn remove_stale_fs_blob_shards(path: &Path) -> Result<(), StoreError> {
95 let entries = std::fs::read_dir(path).map_err(StoreError::Io)?;
96 for entry in entries {
97 let entry = entry.map_err(StoreError::Io)?;
98 let entry_path = entry.path();
99 if entry_path.is_dir() && is_fs_blob_shard_dir(&entry_path) {
100 std::fs::remove_dir_all(&entry_path).map_err(StoreError::Io)?;
101 tracing::info!(
102 "Removed stale filesystem blob shard directory after LMDB cutover: {}",
103 entry_path.display()
104 );
105 }
106 }
107 Ok(())
108}
109
110impl LocalStore {
111 pub fn new<P: AsRef<Path>>(path: P, backend: &StorageBackend) -> Result<Self, StoreError> {
117 Self::new_unbounded(path, backend)
118 }
119
120 pub fn new_with_lmdb_map_size<P: AsRef<Path>>(
125 path: P,
126 backend: &StorageBackend,
127 _map_size_bytes: Option<u64>,
128 ) -> Result<Self, StoreError> {
129 match backend {
130 StorageBackend::Fs => Ok(LocalStore::Fs(FsBlobStore::new(path)?)),
131 #[cfg(feature = "lmdb")]
132 StorageBackend::Lmdb => match _map_size_bytes {
133 Some(map_size_bytes) => {
134 std::fs::create_dir_all(path.as_ref()).map_err(StoreError::Io)?;
135 remove_stale_fs_blob_shards(path.as_ref())?;
136 Ok(LocalStore::Lmdb(LmdbBlobStore::with_max_bytes(
137 path,
138 map_size_bytes,
139 )?))
140 }
141 None => {
142 std::fs::create_dir_all(path.as_ref()).map_err(StoreError::Io)?;
143 remove_stale_fs_blob_shards(path.as_ref())?;
144 Ok(LocalStore::Lmdb(LmdbBlobStore::new(path)?))
145 }
146 },
147 #[cfg(not(feature = "lmdb"))]
148 StorageBackend::Lmdb => {
149 tracing::warn!(
150 "LMDB backend requested but lmdb feature not enabled, using filesystem storage"
151 );
152 Ok(LocalStore::Fs(FsBlobStore::new(path)?))
153 }
154 }
155 }
156
157 pub fn new_unbounded<P: AsRef<Path>>(
159 path: P,
160 backend: &StorageBackend,
161 ) -> Result<Self, StoreError> {
162 Self::new_with_lmdb_map_size(path, backend, None)
163 }
164
165 pub fn backend(&self) -> StorageBackend {
166 match self {
167 LocalStore::Fs(_) => StorageBackend::Fs,
168 #[cfg(feature = "lmdb")]
169 LocalStore::Lmdb(_) => StorageBackend::Lmdb,
170 }
171 }
172
173 pub fn put_sync(&self, hash: Hash, data: &[u8]) -> Result<bool, StoreError> {
175 match self {
176 LocalStore::Fs(store) => store.put_sync(hash, data),
177 #[cfg(feature = "lmdb")]
178 LocalStore::Lmdb(store) => store.put_sync(hash, data),
179 }
180 }
181
182 pub fn put_many_sync(&self, items: &[(Hash, Vec<u8>)]) -> Result<usize, StoreError> {
184 match self {
185 LocalStore::Fs(store) => {
186 let mut inserted = 0usize;
187 for (hash, data) in items {
188 if store.put_sync(*hash, data.as_slice())? {
189 inserted += 1;
190 }
191 }
192 Ok(inserted)
193 }
194 #[cfg(feature = "lmdb")]
195 LocalStore::Lmdb(store) => store.put_many_sync(items),
196 }
197 }
198
199 pub fn get_sync(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
201 match self {
202 LocalStore::Fs(store) => store.get_sync(hash),
203 #[cfg(feature = "lmdb")]
204 LocalStore::Lmdb(store) => store.get_sync(hash),
205 }
206 }
207
208 pub fn exists(&self, hash: &Hash) -> Result<bool, StoreError> {
210 match self {
211 LocalStore::Fs(store) => Ok(store.exists(hash)),
212 #[cfg(feature = "lmdb")]
213 LocalStore::Lmdb(store) => store.exists(hash),
214 }
215 }
216
217 pub fn delete_sync(&self, hash: &Hash) -> Result<bool, StoreError> {
219 match self {
220 LocalStore::Fs(store) => store.delete_sync(hash),
221 #[cfg(feature = "lmdb")]
222 LocalStore::Lmdb(store) => store.delete_sync(hash),
223 }
224 }
225
226 pub fn stats(&self) -> Result<LocalStoreStats, StoreError> {
228 match self {
229 LocalStore::Fs(store) => {
230 let stats = store.stats()?;
231 Ok(LocalStoreStats {
232 count: stats.count,
233 total_bytes: stats.total_bytes,
234 })
235 }
236 #[cfg(feature = "lmdb")]
237 LocalStore::Lmdb(store) => {
238 let stats = store.stats()?;
239 Ok(LocalStoreStats {
240 count: stats.count,
241 total_bytes: stats.total_bytes,
242 })
243 }
244 }
245 }
246
247 pub fn list(&self) -> Result<Vec<Hash>, StoreError> {
249 match self {
250 LocalStore::Fs(store) => store.list(),
251 #[cfg(feature = "lmdb")]
252 LocalStore::Lmdb(store) => store.list(),
253 }
254 }
255}
256
257#[async_trait]
258impl Store for LocalStore {
259 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
260 self.put_sync(hash, &data)
261 }
262
263 async fn put_many(&self, items: Vec<(Hash, Vec<u8>)>) -> Result<usize, StoreError> {
264 self.put_many_sync(&items)
265 }
266
267 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
268 self.get_sync(hash)
269 }
270
271 async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
272 self.exists(hash)
273 }
274
275 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
276 self.delete_sync(hash)
277 }
278}
279
280#[cfg(feature = "s3")]
281use tokio::sync::mpsc;
282
283use crate::config::S3Config;
284
285#[cfg(feature = "s3")]
287enum S3SyncMessage {
288 Upload { hash: Hash, data: Vec<u8> },
289 Delete { hash: Hash },
290}
291
292pub struct StorageRouter {
297 local: Arc<LocalStore>,
299 #[cfg(feature = "s3")]
301 s3_client: Option<aws_sdk_s3::Client>,
302 #[cfg(feature = "s3")]
303 s3_bucket: Option<String>,
304 #[cfg(feature = "s3")]
305 s3_prefix: String,
306 #[cfg(feature = "s3")]
308 sync_tx: Option<mpsc::UnboundedSender<S3SyncMessage>>,
309}
310
311impl StorageRouter {
312 #[cfg(feature = "s3")]
313 fn run_s3_future_sync<F, T>(future: F) -> Result<T, StoreError>
314 where
315 F: Future<Output = T> + Send + 'static,
316 T: Send + 'static,
317 {
318 if tokio::runtime::Handle::try_current().is_ok() {
319 return std::thread::Builder::new()
320 .name("storage-s3-sync".to_string())
321 .spawn(move || {
322 let runtime = tokio::runtime::Builder::new_current_thread()
323 .enable_all()
324 .build()
325 .map_err(|err| {
326 StoreError::Other(format!("build storage s3 sync runtime: {err}"))
327 })?;
328 Ok(runtime.block_on(future))
329 })
330 .map_err(|err| StoreError::Other(format!("spawn S3 sync helper thread: {err}")))?
331 .join()
332 .map_err(|_| StoreError::Other("S3 sync helper thread panicked".to_string()))?;
333 }
334
335 let runtime = tokio::runtime::Builder::new_current_thread()
336 .enable_all()
337 .build()
338 .map_err(|err| StoreError::Other(format!("build storage s3 sync runtime: {err}")))?;
339 Ok(runtime.block_on(future))
340 }
341
342 pub fn new(local: Arc<LocalStore>) -> Self {
344 Self {
345 local,
346 #[cfg(feature = "s3")]
347 s3_client: None,
348 #[cfg(feature = "s3")]
349 s3_bucket: None,
350 #[cfg(feature = "s3")]
351 s3_prefix: String::new(),
352 #[cfg(feature = "s3")]
353 sync_tx: None,
354 }
355 }
356
357 #[cfg(feature = "s3")]
359 pub async fn with_s3(local: Arc<LocalStore>, config: &S3Config) -> Result<Self, anyhow::Error> {
360 use aws_sdk_s3::Client as S3Client;
361
362 let mut aws_config_loader = aws_config::from_env();
364 aws_config_loader =
365 aws_config_loader.region(aws_sdk_s3::config::Region::new(config.region.clone()));
366 let aws_config = aws_config_loader.load().await;
367
368 let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&aws_config);
370 s3_config_builder = s3_config_builder
371 .endpoint_url(&config.endpoint)
372 .force_path_style(true);
373
374 let s3_client = S3Client::from_conf(s3_config_builder.build());
375 let bucket = config.bucket.clone();
376 let prefix = config.prefix.clone().unwrap_or_default();
377
378 let (sync_tx, mut sync_rx) = mpsc::unbounded_channel::<S3SyncMessage>();
380
381 let sync_client = s3_client.clone();
383 let sync_bucket = bucket.clone();
384 let sync_prefix = prefix.clone();
385
386 tokio::spawn(async move {
387 use aws_sdk_s3::primitives::ByteStream;
388
389 tracing::info!("S3 background sync task started");
390
391 let semaphore = std::sync::Arc::new(tokio::sync::Semaphore::new(8));
393 let client = std::sync::Arc::new(sync_client);
394 let bucket = std::sync::Arc::new(sync_bucket);
395 let prefix = std::sync::Arc::new(sync_prefix);
396
397 while let Some(msg) = sync_rx.recv().await {
398 let client = client.clone();
399 let bucket = bucket.clone();
400 let prefix = prefix.clone();
401 let semaphore = semaphore.clone();
402
403 tokio::spawn(async move {
405 let _permit = semaphore.acquire().await;
407
408 match msg {
409 S3SyncMessage::Upload { hash, data } => {
410 let key = format!("{}{}.bin", prefix, to_hex(&hash));
411 tracing::debug!("S3 uploading {} ({} bytes)", &key, data.len());
412
413 let mut attempt = 1u8;
414 loop {
415 match client
416 .put_object()
417 .bucket(bucket.as_str())
418 .key(&key)
419 .body(ByteStream::from(data.clone()))
420 .send()
421 .await
422 {
423 Ok(_) => {
424 tracing::debug!("S3 upload succeeded: {}", &key);
425 break;
426 }
427 Err(e) if attempt < 3 => {
428 tracing::warn!(
429 "S3 upload retrying {}: attempt={} error={}",
430 &key,
431 attempt,
432 e
433 );
434 tokio::time::sleep(std::time::Duration::from_millis(
435 250 * u64::from(attempt),
436 ))
437 .await;
438 attempt += 1;
439 }
440 Err(e) => {
441 tracing::error!(
442 "S3 upload failed {} after {} attempts: {}",
443 &key,
444 attempt,
445 e
446 );
447 break;
448 }
449 }
450 }
451 }
452 S3SyncMessage::Delete { hash } => {
453 let key = format!("{}{}.bin", prefix, to_hex(&hash));
454 tracing::debug!("S3 deleting {}", &key);
455
456 let mut attempt = 1u8;
457 loop {
458 match client
459 .delete_object()
460 .bucket(bucket.as_str())
461 .key(&key)
462 .send()
463 .await
464 {
465 Ok(_) => break,
466 Err(e) if attempt < 3 => {
467 tracing::warn!(
468 "S3 delete retrying {}: attempt={} error={}",
469 &key,
470 attempt,
471 e
472 );
473 tokio::time::sleep(std::time::Duration::from_millis(
474 250 * u64::from(attempt),
475 ))
476 .await;
477 attempt += 1;
478 }
479 Err(e) => {
480 tracing::error!(
481 "S3 delete failed {} after {} attempts: {}",
482 &key,
483 attempt,
484 e
485 );
486 break;
487 }
488 }
489 }
490 }
491 }
492 });
493 }
494 });
495
496 tracing::info!(
497 "S3 storage initialized: bucket={}, prefix={}",
498 bucket,
499 prefix
500 );
501
502 Ok(Self {
503 local,
504 s3_client: Some(s3_client),
505 s3_bucket: Some(bucket),
506 s3_prefix: prefix,
507 sync_tx: Some(sync_tx),
508 })
509 }
510
511 pub fn put_sync(&self, hash: Hash, data: &[u8]) -> Result<bool, StoreError> {
513 let is_new = self.local.put_sync(hash, data)?;
515
516 #[cfg(feature = "s3")]
519 if is_new {
520 if let Some(ref tx) = self.sync_tx {
521 tracing::debug!(
522 "Queueing S3 upload for {} ({} bytes)",
523 crate::storage::to_hex(&hash)[..16].to_string(),
524 data.len(),
525 );
526 if let Err(e) = tx.send(S3SyncMessage::Upload {
527 hash,
528 data: data.to_vec(),
529 }) {
530 tracing::error!("Failed to queue S3 upload: {}", e);
531 }
532 }
533 }
534
535 Ok(is_new)
536 }
537
538 pub fn put_many_sync(&self, items: &[(Hash, Vec<u8>)]) -> Result<usize, StoreError> {
540 #[cfg(feature = "s3")]
541 let pending_uploads = if self.sync_tx.is_some() {
542 let mut pending = Vec::new();
543 for (hash, data) in items {
544 if !self.local.exists(hash)? {
545 pending.push((*hash, data.clone()));
546 }
547 }
548 pending
549 } else {
550 Vec::new()
551 };
552
553 let inserted = self.local.put_many_sync(items)?;
554
555 #[cfg(feature = "s3")]
556 if let Some(ref tx) = self.sync_tx {
557 for (hash, data) in pending_uploads {
558 if let Err(e) = tx.send(S3SyncMessage::Upload { hash, data }) {
559 tracing::error!("Failed to queue S3 upload: {}", e);
560 }
561 }
562 }
563
564 Ok(inserted)
565 }
566
567 pub fn get_sync(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
569 if let Some(data) = self.local.get_sync(hash)? {
571 return Ok(Some(data));
572 }
573
574 #[cfg(feature = "s3")]
576 if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
577 let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
578 let client = client.clone();
579 let bucket = bucket.clone();
580
581 match Self::run_s3_future_sync(async move {
582 client.get_object().bucket(bucket).key(key).send().await
583 }) {
584 Ok(Ok(output)) => {
585 match Self::run_s3_future_sync(async move { output.body.collect().await }) {
586 Ok(Ok(body)) => {
587 let data = body.into_bytes().to_vec();
588 let _ = self.local.put_sync(*hash, &data);
590 return Ok(Some(data));
591 }
592 Ok(Err(err)) => {
593 tracing::warn!("S3 body collect failed: {}", err);
594 }
595 Err(err) => {
596 tracing::warn!("S3 body collect runtime failed: {}", err);
597 }
598 }
599 }
600 Ok(Err(err)) => {
601 let service_err = err.into_service_error();
602 if !service_err.is_no_such_key() {
603 tracing::warn!("S3 get failed: {}", service_err);
604 }
605 }
606 Err(err) => {
607 tracing::warn!("S3 get runtime failed: {}", err);
608 }
609 }
610 }
611
612 Ok(None)
613 }
614
615 pub fn exists(&self, hash: &Hash) -> Result<bool, StoreError> {
617 if self.local.exists(hash)? {
619 return Ok(true);
620 }
621
622 #[cfg(feature = "s3")]
624 if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
625 let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
626 let client = client.clone();
627 let bucket = bucket.clone();
628
629 match Self::run_s3_future_sync(async move {
630 client.head_object().bucket(bucket).key(&key).send().await
631 }) {
632 Ok(Ok(_)) => return Ok(true),
633 Ok(Err(err)) => {
634 let service_err = err.into_service_error();
635 if !service_err.is_not_found() {
636 tracing::warn!("S3 head failed: {}", service_err);
637 }
638 }
639 Err(err) => {
640 tracing::warn!("S3 head runtime failed: {}", err);
641 }
642 }
643 }
644
645 Ok(false)
646 }
647
648 pub fn delete_sync(&self, hash: &Hash) -> Result<bool, StoreError> {
650 let deleted = self.local.delete_sync(hash)?;
651
652 #[cfg(feature = "s3")]
654 if let Some(ref tx) = self.sync_tx {
655 let _ = tx.send(S3SyncMessage::Delete { hash: *hash });
656 }
657
658 Ok(deleted)
659 }
660
661 pub fn delete_local_only(&self, hash: &Hash) -> Result<bool, StoreError> {
664 self.local.delete_sync(hash)
665 }
666
667 pub fn stats(&self) -> Result<LocalStoreStats, StoreError> {
669 self.local.stats()
670 }
671
672 pub fn list(&self) -> Result<Vec<Hash>, StoreError> {
674 self.local.list()
675 }
676
677 pub fn local_store(&self) -> Arc<LocalStore> {
679 Arc::clone(&self.local)
680 }
681}
682
683#[async_trait]
686impl Store for StorageRouter {
687 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
688 self.put_sync(hash, &data)
689 }
690
691 async fn put_many(&self, items: Vec<(Hash, Vec<u8>)>) -> Result<usize, StoreError> {
692 self.put_many_sync(&items)
693 }
694
695 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
696 self.get_sync(hash)
697 }
698
699 async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
700 self.exists(hash)
701 }
702
703 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
704 self.delete_sync(hash)
705 }
706}
707
708pub struct HashtreeStore {
709 base_path: PathBuf,
710 env: heed::Env,
711 pins: Database<Bytes, Unit>,
713 pinned_refs: Database<Str, Unit>,
715 tracked_authors: Database<Str, Unit>,
717 blob_owners: Database<Bytes, Unit>,
719 pubkey_blobs: Database<Bytes, Bytes>,
721 tree_meta: Database<Bytes, Bytes>,
723 blob_trees: Database<Bytes, Unit>,
725 tree_refs: Database<Str, Bytes>,
727 cached_roots: Database<Str, Bytes>,
729 router: Arc<StorageRouter>,
731 max_size_bytes: u64,
733 evict_orphans: bool,
735}
736
737impl HashtreeStore {
738 pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
740 let config = hashtree_config::Config::load_or_default();
741 let max_size_bytes = config
742 .storage
743 .max_size_gb
744 .saturating_mul(1024 * 1024 * 1024);
745 Self::with_options_and_backend(
746 path,
747 None,
748 max_size_bytes,
749 config.storage.evict_orphans,
750 &config.storage.backend,
751 )
752 }
753
754 pub fn new_with_backend<P: AsRef<Path>>(
756 path: P,
757 backend: hashtree_config::StorageBackend,
758 max_size_bytes: u64,
759 ) -> Result<Self> {
760 Self::with_options_and_backend(path, None, max_size_bytes, true, &backend)
761 }
762
763 pub fn with_s3<P: AsRef<Path>>(path: P, s3_config: Option<&S3Config>) -> Result<Self> {
765 let config = hashtree_config::Config::load_or_default();
766 let max_size_bytes = config
767 .storage
768 .max_size_gb
769 .saturating_mul(1024 * 1024 * 1024);
770 Self::with_options_and_backend(
771 path,
772 s3_config,
773 max_size_bytes,
774 config.storage.evict_orphans,
775 &config.storage.backend,
776 )
777 }
778
779 pub fn with_options<P: AsRef<Path>>(
785 path: P,
786 s3_config: Option<&S3Config>,
787 max_size_bytes: u64,
788 ) -> Result<Self> {
789 let config = hashtree_config::Config::load_or_default();
790 Self::with_options_and_backend(
791 path,
792 s3_config,
793 max_size_bytes,
794 config.storage.evict_orphans,
795 &config.storage.backend,
796 )
797 }
798
799 pub fn with_options_and_backend<P: AsRef<Path>>(
800 path: P,
801 s3_config: Option<&S3Config>,
802 max_size_bytes: u64,
803 evict_orphans: bool,
804 backend: &hashtree_config::StorageBackend,
805 ) -> Result<Self> {
806 let path = path.as_ref();
807 std::fs::create_dir_all(path)?;
808 let metadata_map_size =
809 lmdb_map_size_for_existing_env(path, LMDB_METADATA_MIN_MAP_SIZE_BYTES)?;
810
811 let env = unsafe {
812 EnvOpenOptions::new()
813 .map_size(metadata_map_size)
814 .max_dbs(10) .max_readers(LMDB_MAX_READERS)
816 .open(path)?
817 };
818 let _ = env.clear_stale_readers();
819
820 let mut wtxn = env.write_txn()?;
821 let pins = env.create_database(&mut wtxn, Some("pins"))?;
822 let pinned_refs = env.create_database(&mut wtxn, Some("pinned_refs"))?;
823 let tracked_authors = env.create_database(&mut wtxn, Some("tracked_authors"))?;
824 let blob_owners = env.create_database(&mut wtxn, Some("blob_owners"))?;
825 let pubkey_blobs = env.create_database(&mut wtxn, Some("pubkey_blobs"))?;
826 let tree_meta = env.create_database(&mut wtxn, Some("tree_meta"))?;
827 let blob_trees = env.create_database(&mut wtxn, Some("blob_trees"))?;
828 let tree_refs = env.create_database(&mut wtxn, Some("tree_refs"))?;
829 let cached_roots = env.create_database(&mut wtxn, Some("cached_roots"))?;
830 wtxn.commit()?;
831
832 let local_store = Arc::new(match backend {
836 hashtree_config::StorageBackend::Fs => LocalStore::Fs(
837 FsBlobStore::new(path.join("blobs"))
838 .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
839 ),
840 #[cfg(feature = "lmdb")]
841 hashtree_config::StorageBackend::Lmdb => {
842 std::fs::create_dir_all(path.join("blobs"))?;
843 remove_stale_fs_blob_shards(&path.join("blobs"))
844 .map_err(|e| anyhow::anyhow!("Failed to clean LMDB blob store path: {}", e))?;
845 let requested_map_size = max_size_bytes.max(LMDB_BLOB_MIN_MAP_SIZE_BYTES);
846 let map_size = usize::try_from(requested_map_size)
847 .context("LMDB blob map size exceeds usize")?;
848 LocalStore::Lmdb(
849 LmdbBlobStore::with_map_size(path.join("blobs"), map_size)
850 .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
851 )
852 }
853 #[cfg(not(feature = "lmdb"))]
854 hashtree_config::StorageBackend::Lmdb => {
855 tracing::warn!(
856 "LMDB backend requested but lmdb feature not enabled, using filesystem storage"
857 );
858 LocalStore::Fs(
859 FsBlobStore::new(path.join("blobs"))
860 .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
861 )
862 }
863 });
864
865 #[cfg(feature = "s3")]
867 let router = Arc::new(if let Some(s3_cfg) = s3_config {
868 tracing::info!(
869 "Initializing S3 storage backend: bucket={}, endpoint={}",
870 s3_cfg.bucket,
871 s3_cfg.endpoint
872 );
873
874 sync_block_on(async { StorageRouter::with_s3(local_store, s3_cfg).await })?
875 } else {
876 StorageRouter::new(local_store)
877 });
878
879 #[cfg(not(feature = "s3"))]
880 let router = Arc::new({
881 if s3_config.is_some() {
882 tracing::warn!(
883 "S3 config provided but S3 feature not enabled. Using local storage only."
884 );
885 }
886 StorageRouter::new(local_store)
887 });
888
889 Ok(Self {
890 base_path: path.to_path_buf(),
891 env,
892 pins,
893 pinned_refs,
894 tracked_authors,
895 blob_owners,
896 pubkey_blobs,
897 tree_meta,
898 blob_trees,
899 tree_refs,
900 cached_roots,
901 router,
902 max_size_bytes,
903 evict_orphans,
904 })
905 }
906
907 pub fn base_path(&self) -> &Path {
908 &self.base_path
909 }
910
911 pub fn router(&self) -> &StorageRouter {
913 &self.router
914 }
915
916 pub fn store_arc(&self) -> Arc<StorageRouter> {
919 Arc::clone(&self.router)
920 }
921
922 pub fn get_tree_node(&self, hash: &[u8; 32]) -> Result<Option<TreeNode>> {
924 let store = self.store_arc();
925 let tree = HashTree::new(HashTreeConfig::new(store).public());
926
927 sync_block_on(async {
928 tree.get_tree_node(hash)
929 .await
930 .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))
931 })
932 }
933
934 pub fn put_blob(&self, data: &[u8]) -> Result<String> {
936 let hash = sha256(data);
937 self.router
938 .put_sync(hash, data)
939 .map_err(|e| anyhow::anyhow!("Failed to store blob: {}", e))?;
940 Ok(to_hex(&hash))
941 }
942
943 pub fn put_owned_blob(&self, data: &[u8], pubkey: &[u8; 32]) -> Result<String> {
945 let hash = sha256(data);
946 if !self
947 .router
948 .exists(&hash)
949 .map_err(|e| anyhow::anyhow!("Failed to check blob: {}", e))?
950 {
951 self.make_room_for_durable_blob(data.len() as u64)?;
952 self.router
953 .put_sync(hash, data)
954 .map_err(|e| anyhow::anyhow!("Failed to store blob: {}", e))?;
955 }
956 self.set_blob_owner(&hash, pubkey)?;
957 Ok(to_hex(&hash))
958 }
959
960 pub fn put_cached_blob(&self, data: &[u8]) -> Result<String> {
966 let hash = sha256(data);
967 if self
968 .router
969 .exists(&hash)
970 .map_err(|e| anyhow::anyhow!("Failed to check cached blob: {}", e))?
971 {
972 return Ok(to_hex(&hash));
973 }
974
975 let incoming_bytes = data.len() as u64;
976 let _ = self.make_room_for_cached_blob(incoming_bytes);
977
978 let mut retried_after_cleanup = false;
979 loop {
980 match self.router.put_sync(hash, data) {
981 Ok(_) => return Ok(to_hex(&hash)),
982 Err(err) if !retried_after_cleanup && is_map_full_store_error(&err) => {
983 let freed = self.relieve_cached_blob_write_pressure(incoming_bytes)?;
984 if freed == 0 {
985 return Err(anyhow::anyhow!("Failed to store cached blob: {}", err));
986 }
987 retried_after_cleanup = true;
988 }
989 Err(err) => return Err(anyhow::anyhow!("Failed to store cached blob: {}", err)),
990 }
991 }
992 }
993
994 pub fn get_blob(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
996 self.router
997 .get_sync(hash)
998 .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))
999 }
1000
1001 pub fn blob_exists(&self, hash: &[u8; 32]) -> Result<bool> {
1003 self.router
1004 .exists(hash)
1005 .map_err(|e| anyhow::anyhow!("Failed to check blob: {}", e))
1006 }
1007
1008 fn blob_owner_key(sha256: &[u8; 32], pubkey: &[u8; 32]) -> [u8; 64] {
1014 let mut key = [0u8; 64];
1015 key[..32].copy_from_slice(sha256);
1016 key[32..].copy_from_slice(pubkey);
1017 key
1018 }
1019
1020 pub fn set_blob_owner(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<()> {
1023 let key = Self::blob_owner_key(sha256, pubkey);
1024 let mut wtxn = self.env.write_txn()?;
1025
1026 self.blob_owners.put(&mut wtxn, &key[..], &())?;
1028
1029 let sha256_hex = to_hex(sha256);
1031
1032 let mut blobs: Vec<BlobMetadata> = self
1034 .pubkey_blobs
1035 .get(&wtxn, pubkey)?
1036 .and_then(|b| serde_json::from_slice(b).ok())
1037 .unwrap_or_default();
1038
1039 if !blobs.iter().any(|b| b.sha256 == sha256_hex) {
1041 let now = SystemTime::now()
1042 .duration_since(UNIX_EPOCH)
1043 .unwrap()
1044 .as_secs();
1045
1046 let size = self
1048 .get_blob(sha256)?
1049 .map(|data| data.len() as u64)
1050 .unwrap_or(0);
1051
1052 blobs.push(BlobMetadata {
1053 sha256: sha256_hex,
1054 size,
1055 mime_type: "application/octet-stream".to_string(),
1056 uploaded: now,
1057 });
1058
1059 let blobs_json = serde_json::to_vec(&blobs)?;
1060 self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
1061 }
1062
1063 wtxn.commit()?;
1064 Ok(())
1065 }
1066
1067 pub fn is_blob_owner(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<bool> {
1069 let key = Self::blob_owner_key(sha256, pubkey);
1070 let rtxn = self.env.read_txn()?;
1071 Ok(self.blob_owners.get(&rtxn, &key[..])?.is_some())
1072 }
1073
1074 pub fn get_blob_owners(&self, sha256: &[u8; 32]) -> Result<Vec<[u8; 32]>> {
1076 let rtxn = self.env.read_txn()?;
1077
1078 let mut owners = Vec::new();
1079 for item in self.blob_owners.prefix_iter(&rtxn, &sha256[..])? {
1080 let (key, _) = item?;
1081 if key.len() == 64 {
1082 let mut pubkey = [0u8; 32];
1084 pubkey.copy_from_slice(&key[32..64]);
1085 owners.push(pubkey);
1086 }
1087 }
1088 Ok(owners)
1089 }
1090
1091 pub fn blob_has_owners(&self, sha256: &[u8; 32]) -> Result<bool> {
1093 let rtxn = self.env.read_txn()?;
1094
1095 for item in self.blob_owners.prefix_iter(&rtxn, &sha256[..])? {
1097 if item.is_ok() {
1098 return Ok(true);
1099 }
1100 }
1101 Ok(false)
1102 }
1103
1104 pub fn get_blob_owner(&self, sha256: &[u8; 32]) -> Result<Option<[u8; 32]>> {
1106 Ok(self.get_blob_owners(sha256)?.into_iter().next())
1107 }
1108
1109 pub fn delete_blossom_blob(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<bool> {
1113 let key = Self::blob_owner_key(sha256, pubkey);
1114 let mut wtxn = self.env.write_txn()?;
1115
1116 self.blob_owners.delete(&mut wtxn, &key[..])?;
1118
1119 let sha256_hex = to_hex(sha256);
1121
1122 if let Some(blobs_bytes) = self.pubkey_blobs.get(&wtxn, pubkey)? {
1124 if let Ok(mut blobs) = serde_json::from_slice::<Vec<BlobMetadata>>(blobs_bytes) {
1125 blobs.retain(|b| b.sha256 != sha256_hex);
1126 let blobs_json = serde_json::to_vec(&blobs)?;
1127 self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
1128 }
1129 }
1130
1131 let mut has_other_owners = false;
1133 for item in self.blob_owners.prefix_iter(&wtxn, &sha256[..])? {
1134 if item.is_ok() {
1135 has_other_owners = true;
1136 break;
1137 }
1138 }
1139
1140 if has_other_owners {
1141 wtxn.commit()?;
1142 tracing::debug!(
1143 "Removed {} from blob {} owners, other owners remain",
1144 &to_hex(pubkey)[..8],
1145 &sha256_hex[..8]
1146 );
1147 return Ok(false);
1148 }
1149
1150 tracing::info!(
1152 "All owners removed from blob {}, deleting",
1153 &sha256_hex[..8]
1154 );
1155
1156 let _ = self.router.delete_sync(sha256);
1158
1159 wtxn.commit()?;
1160 Ok(true)
1161 }
1162
1163 pub fn list_blobs_by_pubkey(
1165 &self,
1166 pubkey: &[u8; 32],
1167 ) -> Result<Vec<crate::server::blossom::BlobDescriptor>> {
1168 let rtxn = self.env.read_txn()?;
1169
1170 let blobs: Vec<BlobMetadata> = self
1171 .pubkey_blobs
1172 .get(&rtxn, pubkey)?
1173 .and_then(|b| serde_json::from_slice(b).ok())
1174 .unwrap_or_default();
1175
1176 Ok(blobs
1177 .into_iter()
1178 .map(|b| crate::server::blossom::BlobDescriptor {
1179 url: format!("/{}", b.sha256),
1180 sha256: b.sha256,
1181 size: b.size,
1182 mime_type: b.mime_type,
1183 uploaded: b.uploaded,
1184 })
1185 .collect())
1186 }
1187
1188 pub fn get_chunk(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
1190 self.router
1191 .get_sync(hash)
1192 .map_err(|e| anyhow::anyhow!("Failed to get chunk: {}", e))
1193 }
1194
1195 pub fn get_file(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
1198 let store = self.store_arc();
1199 let tree = HashTree::new(HashTreeConfig::new(store).public());
1200
1201 sync_block_on(async {
1202 tree.read_file(hash)
1203 .await
1204 .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
1205 })
1206 }
1207
1208 pub fn get_file_by_cid(&self, cid: &Cid) -> Result<Option<Vec<u8>>> {
1211 let store = self.store_arc();
1212 let tree = HashTree::new(HashTreeConfig::new(store).public());
1213
1214 sync_block_on(async {
1215 tree.get(cid, None)
1216 .await
1217 .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
1218 })
1219 }
1220
1221 fn ensure_cid_exists(&self, cid: &Cid) -> Result<()> {
1222 let exists = self
1223 .router
1224 .exists(&cid.hash)
1225 .map_err(|e| anyhow::anyhow!("Failed to check cid existence: {}", e))?;
1226 if !exists {
1227 anyhow::bail!("CID not found: {}", to_hex(&cid.hash));
1228 }
1229 Ok(())
1230 }
1231
1232 pub fn write_file_by_cid_to_writer<W: Write>(&self, cid: &Cid, writer: &mut W) -> Result<u64> {
1234 self.ensure_cid_exists(cid)?;
1235
1236 let store = self.store_arc();
1237 let tree = HashTree::new(HashTreeConfig::new(store).public());
1238 let mut total_bytes = 0u64;
1239 let mut streamed_any_chunk = false;
1240
1241 sync_block_on(async {
1242 let mut stream = tree.get_stream(cid);
1243 while let Some(chunk) = stream.next().await {
1244 streamed_any_chunk = true;
1245 let chunk =
1246 chunk.map_err(|e| anyhow::anyhow!("Failed to stream file chunk: {}", e))?;
1247 writer
1248 .write_all(&chunk)
1249 .map_err(|e| anyhow::anyhow!("Failed to write file chunk: {}", e))?;
1250 total_bytes += chunk.len() as u64;
1251 }
1252 Ok::<(), anyhow::Error>(())
1253 })?;
1254
1255 if !streamed_any_chunk {
1256 anyhow::bail!("CID not found: {}", to_hex(&cid.hash));
1257 }
1258
1259 writer
1260 .flush()
1261 .map_err(|e| anyhow::anyhow!("Failed to flush output: {}", e))?;
1262 Ok(total_bytes)
1263 }
1264
1265 pub fn write_file_by_cid<P: AsRef<Path>>(&self, cid: &Cid, output_path: P) -> Result<u64> {
1267 self.ensure_cid_exists(cid)?;
1268
1269 let output_path = output_path.as_ref();
1270 if let Some(parent) = output_path.parent() {
1271 if !parent.as_os_str().is_empty() {
1272 std::fs::create_dir_all(parent).with_context(|| {
1273 format!("Failed to create output directory {}", parent.display())
1274 })?;
1275 }
1276 }
1277
1278 let mut file = std::fs::File::create(output_path)
1279 .with_context(|| format!("Failed to create output file {}", output_path.display()))?;
1280 self.write_file_by_cid_to_writer(cid, &mut file)
1281 }
1282
1283 pub fn write_file<P: AsRef<Path>>(&self, hash: &[u8; 32], output_path: P) -> Result<u64> {
1285 self.write_file_by_cid(&Cid::public(*hash), output_path)
1286 }
1287
1288 pub fn resolve_path(&self, cid: &Cid, path: &str) -> Result<Option<Cid>> {
1290 let store = self.store_arc();
1291 let tree = HashTree::new(HashTreeConfig::new(store).public());
1292
1293 sync_block_on(async {
1294 tree.resolve_path(cid, path)
1295 .await
1296 .map_err(|e| anyhow::anyhow!("Failed to resolve path: {}", e))
1297 })
1298 }
1299
1300 pub fn get_file_chunk_metadata(&self, hash: &[u8; 32]) -> Result<Option<FileChunkMetadata>> {
1302 let store = self.store_arc();
1303 let tree = HashTree::new(HashTreeConfig::new(store.clone()).public());
1304
1305 sync_block_on(async {
1306 let exists = store
1309 .has(hash)
1310 .await
1311 .map_err(|e| anyhow::anyhow!("Failed to check existence: {}", e))?;
1312
1313 if !exists {
1314 return Ok(None);
1315 }
1316
1317 let total_size = tree
1319 .get_size(hash)
1320 .await
1321 .map_err(|e| anyhow::anyhow!("Failed to get size: {}", e))?;
1322
1323 let is_tree_node = tree
1325 .is_tree(hash)
1326 .await
1327 .map_err(|e| anyhow::anyhow!("Failed to check tree: {}", e))?;
1328
1329 if !is_tree_node {
1330 return Ok(Some(FileChunkMetadata {
1332 total_size,
1333 chunk_hashes: vec![],
1334 chunk_sizes: vec![],
1335 is_chunked: false,
1336 }));
1337 }
1338
1339 let node = match tree
1341 .get_tree_node(hash)
1342 .await
1343 .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
1344 {
1345 Some(n) => n,
1346 None => return Ok(None),
1347 };
1348
1349 let is_directory = tree
1351 .is_directory(hash)
1352 .await
1353 .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1354
1355 if is_directory {
1356 return Ok(None); }
1358
1359 let chunk_hashes: Vec<Hash> = node.links.iter().map(|l| l.hash).collect();
1361 let chunk_sizes: Vec<u64> = node.links.iter().map(|l| l.size).collect();
1362
1363 Ok(Some(FileChunkMetadata {
1364 total_size,
1365 chunk_hashes,
1366 chunk_sizes,
1367 is_chunked: !node.links.is_empty(),
1368 }))
1369 })
1370 }
1371
1372 pub fn get_file_range(
1374 &self,
1375 hash: &[u8; 32],
1376 start: u64,
1377 end: Option<u64>,
1378 ) -> Result<Option<(Vec<u8>, u64)>> {
1379 let metadata = match self.get_file_chunk_metadata(hash)? {
1380 Some(m) => m,
1381 None => return Ok(None),
1382 };
1383
1384 if metadata.total_size == 0 {
1385 return Ok(Some((Vec::new(), 0)));
1386 }
1387
1388 if start >= metadata.total_size {
1389 return Ok(None);
1390 }
1391
1392 let end = end
1393 .unwrap_or(metadata.total_size - 1)
1394 .min(metadata.total_size - 1);
1395
1396 if !metadata.is_chunked {
1398 let content = self.get_file(hash)?.unwrap_or_default();
1399 let range_content = if start < content.len() as u64 {
1400 content[start as usize..=(end as usize).min(content.len() - 1)].to_vec()
1401 } else {
1402 Vec::new()
1403 };
1404 return Ok(Some((range_content, metadata.total_size)));
1405 }
1406
1407 let mut result = Vec::new();
1409 let mut current_offset = 0u64;
1410
1411 for (i, chunk_hash) in metadata.chunk_hashes.iter().enumerate() {
1412 let chunk_size = metadata.chunk_sizes[i];
1413 let chunk_end = current_offset + chunk_size - 1;
1414
1415 if chunk_end >= start && current_offset <= end {
1417 let chunk_content = match self.get_chunk(chunk_hash)? {
1418 Some(content) => content,
1419 None => {
1420 return Err(anyhow::anyhow!("Chunk {} not found", to_hex(chunk_hash)));
1421 }
1422 };
1423
1424 let chunk_read_start = if current_offset >= start {
1425 0
1426 } else {
1427 (start - current_offset) as usize
1428 };
1429
1430 let chunk_read_end = if chunk_end <= end {
1431 chunk_size as usize - 1
1432 } else {
1433 (end - current_offset) as usize
1434 };
1435
1436 result.extend_from_slice(&chunk_content[chunk_read_start..=chunk_read_end]);
1437 }
1438
1439 current_offset += chunk_size;
1440
1441 if current_offset > end {
1442 break;
1443 }
1444 }
1445
1446 Ok(Some((result, metadata.total_size)))
1447 }
1448
1449 pub fn stream_file_range_chunks_owned(
1451 self: Arc<Self>,
1452 hash: &[u8; 32],
1453 start: u64,
1454 end: u64,
1455 ) -> Result<Option<FileRangeChunksOwned>> {
1456 let metadata = match self.get_file_chunk_metadata(hash)? {
1457 Some(m) => m,
1458 None => return Ok(None),
1459 };
1460
1461 if metadata.total_size == 0 || start >= metadata.total_size {
1462 return Ok(None);
1463 }
1464
1465 let end = end.min(metadata.total_size - 1);
1466
1467 Ok(Some(FileRangeChunksOwned {
1468 store: self,
1469 metadata,
1470 start,
1471 end,
1472 current_chunk_idx: 0,
1473 current_offset: 0,
1474 }))
1475 }
1476
1477 pub fn get_directory_listing(&self, hash: &[u8; 32]) -> Result<Option<DirectoryListing>> {
1479 let store = self.store_arc();
1480 let tree = HashTree::new(HashTreeConfig::new(store).public());
1481
1482 sync_block_on(async {
1483 let is_dir = tree
1485 .is_directory(hash)
1486 .await
1487 .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1488
1489 if !is_dir {
1490 return Ok(None);
1491 }
1492
1493 let cid = hashtree_core::Cid::public(*hash);
1495 let tree_entries = tree
1496 .list_directory(&cid)
1497 .await
1498 .map_err(|e| anyhow::anyhow!("Failed to list directory: {}", e))?;
1499
1500 let entries: Vec<DirEntry> = tree_entries
1501 .into_iter()
1502 .map(|e| DirEntry {
1503 name: e.name,
1504 cid: to_hex(&e.hash),
1505 is_directory: e.link_type.is_tree(),
1506 size: e.size,
1507 })
1508 .collect();
1509
1510 Ok(Some(DirectoryListing {
1511 dir_name: String::new(),
1512 entries,
1513 }))
1514 })
1515 }
1516
1517 pub fn get_directory_listing_by_cid(&self, cid: &Cid) -> Result<Option<DirectoryListing>> {
1519 let store = self.store_arc();
1520 let tree = HashTree::new(HashTreeConfig::new(store).public());
1521 let cid = cid.clone();
1522
1523 sync_block_on(async {
1524 let is_dir = tree
1525 .is_dir(&cid)
1526 .await
1527 .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1528
1529 if !is_dir {
1530 return Ok(None);
1531 }
1532
1533 let tree_entries = tree
1534 .list_directory(&cid)
1535 .await
1536 .map_err(|e| anyhow::anyhow!("Failed to list directory: {}", e))?;
1537
1538 let entries: Vec<DirEntry> = tree_entries
1539 .into_iter()
1540 .map(|e| DirEntry {
1541 name: e.name,
1542 cid: Cid {
1543 hash: e.hash,
1544 key: e.key,
1545 }
1546 .to_string(),
1547 is_directory: e.link_type.is_tree(),
1548 size: e.size,
1549 })
1550 .collect();
1551
1552 Ok(Some(DirectoryListing {
1553 dir_name: String::new(),
1554 entries,
1555 }))
1556 })
1557 }
1558
1559 pub fn add_pinned_ref(&self, key: &str) -> Result<()> {
1563 let mut wtxn = self.env.write_txn()?;
1564 self.pinned_refs.put(&mut wtxn, key, &())?;
1565 wtxn.commit()?;
1566 Ok(())
1567 }
1568
1569 pub fn remove_pinned_ref(&self, key: &str) -> Result<bool> {
1571 let mut wtxn = self.env.write_txn()?;
1572 let removed = self.pinned_refs.delete(&mut wtxn, key)?;
1573 wtxn.commit()?;
1574 Ok(removed)
1575 }
1576
1577 pub fn list_pinned_refs(&self) -> Result<Vec<String>> {
1579 let rtxn = self.env.read_txn()?;
1580 let mut refs = Vec::new();
1581
1582 for item in self.pinned_refs.iter(&rtxn)? {
1583 let (key, _) = item?;
1584 refs.push(key.to_string());
1585 }
1586
1587 refs.sort();
1588 Ok(refs)
1589 }
1590
1591 pub fn add_tracked_author(&self, npub: &str) -> Result<bool> {
1593 let mut wtxn = self.env.write_txn()?;
1594 let inserted = self.tracked_authors.get(&wtxn, npub)?.is_none();
1595 self.tracked_authors.put(&mut wtxn, npub, &())?;
1596 wtxn.commit()?;
1597 Ok(inserted)
1598 }
1599
1600 pub fn remove_tracked_author(&self, npub: &str) -> Result<bool> {
1602 let mut wtxn = self.env.write_txn()?;
1603 let removed = self.tracked_authors.delete(&mut wtxn, npub)?;
1604 wtxn.commit()?;
1605 Ok(removed)
1606 }
1607
1608 pub fn list_tracked_authors(&self) -> Result<Vec<String>> {
1610 let rtxn = self.env.read_txn()?;
1611 let mut authors = Vec::new();
1612
1613 for item in self.tracked_authors.iter(&rtxn)? {
1614 let (npub, _) = item?;
1615 authors.push(npub.to_string());
1616 }
1617
1618 authors.sort();
1619 Ok(authors)
1620 }
1621
1622 pub fn get_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<Option<CachedRoot>> {
1624 let key = format!("{}/{}", pubkey_hex, tree_name);
1625 let rtxn = self.env.read_txn()?;
1626 if let Some(bytes) = self.cached_roots.get(&rtxn, &key)? {
1627 let root: CachedRoot = rmp_serde::from_slice(bytes)
1628 .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
1629 Ok(Some(root))
1630 } else {
1631 Ok(None)
1632 }
1633 }
1634
1635 pub fn set_cached_root(
1637 &self,
1638 pubkey_hex: &str,
1639 tree_name: &str,
1640 hash: &str,
1641 key: Option<&str>,
1642 visibility: &str,
1643 updated_at: u64,
1644 ) -> Result<()> {
1645 let db_key = format!("{}/{}", pubkey_hex, tree_name);
1646 let root = CachedRoot {
1647 hash: hash.to_string(),
1648 key: key.map(|k| k.to_string()),
1649 updated_at,
1650 visibility: visibility.to_string(),
1651 };
1652 let bytes = rmp_serde::to_vec(&root)
1653 .map_err(|e| anyhow::anyhow!("Failed to serialize CachedRoot: {}", e))?;
1654 let mut wtxn = self.env.write_txn()?;
1655 self.cached_roots.put(&mut wtxn, &db_key, &bytes)?;
1656 wtxn.commit()?;
1657 Ok(())
1658 }
1659
1660 pub fn list_cached_roots(&self, pubkey_hex: &str) -> Result<Vec<(String, CachedRoot)>> {
1662 let prefix = format!("{}/", pubkey_hex);
1663 let rtxn = self.env.read_txn()?;
1664 let mut results = Vec::new();
1665
1666 for item in self.cached_roots.iter(&rtxn)? {
1667 let (key, bytes) = item?;
1668 if key.starts_with(&prefix) {
1669 let tree_name = key.strip_prefix(&prefix).unwrap_or(key);
1670 let root: CachedRoot = rmp_serde::from_slice(bytes)
1671 .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
1672 results.push((tree_name.to_string(), root));
1673 }
1674 }
1675
1676 Ok(results)
1677 }
1678
1679 pub fn delete_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<bool> {
1681 let key = format!("{}/{}", pubkey_hex, tree_name);
1682 let mut wtxn = self.env.write_txn()?;
1683 let deleted = self.cached_roots.delete(&mut wtxn, &key)?;
1684 wtxn.commit()?;
1685 Ok(deleted)
1686 }
1687}
1688
1689fn is_map_full_store_error(err: &StoreError) -> bool {
1690 let message = err.to_string();
1691 message.contains("MDB_MAP_FULL") || message.contains("MapFull")
1692}
1693
1694#[derive(Debug, Clone)]
1695pub struct FileChunkMetadata {
1696 pub total_size: u64,
1697 pub chunk_hashes: Vec<Hash>,
1698 pub chunk_sizes: Vec<u64>,
1699 pub is_chunked: bool,
1700}
1701
1702pub struct FileRangeChunksOwned {
1704 store: Arc<HashtreeStore>,
1705 metadata: FileChunkMetadata,
1706 start: u64,
1707 end: u64,
1708 current_chunk_idx: usize,
1709 current_offset: u64,
1710}
1711
1712impl Iterator for FileRangeChunksOwned {
1713 type Item = Result<Vec<u8>>;
1714
1715 fn next(&mut self) -> Option<Self::Item> {
1716 if !self.metadata.is_chunked || self.current_chunk_idx >= self.metadata.chunk_hashes.len() {
1717 return None;
1718 }
1719
1720 if self.current_offset > self.end {
1721 return None;
1722 }
1723
1724 let chunk_hash = &self.metadata.chunk_hashes[self.current_chunk_idx];
1725 let chunk_size = self.metadata.chunk_sizes[self.current_chunk_idx];
1726 let chunk_end = self.current_offset + chunk_size - 1;
1727
1728 self.current_chunk_idx += 1;
1729
1730 if chunk_end < self.start || self.current_offset > self.end {
1731 self.current_offset += chunk_size;
1732 return self.next();
1733 }
1734
1735 let chunk_content = match self.store.get_chunk(chunk_hash) {
1736 Ok(Some(content)) => content,
1737 Ok(None) => {
1738 return Some(Err(anyhow::anyhow!(
1739 "Chunk {} not found",
1740 to_hex(chunk_hash)
1741 )));
1742 }
1743 Err(e) => {
1744 return Some(Err(e));
1745 }
1746 };
1747
1748 let chunk_read_start = if self.current_offset >= self.start {
1749 0
1750 } else {
1751 (self.start - self.current_offset) as usize
1752 };
1753
1754 let chunk_read_end = if chunk_end <= self.end {
1755 chunk_size as usize - 1
1756 } else {
1757 (self.end - self.current_offset) as usize
1758 };
1759
1760 let result = chunk_content[chunk_read_start..=chunk_read_end].to_vec();
1761 self.current_offset += chunk_size;
1762
1763 Some(Ok(result))
1764 }
1765}
1766
1767#[derive(Debug)]
1768pub struct GcStats {
1769 pub deleted_dags: usize,
1770 pub freed_bytes: u64,
1771}
1772
1773#[derive(Debug, Clone)]
1774pub struct DirEntry {
1775 pub name: String,
1776 pub cid: String,
1777 pub is_directory: bool,
1778 pub size: u64,
1779}
1780
1781#[derive(Debug, Clone)]
1782pub struct DirectoryListing {
1783 pub dir_name: String,
1784 pub entries: Vec<DirEntry>,
1785}
1786
1787#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
1789pub struct BlobMetadata {
1790 pub sha256: String,
1791 pub size: u64,
1792 pub mime_type: String,
1793 pub uploaded: u64,
1794}
1795
1796impl crate::webrtc::ContentStore for HashtreeStore {
1798 fn get(&self, hash_hex: &str) -> Result<Option<Vec<u8>>> {
1799 let hash = from_hex(hash_hex).map_err(|e| anyhow::anyhow!("Invalid hash: {}", e))?;
1800 self.get_chunk(&hash)
1801 }
1802}
1803
1804#[cfg(test)]
1805mod tests {
1806 use super::*;
1807 #[cfg(feature = "lmdb")]
1808 use tempfile::TempDir;
1809
1810 #[cfg(feature = "lmdb")]
1811 #[test]
1812 fn hashtree_store_expands_blob_lmdb_map_size_to_storage_budget() -> Result<()> {
1813 let temp = TempDir::new()?;
1814 let requested = LMDB_BLOB_MIN_MAP_SIZE_BYTES + 64 * 1024 * 1024;
1815 let store = HashtreeStore::with_options_and_backend(
1816 temp.path(),
1817 None,
1818 requested,
1819 true,
1820 &StorageBackend::Lmdb,
1821 )?;
1822
1823 let map_size = match store.router.local.as_ref() {
1824 LocalStore::Lmdb(local) => local.map_size_bytes() as u64,
1825 LocalStore::Fs(_) => panic!("expected LMDB local store"),
1826 };
1827
1828 assert!(
1829 map_size >= requested,
1830 "expected blob LMDB map to grow to at least {requested} bytes, got {map_size}"
1831 );
1832
1833 drop(store);
1834 Ok(())
1835 }
1836
1837 #[cfg(feature = "lmdb")]
1838 #[test]
1839 fn local_store_can_override_lmdb_map_size() -> Result<()> {
1840 let temp = TempDir::new()?;
1841 let requested = 512 * 1024 * 1024u64;
1842 let store = LocalStore::new_with_lmdb_map_size(
1843 temp.path().join("lmdb-blobs"),
1844 &StorageBackend::Lmdb,
1845 Some(requested),
1846 )?;
1847
1848 let map_size = match store {
1849 LocalStore::Lmdb(local) => local.map_size_bytes() as u64,
1850 LocalStore::Fs(_) => panic!("expected LMDB local store"),
1851 };
1852
1853 assert!(
1854 map_size >= requested,
1855 "expected LMDB map to grow to at least {requested} bytes, got {map_size}"
1856 );
1857
1858 Ok(())
1859 }
1860
1861 #[cfg(feature = "lmdb")]
1862 #[test]
1863 fn lmdb_local_store_removes_stale_fs_blob_shard_dirs() -> Result<()> {
1864 let temp = TempDir::new()?;
1865 let path = temp.path().join("lmdb-blobs");
1866 std::fs::create_dir_all(path.join("aa"))?;
1867 std::fs::create_dir_all(path.join("b2"))?;
1868 std::fs::create_dir_all(path.join("keep-me"))?;
1869 std::fs::write(path.join("aa").join("blob.bin"), b"old fs shard")?;
1870 std::fs::write(path.join("b2").join("blob.bin"), b"old fs shard")?;
1871 std::fs::write(path.join("keep-me").join("note.txt"), b"keep")?;
1872
1873 let _store = LocalStore::new_with_lmdb_map_size(
1874 &path,
1875 &StorageBackend::Lmdb,
1876 Some(128 * 1024 * 1024),
1877 )?;
1878
1879 assert!(!path.join("aa").exists());
1880 assert!(!path.join("b2").exists());
1881 assert!(path.join("keep-me").exists());
1882 assert!(path.join("data.mdb").exists());
1883 assert!(path.join("lock.mdb").exists());
1884
1885 Ok(())
1886 }
1887
1888 #[cfg(feature = "lmdb")]
1889 #[test]
1890 fn replacing_tree_ref_unpins_and_unindexes_superseded_root() -> Result<()> {
1891 let temp = TempDir::new()?;
1892 let store = HashtreeStore::with_options_and_backend(
1893 temp.path(),
1894 None,
1895 LMDB_BLOB_MIN_MAP_SIZE_BYTES,
1896 true,
1897 &StorageBackend::Lmdb,
1898 )?;
1899
1900 let old_bytes = b"old published root";
1901 let new_bytes = b"new published root";
1902 let old_root = sha256(old_bytes);
1903 let new_root = sha256(new_bytes);
1904
1905 store.put_blob(old_bytes)?;
1906 store.pin(&old_root)?;
1907 store.index_tree(
1908 &old_root,
1909 "owner",
1910 Some("playlist"),
1911 PRIORITY_OWN,
1912 Some("npub1owner/playlist"),
1913 )?;
1914
1915 assert!(store.is_pinned(&old_root)?);
1916 assert!(store.get_tree_meta(&old_root)?.is_some());
1917
1918 store.put_blob(new_bytes)?;
1919 store.pin(&new_root)?;
1920 store.index_tree(
1921 &new_root,
1922 "owner",
1923 Some("playlist"),
1924 PRIORITY_OWN,
1925 Some("npub1owner/playlist"),
1926 )?;
1927
1928 assert!(
1929 !store.is_pinned(&old_root)?,
1930 "superseded root should be unpinned when ref is replaced"
1931 );
1932 assert!(
1933 store.get_tree_meta(&old_root)?.is_none(),
1934 "superseded root metadata should be removed when ref is replaced"
1935 );
1936 assert!(store.is_pinned(&new_root)?);
1937 assert!(store.get_tree_meta(&new_root)?.is_some());
1938
1939 Ok(())
1940 }
1941
1942 #[test]
1943 fn tracked_authors_round_trip_sorted_and_deduplicated() -> Result<()> {
1944 let temp = TempDir::new()?;
1945 let store = HashtreeStore::with_options(temp.path(), None, 1024 * 1024)?;
1946
1947 store
1948 .add_tracked_author("npub1zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzs9d3kk")?;
1949 store
1950 .add_tracked_author("npub1aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaqf5slm")?;
1951 store
1952 .add_tracked_author("npub1zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzs9d3kk")?;
1953
1954 assert_eq!(
1955 store.list_tracked_authors()?,
1956 vec![
1957 "npub1aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaqf5slm".to_string(),
1958 "npub1zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzs9d3kk".to_string(),
1959 ]
1960 );
1961 assert!(store.remove_tracked_author(
1962 "npub1aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaqf5slm"
1963 )?);
1964 assert!(!store.remove_tracked_author(
1965 "npub1bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbpqqqqq"
1966 )?);
1967 assert_eq!(
1968 store.list_tracked_authors()?,
1969 vec!["npub1zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzs9d3kk".to_string()]
1970 );
1971
1972 Ok(())
1973 }
1974
1975 #[cfg(feature = "s3")]
1976 #[test]
1977 fn async_store_s3_fallback_does_not_reenter_futures_executor() -> Result<()> {
1978 let temp = tempfile::TempDir::new()?;
1979 let local = Arc::new(LocalStore::new(
1980 temp.path().join("blobs"),
1981 &StorageBackend::Fs,
1982 )?);
1983
1984 let outcome = std::panic::catch_unwind(|| {
1985 sync_block_on(async {
1986 let aws_config = aws_config::from_env()
1987 .region(aws_sdk_s3::config::Region::new("auto"))
1988 .load()
1989 .await;
1990 let s3_client = aws_sdk_s3::Client::from_conf(
1991 aws_sdk_s3::config::Builder::from(&aws_config)
1992 .endpoint_url("http://127.0.0.1:9")
1993 .force_path_style(true)
1994 .build(),
1995 );
1996
1997 let router = StorageRouter {
1998 local,
1999 s3_client: Some(s3_client),
2000 s3_bucket: Some("test-bucket".to_string()),
2001 s3_prefix: String::new(),
2002 sync_tx: None,
2003 };
2004 let hash = [0u8; 32];
2005
2006 let _ = Store::has(&router, &hash).await;
2007 let _ = Store::get(&router, &hash).await;
2008 });
2009 });
2010
2011 assert!(
2012 outcome.is_ok(),
2013 "S3-backed async store methods should not panic inside futures::block_on"
2014 );
2015
2016 Ok(())
2017 }
2018}