1use anyhow::{Context, Result};
2use async_trait::async_trait;
3use futures::executor::block_on as sync_block_on;
4use futures::StreamExt;
5use hashtree_config::StorageBackend;
6use hashtree_core::store::{Store, StoreError};
7use hashtree_core::{
8 from_hex, sha256, to_hex, types::Hash, Cid, HashTree, HashTreeConfig, TreeNode,
9};
10use hashtree_fs::FsBlobStore;
11#[cfg(feature = "lmdb")]
12use hashtree_lmdb::LmdbBlobStore;
13use heed::types::*;
14use heed::{Database, EnvOpenOptions};
15use serde::{Deserialize, Serialize};
16use std::io::Write;
17use std::path::Path;
18use std::sync::Arc;
19use std::time::{SystemTime, UNIX_EPOCH};
20
21mod upload;
22
23mod maintenance;
24mod retention;
25
26pub use maintenance::VerifyResult;
27pub use retention::{PinnedItem, StorageByPriority, StorageStats, TreeMeta};
28
29pub const PRIORITY_OTHER: u8 = 64;
31pub const PRIORITY_FOLLOWED: u8 = 128;
32pub const PRIORITY_OWN: u8 = 255;
33const LMDB_MAX_READERS: u32 = 1024;
34#[cfg(feature = "lmdb")]
35const LMDB_BLOB_MIN_MAP_SIZE_BYTES: u64 = 10 * 1024 * 1024 * 1024;
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct CachedRoot {
40 pub hash: String,
42 pub key: Option<String>,
44 pub updated_at: u64,
46 pub visibility: String,
48}
49
50#[derive(Debug, Clone)]
52pub struct LocalStoreStats {
53 pub count: usize,
54 pub total_bytes: u64,
55}
56
57pub enum LocalStore {
59 Fs(FsBlobStore),
60 #[cfg(feature = "lmdb")]
61 Lmdb(LmdbBlobStore),
62}
63
64impl LocalStore {
65 pub fn new<P: AsRef<Path>>(path: P, backend: &StorageBackend) -> Result<Self, StoreError> {
71 Self::new_unbounded(path, backend)
72 }
73
74 pub fn new_with_lmdb_map_size<P: AsRef<Path>>(
76 path: P,
77 backend: &StorageBackend,
78 _map_size_bytes: Option<u64>,
79 ) -> Result<Self, StoreError> {
80 match backend {
81 StorageBackend::Fs => Ok(LocalStore::Fs(FsBlobStore::new(path)?)),
82 #[cfg(feature = "lmdb")]
83 StorageBackend::Lmdb => match _map_size_bytes {
84 Some(map_size_bytes) => {
85 let map_size = usize::try_from(map_size_bytes).map_err(|_| {
86 StoreError::Other("LMDB map size exceeds usize".to_string())
87 })?;
88 Ok(LocalStore::Lmdb(LmdbBlobStore::with_map_size(
89 path, map_size,
90 )?))
91 }
92 None => Ok(LocalStore::Lmdb(LmdbBlobStore::new(path)?)),
93 },
94 #[cfg(not(feature = "lmdb"))]
95 StorageBackend::Lmdb => {
96 tracing::warn!(
97 "LMDB backend requested but lmdb feature not enabled, using filesystem storage"
98 );
99 Ok(LocalStore::Fs(FsBlobStore::new(path)?))
100 }
101 }
102 }
103
104 pub fn new_unbounded<P: AsRef<Path>>(
106 path: P,
107 backend: &StorageBackend,
108 ) -> Result<Self, StoreError> {
109 Self::new_with_lmdb_map_size(path, backend, None)
110 }
111
112 pub fn backend(&self) -> StorageBackend {
113 match self {
114 LocalStore::Fs(_) => StorageBackend::Fs,
115 #[cfg(feature = "lmdb")]
116 LocalStore::Lmdb(_) => StorageBackend::Lmdb,
117 }
118 }
119
120 pub fn put_sync(&self, hash: Hash, data: &[u8]) -> Result<bool, StoreError> {
122 match self {
123 LocalStore::Fs(store) => store.put_sync(hash, data),
124 #[cfg(feature = "lmdb")]
125 LocalStore::Lmdb(store) => store.put_sync(hash, data),
126 }
127 }
128
129 pub fn put_many_sync(&self, items: &[(Hash, Vec<u8>)]) -> Result<usize, StoreError> {
131 match self {
132 LocalStore::Fs(store) => {
133 let mut inserted = 0usize;
134 for (hash, data) in items {
135 if store.put_sync(*hash, data.as_slice())? {
136 inserted += 1;
137 }
138 }
139 Ok(inserted)
140 }
141 #[cfg(feature = "lmdb")]
142 LocalStore::Lmdb(store) => store.put_many_sync(items),
143 }
144 }
145
146 pub fn get_sync(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
148 match self {
149 LocalStore::Fs(store) => store.get_sync(hash),
150 #[cfg(feature = "lmdb")]
151 LocalStore::Lmdb(store) => store.get_sync(hash),
152 }
153 }
154
155 pub fn exists(&self, hash: &Hash) -> Result<bool, StoreError> {
157 match self {
158 LocalStore::Fs(store) => Ok(store.exists(hash)),
159 #[cfg(feature = "lmdb")]
160 LocalStore::Lmdb(store) => store.exists(hash),
161 }
162 }
163
164 pub fn delete_sync(&self, hash: &Hash) -> Result<bool, StoreError> {
166 match self {
167 LocalStore::Fs(store) => store.delete_sync(hash),
168 #[cfg(feature = "lmdb")]
169 LocalStore::Lmdb(store) => store.delete_sync(hash),
170 }
171 }
172
173 pub fn stats(&self) -> Result<LocalStoreStats, StoreError> {
175 match self {
176 LocalStore::Fs(store) => {
177 let stats = store.stats()?;
178 Ok(LocalStoreStats {
179 count: stats.count,
180 total_bytes: stats.total_bytes,
181 })
182 }
183 #[cfg(feature = "lmdb")]
184 LocalStore::Lmdb(store) => {
185 let stats = store.stats()?;
186 Ok(LocalStoreStats {
187 count: stats.count,
188 total_bytes: stats.total_bytes,
189 })
190 }
191 }
192 }
193
194 pub fn list(&self) -> Result<Vec<Hash>, StoreError> {
196 match self {
197 LocalStore::Fs(store) => store.list(),
198 #[cfg(feature = "lmdb")]
199 LocalStore::Lmdb(store) => store.list(),
200 }
201 }
202}
203
204#[async_trait]
205impl Store for LocalStore {
206 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
207 self.put_sync(hash, &data)
208 }
209
210 async fn put_many(&self, items: Vec<(Hash, Vec<u8>)>) -> Result<usize, StoreError> {
211 self.put_many_sync(&items)
212 }
213
214 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
215 self.get_sync(hash)
216 }
217
218 async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
219 self.exists(hash)
220 }
221
222 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
223 self.delete_sync(hash)
224 }
225}
226
227#[cfg(feature = "s3")]
228use tokio::sync::mpsc;
229
230use crate::config::S3Config;
231
232#[cfg(feature = "s3")]
234enum S3SyncMessage {
235 Upload { hash: Hash, data: Vec<u8> },
236 Delete { hash: Hash },
237}
238
239pub struct StorageRouter {
244 local: Arc<LocalStore>,
246 #[cfg(feature = "s3")]
248 s3_client: Option<aws_sdk_s3::Client>,
249 #[cfg(feature = "s3")]
250 s3_bucket: Option<String>,
251 #[cfg(feature = "s3")]
252 s3_prefix: String,
253 #[cfg(feature = "s3")]
255 sync_tx: Option<mpsc::UnboundedSender<S3SyncMessage>>,
256}
257
258impl StorageRouter {
259 pub fn new(local: Arc<LocalStore>) -> Self {
261 Self {
262 local,
263 #[cfg(feature = "s3")]
264 s3_client: None,
265 #[cfg(feature = "s3")]
266 s3_bucket: None,
267 #[cfg(feature = "s3")]
268 s3_prefix: String::new(),
269 #[cfg(feature = "s3")]
270 sync_tx: None,
271 }
272 }
273
274 #[cfg(feature = "s3")]
276 pub async fn with_s3(local: Arc<LocalStore>, config: &S3Config) -> Result<Self, anyhow::Error> {
277 use aws_sdk_s3::Client as S3Client;
278
279 let mut aws_config_loader = aws_config::from_env();
281 aws_config_loader =
282 aws_config_loader.region(aws_sdk_s3::config::Region::new(config.region.clone()));
283 let aws_config = aws_config_loader.load().await;
284
285 let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&aws_config);
287 s3_config_builder = s3_config_builder
288 .endpoint_url(&config.endpoint)
289 .force_path_style(true);
290
291 let s3_client = S3Client::from_conf(s3_config_builder.build());
292 let bucket = config.bucket.clone();
293 let prefix = config.prefix.clone().unwrap_or_default();
294
295 let (sync_tx, mut sync_rx) = mpsc::unbounded_channel::<S3SyncMessage>();
297
298 let sync_client = s3_client.clone();
300 let sync_bucket = bucket.clone();
301 let sync_prefix = prefix.clone();
302
303 tokio::spawn(async move {
304 use aws_sdk_s3::primitives::ByteStream;
305
306 tracing::info!("S3 background sync task started");
307
308 let semaphore = std::sync::Arc::new(tokio::sync::Semaphore::new(32));
310 let client = std::sync::Arc::new(sync_client);
311 let bucket = std::sync::Arc::new(sync_bucket);
312 let prefix = std::sync::Arc::new(sync_prefix);
313
314 while let Some(msg) = sync_rx.recv().await {
315 let client = client.clone();
316 let bucket = bucket.clone();
317 let prefix = prefix.clone();
318 let semaphore = semaphore.clone();
319
320 tokio::spawn(async move {
322 let _permit = semaphore.acquire().await;
324
325 match msg {
326 S3SyncMessage::Upload { hash, data } => {
327 let key = format!("{}{}.bin", prefix, to_hex(&hash));
328 tracing::debug!("S3 uploading {} ({} bytes)", &key, data.len());
329
330 match client
331 .put_object()
332 .bucket(bucket.as_str())
333 .key(&key)
334 .body(ByteStream::from(data))
335 .send()
336 .await
337 {
338 Ok(_) => tracing::debug!("S3 upload succeeded: {}", &key),
339 Err(e) => tracing::error!("S3 upload failed {}: {}", &key, e),
340 }
341 }
342 S3SyncMessage::Delete { hash } => {
343 let key = format!("{}{}.bin", prefix, to_hex(&hash));
344 tracing::debug!("S3 deleting {}", &key);
345
346 if let Err(e) = client
347 .delete_object()
348 .bucket(bucket.as_str())
349 .key(&key)
350 .send()
351 .await
352 {
353 tracing::error!("S3 delete failed {}: {}", &key, e);
354 }
355 }
356 }
357 });
358 }
359 });
360
361 tracing::info!(
362 "S3 storage initialized: bucket={}, prefix={}",
363 bucket,
364 prefix
365 );
366
367 Ok(Self {
368 local,
369 s3_client: Some(s3_client),
370 s3_bucket: Some(bucket),
371 s3_prefix: prefix,
372 sync_tx: Some(sync_tx),
373 })
374 }
375
376 pub fn put_sync(&self, hash: Hash, data: &[u8]) -> Result<bool, StoreError> {
378 let is_new = self.local.put_sync(hash, data)?;
380
381 #[cfg(feature = "s3")]
384 if let Some(ref tx) = self.sync_tx {
385 tracing::info!(
386 "Queueing S3 upload for {} ({} bytes, is_new={})",
387 crate::storage::to_hex(&hash)[..16].to_string(),
388 data.len(),
389 is_new
390 );
391 if let Err(e) = tx.send(S3SyncMessage::Upload {
392 hash,
393 data: data.to_vec(),
394 }) {
395 tracing::error!("Failed to queue S3 upload: {}", e);
396 }
397 }
398
399 Ok(is_new)
400 }
401
402 pub fn put_many_sync(&self, items: &[(Hash, Vec<u8>)]) -> Result<usize, StoreError> {
404 let inserted = self.local.put_many_sync(items)?;
405
406 #[cfg(feature = "s3")]
407 if let Some(ref tx) = self.sync_tx {
408 for (hash, data) in items {
409 if let Err(e) = tx.send(S3SyncMessage::Upload {
410 hash: *hash,
411 data: data.clone(),
412 }) {
413 tracing::error!("Failed to queue S3 upload: {}", e);
414 }
415 }
416 }
417
418 Ok(inserted)
419 }
420
421 pub fn get_sync(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
423 if let Some(data) = self.local.get_sync(hash)? {
425 return Ok(Some(data));
426 }
427
428 #[cfg(feature = "s3")]
430 if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
431 let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
432
433 match sync_block_on(async { client.get_object().bucket(bucket).key(&key).send().await })
434 {
435 Ok(output) => {
436 if let Ok(body) = sync_block_on(output.body.collect()) {
437 let data = body.into_bytes().to_vec();
438 let _ = self.local.put_sync(*hash, &data);
440 return Ok(Some(data));
441 }
442 }
443 Err(e) => {
444 let service_err = e.into_service_error();
445 if !service_err.is_no_such_key() {
446 tracing::warn!("S3 get failed: {}", service_err);
447 }
448 }
449 }
450 }
451
452 Ok(None)
453 }
454
455 pub fn exists(&self, hash: &Hash) -> Result<bool, StoreError> {
457 if self.local.exists(hash)? {
459 return Ok(true);
460 }
461
462 #[cfg(feature = "s3")]
464 if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
465 let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
466
467 match sync_block_on(async {
468 client.head_object().bucket(bucket).key(&key).send().await
469 }) {
470 Ok(_) => return Ok(true),
471 Err(e) => {
472 let service_err = e.into_service_error();
473 if !service_err.is_not_found() {
474 tracing::warn!("S3 head failed: {}", service_err);
475 }
476 }
477 }
478 }
479
480 Ok(false)
481 }
482
483 pub fn delete_sync(&self, hash: &Hash) -> Result<bool, StoreError> {
485 let deleted = self.local.delete_sync(hash)?;
486
487 #[cfg(feature = "s3")]
489 if let Some(ref tx) = self.sync_tx {
490 let _ = tx.send(S3SyncMessage::Delete { hash: *hash });
491 }
492
493 Ok(deleted)
494 }
495
496 pub fn delete_local_only(&self, hash: &Hash) -> Result<bool, StoreError> {
499 self.local.delete_sync(hash)
500 }
501
502 pub fn stats(&self) -> Result<LocalStoreStats, StoreError> {
504 self.local.stats()
505 }
506
507 pub fn list(&self) -> Result<Vec<Hash>, StoreError> {
509 self.local.list()
510 }
511
512 pub fn local_store(&self) -> Arc<LocalStore> {
514 Arc::clone(&self.local)
515 }
516}
517
518#[async_trait]
521impl Store for StorageRouter {
522 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
523 self.put_sync(hash, &data)
524 }
525
526 async fn put_many(&self, items: Vec<(Hash, Vec<u8>)>) -> Result<usize, StoreError> {
527 self.put_many_sync(&items)
528 }
529
530 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
531 self.get_sync(hash)
532 }
533
534 async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
535 self.exists(hash)
536 }
537
538 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
539 self.delete_sync(hash)
540 }
541}
542
543pub struct HashtreeStore {
544 env: heed::Env,
545 pins: Database<Bytes, Unit>,
547 blob_owners: Database<Bytes, Unit>,
549 pubkey_blobs: Database<Bytes, Bytes>,
551 tree_meta: Database<Bytes, Bytes>,
553 blob_trees: Database<Bytes, Unit>,
555 tree_refs: Database<Str, Bytes>,
557 cached_roots: Database<Str, Bytes>,
559 router: Arc<StorageRouter>,
561 max_size_bytes: u64,
563}
564
565impl HashtreeStore {
566 pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
568 let config = hashtree_config::Config::load_or_default();
569 let max_size_bytes = config
570 .storage
571 .max_size_gb
572 .saturating_mul(1024 * 1024 * 1024);
573 Self::with_options_and_backend(path, None, max_size_bytes, &config.storage.backend)
574 }
575
576 pub fn new_with_backend<P: AsRef<Path>>(
578 path: P,
579 backend: hashtree_config::StorageBackend,
580 max_size_bytes: u64,
581 ) -> Result<Self> {
582 Self::with_options_and_backend(path, None, max_size_bytes, &backend)
583 }
584
585 pub fn with_s3<P: AsRef<Path>>(path: P, s3_config: Option<&S3Config>) -> Result<Self> {
587 let config = hashtree_config::Config::load_or_default();
588 let max_size_bytes = config
589 .storage
590 .max_size_gb
591 .saturating_mul(1024 * 1024 * 1024);
592 Self::with_options_and_backend(path, s3_config, max_size_bytes, &config.storage.backend)
593 }
594
595 pub fn with_options<P: AsRef<Path>>(
601 path: P,
602 s3_config: Option<&S3Config>,
603 max_size_bytes: u64,
604 ) -> Result<Self> {
605 let config = hashtree_config::Config::load_or_default();
606 Self::with_options_and_backend(path, s3_config, max_size_bytes, &config.storage.backend)
607 }
608
609 fn with_options_and_backend<P: AsRef<Path>>(
610 path: P,
611 s3_config: Option<&S3Config>,
612 max_size_bytes: u64,
613 backend: &hashtree_config::StorageBackend,
614 ) -> Result<Self> {
615 let path = path.as_ref();
616 std::fs::create_dir_all(path)?;
617
618 let env = unsafe {
619 EnvOpenOptions::new()
620 .map_size(10 * 1024 * 1024 * 1024) .max_dbs(8) .max_readers(LMDB_MAX_READERS)
623 .open(path)?
624 };
625 let _ = env.clear_stale_readers();
626
627 let mut wtxn = env.write_txn()?;
628 let pins = env.create_database(&mut wtxn, Some("pins"))?;
629 let blob_owners = env.create_database(&mut wtxn, Some("blob_owners"))?;
630 let pubkey_blobs = env.create_database(&mut wtxn, Some("pubkey_blobs"))?;
631 let tree_meta = env.create_database(&mut wtxn, Some("tree_meta"))?;
632 let blob_trees = env.create_database(&mut wtxn, Some("blob_trees"))?;
633 let tree_refs = env.create_database(&mut wtxn, Some("tree_refs"))?;
634 let cached_roots = env.create_database(&mut wtxn, Some("cached_roots"))?;
635 wtxn.commit()?;
636
637 let local_store = Arc::new(match backend {
641 hashtree_config::StorageBackend::Fs => LocalStore::Fs(
642 FsBlobStore::new(path.join("blobs"))
643 .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
644 ),
645 #[cfg(feature = "lmdb")]
646 hashtree_config::StorageBackend::Lmdb => {
647 let requested_map_size = max_size_bytes.max(LMDB_BLOB_MIN_MAP_SIZE_BYTES);
648 let map_size = usize::try_from(requested_map_size)
649 .context("LMDB blob map size exceeds usize")?;
650 LocalStore::Lmdb(
651 LmdbBlobStore::with_map_size(path.join("blobs"), map_size)
652 .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
653 )
654 }
655 #[cfg(not(feature = "lmdb"))]
656 hashtree_config::StorageBackend::Lmdb => {
657 tracing::warn!(
658 "LMDB backend requested but lmdb feature not enabled, using filesystem storage"
659 );
660 LocalStore::Fs(
661 FsBlobStore::new(path.join("blobs"))
662 .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
663 )
664 }
665 });
666
667 #[cfg(feature = "s3")]
669 let router = Arc::new(if let Some(s3_cfg) = s3_config {
670 tracing::info!(
671 "Initializing S3 storage backend: bucket={}, endpoint={}",
672 s3_cfg.bucket,
673 s3_cfg.endpoint
674 );
675
676 sync_block_on(async { StorageRouter::with_s3(local_store, s3_cfg).await })?
677 } else {
678 StorageRouter::new(local_store)
679 });
680
681 #[cfg(not(feature = "s3"))]
682 let router = Arc::new({
683 if s3_config.is_some() {
684 tracing::warn!(
685 "S3 config provided but S3 feature not enabled. Using local storage only."
686 );
687 }
688 StorageRouter::new(local_store)
689 });
690
691 Ok(Self {
692 env,
693 pins,
694 blob_owners,
695 pubkey_blobs,
696 tree_meta,
697 blob_trees,
698 tree_refs,
699 cached_roots,
700 router,
701 max_size_bytes,
702 })
703 }
704
705 pub fn router(&self) -> &StorageRouter {
707 &self.router
708 }
709
710 pub fn store_arc(&self) -> Arc<StorageRouter> {
713 Arc::clone(&self.router)
714 }
715
716 pub fn get_tree_node(&self, hash: &[u8; 32]) -> Result<Option<TreeNode>> {
718 let store = self.store_arc();
719 let tree = HashTree::new(HashTreeConfig::new(store).public());
720
721 sync_block_on(async {
722 tree.get_tree_node(hash)
723 .await
724 .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))
725 })
726 }
727
728 pub fn put_blob(&self, data: &[u8]) -> Result<String> {
730 let hash = sha256(data);
731 self.router
732 .put_sync(hash, data)
733 .map_err(|e| anyhow::anyhow!("Failed to store blob: {}", e))?;
734 Ok(to_hex(&hash))
735 }
736
737 pub fn get_blob(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
739 self.router
740 .get_sync(hash)
741 .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))
742 }
743
744 pub fn blob_exists(&self, hash: &[u8; 32]) -> Result<bool> {
746 self.router
747 .exists(hash)
748 .map_err(|e| anyhow::anyhow!("Failed to check blob: {}", e))
749 }
750
751 fn blob_owner_key(sha256: &[u8; 32], pubkey: &[u8; 32]) -> [u8; 64] {
757 let mut key = [0u8; 64];
758 key[..32].copy_from_slice(sha256);
759 key[32..].copy_from_slice(pubkey);
760 key
761 }
762
763 pub fn set_blob_owner(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<()> {
766 let key = Self::blob_owner_key(sha256, pubkey);
767 let mut wtxn = self.env.write_txn()?;
768
769 self.blob_owners.put(&mut wtxn, &key[..], &())?;
771
772 let sha256_hex = to_hex(sha256);
774
775 let mut blobs: Vec<BlobMetadata> = self
777 .pubkey_blobs
778 .get(&wtxn, pubkey)?
779 .and_then(|b| serde_json::from_slice(b).ok())
780 .unwrap_or_default();
781
782 if !blobs.iter().any(|b| b.sha256 == sha256_hex) {
784 let now = SystemTime::now()
785 .duration_since(UNIX_EPOCH)
786 .unwrap()
787 .as_secs();
788
789 let size = self
791 .get_blob(sha256)?
792 .map(|data| data.len() as u64)
793 .unwrap_or(0);
794
795 blobs.push(BlobMetadata {
796 sha256: sha256_hex,
797 size,
798 mime_type: "application/octet-stream".to_string(),
799 uploaded: now,
800 });
801
802 let blobs_json = serde_json::to_vec(&blobs)?;
803 self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
804 }
805
806 wtxn.commit()?;
807 Ok(())
808 }
809
810 pub fn is_blob_owner(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<bool> {
812 let key = Self::blob_owner_key(sha256, pubkey);
813 let rtxn = self.env.read_txn()?;
814 Ok(self.blob_owners.get(&rtxn, &key[..])?.is_some())
815 }
816
817 pub fn get_blob_owners(&self, sha256: &[u8; 32]) -> Result<Vec<[u8; 32]>> {
819 let rtxn = self.env.read_txn()?;
820
821 let mut owners = Vec::new();
822 for item in self.blob_owners.prefix_iter(&rtxn, &sha256[..])? {
823 let (key, _) = item?;
824 if key.len() == 64 {
825 let mut pubkey = [0u8; 32];
827 pubkey.copy_from_slice(&key[32..64]);
828 owners.push(pubkey);
829 }
830 }
831 Ok(owners)
832 }
833
834 pub fn blob_has_owners(&self, sha256: &[u8; 32]) -> Result<bool> {
836 let rtxn = self.env.read_txn()?;
837
838 for item in self.blob_owners.prefix_iter(&rtxn, &sha256[..])? {
840 if item.is_ok() {
841 return Ok(true);
842 }
843 }
844 Ok(false)
845 }
846
847 pub fn get_blob_owner(&self, sha256: &[u8; 32]) -> Result<Option<[u8; 32]>> {
849 Ok(self.get_blob_owners(sha256)?.into_iter().next())
850 }
851
852 pub fn delete_blossom_blob(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<bool> {
856 let key = Self::blob_owner_key(sha256, pubkey);
857 let mut wtxn = self.env.write_txn()?;
858
859 self.blob_owners.delete(&mut wtxn, &key[..])?;
861
862 let sha256_hex = to_hex(sha256);
864
865 if let Some(blobs_bytes) = self.pubkey_blobs.get(&wtxn, pubkey)? {
867 if let Ok(mut blobs) = serde_json::from_slice::<Vec<BlobMetadata>>(blobs_bytes) {
868 blobs.retain(|b| b.sha256 != sha256_hex);
869 let blobs_json = serde_json::to_vec(&blobs)?;
870 self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
871 }
872 }
873
874 let mut has_other_owners = false;
876 for item in self.blob_owners.prefix_iter(&wtxn, &sha256[..])? {
877 if item.is_ok() {
878 has_other_owners = true;
879 break;
880 }
881 }
882
883 if has_other_owners {
884 wtxn.commit()?;
885 tracing::debug!(
886 "Removed {} from blob {} owners, other owners remain",
887 &to_hex(pubkey)[..8],
888 &sha256_hex[..8]
889 );
890 return Ok(false);
891 }
892
893 tracing::info!(
895 "All owners removed from blob {}, deleting",
896 &sha256_hex[..8]
897 );
898
899 let _ = self.router.delete_sync(sha256);
901
902 wtxn.commit()?;
903 Ok(true)
904 }
905
906 pub fn list_blobs_by_pubkey(
908 &self,
909 pubkey: &[u8; 32],
910 ) -> Result<Vec<crate::server::blossom::BlobDescriptor>> {
911 let rtxn = self.env.read_txn()?;
912
913 let blobs: Vec<BlobMetadata> = self
914 .pubkey_blobs
915 .get(&rtxn, pubkey)?
916 .and_then(|b| serde_json::from_slice(b).ok())
917 .unwrap_or_default();
918
919 Ok(blobs
920 .into_iter()
921 .map(|b| crate::server::blossom::BlobDescriptor {
922 url: format!("/{}", b.sha256),
923 sha256: b.sha256,
924 size: b.size,
925 mime_type: b.mime_type,
926 uploaded: b.uploaded,
927 })
928 .collect())
929 }
930
931 pub fn get_chunk(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
933 self.router
934 .get_sync(hash)
935 .map_err(|e| anyhow::anyhow!("Failed to get chunk: {}", e))
936 }
937
938 pub fn get_file(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
941 let store = self.store_arc();
942 let tree = HashTree::new(HashTreeConfig::new(store).public());
943
944 sync_block_on(async {
945 tree.read_file(hash)
946 .await
947 .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
948 })
949 }
950
951 pub fn get_file_by_cid(&self, cid: &Cid) -> Result<Option<Vec<u8>>> {
954 let store = self.store_arc();
955 let tree = HashTree::new(HashTreeConfig::new(store).public());
956
957 sync_block_on(async {
958 tree.get(cid, None)
959 .await
960 .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
961 })
962 }
963
964 fn ensure_cid_exists(&self, cid: &Cid) -> Result<()> {
965 let exists = self
966 .router
967 .exists(&cid.hash)
968 .map_err(|e| anyhow::anyhow!("Failed to check cid existence: {}", e))?;
969 if !exists {
970 anyhow::bail!("CID not found: {}", to_hex(&cid.hash));
971 }
972 Ok(())
973 }
974
975 pub fn write_file_by_cid_to_writer<W: Write>(&self, cid: &Cid, writer: &mut W) -> Result<u64> {
977 self.ensure_cid_exists(cid)?;
978
979 let store = self.store_arc();
980 let tree = HashTree::new(HashTreeConfig::new(store).public());
981 let mut total_bytes = 0u64;
982 let mut streamed_any_chunk = false;
983
984 sync_block_on(async {
985 let mut stream = tree.get_stream(cid);
986 while let Some(chunk) = stream.next().await {
987 streamed_any_chunk = true;
988 let chunk =
989 chunk.map_err(|e| anyhow::anyhow!("Failed to stream file chunk: {}", e))?;
990 writer
991 .write_all(&chunk)
992 .map_err(|e| anyhow::anyhow!("Failed to write file chunk: {}", e))?;
993 total_bytes += chunk.len() as u64;
994 }
995 Ok::<(), anyhow::Error>(())
996 })?;
997
998 if !streamed_any_chunk {
999 anyhow::bail!("CID not found: {}", to_hex(&cid.hash));
1000 }
1001
1002 writer
1003 .flush()
1004 .map_err(|e| anyhow::anyhow!("Failed to flush output: {}", e))?;
1005 Ok(total_bytes)
1006 }
1007
1008 pub fn write_file_by_cid<P: AsRef<Path>>(&self, cid: &Cid, output_path: P) -> Result<u64> {
1010 self.ensure_cid_exists(cid)?;
1011
1012 let output_path = output_path.as_ref();
1013 if let Some(parent) = output_path.parent() {
1014 if !parent.as_os_str().is_empty() {
1015 std::fs::create_dir_all(parent).with_context(|| {
1016 format!("Failed to create output directory {}", parent.display())
1017 })?;
1018 }
1019 }
1020
1021 let mut file = std::fs::File::create(output_path)
1022 .with_context(|| format!("Failed to create output file {}", output_path.display()))?;
1023 self.write_file_by_cid_to_writer(cid, &mut file)
1024 }
1025
1026 pub fn write_file<P: AsRef<Path>>(&self, hash: &[u8; 32], output_path: P) -> Result<u64> {
1028 self.write_file_by_cid(&Cid::public(*hash), output_path)
1029 }
1030
1031 pub fn resolve_path(&self, cid: &Cid, path: &str) -> Result<Option<Cid>> {
1033 let store = self.store_arc();
1034 let tree = HashTree::new(HashTreeConfig::new(store).public());
1035
1036 sync_block_on(async {
1037 tree.resolve_path(cid, path)
1038 .await
1039 .map_err(|e| anyhow::anyhow!("Failed to resolve path: {}", e))
1040 })
1041 }
1042
1043 pub fn get_file_chunk_metadata(&self, hash: &[u8; 32]) -> Result<Option<FileChunkMetadata>> {
1045 let store = self.store_arc();
1046 let tree = HashTree::new(HashTreeConfig::new(store.clone()).public());
1047
1048 sync_block_on(async {
1049 let exists = store
1052 .has(hash)
1053 .await
1054 .map_err(|e| anyhow::anyhow!("Failed to check existence: {}", e))?;
1055
1056 if !exists {
1057 return Ok(None);
1058 }
1059
1060 let total_size = tree
1062 .get_size(hash)
1063 .await
1064 .map_err(|e| anyhow::anyhow!("Failed to get size: {}", e))?;
1065
1066 let is_tree_node = tree
1068 .is_tree(hash)
1069 .await
1070 .map_err(|e| anyhow::anyhow!("Failed to check tree: {}", e))?;
1071
1072 if !is_tree_node {
1073 return Ok(Some(FileChunkMetadata {
1075 total_size,
1076 chunk_hashes: vec![],
1077 chunk_sizes: vec![],
1078 is_chunked: false,
1079 }));
1080 }
1081
1082 let node = match tree
1084 .get_tree_node(hash)
1085 .await
1086 .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
1087 {
1088 Some(n) => n,
1089 None => return Ok(None),
1090 };
1091
1092 let is_directory = tree
1094 .is_directory(hash)
1095 .await
1096 .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1097
1098 if is_directory {
1099 return Ok(None); }
1101
1102 let chunk_hashes: Vec<Hash> = node.links.iter().map(|l| l.hash).collect();
1104 let chunk_sizes: Vec<u64> = node.links.iter().map(|l| l.size).collect();
1105
1106 Ok(Some(FileChunkMetadata {
1107 total_size,
1108 chunk_hashes,
1109 chunk_sizes,
1110 is_chunked: !node.links.is_empty(),
1111 }))
1112 })
1113 }
1114
1115 pub fn get_file_range(
1117 &self,
1118 hash: &[u8; 32],
1119 start: u64,
1120 end: Option<u64>,
1121 ) -> Result<Option<(Vec<u8>, u64)>> {
1122 let metadata = match self.get_file_chunk_metadata(hash)? {
1123 Some(m) => m,
1124 None => return Ok(None),
1125 };
1126
1127 if metadata.total_size == 0 {
1128 return Ok(Some((Vec::new(), 0)));
1129 }
1130
1131 if start >= metadata.total_size {
1132 return Ok(None);
1133 }
1134
1135 let end = end
1136 .unwrap_or(metadata.total_size - 1)
1137 .min(metadata.total_size - 1);
1138
1139 if !metadata.is_chunked {
1141 let content = self.get_file(hash)?.unwrap_or_default();
1142 let range_content = if start < content.len() as u64 {
1143 content[start as usize..=(end as usize).min(content.len() - 1)].to_vec()
1144 } else {
1145 Vec::new()
1146 };
1147 return Ok(Some((range_content, metadata.total_size)));
1148 }
1149
1150 let mut result = Vec::new();
1152 let mut current_offset = 0u64;
1153
1154 for (i, chunk_hash) in metadata.chunk_hashes.iter().enumerate() {
1155 let chunk_size = metadata.chunk_sizes[i];
1156 let chunk_end = current_offset + chunk_size - 1;
1157
1158 if chunk_end >= start && current_offset <= end {
1160 let chunk_content = match self.get_chunk(chunk_hash)? {
1161 Some(content) => content,
1162 None => {
1163 return Err(anyhow::anyhow!("Chunk {} not found", to_hex(chunk_hash)));
1164 }
1165 };
1166
1167 let chunk_read_start = if current_offset >= start {
1168 0
1169 } else {
1170 (start - current_offset) as usize
1171 };
1172
1173 let chunk_read_end = if chunk_end <= end {
1174 chunk_size as usize - 1
1175 } else {
1176 (end - current_offset) as usize
1177 };
1178
1179 result.extend_from_slice(&chunk_content[chunk_read_start..=chunk_read_end]);
1180 }
1181
1182 current_offset += chunk_size;
1183
1184 if current_offset > end {
1185 break;
1186 }
1187 }
1188
1189 Ok(Some((result, metadata.total_size)))
1190 }
1191
1192 pub fn stream_file_range_chunks_owned(
1194 self: Arc<Self>,
1195 hash: &[u8; 32],
1196 start: u64,
1197 end: u64,
1198 ) -> Result<Option<FileRangeChunksOwned>> {
1199 let metadata = match self.get_file_chunk_metadata(hash)? {
1200 Some(m) => m,
1201 None => return Ok(None),
1202 };
1203
1204 if metadata.total_size == 0 || start >= metadata.total_size {
1205 return Ok(None);
1206 }
1207
1208 let end = end.min(metadata.total_size - 1);
1209
1210 Ok(Some(FileRangeChunksOwned {
1211 store: self,
1212 metadata,
1213 start,
1214 end,
1215 current_chunk_idx: 0,
1216 current_offset: 0,
1217 }))
1218 }
1219
1220 pub fn get_directory_listing(&self, hash: &[u8; 32]) -> Result<Option<DirectoryListing>> {
1222 let store = self.store_arc();
1223 let tree = HashTree::new(HashTreeConfig::new(store).public());
1224
1225 sync_block_on(async {
1226 let is_dir = tree
1228 .is_directory(hash)
1229 .await
1230 .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1231
1232 if !is_dir {
1233 return Ok(None);
1234 }
1235
1236 let cid = hashtree_core::Cid::public(*hash);
1238 let tree_entries = tree
1239 .list_directory(&cid)
1240 .await
1241 .map_err(|e| anyhow::anyhow!("Failed to list directory: {}", e))?;
1242
1243 let entries: Vec<DirEntry> = tree_entries
1244 .into_iter()
1245 .map(|e| DirEntry {
1246 name: e.name,
1247 cid: to_hex(&e.hash),
1248 is_directory: e.link_type.is_tree(),
1249 size: e.size,
1250 })
1251 .collect();
1252
1253 Ok(Some(DirectoryListing {
1254 dir_name: String::new(),
1255 entries,
1256 }))
1257 })
1258 }
1259
1260 pub fn get_directory_listing_by_cid(&self, cid: &Cid) -> Result<Option<DirectoryListing>> {
1262 let store = self.store_arc();
1263 let tree = HashTree::new(HashTreeConfig::new(store).public());
1264 let cid = cid.clone();
1265
1266 sync_block_on(async {
1267 let is_dir = tree
1268 .is_dir(&cid)
1269 .await
1270 .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1271
1272 if !is_dir {
1273 return Ok(None);
1274 }
1275
1276 let tree_entries = tree
1277 .list_directory(&cid)
1278 .await
1279 .map_err(|e| anyhow::anyhow!("Failed to list directory: {}", e))?;
1280
1281 let entries: Vec<DirEntry> = tree_entries
1282 .into_iter()
1283 .map(|e| DirEntry {
1284 name: e.name,
1285 cid: Cid {
1286 hash: e.hash,
1287 key: e.key,
1288 }
1289 .to_string(),
1290 is_directory: e.link_type.is_tree(),
1291 size: e.size,
1292 })
1293 .collect();
1294
1295 Ok(Some(DirectoryListing {
1296 dir_name: String::new(),
1297 entries,
1298 }))
1299 })
1300 }
1301
1302 pub fn get_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<Option<CachedRoot>> {
1306 let key = format!("{}/{}", pubkey_hex, tree_name);
1307 let rtxn = self.env.read_txn()?;
1308 if let Some(bytes) = self.cached_roots.get(&rtxn, &key)? {
1309 let root: CachedRoot = rmp_serde::from_slice(bytes)
1310 .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
1311 Ok(Some(root))
1312 } else {
1313 Ok(None)
1314 }
1315 }
1316
1317 pub fn set_cached_root(
1319 &self,
1320 pubkey_hex: &str,
1321 tree_name: &str,
1322 hash: &str,
1323 key: Option<&str>,
1324 visibility: &str,
1325 updated_at: u64,
1326 ) -> Result<()> {
1327 let db_key = format!("{}/{}", pubkey_hex, tree_name);
1328 let root = CachedRoot {
1329 hash: hash.to_string(),
1330 key: key.map(|k| k.to_string()),
1331 updated_at,
1332 visibility: visibility.to_string(),
1333 };
1334 let bytes = rmp_serde::to_vec(&root)
1335 .map_err(|e| anyhow::anyhow!("Failed to serialize CachedRoot: {}", e))?;
1336 let mut wtxn = self.env.write_txn()?;
1337 self.cached_roots.put(&mut wtxn, &db_key, &bytes)?;
1338 wtxn.commit()?;
1339 Ok(())
1340 }
1341
1342 pub fn list_cached_roots(&self, pubkey_hex: &str) -> Result<Vec<(String, CachedRoot)>> {
1344 let prefix = format!("{}/", pubkey_hex);
1345 let rtxn = self.env.read_txn()?;
1346 let mut results = Vec::new();
1347
1348 for item in self.cached_roots.iter(&rtxn)? {
1349 let (key, bytes) = item?;
1350 if key.starts_with(&prefix) {
1351 let tree_name = key.strip_prefix(&prefix).unwrap_or(key);
1352 let root: CachedRoot = rmp_serde::from_slice(bytes)
1353 .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
1354 results.push((tree_name.to_string(), root));
1355 }
1356 }
1357
1358 Ok(results)
1359 }
1360
1361 pub fn delete_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<bool> {
1363 let key = format!("{}/{}", pubkey_hex, tree_name);
1364 let mut wtxn = self.env.write_txn()?;
1365 let deleted = self.cached_roots.delete(&mut wtxn, &key)?;
1366 wtxn.commit()?;
1367 Ok(deleted)
1368 }
1369}
1370
1371#[derive(Debug, Clone)]
1372pub struct FileChunkMetadata {
1373 pub total_size: u64,
1374 pub chunk_hashes: Vec<Hash>,
1375 pub chunk_sizes: Vec<u64>,
1376 pub is_chunked: bool,
1377}
1378
1379pub struct FileRangeChunksOwned {
1381 store: Arc<HashtreeStore>,
1382 metadata: FileChunkMetadata,
1383 start: u64,
1384 end: u64,
1385 current_chunk_idx: usize,
1386 current_offset: u64,
1387}
1388
1389impl Iterator for FileRangeChunksOwned {
1390 type Item = Result<Vec<u8>>;
1391
1392 fn next(&mut self) -> Option<Self::Item> {
1393 if !self.metadata.is_chunked || self.current_chunk_idx >= self.metadata.chunk_hashes.len() {
1394 return None;
1395 }
1396
1397 if self.current_offset > self.end {
1398 return None;
1399 }
1400
1401 let chunk_hash = &self.metadata.chunk_hashes[self.current_chunk_idx];
1402 let chunk_size = self.metadata.chunk_sizes[self.current_chunk_idx];
1403 let chunk_end = self.current_offset + chunk_size - 1;
1404
1405 self.current_chunk_idx += 1;
1406
1407 if chunk_end < self.start || self.current_offset > self.end {
1408 self.current_offset += chunk_size;
1409 return self.next();
1410 }
1411
1412 let chunk_content = match self.store.get_chunk(chunk_hash) {
1413 Ok(Some(content)) => content,
1414 Ok(None) => {
1415 return Some(Err(anyhow::anyhow!(
1416 "Chunk {} not found",
1417 to_hex(chunk_hash)
1418 )));
1419 }
1420 Err(e) => {
1421 return Some(Err(e));
1422 }
1423 };
1424
1425 let chunk_read_start = if self.current_offset >= self.start {
1426 0
1427 } else {
1428 (self.start - self.current_offset) as usize
1429 };
1430
1431 let chunk_read_end = if chunk_end <= self.end {
1432 chunk_size as usize - 1
1433 } else {
1434 (self.end - self.current_offset) as usize
1435 };
1436
1437 let result = chunk_content[chunk_read_start..=chunk_read_end].to_vec();
1438 self.current_offset += chunk_size;
1439
1440 Some(Ok(result))
1441 }
1442}
1443
1444#[derive(Debug)]
1445pub struct GcStats {
1446 pub deleted_dags: usize,
1447 pub freed_bytes: u64,
1448}
1449
1450#[derive(Debug, Clone)]
1451pub struct DirEntry {
1452 pub name: String,
1453 pub cid: String,
1454 pub is_directory: bool,
1455 pub size: u64,
1456}
1457
1458#[derive(Debug, Clone)]
1459pub struct DirectoryListing {
1460 pub dir_name: String,
1461 pub entries: Vec<DirEntry>,
1462}
1463
1464#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
1466pub struct BlobMetadata {
1467 pub sha256: String,
1468 pub size: u64,
1469 pub mime_type: String,
1470 pub uploaded: u64,
1471}
1472
1473impl crate::webrtc::ContentStore for HashtreeStore {
1475 fn get(&self, hash_hex: &str) -> Result<Option<Vec<u8>>> {
1476 let hash = from_hex(hash_hex).map_err(|e| anyhow::anyhow!("Invalid hash: {}", e))?;
1477 self.get_chunk(&hash)
1478 }
1479}
1480
1481#[cfg(test)]
1482mod tests {
1483 #[cfg(feature = "lmdb")]
1484 use super::*;
1485 #[cfg(feature = "lmdb")]
1486 use tempfile::TempDir;
1487
1488 #[cfg(feature = "lmdb")]
1489 #[test]
1490 fn hashtree_store_expands_blob_lmdb_map_size_to_storage_budget() -> Result<()> {
1491 let temp = TempDir::new()?;
1492 let requested = LMDB_BLOB_MIN_MAP_SIZE_BYTES + 64 * 1024 * 1024;
1493 let store = HashtreeStore::with_options_and_backend(
1494 temp.path(),
1495 None,
1496 requested,
1497 &StorageBackend::Lmdb,
1498 )?;
1499
1500 let map_size = match store.router.local.as_ref() {
1501 LocalStore::Lmdb(local) => local.map_size_bytes() as u64,
1502 LocalStore::Fs(_) => panic!("expected LMDB local store"),
1503 };
1504
1505 assert!(
1506 map_size >= requested,
1507 "expected blob LMDB map to grow to at least {requested} bytes, got {map_size}"
1508 );
1509
1510 drop(store);
1511 Ok(())
1512 }
1513
1514 #[cfg(feature = "lmdb")]
1515 #[test]
1516 fn local_store_can_override_lmdb_map_size() -> Result<()> {
1517 let temp = TempDir::new()?;
1518 let requested = 512 * 1024 * 1024u64;
1519 let store = LocalStore::new_with_lmdb_map_size(
1520 temp.path().join("lmdb-blobs"),
1521 &StorageBackend::Lmdb,
1522 Some(requested),
1523 )?;
1524
1525 let map_size = match store {
1526 LocalStore::Lmdb(local) => local.map_size_bytes() as u64,
1527 LocalStore::Fs(_) => panic!("expected LMDB local store"),
1528 };
1529
1530 assert!(
1531 map_size >= requested,
1532 "expected LMDB map to grow to at least {requested} bytes, got {map_size}"
1533 );
1534
1535 Ok(())
1536 }
1537}