1use anyhow::{Context, Result};
2use async_trait::async_trait;
3use futures::executor::block_on as sync_block_on;
4use futures::StreamExt;
5use hashtree_config::StorageBackend;
6use hashtree_core::store::{Store, StoreError};
7use hashtree_core::{
8 from_hex, sha256, to_hex, types::Hash, Cid, HashTree, HashTreeConfig, TreeNode,
9};
10use hashtree_fs::FsBlobStore;
11#[cfg(feature = "lmdb")]
12use hashtree_lmdb::LmdbBlobStore;
13use heed::types::*;
14use heed::{Database, EnvOpenOptions};
15use serde::{Deserialize, Serialize};
16use std::io::Write;
17use std::path::{Path, PathBuf};
18use std::sync::Arc;
19use std::time::{SystemTime, UNIX_EPOCH};
20
21mod upload;
22
23mod maintenance;
24mod retention;
25
26pub use maintenance::VerifyResult;
27pub use retention::{PinnedItem, StorageByPriority, StorageStats, TreeMeta};
28
29pub const PRIORITY_OTHER: u8 = 64;
31pub const PRIORITY_FOLLOWED: u8 = 128;
32pub const PRIORITY_OWN: u8 = 255;
33const LMDB_MAX_READERS: u32 = 1024;
34#[cfg(feature = "lmdb")]
35const LMDB_BLOB_MIN_MAP_SIZE_BYTES: u64 = 10 * 1024 * 1024 * 1024;
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct CachedRoot {
40 pub hash: String,
42 pub key: Option<String>,
44 pub updated_at: u64,
46 pub visibility: String,
48}
49
50#[derive(Debug, Clone)]
52pub struct LocalStoreStats {
53 pub count: usize,
54 pub total_bytes: u64,
55}
56
57pub enum LocalStore {
59 Fs(FsBlobStore),
60 #[cfg(feature = "lmdb")]
61 Lmdb(LmdbBlobStore),
62}
63
64#[cfg(feature = "lmdb")]
65fn is_fs_blob_shard_dir(path: &Path) -> bool {
66 path.file_name()
67 .and_then(|name| name.to_str())
68 .map(|name| name.len() == 2 && name.as_bytes().iter().all(u8::is_ascii_hexdigit))
69 .unwrap_or(false)
70}
71
72#[cfg(feature = "lmdb")]
73fn remove_stale_fs_blob_shards(path: &Path) -> Result<(), StoreError> {
74 let entries = std::fs::read_dir(path).map_err(StoreError::Io)?;
75 for entry in entries {
76 let entry = entry.map_err(StoreError::Io)?;
77 let entry_path = entry.path();
78 if entry_path.is_dir() && is_fs_blob_shard_dir(&entry_path) {
79 std::fs::remove_dir_all(&entry_path).map_err(StoreError::Io)?;
80 tracing::info!(
81 "Removed stale filesystem blob shard directory after LMDB cutover: {}",
82 entry_path.display()
83 );
84 }
85 }
86 Ok(())
87}
88
89impl LocalStore {
90 pub fn new<P: AsRef<Path>>(path: P, backend: &StorageBackend) -> Result<Self, StoreError> {
96 Self::new_unbounded(path, backend)
97 }
98
99 pub fn new_with_lmdb_map_size<P: AsRef<Path>>(
101 path: P,
102 backend: &StorageBackend,
103 _map_size_bytes: Option<u64>,
104 ) -> Result<Self, StoreError> {
105 match backend {
106 StorageBackend::Fs => Ok(LocalStore::Fs(FsBlobStore::new(path)?)),
107 #[cfg(feature = "lmdb")]
108 StorageBackend::Lmdb => match _map_size_bytes {
109 Some(map_size_bytes) => {
110 std::fs::create_dir_all(path.as_ref()).map_err(StoreError::Io)?;
111 remove_stale_fs_blob_shards(path.as_ref())?;
112 let map_size = usize::try_from(map_size_bytes).map_err(|_| {
113 StoreError::Other("LMDB map size exceeds usize".to_string())
114 })?;
115 Ok(LocalStore::Lmdb(LmdbBlobStore::with_map_size(
116 path, map_size,
117 )?))
118 }
119 None => {
120 std::fs::create_dir_all(path.as_ref()).map_err(StoreError::Io)?;
121 remove_stale_fs_blob_shards(path.as_ref())?;
122 Ok(LocalStore::Lmdb(LmdbBlobStore::new(path)?))
123 }
124 },
125 #[cfg(not(feature = "lmdb"))]
126 StorageBackend::Lmdb => {
127 tracing::warn!(
128 "LMDB backend requested but lmdb feature not enabled, using filesystem storage"
129 );
130 Ok(LocalStore::Fs(FsBlobStore::new(path)?))
131 }
132 }
133 }
134
135 pub fn new_unbounded<P: AsRef<Path>>(
137 path: P,
138 backend: &StorageBackend,
139 ) -> Result<Self, StoreError> {
140 Self::new_with_lmdb_map_size(path, backend, None)
141 }
142
143 pub fn backend(&self) -> StorageBackend {
144 match self {
145 LocalStore::Fs(_) => StorageBackend::Fs,
146 #[cfg(feature = "lmdb")]
147 LocalStore::Lmdb(_) => StorageBackend::Lmdb,
148 }
149 }
150
151 pub fn put_sync(&self, hash: Hash, data: &[u8]) -> Result<bool, StoreError> {
153 match self {
154 LocalStore::Fs(store) => store.put_sync(hash, data),
155 #[cfg(feature = "lmdb")]
156 LocalStore::Lmdb(store) => store.put_sync(hash, data),
157 }
158 }
159
160 pub fn put_many_sync(&self, items: &[(Hash, Vec<u8>)]) -> Result<usize, StoreError> {
162 match self {
163 LocalStore::Fs(store) => {
164 let mut inserted = 0usize;
165 for (hash, data) in items {
166 if store.put_sync(*hash, data.as_slice())? {
167 inserted += 1;
168 }
169 }
170 Ok(inserted)
171 }
172 #[cfg(feature = "lmdb")]
173 LocalStore::Lmdb(store) => store.put_many_sync(items),
174 }
175 }
176
177 pub fn get_sync(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
179 match self {
180 LocalStore::Fs(store) => store.get_sync(hash),
181 #[cfg(feature = "lmdb")]
182 LocalStore::Lmdb(store) => store.get_sync(hash),
183 }
184 }
185
186 pub fn exists(&self, hash: &Hash) -> Result<bool, StoreError> {
188 match self {
189 LocalStore::Fs(store) => Ok(store.exists(hash)),
190 #[cfg(feature = "lmdb")]
191 LocalStore::Lmdb(store) => store.exists(hash),
192 }
193 }
194
195 pub fn delete_sync(&self, hash: &Hash) -> Result<bool, StoreError> {
197 match self {
198 LocalStore::Fs(store) => store.delete_sync(hash),
199 #[cfg(feature = "lmdb")]
200 LocalStore::Lmdb(store) => store.delete_sync(hash),
201 }
202 }
203
204 pub fn stats(&self) -> Result<LocalStoreStats, StoreError> {
206 match self {
207 LocalStore::Fs(store) => {
208 let stats = store.stats()?;
209 Ok(LocalStoreStats {
210 count: stats.count,
211 total_bytes: stats.total_bytes,
212 })
213 }
214 #[cfg(feature = "lmdb")]
215 LocalStore::Lmdb(store) => {
216 let stats = store.stats()?;
217 Ok(LocalStoreStats {
218 count: stats.count,
219 total_bytes: stats.total_bytes,
220 })
221 }
222 }
223 }
224
225 pub fn list(&self) -> Result<Vec<Hash>, StoreError> {
227 match self {
228 LocalStore::Fs(store) => store.list(),
229 #[cfg(feature = "lmdb")]
230 LocalStore::Lmdb(store) => store.list(),
231 }
232 }
233}
234
235#[async_trait]
236impl Store for LocalStore {
237 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
238 self.put_sync(hash, &data)
239 }
240
241 async fn put_many(&self, items: Vec<(Hash, Vec<u8>)>) -> Result<usize, StoreError> {
242 self.put_many_sync(&items)
243 }
244
245 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
246 self.get_sync(hash)
247 }
248
249 async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
250 self.exists(hash)
251 }
252
253 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
254 self.delete_sync(hash)
255 }
256}
257
258#[cfg(feature = "s3")]
259use tokio::sync::mpsc;
260
261use crate::config::S3Config;
262
263#[cfg(feature = "s3")]
265enum S3SyncMessage {
266 Upload { hash: Hash, data: Vec<u8> },
267 Delete { hash: Hash },
268}
269
270pub struct StorageRouter {
275 local: Arc<LocalStore>,
277 #[cfg(feature = "s3")]
279 s3_client: Option<aws_sdk_s3::Client>,
280 #[cfg(feature = "s3")]
281 s3_bucket: Option<String>,
282 #[cfg(feature = "s3")]
283 s3_prefix: String,
284 #[cfg(feature = "s3")]
286 sync_tx: Option<mpsc::UnboundedSender<S3SyncMessage>>,
287}
288
289impl StorageRouter {
290 pub fn new(local: Arc<LocalStore>) -> Self {
292 Self {
293 local,
294 #[cfg(feature = "s3")]
295 s3_client: None,
296 #[cfg(feature = "s3")]
297 s3_bucket: None,
298 #[cfg(feature = "s3")]
299 s3_prefix: String::new(),
300 #[cfg(feature = "s3")]
301 sync_tx: None,
302 }
303 }
304
305 #[cfg(feature = "s3")]
307 pub async fn with_s3(local: Arc<LocalStore>, config: &S3Config) -> Result<Self, anyhow::Error> {
308 use aws_sdk_s3::Client as S3Client;
309
310 let mut aws_config_loader = aws_config::from_env();
312 aws_config_loader =
313 aws_config_loader.region(aws_sdk_s3::config::Region::new(config.region.clone()));
314 let aws_config = aws_config_loader.load().await;
315
316 let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&aws_config);
318 s3_config_builder = s3_config_builder
319 .endpoint_url(&config.endpoint)
320 .force_path_style(true);
321
322 let s3_client = S3Client::from_conf(s3_config_builder.build());
323 let bucket = config.bucket.clone();
324 let prefix = config.prefix.clone().unwrap_or_default();
325
326 let (sync_tx, mut sync_rx) = mpsc::unbounded_channel::<S3SyncMessage>();
328
329 let sync_client = s3_client.clone();
331 let sync_bucket = bucket.clone();
332 let sync_prefix = prefix.clone();
333
334 tokio::spawn(async move {
335 use aws_sdk_s3::primitives::ByteStream;
336
337 tracing::info!("S3 background sync task started");
338
339 let semaphore = std::sync::Arc::new(tokio::sync::Semaphore::new(32));
341 let client = std::sync::Arc::new(sync_client);
342 let bucket = std::sync::Arc::new(sync_bucket);
343 let prefix = std::sync::Arc::new(sync_prefix);
344
345 while let Some(msg) = sync_rx.recv().await {
346 let client = client.clone();
347 let bucket = bucket.clone();
348 let prefix = prefix.clone();
349 let semaphore = semaphore.clone();
350
351 tokio::spawn(async move {
353 let _permit = semaphore.acquire().await;
355
356 match msg {
357 S3SyncMessage::Upload { hash, data } => {
358 let key = format!("{}{}.bin", prefix, to_hex(&hash));
359 tracing::debug!("S3 uploading {} ({} bytes)", &key, data.len());
360
361 match client
362 .put_object()
363 .bucket(bucket.as_str())
364 .key(&key)
365 .body(ByteStream::from(data))
366 .send()
367 .await
368 {
369 Ok(_) => tracing::debug!("S3 upload succeeded: {}", &key),
370 Err(e) => tracing::error!("S3 upload failed {}: {}", &key, e),
371 }
372 }
373 S3SyncMessage::Delete { hash } => {
374 let key = format!("{}{}.bin", prefix, to_hex(&hash));
375 tracing::debug!("S3 deleting {}", &key);
376
377 if let Err(e) = client
378 .delete_object()
379 .bucket(bucket.as_str())
380 .key(&key)
381 .send()
382 .await
383 {
384 tracing::error!("S3 delete failed {}: {}", &key, e);
385 }
386 }
387 }
388 });
389 }
390 });
391
392 tracing::info!(
393 "S3 storage initialized: bucket={}, prefix={}",
394 bucket,
395 prefix
396 );
397
398 Ok(Self {
399 local,
400 s3_client: Some(s3_client),
401 s3_bucket: Some(bucket),
402 s3_prefix: prefix,
403 sync_tx: Some(sync_tx),
404 })
405 }
406
407 pub fn put_sync(&self, hash: Hash, data: &[u8]) -> Result<bool, StoreError> {
409 let is_new = self.local.put_sync(hash, data)?;
411
412 #[cfg(feature = "s3")]
415 if let Some(ref tx) = self.sync_tx {
416 tracing::info!(
417 "Queueing S3 upload for {} ({} bytes, is_new={})",
418 crate::storage::to_hex(&hash)[..16].to_string(),
419 data.len(),
420 is_new
421 );
422 if let Err(e) = tx.send(S3SyncMessage::Upload {
423 hash,
424 data: data.to_vec(),
425 }) {
426 tracing::error!("Failed to queue S3 upload: {}", e);
427 }
428 }
429
430 Ok(is_new)
431 }
432
433 pub fn put_many_sync(&self, items: &[(Hash, Vec<u8>)]) -> Result<usize, StoreError> {
435 let inserted = self.local.put_many_sync(items)?;
436
437 #[cfg(feature = "s3")]
438 if let Some(ref tx) = self.sync_tx {
439 for (hash, data) in items {
440 if let Err(e) = tx.send(S3SyncMessage::Upload {
441 hash: *hash,
442 data: data.clone(),
443 }) {
444 tracing::error!("Failed to queue S3 upload: {}", e);
445 }
446 }
447 }
448
449 Ok(inserted)
450 }
451
452 pub fn get_sync(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
454 if let Some(data) = self.local.get_sync(hash)? {
456 return Ok(Some(data));
457 }
458
459 #[cfg(feature = "s3")]
461 if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
462 let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
463
464 match sync_block_on(async { client.get_object().bucket(bucket).key(&key).send().await })
465 {
466 Ok(output) => {
467 if let Ok(body) = sync_block_on(output.body.collect()) {
468 let data = body.into_bytes().to_vec();
469 let _ = self.local.put_sync(*hash, &data);
471 return Ok(Some(data));
472 }
473 }
474 Err(e) => {
475 let service_err = e.into_service_error();
476 if !service_err.is_no_such_key() {
477 tracing::warn!("S3 get failed: {}", service_err);
478 }
479 }
480 }
481 }
482
483 Ok(None)
484 }
485
486 pub fn exists(&self, hash: &Hash) -> Result<bool, StoreError> {
488 if self.local.exists(hash)? {
490 return Ok(true);
491 }
492
493 #[cfg(feature = "s3")]
495 if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
496 let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
497
498 match sync_block_on(async {
499 client.head_object().bucket(bucket).key(&key).send().await
500 }) {
501 Ok(_) => return Ok(true),
502 Err(e) => {
503 let service_err = e.into_service_error();
504 if !service_err.is_not_found() {
505 tracing::warn!("S3 head failed: {}", service_err);
506 }
507 }
508 }
509 }
510
511 Ok(false)
512 }
513
514 pub fn delete_sync(&self, hash: &Hash) -> Result<bool, StoreError> {
516 let deleted = self.local.delete_sync(hash)?;
517
518 #[cfg(feature = "s3")]
520 if let Some(ref tx) = self.sync_tx {
521 let _ = tx.send(S3SyncMessage::Delete { hash: *hash });
522 }
523
524 Ok(deleted)
525 }
526
527 pub fn delete_local_only(&self, hash: &Hash) -> Result<bool, StoreError> {
530 self.local.delete_sync(hash)
531 }
532
533 pub fn stats(&self) -> Result<LocalStoreStats, StoreError> {
535 self.local.stats()
536 }
537
538 pub fn list(&self) -> Result<Vec<Hash>, StoreError> {
540 self.local.list()
541 }
542
543 pub fn local_store(&self) -> Arc<LocalStore> {
545 Arc::clone(&self.local)
546 }
547}
548
549#[async_trait]
552impl Store for StorageRouter {
553 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
554 self.put_sync(hash, &data)
555 }
556
557 async fn put_many(&self, items: Vec<(Hash, Vec<u8>)>) -> Result<usize, StoreError> {
558 self.put_many_sync(&items)
559 }
560
561 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
562 self.get_sync(hash)
563 }
564
565 async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
566 self.exists(hash)
567 }
568
569 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
570 self.delete_sync(hash)
571 }
572}
573
574pub struct HashtreeStore {
575 base_path: PathBuf,
576 env: heed::Env,
577 pins: Database<Bytes, Unit>,
579 pinned_refs: Database<Str, Unit>,
581 blob_owners: Database<Bytes, Unit>,
583 pubkey_blobs: Database<Bytes, Bytes>,
585 tree_meta: Database<Bytes, Bytes>,
587 blob_trees: Database<Bytes, Unit>,
589 tree_refs: Database<Str, Bytes>,
591 cached_roots: Database<Str, Bytes>,
593 router: Arc<StorageRouter>,
595 max_size_bytes: u64,
597 evict_orphans: bool,
599}
600
601impl HashtreeStore {
602 pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
604 let config = hashtree_config::Config::load_or_default();
605 let max_size_bytes = config
606 .storage
607 .max_size_gb
608 .saturating_mul(1024 * 1024 * 1024);
609 Self::with_options_and_backend(
610 path,
611 None,
612 max_size_bytes,
613 config.storage.evict_orphans,
614 &config.storage.backend,
615 )
616 }
617
618 pub fn new_with_backend<P: AsRef<Path>>(
620 path: P,
621 backend: hashtree_config::StorageBackend,
622 max_size_bytes: u64,
623 ) -> Result<Self> {
624 Self::with_options_and_backend(path, None, max_size_bytes, true, &backend)
625 }
626
627 pub fn with_s3<P: AsRef<Path>>(path: P, s3_config: Option<&S3Config>) -> Result<Self> {
629 let config = hashtree_config::Config::load_or_default();
630 let max_size_bytes = config
631 .storage
632 .max_size_gb
633 .saturating_mul(1024 * 1024 * 1024);
634 Self::with_options_and_backend(
635 path,
636 s3_config,
637 max_size_bytes,
638 config.storage.evict_orphans,
639 &config.storage.backend,
640 )
641 }
642
643 pub fn with_options<P: AsRef<Path>>(
649 path: P,
650 s3_config: Option<&S3Config>,
651 max_size_bytes: u64,
652 ) -> Result<Self> {
653 let config = hashtree_config::Config::load_or_default();
654 Self::with_options_and_backend(
655 path,
656 s3_config,
657 max_size_bytes,
658 config.storage.evict_orphans,
659 &config.storage.backend,
660 )
661 }
662
663 fn with_options_and_backend<P: AsRef<Path>>(
664 path: P,
665 s3_config: Option<&S3Config>,
666 max_size_bytes: u64,
667 evict_orphans: bool,
668 backend: &hashtree_config::StorageBackend,
669 ) -> Result<Self> {
670 let path = path.as_ref();
671 std::fs::create_dir_all(path)?;
672
673 let env = unsafe {
674 EnvOpenOptions::new()
675 .map_size(10 * 1024 * 1024 * 1024) .max_dbs(9) .max_readers(LMDB_MAX_READERS)
678 .open(path)?
679 };
680 let _ = env.clear_stale_readers();
681
682 let mut wtxn = env.write_txn()?;
683 let pins = env.create_database(&mut wtxn, Some("pins"))?;
684 let pinned_refs = env.create_database(&mut wtxn, Some("pinned_refs"))?;
685 let blob_owners = env.create_database(&mut wtxn, Some("blob_owners"))?;
686 let pubkey_blobs = env.create_database(&mut wtxn, Some("pubkey_blobs"))?;
687 let tree_meta = env.create_database(&mut wtxn, Some("tree_meta"))?;
688 let blob_trees = env.create_database(&mut wtxn, Some("blob_trees"))?;
689 let tree_refs = env.create_database(&mut wtxn, Some("tree_refs"))?;
690 let cached_roots = env.create_database(&mut wtxn, Some("cached_roots"))?;
691 wtxn.commit()?;
692
693 let local_store = Arc::new(match backend {
697 hashtree_config::StorageBackend::Fs => LocalStore::Fs(
698 FsBlobStore::new(path.join("blobs"))
699 .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
700 ),
701 #[cfg(feature = "lmdb")]
702 hashtree_config::StorageBackend::Lmdb => {
703 std::fs::create_dir_all(path.join("blobs"))?;
704 remove_stale_fs_blob_shards(&path.join("blobs"))
705 .map_err(|e| anyhow::anyhow!("Failed to clean LMDB blob store path: {}", e))?;
706 let requested_map_size = max_size_bytes.max(LMDB_BLOB_MIN_MAP_SIZE_BYTES);
707 let map_size = usize::try_from(requested_map_size)
708 .context("LMDB blob map size exceeds usize")?;
709 LocalStore::Lmdb(
710 LmdbBlobStore::with_map_size(path.join("blobs"), map_size)
711 .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
712 )
713 }
714 #[cfg(not(feature = "lmdb"))]
715 hashtree_config::StorageBackend::Lmdb => {
716 tracing::warn!(
717 "LMDB backend requested but lmdb feature not enabled, using filesystem storage"
718 );
719 LocalStore::Fs(
720 FsBlobStore::new(path.join("blobs"))
721 .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
722 )
723 }
724 });
725
726 #[cfg(feature = "s3")]
728 let router = Arc::new(if let Some(s3_cfg) = s3_config {
729 tracing::info!(
730 "Initializing S3 storage backend: bucket={}, endpoint={}",
731 s3_cfg.bucket,
732 s3_cfg.endpoint
733 );
734
735 sync_block_on(async { StorageRouter::with_s3(local_store, s3_cfg).await })?
736 } else {
737 StorageRouter::new(local_store)
738 });
739
740 #[cfg(not(feature = "s3"))]
741 let router = Arc::new({
742 if s3_config.is_some() {
743 tracing::warn!(
744 "S3 config provided but S3 feature not enabled. Using local storage only."
745 );
746 }
747 StorageRouter::new(local_store)
748 });
749
750 Ok(Self {
751 base_path: path.to_path_buf(),
752 env,
753 pins,
754 pinned_refs,
755 blob_owners,
756 pubkey_blobs,
757 tree_meta,
758 blob_trees,
759 tree_refs,
760 cached_roots,
761 router,
762 max_size_bytes,
763 evict_orphans,
764 })
765 }
766
767 pub fn base_path(&self) -> &Path {
768 &self.base_path
769 }
770
771 pub fn router(&self) -> &StorageRouter {
773 &self.router
774 }
775
776 pub fn store_arc(&self) -> Arc<StorageRouter> {
779 Arc::clone(&self.router)
780 }
781
782 pub fn get_tree_node(&self, hash: &[u8; 32]) -> Result<Option<TreeNode>> {
784 let store = self.store_arc();
785 let tree = HashTree::new(HashTreeConfig::new(store).public());
786
787 sync_block_on(async {
788 tree.get_tree_node(hash)
789 .await
790 .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))
791 })
792 }
793
794 pub fn put_blob(&self, data: &[u8]) -> Result<String> {
796 let hash = sha256(data);
797 self.router
798 .put_sync(hash, data)
799 .map_err(|e| anyhow::anyhow!("Failed to store blob: {}", e))?;
800 Ok(to_hex(&hash))
801 }
802
803 pub fn put_cached_blob(&self, data: &[u8]) -> Result<String> {
809 let hash = sha256(data);
810 if self
811 .router
812 .exists(&hash)
813 .map_err(|e| anyhow::anyhow!("Failed to check cached blob: {}", e))?
814 {
815 return Ok(to_hex(&hash));
816 }
817
818 let incoming_bytes = data.len() as u64;
819 let _ = self.make_room_for_cached_blob(incoming_bytes);
820
821 let mut retried_after_cleanup = false;
822 loop {
823 match self.router.put_sync(hash, data) {
824 Ok(_) => return Ok(to_hex(&hash)),
825 Err(err) if !retried_after_cleanup && is_map_full_store_error(&err) => {
826 let freed = self.relieve_cached_blob_write_pressure(incoming_bytes)?;
827 if freed == 0 {
828 return Err(anyhow::anyhow!("Failed to store cached blob: {}", err));
829 }
830 retried_after_cleanup = true;
831 }
832 Err(err) => return Err(anyhow::anyhow!("Failed to store cached blob: {}", err)),
833 }
834 }
835 }
836
837 pub fn get_blob(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
839 self.router
840 .get_sync(hash)
841 .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))
842 }
843
844 pub fn blob_exists(&self, hash: &[u8; 32]) -> Result<bool> {
846 self.router
847 .exists(hash)
848 .map_err(|e| anyhow::anyhow!("Failed to check blob: {}", e))
849 }
850
851 fn blob_owner_key(sha256: &[u8; 32], pubkey: &[u8; 32]) -> [u8; 64] {
857 let mut key = [0u8; 64];
858 key[..32].copy_from_slice(sha256);
859 key[32..].copy_from_slice(pubkey);
860 key
861 }
862
863 pub fn set_blob_owner(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<()> {
866 let key = Self::blob_owner_key(sha256, pubkey);
867 let mut wtxn = self.env.write_txn()?;
868
869 self.blob_owners.put(&mut wtxn, &key[..], &())?;
871
872 let sha256_hex = to_hex(sha256);
874
875 let mut blobs: Vec<BlobMetadata> = self
877 .pubkey_blobs
878 .get(&wtxn, pubkey)?
879 .and_then(|b| serde_json::from_slice(b).ok())
880 .unwrap_or_default();
881
882 if !blobs.iter().any(|b| b.sha256 == sha256_hex) {
884 let now = SystemTime::now()
885 .duration_since(UNIX_EPOCH)
886 .unwrap()
887 .as_secs();
888
889 let size = self
891 .get_blob(sha256)?
892 .map(|data| data.len() as u64)
893 .unwrap_or(0);
894
895 blobs.push(BlobMetadata {
896 sha256: sha256_hex,
897 size,
898 mime_type: "application/octet-stream".to_string(),
899 uploaded: now,
900 });
901
902 let blobs_json = serde_json::to_vec(&blobs)?;
903 self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
904 }
905
906 wtxn.commit()?;
907 Ok(())
908 }
909
910 pub fn is_blob_owner(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<bool> {
912 let key = Self::blob_owner_key(sha256, pubkey);
913 let rtxn = self.env.read_txn()?;
914 Ok(self.blob_owners.get(&rtxn, &key[..])?.is_some())
915 }
916
917 pub fn get_blob_owners(&self, sha256: &[u8; 32]) -> Result<Vec<[u8; 32]>> {
919 let rtxn = self.env.read_txn()?;
920
921 let mut owners = Vec::new();
922 for item in self.blob_owners.prefix_iter(&rtxn, &sha256[..])? {
923 let (key, _) = item?;
924 if key.len() == 64 {
925 let mut pubkey = [0u8; 32];
927 pubkey.copy_from_slice(&key[32..64]);
928 owners.push(pubkey);
929 }
930 }
931 Ok(owners)
932 }
933
934 pub fn blob_has_owners(&self, sha256: &[u8; 32]) -> Result<bool> {
936 let rtxn = self.env.read_txn()?;
937
938 for item in self.blob_owners.prefix_iter(&rtxn, &sha256[..])? {
940 if item.is_ok() {
941 return Ok(true);
942 }
943 }
944 Ok(false)
945 }
946
947 pub fn get_blob_owner(&self, sha256: &[u8; 32]) -> Result<Option<[u8; 32]>> {
949 Ok(self.get_blob_owners(sha256)?.into_iter().next())
950 }
951
952 pub fn delete_blossom_blob(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<bool> {
956 let key = Self::blob_owner_key(sha256, pubkey);
957 let mut wtxn = self.env.write_txn()?;
958
959 self.blob_owners.delete(&mut wtxn, &key[..])?;
961
962 let sha256_hex = to_hex(sha256);
964
965 if let Some(blobs_bytes) = self.pubkey_blobs.get(&wtxn, pubkey)? {
967 if let Ok(mut blobs) = serde_json::from_slice::<Vec<BlobMetadata>>(blobs_bytes) {
968 blobs.retain(|b| b.sha256 != sha256_hex);
969 let blobs_json = serde_json::to_vec(&blobs)?;
970 self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
971 }
972 }
973
974 let mut has_other_owners = false;
976 for item in self.blob_owners.prefix_iter(&wtxn, &sha256[..])? {
977 if item.is_ok() {
978 has_other_owners = true;
979 break;
980 }
981 }
982
983 if has_other_owners {
984 wtxn.commit()?;
985 tracing::debug!(
986 "Removed {} from blob {} owners, other owners remain",
987 &to_hex(pubkey)[..8],
988 &sha256_hex[..8]
989 );
990 return Ok(false);
991 }
992
993 tracing::info!(
995 "All owners removed from blob {}, deleting",
996 &sha256_hex[..8]
997 );
998
999 let _ = self.router.delete_sync(sha256);
1001
1002 wtxn.commit()?;
1003 Ok(true)
1004 }
1005
1006 pub fn list_blobs_by_pubkey(
1008 &self,
1009 pubkey: &[u8; 32],
1010 ) -> Result<Vec<crate::server::blossom::BlobDescriptor>> {
1011 let rtxn = self.env.read_txn()?;
1012
1013 let blobs: Vec<BlobMetadata> = self
1014 .pubkey_blobs
1015 .get(&rtxn, pubkey)?
1016 .and_then(|b| serde_json::from_slice(b).ok())
1017 .unwrap_or_default();
1018
1019 Ok(blobs
1020 .into_iter()
1021 .map(|b| crate::server::blossom::BlobDescriptor {
1022 url: format!("/{}", b.sha256),
1023 sha256: b.sha256,
1024 size: b.size,
1025 mime_type: b.mime_type,
1026 uploaded: b.uploaded,
1027 })
1028 .collect())
1029 }
1030
1031 pub fn get_chunk(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
1033 self.router
1034 .get_sync(hash)
1035 .map_err(|e| anyhow::anyhow!("Failed to get chunk: {}", e))
1036 }
1037
1038 pub fn get_file(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
1041 let store = self.store_arc();
1042 let tree = HashTree::new(HashTreeConfig::new(store).public());
1043
1044 sync_block_on(async {
1045 tree.read_file(hash)
1046 .await
1047 .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
1048 })
1049 }
1050
1051 pub fn get_file_by_cid(&self, cid: &Cid) -> Result<Option<Vec<u8>>> {
1054 let store = self.store_arc();
1055 let tree = HashTree::new(HashTreeConfig::new(store).public());
1056
1057 sync_block_on(async {
1058 tree.get(cid, None)
1059 .await
1060 .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
1061 })
1062 }
1063
1064 fn ensure_cid_exists(&self, cid: &Cid) -> Result<()> {
1065 let exists = self
1066 .router
1067 .exists(&cid.hash)
1068 .map_err(|e| anyhow::anyhow!("Failed to check cid existence: {}", e))?;
1069 if !exists {
1070 anyhow::bail!("CID not found: {}", to_hex(&cid.hash));
1071 }
1072 Ok(())
1073 }
1074
1075 pub fn write_file_by_cid_to_writer<W: Write>(&self, cid: &Cid, writer: &mut W) -> Result<u64> {
1077 self.ensure_cid_exists(cid)?;
1078
1079 let store = self.store_arc();
1080 let tree = HashTree::new(HashTreeConfig::new(store).public());
1081 let mut total_bytes = 0u64;
1082 let mut streamed_any_chunk = false;
1083
1084 sync_block_on(async {
1085 let mut stream = tree.get_stream(cid);
1086 while let Some(chunk) = stream.next().await {
1087 streamed_any_chunk = true;
1088 let chunk =
1089 chunk.map_err(|e| anyhow::anyhow!("Failed to stream file chunk: {}", e))?;
1090 writer
1091 .write_all(&chunk)
1092 .map_err(|e| anyhow::anyhow!("Failed to write file chunk: {}", e))?;
1093 total_bytes += chunk.len() as u64;
1094 }
1095 Ok::<(), anyhow::Error>(())
1096 })?;
1097
1098 if !streamed_any_chunk {
1099 anyhow::bail!("CID not found: {}", to_hex(&cid.hash));
1100 }
1101
1102 writer
1103 .flush()
1104 .map_err(|e| anyhow::anyhow!("Failed to flush output: {}", e))?;
1105 Ok(total_bytes)
1106 }
1107
1108 pub fn write_file_by_cid<P: AsRef<Path>>(&self, cid: &Cid, output_path: P) -> Result<u64> {
1110 self.ensure_cid_exists(cid)?;
1111
1112 let output_path = output_path.as_ref();
1113 if let Some(parent) = output_path.parent() {
1114 if !parent.as_os_str().is_empty() {
1115 std::fs::create_dir_all(parent).with_context(|| {
1116 format!("Failed to create output directory {}", parent.display())
1117 })?;
1118 }
1119 }
1120
1121 let mut file = std::fs::File::create(output_path)
1122 .with_context(|| format!("Failed to create output file {}", output_path.display()))?;
1123 self.write_file_by_cid_to_writer(cid, &mut file)
1124 }
1125
1126 pub fn write_file<P: AsRef<Path>>(&self, hash: &[u8; 32], output_path: P) -> Result<u64> {
1128 self.write_file_by_cid(&Cid::public(*hash), output_path)
1129 }
1130
1131 pub fn resolve_path(&self, cid: &Cid, path: &str) -> Result<Option<Cid>> {
1133 let store = self.store_arc();
1134 let tree = HashTree::new(HashTreeConfig::new(store).public());
1135
1136 sync_block_on(async {
1137 tree.resolve_path(cid, path)
1138 .await
1139 .map_err(|e| anyhow::anyhow!("Failed to resolve path: {}", e))
1140 })
1141 }
1142
1143 pub fn get_file_chunk_metadata(&self, hash: &[u8; 32]) -> Result<Option<FileChunkMetadata>> {
1145 let store = self.store_arc();
1146 let tree = HashTree::new(HashTreeConfig::new(store.clone()).public());
1147
1148 sync_block_on(async {
1149 let exists = store
1152 .has(hash)
1153 .await
1154 .map_err(|e| anyhow::anyhow!("Failed to check existence: {}", e))?;
1155
1156 if !exists {
1157 return Ok(None);
1158 }
1159
1160 let total_size = tree
1162 .get_size(hash)
1163 .await
1164 .map_err(|e| anyhow::anyhow!("Failed to get size: {}", e))?;
1165
1166 let is_tree_node = tree
1168 .is_tree(hash)
1169 .await
1170 .map_err(|e| anyhow::anyhow!("Failed to check tree: {}", e))?;
1171
1172 if !is_tree_node {
1173 return Ok(Some(FileChunkMetadata {
1175 total_size,
1176 chunk_hashes: vec![],
1177 chunk_sizes: vec![],
1178 is_chunked: false,
1179 }));
1180 }
1181
1182 let node = match tree
1184 .get_tree_node(hash)
1185 .await
1186 .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
1187 {
1188 Some(n) => n,
1189 None => return Ok(None),
1190 };
1191
1192 let is_directory = tree
1194 .is_directory(hash)
1195 .await
1196 .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1197
1198 if is_directory {
1199 return Ok(None); }
1201
1202 let chunk_hashes: Vec<Hash> = node.links.iter().map(|l| l.hash).collect();
1204 let chunk_sizes: Vec<u64> = node.links.iter().map(|l| l.size).collect();
1205
1206 Ok(Some(FileChunkMetadata {
1207 total_size,
1208 chunk_hashes,
1209 chunk_sizes,
1210 is_chunked: !node.links.is_empty(),
1211 }))
1212 })
1213 }
1214
1215 pub fn get_file_range(
1217 &self,
1218 hash: &[u8; 32],
1219 start: u64,
1220 end: Option<u64>,
1221 ) -> Result<Option<(Vec<u8>, u64)>> {
1222 let metadata = match self.get_file_chunk_metadata(hash)? {
1223 Some(m) => m,
1224 None => return Ok(None),
1225 };
1226
1227 if metadata.total_size == 0 {
1228 return Ok(Some((Vec::new(), 0)));
1229 }
1230
1231 if start >= metadata.total_size {
1232 return Ok(None);
1233 }
1234
1235 let end = end
1236 .unwrap_or(metadata.total_size - 1)
1237 .min(metadata.total_size - 1);
1238
1239 if !metadata.is_chunked {
1241 let content = self.get_file(hash)?.unwrap_or_default();
1242 let range_content = if start < content.len() as u64 {
1243 content[start as usize..=(end as usize).min(content.len() - 1)].to_vec()
1244 } else {
1245 Vec::new()
1246 };
1247 return Ok(Some((range_content, metadata.total_size)));
1248 }
1249
1250 let mut result = Vec::new();
1252 let mut current_offset = 0u64;
1253
1254 for (i, chunk_hash) in metadata.chunk_hashes.iter().enumerate() {
1255 let chunk_size = metadata.chunk_sizes[i];
1256 let chunk_end = current_offset + chunk_size - 1;
1257
1258 if chunk_end >= start && current_offset <= end {
1260 let chunk_content = match self.get_chunk(chunk_hash)? {
1261 Some(content) => content,
1262 None => {
1263 return Err(anyhow::anyhow!("Chunk {} not found", to_hex(chunk_hash)));
1264 }
1265 };
1266
1267 let chunk_read_start = if current_offset >= start {
1268 0
1269 } else {
1270 (start - current_offset) as usize
1271 };
1272
1273 let chunk_read_end = if chunk_end <= end {
1274 chunk_size as usize - 1
1275 } else {
1276 (end - current_offset) as usize
1277 };
1278
1279 result.extend_from_slice(&chunk_content[chunk_read_start..=chunk_read_end]);
1280 }
1281
1282 current_offset += chunk_size;
1283
1284 if current_offset > end {
1285 break;
1286 }
1287 }
1288
1289 Ok(Some((result, metadata.total_size)))
1290 }
1291
1292 pub fn stream_file_range_chunks_owned(
1294 self: Arc<Self>,
1295 hash: &[u8; 32],
1296 start: u64,
1297 end: u64,
1298 ) -> Result<Option<FileRangeChunksOwned>> {
1299 let metadata = match self.get_file_chunk_metadata(hash)? {
1300 Some(m) => m,
1301 None => return Ok(None),
1302 };
1303
1304 if metadata.total_size == 0 || start >= metadata.total_size {
1305 return Ok(None);
1306 }
1307
1308 let end = end.min(metadata.total_size - 1);
1309
1310 Ok(Some(FileRangeChunksOwned {
1311 store: self,
1312 metadata,
1313 start,
1314 end,
1315 current_chunk_idx: 0,
1316 current_offset: 0,
1317 }))
1318 }
1319
1320 pub fn get_directory_listing(&self, hash: &[u8; 32]) -> Result<Option<DirectoryListing>> {
1322 let store = self.store_arc();
1323 let tree = HashTree::new(HashTreeConfig::new(store).public());
1324
1325 sync_block_on(async {
1326 let is_dir = tree
1328 .is_directory(hash)
1329 .await
1330 .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1331
1332 if !is_dir {
1333 return Ok(None);
1334 }
1335
1336 let cid = hashtree_core::Cid::public(*hash);
1338 let tree_entries = tree
1339 .list_directory(&cid)
1340 .await
1341 .map_err(|e| anyhow::anyhow!("Failed to list directory: {}", e))?;
1342
1343 let entries: Vec<DirEntry> = tree_entries
1344 .into_iter()
1345 .map(|e| DirEntry {
1346 name: e.name,
1347 cid: to_hex(&e.hash),
1348 is_directory: e.link_type.is_tree(),
1349 size: e.size,
1350 })
1351 .collect();
1352
1353 Ok(Some(DirectoryListing {
1354 dir_name: String::new(),
1355 entries,
1356 }))
1357 })
1358 }
1359
1360 pub fn get_directory_listing_by_cid(&self, cid: &Cid) -> Result<Option<DirectoryListing>> {
1362 let store = self.store_arc();
1363 let tree = HashTree::new(HashTreeConfig::new(store).public());
1364 let cid = cid.clone();
1365
1366 sync_block_on(async {
1367 let is_dir = tree
1368 .is_dir(&cid)
1369 .await
1370 .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1371
1372 if !is_dir {
1373 return Ok(None);
1374 }
1375
1376 let tree_entries = tree
1377 .list_directory(&cid)
1378 .await
1379 .map_err(|e| anyhow::anyhow!("Failed to list directory: {}", e))?;
1380
1381 let entries: Vec<DirEntry> = tree_entries
1382 .into_iter()
1383 .map(|e| DirEntry {
1384 name: e.name,
1385 cid: Cid {
1386 hash: e.hash,
1387 key: e.key,
1388 }
1389 .to_string(),
1390 is_directory: e.link_type.is_tree(),
1391 size: e.size,
1392 })
1393 .collect();
1394
1395 Ok(Some(DirectoryListing {
1396 dir_name: String::new(),
1397 entries,
1398 }))
1399 })
1400 }
1401
1402 pub fn add_pinned_ref(&self, key: &str) -> Result<()> {
1406 let mut wtxn = self.env.write_txn()?;
1407 self.pinned_refs.put(&mut wtxn, key, &())?;
1408 wtxn.commit()?;
1409 Ok(())
1410 }
1411
1412 pub fn remove_pinned_ref(&self, key: &str) -> Result<bool> {
1414 let mut wtxn = self.env.write_txn()?;
1415 let removed = self.pinned_refs.delete(&mut wtxn, key)?;
1416 wtxn.commit()?;
1417 Ok(removed)
1418 }
1419
1420 pub fn list_pinned_refs(&self) -> Result<Vec<String>> {
1422 let rtxn = self.env.read_txn()?;
1423 let mut refs = Vec::new();
1424
1425 for item in self.pinned_refs.iter(&rtxn)? {
1426 let (key, _) = item?;
1427 refs.push(key.to_string());
1428 }
1429
1430 refs.sort();
1431 Ok(refs)
1432 }
1433
1434 pub fn get_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<Option<CachedRoot>> {
1436 let key = format!("{}/{}", pubkey_hex, tree_name);
1437 let rtxn = self.env.read_txn()?;
1438 if let Some(bytes) = self.cached_roots.get(&rtxn, &key)? {
1439 let root: CachedRoot = rmp_serde::from_slice(bytes)
1440 .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
1441 Ok(Some(root))
1442 } else {
1443 Ok(None)
1444 }
1445 }
1446
1447 pub fn set_cached_root(
1449 &self,
1450 pubkey_hex: &str,
1451 tree_name: &str,
1452 hash: &str,
1453 key: Option<&str>,
1454 visibility: &str,
1455 updated_at: u64,
1456 ) -> Result<()> {
1457 let db_key = format!("{}/{}", pubkey_hex, tree_name);
1458 let root = CachedRoot {
1459 hash: hash.to_string(),
1460 key: key.map(|k| k.to_string()),
1461 updated_at,
1462 visibility: visibility.to_string(),
1463 };
1464 let bytes = rmp_serde::to_vec(&root)
1465 .map_err(|e| anyhow::anyhow!("Failed to serialize CachedRoot: {}", e))?;
1466 let mut wtxn = self.env.write_txn()?;
1467 self.cached_roots.put(&mut wtxn, &db_key, &bytes)?;
1468 wtxn.commit()?;
1469 Ok(())
1470 }
1471
1472 pub fn list_cached_roots(&self, pubkey_hex: &str) -> Result<Vec<(String, CachedRoot)>> {
1474 let prefix = format!("{}/", pubkey_hex);
1475 let rtxn = self.env.read_txn()?;
1476 let mut results = Vec::new();
1477
1478 for item in self.cached_roots.iter(&rtxn)? {
1479 let (key, bytes) = item?;
1480 if key.starts_with(&prefix) {
1481 let tree_name = key.strip_prefix(&prefix).unwrap_or(key);
1482 let root: CachedRoot = rmp_serde::from_slice(bytes)
1483 .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
1484 results.push((tree_name.to_string(), root));
1485 }
1486 }
1487
1488 Ok(results)
1489 }
1490
1491 pub fn delete_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<bool> {
1493 let key = format!("{}/{}", pubkey_hex, tree_name);
1494 let mut wtxn = self.env.write_txn()?;
1495 let deleted = self.cached_roots.delete(&mut wtxn, &key)?;
1496 wtxn.commit()?;
1497 Ok(deleted)
1498 }
1499}
1500
1501fn is_map_full_store_error(err: &StoreError) -> bool {
1502 let message = err.to_string();
1503 message.contains("MDB_MAP_FULL") || message.contains("MapFull")
1504}
1505
1506#[derive(Debug, Clone)]
1507pub struct FileChunkMetadata {
1508 pub total_size: u64,
1509 pub chunk_hashes: Vec<Hash>,
1510 pub chunk_sizes: Vec<u64>,
1511 pub is_chunked: bool,
1512}
1513
1514pub struct FileRangeChunksOwned {
1516 store: Arc<HashtreeStore>,
1517 metadata: FileChunkMetadata,
1518 start: u64,
1519 end: u64,
1520 current_chunk_idx: usize,
1521 current_offset: u64,
1522}
1523
1524impl Iterator for FileRangeChunksOwned {
1525 type Item = Result<Vec<u8>>;
1526
1527 fn next(&mut self) -> Option<Self::Item> {
1528 if !self.metadata.is_chunked || self.current_chunk_idx >= self.metadata.chunk_hashes.len() {
1529 return None;
1530 }
1531
1532 if self.current_offset > self.end {
1533 return None;
1534 }
1535
1536 let chunk_hash = &self.metadata.chunk_hashes[self.current_chunk_idx];
1537 let chunk_size = self.metadata.chunk_sizes[self.current_chunk_idx];
1538 let chunk_end = self.current_offset + chunk_size - 1;
1539
1540 self.current_chunk_idx += 1;
1541
1542 if chunk_end < self.start || self.current_offset > self.end {
1543 self.current_offset += chunk_size;
1544 return self.next();
1545 }
1546
1547 let chunk_content = match self.store.get_chunk(chunk_hash) {
1548 Ok(Some(content)) => content,
1549 Ok(None) => {
1550 return Some(Err(anyhow::anyhow!(
1551 "Chunk {} not found",
1552 to_hex(chunk_hash)
1553 )));
1554 }
1555 Err(e) => {
1556 return Some(Err(e));
1557 }
1558 };
1559
1560 let chunk_read_start = if self.current_offset >= self.start {
1561 0
1562 } else {
1563 (self.start - self.current_offset) as usize
1564 };
1565
1566 let chunk_read_end = if chunk_end <= self.end {
1567 chunk_size as usize - 1
1568 } else {
1569 (self.end - self.current_offset) as usize
1570 };
1571
1572 let result = chunk_content[chunk_read_start..=chunk_read_end].to_vec();
1573 self.current_offset += chunk_size;
1574
1575 Some(Ok(result))
1576 }
1577}
1578
1579#[derive(Debug)]
1580pub struct GcStats {
1581 pub deleted_dags: usize,
1582 pub freed_bytes: u64,
1583}
1584
1585#[derive(Debug, Clone)]
1586pub struct DirEntry {
1587 pub name: String,
1588 pub cid: String,
1589 pub is_directory: bool,
1590 pub size: u64,
1591}
1592
1593#[derive(Debug, Clone)]
1594pub struct DirectoryListing {
1595 pub dir_name: String,
1596 pub entries: Vec<DirEntry>,
1597}
1598
1599#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
1601pub struct BlobMetadata {
1602 pub sha256: String,
1603 pub size: u64,
1604 pub mime_type: String,
1605 pub uploaded: u64,
1606}
1607
1608impl crate::webrtc::ContentStore for HashtreeStore {
1610 fn get(&self, hash_hex: &str) -> Result<Option<Vec<u8>>> {
1611 let hash = from_hex(hash_hex).map_err(|e| anyhow::anyhow!("Invalid hash: {}", e))?;
1612 self.get_chunk(&hash)
1613 }
1614}
1615
1616#[cfg(test)]
1617mod tests {
1618 #[cfg(feature = "lmdb")]
1619 use super::*;
1620 #[cfg(feature = "lmdb")]
1621 use tempfile::TempDir;
1622
1623 #[cfg(feature = "lmdb")]
1624 #[test]
1625 fn hashtree_store_expands_blob_lmdb_map_size_to_storage_budget() -> Result<()> {
1626 let temp = TempDir::new()?;
1627 let requested = LMDB_BLOB_MIN_MAP_SIZE_BYTES + 64 * 1024 * 1024;
1628 let store = HashtreeStore::with_options_and_backend(
1629 temp.path(),
1630 None,
1631 requested,
1632 true,
1633 &StorageBackend::Lmdb,
1634 )?;
1635
1636 let map_size = match store.router.local.as_ref() {
1637 LocalStore::Lmdb(local) => local.map_size_bytes() as u64,
1638 LocalStore::Fs(_) => panic!("expected LMDB local store"),
1639 };
1640
1641 assert!(
1642 map_size >= requested,
1643 "expected blob LMDB map to grow to at least {requested} bytes, got {map_size}"
1644 );
1645
1646 drop(store);
1647 Ok(())
1648 }
1649
1650 #[cfg(feature = "lmdb")]
1651 #[test]
1652 fn local_store_can_override_lmdb_map_size() -> Result<()> {
1653 let temp = TempDir::new()?;
1654 let requested = 512 * 1024 * 1024u64;
1655 let store = LocalStore::new_with_lmdb_map_size(
1656 temp.path().join("lmdb-blobs"),
1657 &StorageBackend::Lmdb,
1658 Some(requested),
1659 )?;
1660
1661 let map_size = match store {
1662 LocalStore::Lmdb(local) => local.map_size_bytes() as u64,
1663 LocalStore::Fs(_) => panic!("expected LMDB local store"),
1664 };
1665
1666 assert!(
1667 map_size >= requested,
1668 "expected LMDB map to grow to at least {requested} bytes, got {map_size}"
1669 );
1670
1671 Ok(())
1672 }
1673
1674 #[cfg(feature = "lmdb")]
1675 #[test]
1676 fn lmdb_local_store_removes_stale_fs_blob_shard_dirs() -> Result<()> {
1677 let temp = TempDir::new()?;
1678 let path = temp.path().join("lmdb-blobs");
1679 std::fs::create_dir_all(path.join("aa"))?;
1680 std::fs::create_dir_all(path.join("b2"))?;
1681 std::fs::create_dir_all(path.join("keep-me"))?;
1682 std::fs::write(path.join("aa").join("blob.bin"), b"old fs shard")?;
1683 std::fs::write(path.join("b2").join("blob.bin"), b"old fs shard")?;
1684 std::fs::write(path.join("keep-me").join("note.txt"), b"keep")?;
1685
1686 let _store = LocalStore::new_with_lmdb_map_size(
1687 &path,
1688 &StorageBackend::Lmdb,
1689 Some(128 * 1024 * 1024),
1690 )?;
1691
1692 assert!(!path.join("aa").exists());
1693 assert!(!path.join("b2").exists());
1694 assert!(path.join("keep-me").exists());
1695 assert!(path.join("data.mdb").exists());
1696 assert!(path.join("lock.mdb").exists());
1697
1698 Ok(())
1699 }
1700
1701 #[cfg(feature = "lmdb")]
1702 #[test]
1703 fn replacing_tree_ref_unpins_and_unindexes_superseded_root() -> Result<()> {
1704 let temp = TempDir::new()?;
1705 let store = HashtreeStore::with_options_and_backend(
1706 temp.path(),
1707 None,
1708 LMDB_BLOB_MIN_MAP_SIZE_BYTES,
1709 true,
1710 &StorageBackend::Lmdb,
1711 )?;
1712
1713 let old_bytes = b"old published root";
1714 let new_bytes = b"new published root";
1715 let old_root = sha256(old_bytes);
1716 let new_root = sha256(new_bytes);
1717
1718 store.put_blob(old_bytes)?;
1719 store.pin(&old_root)?;
1720 store.index_tree(
1721 &old_root,
1722 "owner",
1723 Some("playlist"),
1724 PRIORITY_OWN,
1725 Some("npub1owner/playlist"),
1726 )?;
1727
1728 assert!(store.is_pinned(&old_root)?);
1729 assert!(store.get_tree_meta(&old_root)?.is_some());
1730
1731 store.put_blob(new_bytes)?;
1732 store.pin(&new_root)?;
1733 store.index_tree(
1734 &new_root,
1735 "owner",
1736 Some("playlist"),
1737 PRIORITY_OWN,
1738 Some("npub1owner/playlist"),
1739 )?;
1740
1741 assert!(
1742 !store.is_pinned(&old_root)?,
1743 "superseded root should be unpinned when ref is replaced"
1744 );
1745 assert!(
1746 store.get_tree_meta(&old_root)?.is_none(),
1747 "superseded root metadata should be removed when ref is replaced"
1748 );
1749 assert!(store.is_pinned(&new_root)?);
1750 assert!(store.get_tree_meta(&new_root)?.is_some());
1751
1752 Ok(())
1753 }
1754}