1use anyhow::{Context, Result};
2use async_trait::async_trait;
3use futures::executor::block_on as sync_block_on;
4use futures::StreamExt;
5use hashtree_config::StorageBackend;
6use hashtree_core::store::{Store, StoreError};
7use hashtree_core::{
8 from_hex, sha256, to_hex, types::Hash, Cid, HashTree, HashTreeConfig, TreeNode,
9};
10use hashtree_fs::FsBlobStore;
11#[cfg(feature = "lmdb")]
12use hashtree_lmdb::LmdbBlobStore;
13use heed::types::*;
14use heed::{Database, EnvOpenOptions};
15use serde::{Deserialize, Serialize};
16#[cfg(feature = "s3")]
17use std::future::Future;
18use std::io::Write;
19use std::path::{Path, PathBuf};
20use std::sync::Arc;
21use std::time::{SystemTime, UNIX_EPOCH};
22
23mod upload;
24
25mod maintenance;
26mod retention;
27
28pub use maintenance::{compact_lmdb_environments_under, CompactResult, VerifyResult};
29pub use retention::{PinnedItem, StorageByPriority, StorageStats, TreeMeta};
30
31pub const PRIORITY_OTHER: u8 = 64;
33pub const PRIORITY_FOLLOWED: u8 = 128;
34pub const PRIORITY_OWN: u8 = 255;
35const LMDB_MAX_READERS: u32 = 1024;
36#[cfg(feature = "lmdb")]
37const LMDB_BLOB_MIN_MAP_SIZE_BYTES: u64 = 10 * 1024 * 1024 * 1024;
38
39#[derive(Debug, Clone, Serialize, Deserialize)]
41pub struct CachedRoot {
42 pub hash: String,
44 pub key: Option<String>,
46 pub updated_at: u64,
48 pub visibility: String,
50}
51
52#[derive(Debug, Clone)]
54pub struct LocalStoreStats {
55 pub count: usize,
56 pub total_bytes: u64,
57}
58
59pub enum LocalStore {
61 Fs(FsBlobStore),
62 #[cfg(feature = "lmdb")]
63 Lmdb(LmdbBlobStore),
64}
65
66#[cfg(feature = "lmdb")]
67fn is_fs_blob_shard_dir(path: &Path) -> bool {
68 path.file_name()
69 .and_then(|name| name.to_str())
70 .map(|name| name.len() == 2 && name.as_bytes().iter().all(u8::is_ascii_hexdigit))
71 .unwrap_or(false)
72}
73
74#[cfg(feature = "lmdb")]
75fn remove_stale_fs_blob_shards(path: &Path) -> Result<(), StoreError> {
76 let entries = std::fs::read_dir(path).map_err(StoreError::Io)?;
77 for entry in entries {
78 let entry = entry.map_err(StoreError::Io)?;
79 let entry_path = entry.path();
80 if entry_path.is_dir() && is_fs_blob_shard_dir(&entry_path) {
81 std::fs::remove_dir_all(&entry_path).map_err(StoreError::Io)?;
82 tracing::info!(
83 "Removed stale filesystem blob shard directory after LMDB cutover: {}",
84 entry_path.display()
85 );
86 }
87 }
88 Ok(())
89}
90
91impl LocalStore {
92 pub fn new<P: AsRef<Path>>(path: P, backend: &StorageBackend) -> Result<Self, StoreError> {
98 Self::new_unbounded(path, backend)
99 }
100
101 pub fn new_with_lmdb_map_size<P: AsRef<Path>>(
106 path: P,
107 backend: &StorageBackend,
108 _map_size_bytes: Option<u64>,
109 ) -> Result<Self, StoreError> {
110 match backend {
111 StorageBackend::Fs => Ok(LocalStore::Fs(FsBlobStore::new(path)?)),
112 #[cfg(feature = "lmdb")]
113 StorageBackend::Lmdb => match _map_size_bytes {
114 Some(map_size_bytes) => {
115 std::fs::create_dir_all(path.as_ref()).map_err(StoreError::Io)?;
116 remove_stale_fs_blob_shards(path.as_ref())?;
117 Ok(LocalStore::Lmdb(LmdbBlobStore::with_max_bytes(
118 path,
119 map_size_bytes,
120 )?))
121 }
122 None => {
123 std::fs::create_dir_all(path.as_ref()).map_err(StoreError::Io)?;
124 remove_stale_fs_blob_shards(path.as_ref())?;
125 Ok(LocalStore::Lmdb(LmdbBlobStore::new(path)?))
126 }
127 },
128 #[cfg(not(feature = "lmdb"))]
129 StorageBackend::Lmdb => {
130 tracing::warn!(
131 "LMDB backend requested but lmdb feature not enabled, using filesystem storage"
132 );
133 Ok(LocalStore::Fs(FsBlobStore::new(path)?))
134 }
135 }
136 }
137
138 pub fn new_unbounded<P: AsRef<Path>>(
140 path: P,
141 backend: &StorageBackend,
142 ) -> Result<Self, StoreError> {
143 Self::new_with_lmdb_map_size(path, backend, None)
144 }
145
146 pub fn backend(&self) -> StorageBackend {
147 match self {
148 LocalStore::Fs(_) => StorageBackend::Fs,
149 #[cfg(feature = "lmdb")]
150 LocalStore::Lmdb(_) => StorageBackend::Lmdb,
151 }
152 }
153
154 pub fn put_sync(&self, hash: Hash, data: &[u8]) -> Result<bool, StoreError> {
156 match self {
157 LocalStore::Fs(store) => store.put_sync(hash, data),
158 #[cfg(feature = "lmdb")]
159 LocalStore::Lmdb(store) => store.put_sync(hash, data),
160 }
161 }
162
163 pub fn put_many_sync(&self, items: &[(Hash, Vec<u8>)]) -> Result<usize, StoreError> {
165 match self {
166 LocalStore::Fs(store) => {
167 let mut inserted = 0usize;
168 for (hash, data) in items {
169 if store.put_sync(*hash, data.as_slice())? {
170 inserted += 1;
171 }
172 }
173 Ok(inserted)
174 }
175 #[cfg(feature = "lmdb")]
176 LocalStore::Lmdb(store) => store.put_many_sync(items),
177 }
178 }
179
180 pub fn get_sync(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
182 match self {
183 LocalStore::Fs(store) => store.get_sync(hash),
184 #[cfg(feature = "lmdb")]
185 LocalStore::Lmdb(store) => store.get_sync(hash),
186 }
187 }
188
189 pub fn exists(&self, hash: &Hash) -> Result<bool, StoreError> {
191 match self {
192 LocalStore::Fs(store) => Ok(store.exists(hash)),
193 #[cfg(feature = "lmdb")]
194 LocalStore::Lmdb(store) => store.exists(hash),
195 }
196 }
197
198 pub fn delete_sync(&self, hash: &Hash) -> Result<bool, StoreError> {
200 match self {
201 LocalStore::Fs(store) => store.delete_sync(hash),
202 #[cfg(feature = "lmdb")]
203 LocalStore::Lmdb(store) => store.delete_sync(hash),
204 }
205 }
206
207 pub fn stats(&self) -> Result<LocalStoreStats, StoreError> {
209 match self {
210 LocalStore::Fs(store) => {
211 let stats = store.stats()?;
212 Ok(LocalStoreStats {
213 count: stats.count,
214 total_bytes: stats.total_bytes,
215 })
216 }
217 #[cfg(feature = "lmdb")]
218 LocalStore::Lmdb(store) => {
219 let stats = store.stats()?;
220 Ok(LocalStoreStats {
221 count: stats.count,
222 total_bytes: stats.total_bytes,
223 })
224 }
225 }
226 }
227
228 pub fn list(&self) -> Result<Vec<Hash>, StoreError> {
230 match self {
231 LocalStore::Fs(store) => store.list(),
232 #[cfg(feature = "lmdb")]
233 LocalStore::Lmdb(store) => store.list(),
234 }
235 }
236}
237
238#[async_trait]
239impl Store for LocalStore {
240 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
241 self.put_sync(hash, &data)
242 }
243
244 async fn put_many(&self, items: Vec<(Hash, Vec<u8>)>) -> Result<usize, StoreError> {
245 self.put_many_sync(&items)
246 }
247
248 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
249 self.get_sync(hash)
250 }
251
252 async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
253 self.exists(hash)
254 }
255
256 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
257 self.delete_sync(hash)
258 }
259}
260
261#[cfg(feature = "s3")]
262use tokio::sync::mpsc;
263
264use crate::config::S3Config;
265
266#[cfg(feature = "s3")]
268enum S3SyncMessage {
269 Upload { hash: Hash, data: Vec<u8> },
270 Delete { hash: Hash },
271}
272
273pub struct StorageRouter {
278 local: Arc<LocalStore>,
280 #[cfg(feature = "s3")]
282 s3_client: Option<aws_sdk_s3::Client>,
283 #[cfg(feature = "s3")]
284 s3_bucket: Option<String>,
285 #[cfg(feature = "s3")]
286 s3_prefix: String,
287 #[cfg(feature = "s3")]
289 sync_tx: Option<mpsc::UnboundedSender<S3SyncMessage>>,
290}
291
292impl StorageRouter {
293 #[cfg(feature = "s3")]
294 fn run_s3_future_sync<F, T>(future: F) -> Result<T, StoreError>
295 where
296 F: Future<Output = T> + Send + 'static,
297 T: Send + 'static,
298 {
299 if tokio::runtime::Handle::try_current().is_ok() {
300 return std::thread::Builder::new()
301 .name("storage-s3-sync".to_string())
302 .spawn(move || {
303 let runtime = tokio::runtime::Builder::new_current_thread()
304 .enable_all()
305 .build()
306 .map_err(|err| {
307 StoreError::Other(format!("build storage s3 sync runtime: {err}"))
308 })?;
309 Ok(runtime.block_on(future))
310 })
311 .map_err(|err| StoreError::Other(format!("spawn S3 sync helper thread: {err}")))?
312 .join()
313 .map_err(|_| StoreError::Other("S3 sync helper thread panicked".to_string()))?;
314 }
315
316 let runtime = tokio::runtime::Builder::new_current_thread()
317 .enable_all()
318 .build()
319 .map_err(|err| StoreError::Other(format!("build storage s3 sync runtime: {err}")))?;
320 Ok(runtime.block_on(future))
321 }
322
323 pub fn new(local: Arc<LocalStore>) -> Self {
325 Self {
326 local,
327 #[cfg(feature = "s3")]
328 s3_client: None,
329 #[cfg(feature = "s3")]
330 s3_bucket: None,
331 #[cfg(feature = "s3")]
332 s3_prefix: String::new(),
333 #[cfg(feature = "s3")]
334 sync_tx: None,
335 }
336 }
337
338 #[cfg(feature = "s3")]
340 pub async fn with_s3(local: Arc<LocalStore>, config: &S3Config) -> Result<Self, anyhow::Error> {
341 use aws_sdk_s3::Client as S3Client;
342
343 let mut aws_config_loader = aws_config::from_env();
345 aws_config_loader =
346 aws_config_loader.region(aws_sdk_s3::config::Region::new(config.region.clone()));
347 let aws_config = aws_config_loader.load().await;
348
349 let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&aws_config);
351 s3_config_builder = s3_config_builder
352 .endpoint_url(&config.endpoint)
353 .force_path_style(true);
354
355 let s3_client = S3Client::from_conf(s3_config_builder.build());
356 let bucket = config.bucket.clone();
357 let prefix = config.prefix.clone().unwrap_or_default();
358
359 let (sync_tx, mut sync_rx) = mpsc::unbounded_channel::<S3SyncMessage>();
361
362 let sync_client = s3_client.clone();
364 let sync_bucket = bucket.clone();
365 let sync_prefix = prefix.clone();
366
367 tokio::spawn(async move {
368 use aws_sdk_s3::primitives::ByteStream;
369
370 tracing::info!("S3 background sync task started");
371
372 let semaphore = std::sync::Arc::new(tokio::sync::Semaphore::new(8));
374 let client = std::sync::Arc::new(sync_client);
375 let bucket = std::sync::Arc::new(sync_bucket);
376 let prefix = std::sync::Arc::new(sync_prefix);
377
378 while let Some(msg) = sync_rx.recv().await {
379 let client = client.clone();
380 let bucket = bucket.clone();
381 let prefix = prefix.clone();
382 let semaphore = semaphore.clone();
383
384 tokio::spawn(async move {
386 let _permit = semaphore.acquire().await;
388
389 match msg {
390 S3SyncMessage::Upload { hash, data } => {
391 let key = format!("{}{}.bin", prefix, to_hex(&hash));
392 tracing::debug!("S3 uploading {} ({} bytes)", &key, data.len());
393
394 let mut attempt = 1u8;
395 loop {
396 match client
397 .put_object()
398 .bucket(bucket.as_str())
399 .key(&key)
400 .body(ByteStream::from(data.clone()))
401 .send()
402 .await
403 {
404 Ok(_) => {
405 tracing::debug!("S3 upload succeeded: {}", &key);
406 break;
407 }
408 Err(e) if attempt < 3 => {
409 tracing::warn!(
410 "S3 upload retrying {}: attempt={} error={}",
411 &key,
412 attempt,
413 e
414 );
415 tokio::time::sleep(std::time::Duration::from_millis(
416 250 * u64::from(attempt),
417 ))
418 .await;
419 attempt += 1;
420 }
421 Err(e) => {
422 tracing::error!(
423 "S3 upload failed {} after {} attempts: {}",
424 &key,
425 attempt,
426 e
427 );
428 break;
429 }
430 }
431 }
432 }
433 S3SyncMessage::Delete { hash } => {
434 let key = format!("{}{}.bin", prefix, to_hex(&hash));
435 tracing::debug!("S3 deleting {}", &key);
436
437 let mut attempt = 1u8;
438 loop {
439 match client
440 .delete_object()
441 .bucket(bucket.as_str())
442 .key(&key)
443 .send()
444 .await
445 {
446 Ok(_) => break,
447 Err(e) if attempt < 3 => {
448 tracing::warn!(
449 "S3 delete retrying {}: attempt={} error={}",
450 &key,
451 attempt,
452 e
453 );
454 tokio::time::sleep(std::time::Duration::from_millis(
455 250 * u64::from(attempt),
456 ))
457 .await;
458 attempt += 1;
459 }
460 Err(e) => {
461 tracing::error!(
462 "S3 delete failed {} after {} attempts: {}",
463 &key,
464 attempt,
465 e
466 );
467 break;
468 }
469 }
470 }
471 }
472 }
473 });
474 }
475 });
476
477 tracing::info!(
478 "S3 storage initialized: bucket={}, prefix={}",
479 bucket,
480 prefix
481 );
482
483 Ok(Self {
484 local,
485 s3_client: Some(s3_client),
486 s3_bucket: Some(bucket),
487 s3_prefix: prefix,
488 sync_tx: Some(sync_tx),
489 })
490 }
491
492 pub fn put_sync(&self, hash: Hash, data: &[u8]) -> Result<bool, StoreError> {
494 let is_new = self.local.put_sync(hash, data)?;
496
497 #[cfg(feature = "s3")]
500 if is_new {
501 if let Some(ref tx) = self.sync_tx {
502 tracing::debug!(
503 "Queueing S3 upload for {} ({} bytes)",
504 crate::storage::to_hex(&hash)[..16].to_string(),
505 data.len(),
506 );
507 if let Err(e) = tx.send(S3SyncMessage::Upload {
508 hash,
509 data: data.to_vec(),
510 }) {
511 tracing::error!("Failed to queue S3 upload: {}", e);
512 }
513 }
514 }
515
516 Ok(is_new)
517 }
518
519 pub fn put_many_sync(&self, items: &[(Hash, Vec<u8>)]) -> Result<usize, StoreError> {
521 #[cfg(feature = "s3")]
522 let pending_uploads = if self.sync_tx.is_some() {
523 let mut pending = Vec::new();
524 for (hash, data) in items {
525 if !self.local.exists(hash)? {
526 pending.push((*hash, data.clone()));
527 }
528 }
529 pending
530 } else {
531 Vec::new()
532 };
533
534 let inserted = self.local.put_many_sync(items)?;
535
536 #[cfg(feature = "s3")]
537 if let Some(ref tx) = self.sync_tx {
538 for (hash, data) in pending_uploads {
539 if let Err(e) = tx.send(S3SyncMessage::Upload { hash, data }) {
540 tracing::error!("Failed to queue S3 upload: {}", e);
541 }
542 }
543 }
544
545 Ok(inserted)
546 }
547
548 pub fn get_sync(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
550 if let Some(data) = self.local.get_sync(hash)? {
552 return Ok(Some(data));
553 }
554
555 #[cfg(feature = "s3")]
557 if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
558 let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
559 let client = client.clone();
560 let bucket = bucket.clone();
561
562 match Self::run_s3_future_sync(async move {
563 client.get_object().bucket(bucket).key(key).send().await
564 }) {
565 Ok(Ok(output)) => {
566 match Self::run_s3_future_sync(async move { output.body.collect().await }) {
567 Ok(Ok(body)) => {
568 let data = body.into_bytes().to_vec();
569 let _ = self.local.put_sync(*hash, &data);
571 return Ok(Some(data));
572 }
573 Ok(Err(err)) => {
574 tracing::warn!("S3 body collect failed: {}", err);
575 }
576 Err(err) => {
577 tracing::warn!("S3 body collect runtime failed: {}", err);
578 }
579 }
580 }
581 Ok(Err(err)) => {
582 let service_err = err.into_service_error();
583 if !service_err.is_no_such_key() {
584 tracing::warn!("S3 get failed: {}", service_err);
585 }
586 }
587 Err(err) => {
588 tracing::warn!("S3 get runtime failed: {}", err);
589 }
590 }
591 }
592
593 Ok(None)
594 }
595
596 pub fn exists(&self, hash: &Hash) -> Result<bool, StoreError> {
598 if self.local.exists(hash)? {
600 return Ok(true);
601 }
602
603 #[cfg(feature = "s3")]
605 if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
606 let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
607 let client = client.clone();
608 let bucket = bucket.clone();
609
610 match Self::run_s3_future_sync(async move {
611 client.head_object().bucket(bucket).key(&key).send().await
612 }) {
613 Ok(Ok(_)) => return Ok(true),
614 Ok(Err(err)) => {
615 let service_err = err.into_service_error();
616 if !service_err.is_not_found() {
617 tracing::warn!("S3 head failed: {}", service_err);
618 }
619 }
620 Err(err) => {
621 tracing::warn!("S3 head runtime failed: {}", err);
622 }
623 }
624 }
625
626 Ok(false)
627 }
628
629 pub fn delete_sync(&self, hash: &Hash) -> Result<bool, StoreError> {
631 let deleted = self.local.delete_sync(hash)?;
632
633 #[cfg(feature = "s3")]
635 if let Some(ref tx) = self.sync_tx {
636 let _ = tx.send(S3SyncMessage::Delete { hash: *hash });
637 }
638
639 Ok(deleted)
640 }
641
642 pub fn delete_local_only(&self, hash: &Hash) -> Result<bool, StoreError> {
645 self.local.delete_sync(hash)
646 }
647
648 pub fn stats(&self) -> Result<LocalStoreStats, StoreError> {
650 self.local.stats()
651 }
652
653 pub fn list(&self) -> Result<Vec<Hash>, StoreError> {
655 self.local.list()
656 }
657
658 pub fn local_store(&self) -> Arc<LocalStore> {
660 Arc::clone(&self.local)
661 }
662}
663
664#[async_trait]
667impl Store for StorageRouter {
668 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
669 self.put_sync(hash, &data)
670 }
671
672 async fn put_many(&self, items: Vec<(Hash, Vec<u8>)>) -> Result<usize, StoreError> {
673 self.put_many_sync(&items)
674 }
675
676 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
677 self.get_sync(hash)
678 }
679
680 async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
681 self.exists(hash)
682 }
683
684 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
685 self.delete_sync(hash)
686 }
687}
688
689pub struct HashtreeStore {
690 base_path: PathBuf,
691 env: heed::Env,
692 pins: Database<Bytes, Unit>,
694 pinned_refs: Database<Str, Unit>,
696 tracked_authors: Database<Str, Unit>,
698 blob_owners: Database<Bytes, Unit>,
700 pubkey_blobs: Database<Bytes, Bytes>,
702 tree_meta: Database<Bytes, Bytes>,
704 blob_trees: Database<Bytes, Unit>,
706 tree_refs: Database<Str, Bytes>,
708 cached_roots: Database<Str, Bytes>,
710 router: Arc<StorageRouter>,
712 max_size_bytes: u64,
714 evict_orphans: bool,
716}
717
718impl HashtreeStore {
719 pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
721 let config = hashtree_config::Config::load_or_default();
722 let max_size_bytes = config
723 .storage
724 .max_size_gb
725 .saturating_mul(1024 * 1024 * 1024);
726 Self::with_options_and_backend(
727 path,
728 None,
729 max_size_bytes,
730 config.storage.evict_orphans,
731 &config.storage.backend,
732 )
733 }
734
735 pub fn new_with_backend<P: AsRef<Path>>(
737 path: P,
738 backend: hashtree_config::StorageBackend,
739 max_size_bytes: u64,
740 ) -> Result<Self> {
741 Self::with_options_and_backend(path, None, max_size_bytes, true, &backend)
742 }
743
744 pub fn with_s3<P: AsRef<Path>>(path: P, s3_config: Option<&S3Config>) -> Result<Self> {
746 let config = hashtree_config::Config::load_or_default();
747 let max_size_bytes = config
748 .storage
749 .max_size_gb
750 .saturating_mul(1024 * 1024 * 1024);
751 Self::with_options_and_backend(
752 path,
753 s3_config,
754 max_size_bytes,
755 config.storage.evict_orphans,
756 &config.storage.backend,
757 )
758 }
759
760 pub fn with_options<P: AsRef<Path>>(
766 path: P,
767 s3_config: Option<&S3Config>,
768 max_size_bytes: u64,
769 ) -> Result<Self> {
770 let config = hashtree_config::Config::load_or_default();
771 Self::with_options_and_backend(
772 path,
773 s3_config,
774 max_size_bytes,
775 config.storage.evict_orphans,
776 &config.storage.backend,
777 )
778 }
779
780 fn with_options_and_backend<P: AsRef<Path>>(
781 path: P,
782 s3_config: Option<&S3Config>,
783 max_size_bytes: u64,
784 evict_orphans: bool,
785 backend: &hashtree_config::StorageBackend,
786 ) -> Result<Self> {
787 let path = path.as_ref();
788 std::fs::create_dir_all(path)?;
789
790 let env = unsafe {
791 EnvOpenOptions::new()
792 .map_size(10 * 1024 * 1024 * 1024) .max_dbs(10) .max_readers(LMDB_MAX_READERS)
795 .open(path)?
796 };
797 let _ = env.clear_stale_readers();
798
799 let mut wtxn = env.write_txn()?;
800 let pins = env.create_database(&mut wtxn, Some("pins"))?;
801 let pinned_refs = env.create_database(&mut wtxn, Some("pinned_refs"))?;
802 let tracked_authors = env.create_database(&mut wtxn, Some("tracked_authors"))?;
803 let blob_owners = env.create_database(&mut wtxn, Some("blob_owners"))?;
804 let pubkey_blobs = env.create_database(&mut wtxn, Some("pubkey_blobs"))?;
805 let tree_meta = env.create_database(&mut wtxn, Some("tree_meta"))?;
806 let blob_trees = env.create_database(&mut wtxn, Some("blob_trees"))?;
807 let tree_refs = env.create_database(&mut wtxn, Some("tree_refs"))?;
808 let cached_roots = env.create_database(&mut wtxn, Some("cached_roots"))?;
809 wtxn.commit()?;
810
811 let local_store = Arc::new(match backend {
815 hashtree_config::StorageBackend::Fs => LocalStore::Fs(
816 FsBlobStore::new(path.join("blobs"))
817 .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
818 ),
819 #[cfg(feature = "lmdb")]
820 hashtree_config::StorageBackend::Lmdb => {
821 std::fs::create_dir_all(path.join("blobs"))?;
822 remove_stale_fs_blob_shards(&path.join("blobs"))
823 .map_err(|e| anyhow::anyhow!("Failed to clean LMDB blob store path: {}", e))?;
824 let requested_map_size = max_size_bytes.max(LMDB_BLOB_MIN_MAP_SIZE_BYTES);
825 let map_size = usize::try_from(requested_map_size)
826 .context("LMDB blob map size exceeds usize")?;
827 LocalStore::Lmdb(
828 LmdbBlobStore::with_map_size(path.join("blobs"), map_size)
829 .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
830 )
831 }
832 #[cfg(not(feature = "lmdb"))]
833 hashtree_config::StorageBackend::Lmdb => {
834 tracing::warn!(
835 "LMDB backend requested but lmdb feature not enabled, using filesystem storage"
836 );
837 LocalStore::Fs(
838 FsBlobStore::new(path.join("blobs"))
839 .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
840 )
841 }
842 });
843
844 #[cfg(feature = "s3")]
846 let router = Arc::new(if let Some(s3_cfg) = s3_config {
847 tracing::info!(
848 "Initializing S3 storage backend: bucket={}, endpoint={}",
849 s3_cfg.bucket,
850 s3_cfg.endpoint
851 );
852
853 sync_block_on(async { StorageRouter::with_s3(local_store, s3_cfg).await })?
854 } else {
855 StorageRouter::new(local_store)
856 });
857
858 #[cfg(not(feature = "s3"))]
859 let router = Arc::new({
860 if s3_config.is_some() {
861 tracing::warn!(
862 "S3 config provided but S3 feature not enabled. Using local storage only."
863 );
864 }
865 StorageRouter::new(local_store)
866 });
867
868 Ok(Self {
869 base_path: path.to_path_buf(),
870 env,
871 pins,
872 pinned_refs,
873 tracked_authors,
874 blob_owners,
875 pubkey_blobs,
876 tree_meta,
877 blob_trees,
878 tree_refs,
879 cached_roots,
880 router,
881 max_size_bytes,
882 evict_orphans,
883 })
884 }
885
886 pub fn base_path(&self) -> &Path {
887 &self.base_path
888 }
889
890 pub fn router(&self) -> &StorageRouter {
892 &self.router
893 }
894
895 pub fn store_arc(&self) -> Arc<StorageRouter> {
898 Arc::clone(&self.router)
899 }
900
901 pub fn get_tree_node(&self, hash: &[u8; 32]) -> Result<Option<TreeNode>> {
903 let store = self.store_arc();
904 let tree = HashTree::new(HashTreeConfig::new(store).public());
905
906 sync_block_on(async {
907 tree.get_tree_node(hash)
908 .await
909 .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))
910 })
911 }
912
913 pub fn put_blob(&self, data: &[u8]) -> Result<String> {
915 let hash = sha256(data);
916 self.router
917 .put_sync(hash, data)
918 .map_err(|e| anyhow::anyhow!("Failed to store blob: {}", e))?;
919 Ok(to_hex(&hash))
920 }
921
922 pub fn put_cached_blob(&self, data: &[u8]) -> Result<String> {
928 let hash = sha256(data);
929 if self
930 .router
931 .exists(&hash)
932 .map_err(|e| anyhow::anyhow!("Failed to check cached blob: {}", e))?
933 {
934 return Ok(to_hex(&hash));
935 }
936
937 let incoming_bytes = data.len() as u64;
938 let _ = self.make_room_for_cached_blob(incoming_bytes);
939
940 let mut retried_after_cleanup = false;
941 loop {
942 match self.router.put_sync(hash, data) {
943 Ok(_) => return Ok(to_hex(&hash)),
944 Err(err) if !retried_after_cleanup && is_map_full_store_error(&err) => {
945 let freed = self.relieve_cached_blob_write_pressure(incoming_bytes)?;
946 if freed == 0 {
947 return Err(anyhow::anyhow!("Failed to store cached blob: {}", err));
948 }
949 retried_after_cleanup = true;
950 }
951 Err(err) => return Err(anyhow::anyhow!("Failed to store cached blob: {}", err)),
952 }
953 }
954 }
955
956 pub fn get_blob(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
958 self.router
959 .get_sync(hash)
960 .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))
961 }
962
963 pub fn blob_exists(&self, hash: &[u8; 32]) -> Result<bool> {
965 self.router
966 .exists(hash)
967 .map_err(|e| anyhow::anyhow!("Failed to check blob: {}", e))
968 }
969
970 fn blob_owner_key(sha256: &[u8; 32], pubkey: &[u8; 32]) -> [u8; 64] {
976 let mut key = [0u8; 64];
977 key[..32].copy_from_slice(sha256);
978 key[32..].copy_from_slice(pubkey);
979 key
980 }
981
982 pub fn set_blob_owner(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<()> {
985 let key = Self::blob_owner_key(sha256, pubkey);
986 let mut wtxn = self.env.write_txn()?;
987
988 self.blob_owners.put(&mut wtxn, &key[..], &())?;
990
991 let sha256_hex = to_hex(sha256);
993
994 let mut blobs: Vec<BlobMetadata> = self
996 .pubkey_blobs
997 .get(&wtxn, pubkey)?
998 .and_then(|b| serde_json::from_slice(b).ok())
999 .unwrap_or_default();
1000
1001 if !blobs.iter().any(|b| b.sha256 == sha256_hex) {
1003 let now = SystemTime::now()
1004 .duration_since(UNIX_EPOCH)
1005 .unwrap()
1006 .as_secs();
1007
1008 let size = self
1010 .get_blob(sha256)?
1011 .map(|data| data.len() as u64)
1012 .unwrap_or(0);
1013
1014 blobs.push(BlobMetadata {
1015 sha256: sha256_hex,
1016 size,
1017 mime_type: "application/octet-stream".to_string(),
1018 uploaded: now,
1019 });
1020
1021 let blobs_json = serde_json::to_vec(&blobs)?;
1022 self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
1023 }
1024
1025 wtxn.commit()?;
1026 Ok(())
1027 }
1028
1029 pub fn is_blob_owner(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<bool> {
1031 let key = Self::blob_owner_key(sha256, pubkey);
1032 let rtxn = self.env.read_txn()?;
1033 Ok(self.blob_owners.get(&rtxn, &key[..])?.is_some())
1034 }
1035
1036 pub fn get_blob_owners(&self, sha256: &[u8; 32]) -> Result<Vec<[u8; 32]>> {
1038 let rtxn = self.env.read_txn()?;
1039
1040 let mut owners = Vec::new();
1041 for item in self.blob_owners.prefix_iter(&rtxn, &sha256[..])? {
1042 let (key, _) = item?;
1043 if key.len() == 64 {
1044 let mut pubkey = [0u8; 32];
1046 pubkey.copy_from_slice(&key[32..64]);
1047 owners.push(pubkey);
1048 }
1049 }
1050 Ok(owners)
1051 }
1052
1053 pub fn blob_has_owners(&self, sha256: &[u8; 32]) -> Result<bool> {
1055 let rtxn = self.env.read_txn()?;
1056
1057 for item in self.blob_owners.prefix_iter(&rtxn, &sha256[..])? {
1059 if item.is_ok() {
1060 return Ok(true);
1061 }
1062 }
1063 Ok(false)
1064 }
1065
1066 pub fn get_blob_owner(&self, sha256: &[u8; 32]) -> Result<Option<[u8; 32]>> {
1068 Ok(self.get_blob_owners(sha256)?.into_iter().next())
1069 }
1070
1071 pub fn delete_blossom_blob(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<bool> {
1075 let key = Self::blob_owner_key(sha256, pubkey);
1076 let mut wtxn = self.env.write_txn()?;
1077
1078 self.blob_owners.delete(&mut wtxn, &key[..])?;
1080
1081 let sha256_hex = to_hex(sha256);
1083
1084 if let Some(blobs_bytes) = self.pubkey_blobs.get(&wtxn, pubkey)? {
1086 if let Ok(mut blobs) = serde_json::from_slice::<Vec<BlobMetadata>>(blobs_bytes) {
1087 blobs.retain(|b| b.sha256 != sha256_hex);
1088 let blobs_json = serde_json::to_vec(&blobs)?;
1089 self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
1090 }
1091 }
1092
1093 let mut has_other_owners = false;
1095 for item in self.blob_owners.prefix_iter(&wtxn, &sha256[..])? {
1096 if item.is_ok() {
1097 has_other_owners = true;
1098 break;
1099 }
1100 }
1101
1102 if has_other_owners {
1103 wtxn.commit()?;
1104 tracing::debug!(
1105 "Removed {} from blob {} owners, other owners remain",
1106 &to_hex(pubkey)[..8],
1107 &sha256_hex[..8]
1108 );
1109 return Ok(false);
1110 }
1111
1112 tracing::info!(
1114 "All owners removed from blob {}, deleting",
1115 &sha256_hex[..8]
1116 );
1117
1118 let _ = self.router.delete_sync(sha256);
1120
1121 wtxn.commit()?;
1122 Ok(true)
1123 }
1124
1125 pub fn list_blobs_by_pubkey(
1127 &self,
1128 pubkey: &[u8; 32],
1129 ) -> Result<Vec<crate::server::blossom::BlobDescriptor>> {
1130 let rtxn = self.env.read_txn()?;
1131
1132 let blobs: Vec<BlobMetadata> = self
1133 .pubkey_blobs
1134 .get(&rtxn, pubkey)?
1135 .and_then(|b| serde_json::from_slice(b).ok())
1136 .unwrap_or_default();
1137
1138 Ok(blobs
1139 .into_iter()
1140 .map(|b| crate::server::blossom::BlobDescriptor {
1141 url: format!("/{}", b.sha256),
1142 sha256: b.sha256,
1143 size: b.size,
1144 mime_type: b.mime_type,
1145 uploaded: b.uploaded,
1146 })
1147 .collect())
1148 }
1149
1150 pub fn get_chunk(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
1152 self.router
1153 .get_sync(hash)
1154 .map_err(|e| anyhow::anyhow!("Failed to get chunk: {}", e))
1155 }
1156
1157 pub fn get_file(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
1160 let store = self.store_arc();
1161 let tree = HashTree::new(HashTreeConfig::new(store).public());
1162
1163 sync_block_on(async {
1164 tree.read_file(hash)
1165 .await
1166 .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
1167 })
1168 }
1169
1170 pub fn get_file_by_cid(&self, cid: &Cid) -> Result<Option<Vec<u8>>> {
1173 let store = self.store_arc();
1174 let tree = HashTree::new(HashTreeConfig::new(store).public());
1175
1176 sync_block_on(async {
1177 tree.get(cid, None)
1178 .await
1179 .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
1180 })
1181 }
1182
1183 fn ensure_cid_exists(&self, cid: &Cid) -> Result<()> {
1184 let exists = self
1185 .router
1186 .exists(&cid.hash)
1187 .map_err(|e| anyhow::anyhow!("Failed to check cid existence: {}", e))?;
1188 if !exists {
1189 anyhow::bail!("CID not found: {}", to_hex(&cid.hash));
1190 }
1191 Ok(())
1192 }
1193
1194 pub fn write_file_by_cid_to_writer<W: Write>(&self, cid: &Cid, writer: &mut W) -> Result<u64> {
1196 self.ensure_cid_exists(cid)?;
1197
1198 let store = self.store_arc();
1199 let tree = HashTree::new(HashTreeConfig::new(store).public());
1200 let mut total_bytes = 0u64;
1201 let mut streamed_any_chunk = false;
1202
1203 sync_block_on(async {
1204 let mut stream = tree.get_stream(cid);
1205 while let Some(chunk) = stream.next().await {
1206 streamed_any_chunk = true;
1207 let chunk =
1208 chunk.map_err(|e| anyhow::anyhow!("Failed to stream file chunk: {}", e))?;
1209 writer
1210 .write_all(&chunk)
1211 .map_err(|e| anyhow::anyhow!("Failed to write file chunk: {}", e))?;
1212 total_bytes += chunk.len() as u64;
1213 }
1214 Ok::<(), anyhow::Error>(())
1215 })?;
1216
1217 if !streamed_any_chunk {
1218 anyhow::bail!("CID not found: {}", to_hex(&cid.hash));
1219 }
1220
1221 writer
1222 .flush()
1223 .map_err(|e| anyhow::anyhow!("Failed to flush output: {}", e))?;
1224 Ok(total_bytes)
1225 }
1226
1227 pub fn write_file_by_cid<P: AsRef<Path>>(&self, cid: &Cid, output_path: P) -> Result<u64> {
1229 self.ensure_cid_exists(cid)?;
1230
1231 let output_path = output_path.as_ref();
1232 if let Some(parent) = output_path.parent() {
1233 if !parent.as_os_str().is_empty() {
1234 std::fs::create_dir_all(parent).with_context(|| {
1235 format!("Failed to create output directory {}", parent.display())
1236 })?;
1237 }
1238 }
1239
1240 let mut file = std::fs::File::create(output_path)
1241 .with_context(|| format!("Failed to create output file {}", output_path.display()))?;
1242 self.write_file_by_cid_to_writer(cid, &mut file)
1243 }
1244
1245 pub fn write_file<P: AsRef<Path>>(&self, hash: &[u8; 32], output_path: P) -> Result<u64> {
1247 self.write_file_by_cid(&Cid::public(*hash), output_path)
1248 }
1249
1250 pub fn resolve_path(&self, cid: &Cid, path: &str) -> Result<Option<Cid>> {
1252 let store = self.store_arc();
1253 let tree = HashTree::new(HashTreeConfig::new(store).public());
1254
1255 sync_block_on(async {
1256 tree.resolve_path(cid, path)
1257 .await
1258 .map_err(|e| anyhow::anyhow!("Failed to resolve path: {}", e))
1259 })
1260 }
1261
1262 pub fn get_file_chunk_metadata(&self, hash: &[u8; 32]) -> Result<Option<FileChunkMetadata>> {
1264 let store = self.store_arc();
1265 let tree = HashTree::new(HashTreeConfig::new(store.clone()).public());
1266
1267 sync_block_on(async {
1268 let exists = store
1271 .has(hash)
1272 .await
1273 .map_err(|e| anyhow::anyhow!("Failed to check existence: {}", e))?;
1274
1275 if !exists {
1276 return Ok(None);
1277 }
1278
1279 let total_size = tree
1281 .get_size(hash)
1282 .await
1283 .map_err(|e| anyhow::anyhow!("Failed to get size: {}", e))?;
1284
1285 let is_tree_node = tree
1287 .is_tree(hash)
1288 .await
1289 .map_err(|e| anyhow::anyhow!("Failed to check tree: {}", e))?;
1290
1291 if !is_tree_node {
1292 return Ok(Some(FileChunkMetadata {
1294 total_size,
1295 chunk_hashes: vec![],
1296 chunk_sizes: vec![],
1297 is_chunked: false,
1298 }));
1299 }
1300
1301 let node = match tree
1303 .get_tree_node(hash)
1304 .await
1305 .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
1306 {
1307 Some(n) => n,
1308 None => return Ok(None),
1309 };
1310
1311 let is_directory = tree
1313 .is_directory(hash)
1314 .await
1315 .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1316
1317 if is_directory {
1318 return Ok(None); }
1320
1321 let chunk_hashes: Vec<Hash> = node.links.iter().map(|l| l.hash).collect();
1323 let chunk_sizes: Vec<u64> = node.links.iter().map(|l| l.size).collect();
1324
1325 Ok(Some(FileChunkMetadata {
1326 total_size,
1327 chunk_hashes,
1328 chunk_sizes,
1329 is_chunked: !node.links.is_empty(),
1330 }))
1331 })
1332 }
1333
1334 pub fn get_file_range(
1336 &self,
1337 hash: &[u8; 32],
1338 start: u64,
1339 end: Option<u64>,
1340 ) -> Result<Option<(Vec<u8>, u64)>> {
1341 let metadata = match self.get_file_chunk_metadata(hash)? {
1342 Some(m) => m,
1343 None => return Ok(None),
1344 };
1345
1346 if metadata.total_size == 0 {
1347 return Ok(Some((Vec::new(), 0)));
1348 }
1349
1350 if start >= metadata.total_size {
1351 return Ok(None);
1352 }
1353
1354 let end = end
1355 .unwrap_or(metadata.total_size - 1)
1356 .min(metadata.total_size - 1);
1357
1358 if !metadata.is_chunked {
1360 let content = self.get_file(hash)?.unwrap_or_default();
1361 let range_content = if start < content.len() as u64 {
1362 content[start as usize..=(end as usize).min(content.len() - 1)].to_vec()
1363 } else {
1364 Vec::new()
1365 };
1366 return Ok(Some((range_content, metadata.total_size)));
1367 }
1368
1369 let mut result = Vec::new();
1371 let mut current_offset = 0u64;
1372
1373 for (i, chunk_hash) in metadata.chunk_hashes.iter().enumerate() {
1374 let chunk_size = metadata.chunk_sizes[i];
1375 let chunk_end = current_offset + chunk_size - 1;
1376
1377 if chunk_end >= start && current_offset <= end {
1379 let chunk_content = match self.get_chunk(chunk_hash)? {
1380 Some(content) => content,
1381 None => {
1382 return Err(anyhow::anyhow!("Chunk {} not found", to_hex(chunk_hash)));
1383 }
1384 };
1385
1386 let chunk_read_start = if current_offset >= start {
1387 0
1388 } else {
1389 (start - current_offset) as usize
1390 };
1391
1392 let chunk_read_end = if chunk_end <= end {
1393 chunk_size as usize - 1
1394 } else {
1395 (end - current_offset) as usize
1396 };
1397
1398 result.extend_from_slice(&chunk_content[chunk_read_start..=chunk_read_end]);
1399 }
1400
1401 current_offset += chunk_size;
1402
1403 if current_offset > end {
1404 break;
1405 }
1406 }
1407
1408 Ok(Some((result, metadata.total_size)))
1409 }
1410
1411 pub fn stream_file_range_chunks_owned(
1413 self: Arc<Self>,
1414 hash: &[u8; 32],
1415 start: u64,
1416 end: u64,
1417 ) -> Result<Option<FileRangeChunksOwned>> {
1418 let metadata = match self.get_file_chunk_metadata(hash)? {
1419 Some(m) => m,
1420 None => return Ok(None),
1421 };
1422
1423 if metadata.total_size == 0 || start >= metadata.total_size {
1424 return Ok(None);
1425 }
1426
1427 let end = end.min(metadata.total_size - 1);
1428
1429 Ok(Some(FileRangeChunksOwned {
1430 store: self,
1431 metadata,
1432 start,
1433 end,
1434 current_chunk_idx: 0,
1435 current_offset: 0,
1436 }))
1437 }
1438
1439 pub fn get_directory_listing(&self, hash: &[u8; 32]) -> Result<Option<DirectoryListing>> {
1441 let store = self.store_arc();
1442 let tree = HashTree::new(HashTreeConfig::new(store).public());
1443
1444 sync_block_on(async {
1445 let is_dir = tree
1447 .is_directory(hash)
1448 .await
1449 .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1450
1451 if !is_dir {
1452 return Ok(None);
1453 }
1454
1455 let cid = hashtree_core::Cid::public(*hash);
1457 let tree_entries = tree
1458 .list_directory(&cid)
1459 .await
1460 .map_err(|e| anyhow::anyhow!("Failed to list directory: {}", e))?;
1461
1462 let entries: Vec<DirEntry> = tree_entries
1463 .into_iter()
1464 .map(|e| DirEntry {
1465 name: e.name,
1466 cid: to_hex(&e.hash),
1467 is_directory: e.link_type.is_tree(),
1468 size: e.size,
1469 })
1470 .collect();
1471
1472 Ok(Some(DirectoryListing {
1473 dir_name: String::new(),
1474 entries,
1475 }))
1476 })
1477 }
1478
1479 pub fn get_directory_listing_by_cid(&self, cid: &Cid) -> Result<Option<DirectoryListing>> {
1481 let store = self.store_arc();
1482 let tree = HashTree::new(HashTreeConfig::new(store).public());
1483 let cid = cid.clone();
1484
1485 sync_block_on(async {
1486 let is_dir = tree
1487 .is_dir(&cid)
1488 .await
1489 .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1490
1491 if !is_dir {
1492 return Ok(None);
1493 }
1494
1495 let tree_entries = tree
1496 .list_directory(&cid)
1497 .await
1498 .map_err(|e| anyhow::anyhow!("Failed to list directory: {}", e))?;
1499
1500 let entries: Vec<DirEntry> = tree_entries
1501 .into_iter()
1502 .map(|e| DirEntry {
1503 name: e.name,
1504 cid: Cid {
1505 hash: e.hash,
1506 key: e.key,
1507 }
1508 .to_string(),
1509 is_directory: e.link_type.is_tree(),
1510 size: e.size,
1511 })
1512 .collect();
1513
1514 Ok(Some(DirectoryListing {
1515 dir_name: String::new(),
1516 entries,
1517 }))
1518 })
1519 }
1520
1521 pub fn add_pinned_ref(&self, key: &str) -> Result<()> {
1525 let mut wtxn = self.env.write_txn()?;
1526 self.pinned_refs.put(&mut wtxn, key, &())?;
1527 wtxn.commit()?;
1528 Ok(())
1529 }
1530
1531 pub fn remove_pinned_ref(&self, key: &str) -> Result<bool> {
1533 let mut wtxn = self.env.write_txn()?;
1534 let removed = self.pinned_refs.delete(&mut wtxn, key)?;
1535 wtxn.commit()?;
1536 Ok(removed)
1537 }
1538
1539 pub fn list_pinned_refs(&self) -> Result<Vec<String>> {
1541 let rtxn = self.env.read_txn()?;
1542 let mut refs = Vec::new();
1543
1544 for item in self.pinned_refs.iter(&rtxn)? {
1545 let (key, _) = item?;
1546 refs.push(key.to_string());
1547 }
1548
1549 refs.sort();
1550 Ok(refs)
1551 }
1552
1553 pub fn add_tracked_author(&self, npub: &str) -> Result<bool> {
1555 let mut wtxn = self.env.write_txn()?;
1556 let inserted = self.tracked_authors.get(&wtxn, npub)?.is_none();
1557 self.tracked_authors.put(&mut wtxn, npub, &())?;
1558 wtxn.commit()?;
1559 Ok(inserted)
1560 }
1561
1562 pub fn remove_tracked_author(&self, npub: &str) -> Result<bool> {
1564 let mut wtxn = self.env.write_txn()?;
1565 let removed = self.tracked_authors.delete(&mut wtxn, npub)?;
1566 wtxn.commit()?;
1567 Ok(removed)
1568 }
1569
1570 pub fn list_tracked_authors(&self) -> Result<Vec<String>> {
1572 let rtxn = self.env.read_txn()?;
1573 let mut authors = Vec::new();
1574
1575 for item in self.tracked_authors.iter(&rtxn)? {
1576 let (npub, _) = item?;
1577 authors.push(npub.to_string());
1578 }
1579
1580 authors.sort();
1581 Ok(authors)
1582 }
1583
1584 pub fn get_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<Option<CachedRoot>> {
1586 let key = format!("{}/{}", pubkey_hex, tree_name);
1587 let rtxn = self.env.read_txn()?;
1588 if let Some(bytes) = self.cached_roots.get(&rtxn, &key)? {
1589 let root: CachedRoot = rmp_serde::from_slice(bytes)
1590 .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
1591 Ok(Some(root))
1592 } else {
1593 Ok(None)
1594 }
1595 }
1596
1597 pub fn set_cached_root(
1599 &self,
1600 pubkey_hex: &str,
1601 tree_name: &str,
1602 hash: &str,
1603 key: Option<&str>,
1604 visibility: &str,
1605 updated_at: u64,
1606 ) -> Result<()> {
1607 let db_key = format!("{}/{}", pubkey_hex, tree_name);
1608 let root = CachedRoot {
1609 hash: hash.to_string(),
1610 key: key.map(|k| k.to_string()),
1611 updated_at,
1612 visibility: visibility.to_string(),
1613 };
1614 let bytes = rmp_serde::to_vec(&root)
1615 .map_err(|e| anyhow::anyhow!("Failed to serialize CachedRoot: {}", e))?;
1616 let mut wtxn = self.env.write_txn()?;
1617 self.cached_roots.put(&mut wtxn, &db_key, &bytes)?;
1618 wtxn.commit()?;
1619 Ok(())
1620 }
1621
1622 pub fn list_cached_roots(&self, pubkey_hex: &str) -> Result<Vec<(String, CachedRoot)>> {
1624 let prefix = format!("{}/", pubkey_hex);
1625 let rtxn = self.env.read_txn()?;
1626 let mut results = Vec::new();
1627
1628 for item in self.cached_roots.iter(&rtxn)? {
1629 let (key, bytes) = item?;
1630 if key.starts_with(&prefix) {
1631 let tree_name = key.strip_prefix(&prefix).unwrap_or(key);
1632 let root: CachedRoot = rmp_serde::from_slice(bytes)
1633 .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
1634 results.push((tree_name.to_string(), root));
1635 }
1636 }
1637
1638 Ok(results)
1639 }
1640
1641 pub fn delete_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<bool> {
1643 let key = format!("{}/{}", pubkey_hex, tree_name);
1644 let mut wtxn = self.env.write_txn()?;
1645 let deleted = self.cached_roots.delete(&mut wtxn, &key)?;
1646 wtxn.commit()?;
1647 Ok(deleted)
1648 }
1649}
1650
1651fn is_map_full_store_error(err: &StoreError) -> bool {
1652 let message = err.to_string();
1653 message.contains("MDB_MAP_FULL") || message.contains("MapFull")
1654}
1655
1656#[derive(Debug, Clone)]
1657pub struct FileChunkMetadata {
1658 pub total_size: u64,
1659 pub chunk_hashes: Vec<Hash>,
1660 pub chunk_sizes: Vec<u64>,
1661 pub is_chunked: bool,
1662}
1663
1664pub struct FileRangeChunksOwned {
1666 store: Arc<HashtreeStore>,
1667 metadata: FileChunkMetadata,
1668 start: u64,
1669 end: u64,
1670 current_chunk_idx: usize,
1671 current_offset: u64,
1672}
1673
1674impl Iterator for FileRangeChunksOwned {
1675 type Item = Result<Vec<u8>>;
1676
1677 fn next(&mut self) -> Option<Self::Item> {
1678 if !self.metadata.is_chunked || self.current_chunk_idx >= self.metadata.chunk_hashes.len() {
1679 return None;
1680 }
1681
1682 if self.current_offset > self.end {
1683 return None;
1684 }
1685
1686 let chunk_hash = &self.metadata.chunk_hashes[self.current_chunk_idx];
1687 let chunk_size = self.metadata.chunk_sizes[self.current_chunk_idx];
1688 let chunk_end = self.current_offset + chunk_size - 1;
1689
1690 self.current_chunk_idx += 1;
1691
1692 if chunk_end < self.start || self.current_offset > self.end {
1693 self.current_offset += chunk_size;
1694 return self.next();
1695 }
1696
1697 let chunk_content = match self.store.get_chunk(chunk_hash) {
1698 Ok(Some(content)) => content,
1699 Ok(None) => {
1700 return Some(Err(anyhow::anyhow!(
1701 "Chunk {} not found",
1702 to_hex(chunk_hash)
1703 )));
1704 }
1705 Err(e) => {
1706 return Some(Err(e));
1707 }
1708 };
1709
1710 let chunk_read_start = if self.current_offset >= self.start {
1711 0
1712 } else {
1713 (self.start - self.current_offset) as usize
1714 };
1715
1716 let chunk_read_end = if chunk_end <= self.end {
1717 chunk_size as usize - 1
1718 } else {
1719 (self.end - self.current_offset) as usize
1720 };
1721
1722 let result = chunk_content[chunk_read_start..=chunk_read_end].to_vec();
1723 self.current_offset += chunk_size;
1724
1725 Some(Ok(result))
1726 }
1727}
1728
1729#[derive(Debug)]
1730pub struct GcStats {
1731 pub deleted_dags: usize,
1732 pub freed_bytes: u64,
1733}
1734
1735#[derive(Debug, Clone)]
1736pub struct DirEntry {
1737 pub name: String,
1738 pub cid: String,
1739 pub is_directory: bool,
1740 pub size: u64,
1741}
1742
1743#[derive(Debug, Clone)]
1744pub struct DirectoryListing {
1745 pub dir_name: String,
1746 pub entries: Vec<DirEntry>,
1747}
1748
1749#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
1751pub struct BlobMetadata {
1752 pub sha256: String,
1753 pub size: u64,
1754 pub mime_type: String,
1755 pub uploaded: u64,
1756}
1757
1758impl crate::webrtc::ContentStore for HashtreeStore {
1760 fn get(&self, hash_hex: &str) -> Result<Option<Vec<u8>>> {
1761 let hash = from_hex(hash_hex).map_err(|e| anyhow::anyhow!("Invalid hash: {}", e))?;
1762 self.get_chunk(&hash)
1763 }
1764}
1765
1766#[cfg(test)]
1767mod tests {
1768 use super::*;
1769 #[cfg(feature = "lmdb")]
1770 use tempfile::TempDir;
1771
1772 #[cfg(feature = "lmdb")]
1773 #[test]
1774 fn hashtree_store_expands_blob_lmdb_map_size_to_storage_budget() -> Result<()> {
1775 let temp = TempDir::new()?;
1776 let requested = LMDB_BLOB_MIN_MAP_SIZE_BYTES + 64 * 1024 * 1024;
1777 let store = HashtreeStore::with_options_and_backend(
1778 temp.path(),
1779 None,
1780 requested,
1781 true,
1782 &StorageBackend::Lmdb,
1783 )?;
1784
1785 let map_size = match store.router.local.as_ref() {
1786 LocalStore::Lmdb(local) => local.map_size_bytes() as u64,
1787 LocalStore::Fs(_) => panic!("expected LMDB local store"),
1788 };
1789
1790 assert!(
1791 map_size >= requested,
1792 "expected blob LMDB map to grow to at least {requested} bytes, got {map_size}"
1793 );
1794
1795 drop(store);
1796 Ok(())
1797 }
1798
1799 #[cfg(feature = "lmdb")]
1800 #[test]
1801 fn local_store_can_override_lmdb_map_size() -> Result<()> {
1802 let temp = TempDir::new()?;
1803 let requested = 512 * 1024 * 1024u64;
1804 let store = LocalStore::new_with_lmdb_map_size(
1805 temp.path().join("lmdb-blobs"),
1806 &StorageBackend::Lmdb,
1807 Some(requested),
1808 )?;
1809
1810 let map_size = match store {
1811 LocalStore::Lmdb(local) => local.map_size_bytes() as u64,
1812 LocalStore::Fs(_) => panic!("expected LMDB local store"),
1813 };
1814
1815 assert!(
1816 map_size >= requested,
1817 "expected LMDB map to grow to at least {requested} bytes, got {map_size}"
1818 );
1819
1820 Ok(())
1821 }
1822
1823 #[cfg(feature = "lmdb")]
1824 #[test]
1825 fn lmdb_local_store_removes_stale_fs_blob_shard_dirs() -> Result<()> {
1826 let temp = TempDir::new()?;
1827 let path = temp.path().join("lmdb-blobs");
1828 std::fs::create_dir_all(path.join("aa"))?;
1829 std::fs::create_dir_all(path.join("b2"))?;
1830 std::fs::create_dir_all(path.join("keep-me"))?;
1831 std::fs::write(path.join("aa").join("blob.bin"), b"old fs shard")?;
1832 std::fs::write(path.join("b2").join("blob.bin"), b"old fs shard")?;
1833 std::fs::write(path.join("keep-me").join("note.txt"), b"keep")?;
1834
1835 let _store = LocalStore::new_with_lmdb_map_size(
1836 &path,
1837 &StorageBackend::Lmdb,
1838 Some(128 * 1024 * 1024),
1839 )?;
1840
1841 assert!(!path.join("aa").exists());
1842 assert!(!path.join("b2").exists());
1843 assert!(path.join("keep-me").exists());
1844 assert!(path.join("data.mdb").exists());
1845 assert!(path.join("lock.mdb").exists());
1846
1847 Ok(())
1848 }
1849
1850 #[cfg(feature = "lmdb")]
1851 #[test]
1852 fn replacing_tree_ref_unpins_and_unindexes_superseded_root() -> Result<()> {
1853 let temp = TempDir::new()?;
1854 let store = HashtreeStore::with_options_and_backend(
1855 temp.path(),
1856 None,
1857 LMDB_BLOB_MIN_MAP_SIZE_BYTES,
1858 true,
1859 &StorageBackend::Lmdb,
1860 )?;
1861
1862 let old_bytes = b"old published root";
1863 let new_bytes = b"new published root";
1864 let old_root = sha256(old_bytes);
1865 let new_root = sha256(new_bytes);
1866
1867 store.put_blob(old_bytes)?;
1868 store.pin(&old_root)?;
1869 store.index_tree(
1870 &old_root,
1871 "owner",
1872 Some("playlist"),
1873 PRIORITY_OWN,
1874 Some("npub1owner/playlist"),
1875 )?;
1876
1877 assert!(store.is_pinned(&old_root)?);
1878 assert!(store.get_tree_meta(&old_root)?.is_some());
1879
1880 store.put_blob(new_bytes)?;
1881 store.pin(&new_root)?;
1882 store.index_tree(
1883 &new_root,
1884 "owner",
1885 Some("playlist"),
1886 PRIORITY_OWN,
1887 Some("npub1owner/playlist"),
1888 )?;
1889
1890 assert!(
1891 !store.is_pinned(&old_root)?,
1892 "superseded root should be unpinned when ref is replaced"
1893 );
1894 assert!(
1895 store.get_tree_meta(&old_root)?.is_none(),
1896 "superseded root metadata should be removed when ref is replaced"
1897 );
1898 assert!(store.is_pinned(&new_root)?);
1899 assert!(store.get_tree_meta(&new_root)?.is_some());
1900
1901 Ok(())
1902 }
1903
1904 #[test]
1905 fn tracked_authors_round_trip_sorted_and_deduplicated() -> Result<()> {
1906 let temp = TempDir::new()?;
1907 let store = HashtreeStore::with_options(temp.path(), None, 1024 * 1024)?;
1908
1909 store
1910 .add_tracked_author("npub1zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzs9d3kk")?;
1911 store
1912 .add_tracked_author("npub1aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaqf5slm")?;
1913 store
1914 .add_tracked_author("npub1zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzs9d3kk")?;
1915
1916 assert_eq!(
1917 store.list_tracked_authors()?,
1918 vec![
1919 "npub1aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaqf5slm".to_string(),
1920 "npub1zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzs9d3kk".to_string(),
1921 ]
1922 );
1923 assert!(store.remove_tracked_author(
1924 "npub1aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaqf5slm"
1925 )?);
1926 assert!(!store.remove_tracked_author(
1927 "npub1bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbpqqqqq"
1928 )?);
1929 assert_eq!(
1930 store.list_tracked_authors()?,
1931 vec!["npub1zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzs9d3kk".to_string()]
1932 );
1933
1934 Ok(())
1935 }
1936
1937 #[cfg(feature = "s3")]
1938 #[test]
1939 fn async_store_s3_fallback_does_not_reenter_futures_executor() -> Result<()> {
1940 let temp = tempfile::TempDir::new()?;
1941 let local = Arc::new(LocalStore::new(
1942 temp.path().join("blobs"),
1943 &StorageBackend::Fs,
1944 )?);
1945
1946 let outcome = std::panic::catch_unwind(|| {
1947 sync_block_on(async {
1948 let aws_config = aws_config::from_env()
1949 .region(aws_sdk_s3::config::Region::new("auto"))
1950 .load()
1951 .await;
1952 let s3_client = aws_sdk_s3::Client::from_conf(
1953 aws_sdk_s3::config::Builder::from(&aws_config)
1954 .endpoint_url("http://127.0.0.1:9")
1955 .force_path_style(true)
1956 .build(),
1957 );
1958
1959 let router = StorageRouter {
1960 local,
1961 s3_client: Some(s3_client),
1962 s3_bucket: Some("test-bucket".to_string()),
1963 s3_prefix: String::new(),
1964 sync_tx: None,
1965 };
1966 let hash = [0u8; 32];
1967
1968 let _ = Store::has(&router, &hash).await;
1969 let _ = Store::get(&router, &hash).await;
1970 });
1971 });
1972
1973 assert!(
1974 outcome.is_ok(),
1975 "S3-backed async store methods should not panic inside futures::block_on"
1976 );
1977
1978 Ok(())
1979 }
1980}