1use anyhow::{Context, Result};
2use async_trait::async_trait;
3use futures::executor::block_on as sync_block_on;
4use futures::StreamExt;
5use hashtree_config::StorageBackend;
6use hashtree_core::store::{Store, StoreError};
7use hashtree_core::{
8 from_hex, sha256, to_hex, types::Hash, Cid, HashTree, HashTreeConfig, TreeNode,
9};
10use hashtree_fs::FsBlobStore;
11#[cfg(feature = "lmdb")]
12use hashtree_lmdb::LmdbBlobStore;
13use heed::types::*;
14use heed::{Database, EnvOpenOptions};
15use serde::{Deserialize, Serialize};
16#[cfg(feature = "s3")]
17use std::future::Future;
18use std::io::Write;
19use std::path::{Path, PathBuf};
20use std::sync::Arc;
21use std::time::{SystemTime, UNIX_EPOCH};
22
23mod upload;
24
25mod maintenance;
26mod retention;
27
28pub use maintenance::{compact_lmdb_environments_under, CompactResult, VerifyResult};
29pub use retention::{PinnedItem, StorageByPriority, StorageStats, TreeMeta};
30
31pub const PRIORITY_OTHER: u8 = 64;
33pub const PRIORITY_FOLLOWED: u8 = 128;
34pub const PRIORITY_OWN: u8 = 255;
35const LMDB_MAX_READERS: u32 = 1024;
36#[cfg(feature = "lmdb")]
37const LMDB_BLOB_MIN_MAP_SIZE_BYTES: u64 = 10 * 1024 * 1024 * 1024;
38
39#[derive(Debug, Clone, Serialize, Deserialize)]
41pub struct CachedRoot {
42 pub hash: String,
44 pub key: Option<String>,
46 pub updated_at: u64,
48 pub visibility: String,
50}
51
52#[derive(Debug, Clone)]
54pub struct LocalStoreStats {
55 pub count: usize,
56 pub total_bytes: u64,
57}
58
59pub enum LocalStore {
61 Fs(FsBlobStore),
62 #[cfg(feature = "lmdb")]
63 Lmdb(LmdbBlobStore),
64}
65
66#[cfg(feature = "lmdb")]
67fn is_fs_blob_shard_dir(path: &Path) -> bool {
68 path.file_name()
69 .and_then(|name| name.to_str())
70 .map(|name| name.len() == 2 && name.as_bytes().iter().all(u8::is_ascii_hexdigit))
71 .unwrap_or(false)
72}
73
74#[cfg(feature = "lmdb")]
75fn remove_stale_fs_blob_shards(path: &Path) -> Result<(), StoreError> {
76 let entries = std::fs::read_dir(path).map_err(StoreError::Io)?;
77 for entry in entries {
78 let entry = entry.map_err(StoreError::Io)?;
79 let entry_path = entry.path();
80 if entry_path.is_dir() && is_fs_blob_shard_dir(&entry_path) {
81 std::fs::remove_dir_all(&entry_path).map_err(StoreError::Io)?;
82 tracing::info!(
83 "Removed stale filesystem blob shard directory after LMDB cutover: {}",
84 entry_path.display()
85 );
86 }
87 }
88 Ok(())
89}
90
91impl LocalStore {
92 pub fn new<P: AsRef<Path>>(path: P, backend: &StorageBackend) -> Result<Self, StoreError> {
98 Self::new_unbounded(path, backend)
99 }
100
101 pub fn new_with_lmdb_map_size<P: AsRef<Path>>(
106 path: P,
107 backend: &StorageBackend,
108 _map_size_bytes: Option<u64>,
109 ) -> Result<Self, StoreError> {
110 match backend {
111 StorageBackend::Fs => Ok(LocalStore::Fs(FsBlobStore::new(path)?)),
112 #[cfg(feature = "lmdb")]
113 StorageBackend::Lmdb => match _map_size_bytes {
114 Some(map_size_bytes) => {
115 std::fs::create_dir_all(path.as_ref()).map_err(StoreError::Io)?;
116 remove_stale_fs_blob_shards(path.as_ref())?;
117 Ok(LocalStore::Lmdb(LmdbBlobStore::with_max_bytes(
118 path,
119 map_size_bytes,
120 )?))
121 }
122 None => {
123 std::fs::create_dir_all(path.as_ref()).map_err(StoreError::Io)?;
124 remove_stale_fs_blob_shards(path.as_ref())?;
125 Ok(LocalStore::Lmdb(LmdbBlobStore::new(path)?))
126 }
127 },
128 #[cfg(not(feature = "lmdb"))]
129 StorageBackend::Lmdb => {
130 tracing::warn!(
131 "LMDB backend requested but lmdb feature not enabled, using filesystem storage"
132 );
133 Ok(LocalStore::Fs(FsBlobStore::new(path)?))
134 }
135 }
136 }
137
138 pub fn new_unbounded<P: AsRef<Path>>(
140 path: P,
141 backend: &StorageBackend,
142 ) -> Result<Self, StoreError> {
143 Self::new_with_lmdb_map_size(path, backend, None)
144 }
145
146 pub fn backend(&self) -> StorageBackend {
147 match self {
148 LocalStore::Fs(_) => StorageBackend::Fs,
149 #[cfg(feature = "lmdb")]
150 LocalStore::Lmdb(_) => StorageBackend::Lmdb,
151 }
152 }
153
154 pub fn put_sync(&self, hash: Hash, data: &[u8]) -> Result<bool, StoreError> {
156 match self {
157 LocalStore::Fs(store) => store.put_sync(hash, data),
158 #[cfg(feature = "lmdb")]
159 LocalStore::Lmdb(store) => store.put_sync(hash, data),
160 }
161 }
162
163 pub fn put_many_sync(&self, items: &[(Hash, Vec<u8>)]) -> Result<usize, StoreError> {
165 match self {
166 LocalStore::Fs(store) => {
167 let mut inserted = 0usize;
168 for (hash, data) in items {
169 if store.put_sync(*hash, data.as_slice())? {
170 inserted += 1;
171 }
172 }
173 Ok(inserted)
174 }
175 #[cfg(feature = "lmdb")]
176 LocalStore::Lmdb(store) => store.put_many_sync(items),
177 }
178 }
179
180 pub fn get_sync(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
182 match self {
183 LocalStore::Fs(store) => store.get_sync(hash),
184 #[cfg(feature = "lmdb")]
185 LocalStore::Lmdb(store) => store.get_sync(hash),
186 }
187 }
188
189 pub fn exists(&self, hash: &Hash) -> Result<bool, StoreError> {
191 match self {
192 LocalStore::Fs(store) => Ok(store.exists(hash)),
193 #[cfg(feature = "lmdb")]
194 LocalStore::Lmdb(store) => store.exists(hash),
195 }
196 }
197
198 pub fn delete_sync(&self, hash: &Hash) -> Result<bool, StoreError> {
200 match self {
201 LocalStore::Fs(store) => store.delete_sync(hash),
202 #[cfg(feature = "lmdb")]
203 LocalStore::Lmdb(store) => store.delete_sync(hash),
204 }
205 }
206
207 pub fn stats(&self) -> Result<LocalStoreStats, StoreError> {
209 match self {
210 LocalStore::Fs(store) => {
211 let stats = store.stats()?;
212 Ok(LocalStoreStats {
213 count: stats.count,
214 total_bytes: stats.total_bytes,
215 })
216 }
217 #[cfg(feature = "lmdb")]
218 LocalStore::Lmdb(store) => {
219 let stats = store.stats()?;
220 Ok(LocalStoreStats {
221 count: stats.count,
222 total_bytes: stats.total_bytes,
223 })
224 }
225 }
226 }
227
228 pub fn list(&self) -> Result<Vec<Hash>, StoreError> {
230 match self {
231 LocalStore::Fs(store) => store.list(),
232 #[cfg(feature = "lmdb")]
233 LocalStore::Lmdb(store) => store.list(),
234 }
235 }
236}
237
238#[async_trait]
239impl Store for LocalStore {
240 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
241 self.put_sync(hash, &data)
242 }
243
244 async fn put_many(&self, items: Vec<(Hash, Vec<u8>)>) -> Result<usize, StoreError> {
245 self.put_many_sync(&items)
246 }
247
248 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
249 self.get_sync(hash)
250 }
251
252 async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
253 self.exists(hash)
254 }
255
256 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
257 self.delete_sync(hash)
258 }
259}
260
261#[cfg(feature = "s3")]
262use tokio::sync::mpsc;
263
264use crate::config::S3Config;
265
266#[cfg(feature = "s3")]
268enum S3SyncMessage {
269 Upload { hash: Hash, data: Vec<u8> },
270 Delete { hash: Hash },
271}
272
273pub struct StorageRouter {
278 local: Arc<LocalStore>,
280 #[cfg(feature = "s3")]
282 s3_client: Option<aws_sdk_s3::Client>,
283 #[cfg(feature = "s3")]
284 s3_bucket: Option<String>,
285 #[cfg(feature = "s3")]
286 s3_prefix: String,
287 #[cfg(feature = "s3")]
289 sync_tx: Option<mpsc::UnboundedSender<S3SyncMessage>>,
290}
291
292impl StorageRouter {
293 #[cfg(feature = "s3")]
294 fn run_s3_future_sync<F, T>(future: F) -> Result<T, StoreError>
295 where
296 F: Future<Output = T> + Send + 'static,
297 T: Send + 'static,
298 {
299 if tokio::runtime::Handle::try_current().is_ok() {
300 return std::thread::Builder::new()
301 .name("storage-s3-sync".to_string())
302 .spawn(move || {
303 tokio::runtime::Builder::new_current_thread()
304 .enable_all()
305 .build()
306 .expect("build storage s3 sync runtime")
307 .block_on(future)
308 })
309 .map_err(|err| StoreError::Other(format!("spawn S3 sync helper thread: {err}")))?
310 .join()
311 .map_err(|_| StoreError::Other("S3 sync helper thread panicked".to_string()));
312 }
313
314 let runtime = tokio::runtime::Builder::new_current_thread()
315 .enable_all()
316 .build()
317 .map_err(|err| StoreError::Other(format!("build storage s3 sync runtime: {err}")))?;
318 Ok(runtime.block_on(future))
319 }
320
321 pub fn new(local: Arc<LocalStore>) -> Self {
323 Self {
324 local,
325 #[cfg(feature = "s3")]
326 s3_client: None,
327 #[cfg(feature = "s3")]
328 s3_bucket: None,
329 #[cfg(feature = "s3")]
330 s3_prefix: String::new(),
331 #[cfg(feature = "s3")]
332 sync_tx: None,
333 }
334 }
335
336 #[cfg(feature = "s3")]
338 pub async fn with_s3(local: Arc<LocalStore>, config: &S3Config) -> Result<Self, anyhow::Error> {
339 use aws_sdk_s3::Client as S3Client;
340
341 let mut aws_config_loader = aws_config::from_env();
343 aws_config_loader =
344 aws_config_loader.region(aws_sdk_s3::config::Region::new(config.region.clone()));
345 let aws_config = aws_config_loader.load().await;
346
347 let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&aws_config);
349 s3_config_builder = s3_config_builder
350 .endpoint_url(&config.endpoint)
351 .force_path_style(true);
352
353 let s3_client = S3Client::from_conf(s3_config_builder.build());
354 let bucket = config.bucket.clone();
355 let prefix = config.prefix.clone().unwrap_or_default();
356
357 let (sync_tx, mut sync_rx) = mpsc::unbounded_channel::<S3SyncMessage>();
359
360 let sync_client = s3_client.clone();
362 let sync_bucket = bucket.clone();
363 let sync_prefix = prefix.clone();
364
365 tokio::spawn(async move {
366 use aws_sdk_s3::primitives::ByteStream;
367
368 tracing::info!("S3 background sync task started");
369
370 let semaphore = std::sync::Arc::new(tokio::sync::Semaphore::new(32));
372 let client = std::sync::Arc::new(sync_client);
373 let bucket = std::sync::Arc::new(sync_bucket);
374 let prefix = std::sync::Arc::new(sync_prefix);
375
376 while let Some(msg) = sync_rx.recv().await {
377 let client = client.clone();
378 let bucket = bucket.clone();
379 let prefix = prefix.clone();
380 let semaphore = semaphore.clone();
381
382 tokio::spawn(async move {
384 let _permit = semaphore.acquire().await;
386
387 match msg {
388 S3SyncMessage::Upload { hash, data } => {
389 let key = format!("{}{}.bin", prefix, to_hex(&hash));
390 tracing::debug!("S3 uploading {} ({} bytes)", &key, data.len());
391
392 match client
393 .put_object()
394 .bucket(bucket.as_str())
395 .key(&key)
396 .body(ByteStream::from(data))
397 .send()
398 .await
399 {
400 Ok(_) => tracing::debug!("S3 upload succeeded: {}", &key),
401 Err(e) => tracing::error!("S3 upload failed {}: {}", &key, e),
402 }
403 }
404 S3SyncMessage::Delete { hash } => {
405 let key = format!("{}{}.bin", prefix, to_hex(&hash));
406 tracing::debug!("S3 deleting {}", &key);
407
408 if let Err(e) = client
409 .delete_object()
410 .bucket(bucket.as_str())
411 .key(&key)
412 .send()
413 .await
414 {
415 tracing::error!("S3 delete failed {}: {}", &key, e);
416 }
417 }
418 }
419 });
420 }
421 });
422
423 tracing::info!(
424 "S3 storage initialized: bucket={}, prefix={}",
425 bucket,
426 prefix
427 );
428
429 Ok(Self {
430 local,
431 s3_client: Some(s3_client),
432 s3_bucket: Some(bucket),
433 s3_prefix: prefix,
434 sync_tx: Some(sync_tx),
435 })
436 }
437
438 pub fn put_sync(&self, hash: Hash, data: &[u8]) -> Result<bool, StoreError> {
440 let is_new = self.local.put_sync(hash, data)?;
442
443 #[cfg(feature = "s3")]
446 if is_new {
447 if let Some(ref tx) = self.sync_tx {
448 tracing::debug!(
449 "Queueing S3 upload for {} ({} bytes)",
450 crate::storage::to_hex(&hash)[..16].to_string(),
451 data.len(),
452 );
453 if let Err(e) = tx.send(S3SyncMessage::Upload {
454 hash,
455 data: data.to_vec(),
456 }) {
457 tracing::error!("Failed to queue S3 upload: {}", e);
458 }
459 }
460 }
461
462 Ok(is_new)
463 }
464
465 pub fn put_many_sync(&self, items: &[(Hash, Vec<u8>)]) -> Result<usize, StoreError> {
467 #[cfg(feature = "s3")]
468 let pending_uploads = if self.sync_tx.is_some() {
469 let mut pending = Vec::new();
470 for (hash, data) in items {
471 if !self.local.exists(hash)? {
472 pending.push((*hash, data.clone()));
473 }
474 }
475 pending
476 } else {
477 Vec::new()
478 };
479
480 let inserted = self.local.put_many_sync(items)?;
481
482 #[cfg(feature = "s3")]
483 if let Some(ref tx) = self.sync_tx {
484 for (hash, data) in pending_uploads {
485 if let Err(e) = tx.send(S3SyncMessage::Upload { hash, data }) {
486 tracing::error!("Failed to queue S3 upload: {}", e);
487 }
488 }
489 }
490
491 Ok(inserted)
492 }
493
494 pub fn get_sync(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
496 if let Some(data) = self.local.get_sync(hash)? {
498 return Ok(Some(data));
499 }
500
501 #[cfg(feature = "s3")]
503 if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
504 let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
505 let client = client.clone();
506 let bucket = bucket.clone();
507
508 match Self::run_s3_future_sync(async move {
509 client.get_object().bucket(bucket).key(key).send().await
510 }) {
511 Ok(Ok(output)) => {
512 match Self::run_s3_future_sync(async move { output.body.collect().await }) {
513 Ok(Ok(body)) => {
514 let data = body.into_bytes().to_vec();
515 let _ = self.local.put_sync(*hash, &data);
517 return Ok(Some(data));
518 }
519 Ok(Err(err)) => {
520 tracing::warn!("S3 body collect failed: {}", err);
521 }
522 Err(err) => {
523 tracing::warn!("S3 body collect runtime failed: {}", err);
524 }
525 }
526 }
527 Ok(Err(err)) => {
528 let service_err = err.into_service_error();
529 if !service_err.is_no_such_key() {
530 tracing::warn!("S3 get failed: {}", service_err);
531 }
532 }
533 Err(err) => {
534 tracing::warn!("S3 get runtime failed: {}", err);
535 }
536 }
537 }
538
539 Ok(None)
540 }
541
542 pub fn exists(&self, hash: &Hash) -> Result<bool, StoreError> {
544 if self.local.exists(hash)? {
546 return Ok(true);
547 }
548
549 #[cfg(feature = "s3")]
551 if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
552 let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
553 let client = client.clone();
554 let bucket = bucket.clone();
555
556 match Self::run_s3_future_sync(async move {
557 client.head_object().bucket(bucket).key(&key).send().await
558 }) {
559 Ok(Ok(_)) => return Ok(true),
560 Ok(Err(err)) => {
561 let service_err = err.into_service_error();
562 if !service_err.is_not_found() {
563 tracing::warn!("S3 head failed: {}", service_err);
564 }
565 }
566 Err(err) => {
567 tracing::warn!("S3 head runtime failed: {}", err);
568 }
569 }
570 }
571
572 Ok(false)
573 }
574
575 pub fn delete_sync(&self, hash: &Hash) -> Result<bool, StoreError> {
577 let deleted = self.local.delete_sync(hash)?;
578
579 #[cfg(feature = "s3")]
581 if let Some(ref tx) = self.sync_tx {
582 let _ = tx.send(S3SyncMessage::Delete { hash: *hash });
583 }
584
585 Ok(deleted)
586 }
587
588 pub fn delete_local_only(&self, hash: &Hash) -> Result<bool, StoreError> {
591 self.local.delete_sync(hash)
592 }
593
594 pub fn stats(&self) -> Result<LocalStoreStats, StoreError> {
596 self.local.stats()
597 }
598
599 pub fn list(&self) -> Result<Vec<Hash>, StoreError> {
601 self.local.list()
602 }
603
604 pub fn local_store(&self) -> Arc<LocalStore> {
606 Arc::clone(&self.local)
607 }
608}
609
610#[async_trait]
613impl Store for StorageRouter {
614 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
615 self.put_sync(hash, &data)
616 }
617
618 async fn put_many(&self, items: Vec<(Hash, Vec<u8>)>) -> Result<usize, StoreError> {
619 self.put_many_sync(&items)
620 }
621
622 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
623 self.get_sync(hash)
624 }
625
626 async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
627 self.exists(hash)
628 }
629
630 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
631 self.delete_sync(hash)
632 }
633}
634
635pub struct HashtreeStore {
636 base_path: PathBuf,
637 env: heed::Env,
638 pins: Database<Bytes, Unit>,
640 pinned_refs: Database<Str, Unit>,
642 tracked_authors: Database<Str, Unit>,
644 blob_owners: Database<Bytes, Unit>,
646 pubkey_blobs: Database<Bytes, Bytes>,
648 tree_meta: Database<Bytes, Bytes>,
650 blob_trees: Database<Bytes, Unit>,
652 tree_refs: Database<Str, Bytes>,
654 cached_roots: Database<Str, Bytes>,
656 router: Arc<StorageRouter>,
658 max_size_bytes: u64,
660 evict_orphans: bool,
662}
663
664impl HashtreeStore {
665 pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
667 let config = hashtree_config::Config::load_or_default();
668 let max_size_bytes = config
669 .storage
670 .max_size_gb
671 .saturating_mul(1024 * 1024 * 1024);
672 Self::with_options_and_backend(
673 path,
674 None,
675 max_size_bytes,
676 config.storage.evict_orphans,
677 &config.storage.backend,
678 )
679 }
680
681 pub fn new_with_backend<P: AsRef<Path>>(
683 path: P,
684 backend: hashtree_config::StorageBackend,
685 max_size_bytes: u64,
686 ) -> Result<Self> {
687 Self::with_options_and_backend(path, None, max_size_bytes, true, &backend)
688 }
689
690 pub fn with_s3<P: AsRef<Path>>(path: P, s3_config: Option<&S3Config>) -> Result<Self> {
692 let config = hashtree_config::Config::load_or_default();
693 let max_size_bytes = config
694 .storage
695 .max_size_gb
696 .saturating_mul(1024 * 1024 * 1024);
697 Self::with_options_and_backend(
698 path,
699 s3_config,
700 max_size_bytes,
701 config.storage.evict_orphans,
702 &config.storage.backend,
703 )
704 }
705
706 pub fn with_options<P: AsRef<Path>>(
712 path: P,
713 s3_config: Option<&S3Config>,
714 max_size_bytes: u64,
715 ) -> Result<Self> {
716 let config = hashtree_config::Config::load_or_default();
717 Self::with_options_and_backend(
718 path,
719 s3_config,
720 max_size_bytes,
721 config.storage.evict_orphans,
722 &config.storage.backend,
723 )
724 }
725
726 fn with_options_and_backend<P: AsRef<Path>>(
727 path: P,
728 s3_config: Option<&S3Config>,
729 max_size_bytes: u64,
730 evict_orphans: bool,
731 backend: &hashtree_config::StorageBackend,
732 ) -> Result<Self> {
733 let path = path.as_ref();
734 std::fs::create_dir_all(path)?;
735
736 let env = unsafe {
737 EnvOpenOptions::new()
738 .map_size(10 * 1024 * 1024 * 1024) .max_dbs(10) .max_readers(LMDB_MAX_READERS)
741 .open(path)?
742 };
743 let _ = env.clear_stale_readers();
744
745 let mut wtxn = env.write_txn()?;
746 let pins = env.create_database(&mut wtxn, Some("pins"))?;
747 let pinned_refs = env.create_database(&mut wtxn, Some("pinned_refs"))?;
748 let tracked_authors = env.create_database(&mut wtxn, Some("tracked_authors"))?;
749 let blob_owners = env.create_database(&mut wtxn, Some("blob_owners"))?;
750 let pubkey_blobs = env.create_database(&mut wtxn, Some("pubkey_blobs"))?;
751 let tree_meta = env.create_database(&mut wtxn, Some("tree_meta"))?;
752 let blob_trees = env.create_database(&mut wtxn, Some("blob_trees"))?;
753 let tree_refs = env.create_database(&mut wtxn, Some("tree_refs"))?;
754 let cached_roots = env.create_database(&mut wtxn, Some("cached_roots"))?;
755 wtxn.commit()?;
756
757 let local_store = Arc::new(match backend {
761 hashtree_config::StorageBackend::Fs => LocalStore::Fs(
762 FsBlobStore::new(path.join("blobs"))
763 .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
764 ),
765 #[cfg(feature = "lmdb")]
766 hashtree_config::StorageBackend::Lmdb => {
767 std::fs::create_dir_all(path.join("blobs"))?;
768 remove_stale_fs_blob_shards(&path.join("blobs"))
769 .map_err(|e| anyhow::anyhow!("Failed to clean LMDB blob store path: {}", e))?;
770 let requested_map_size = max_size_bytes.max(LMDB_BLOB_MIN_MAP_SIZE_BYTES);
771 let map_size = usize::try_from(requested_map_size)
772 .context("LMDB blob map size exceeds usize")?;
773 LocalStore::Lmdb(
774 LmdbBlobStore::with_map_size(path.join("blobs"), map_size)
775 .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
776 )
777 }
778 #[cfg(not(feature = "lmdb"))]
779 hashtree_config::StorageBackend::Lmdb => {
780 tracing::warn!(
781 "LMDB backend requested but lmdb feature not enabled, using filesystem storage"
782 );
783 LocalStore::Fs(
784 FsBlobStore::new(path.join("blobs"))
785 .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
786 )
787 }
788 });
789
790 #[cfg(feature = "s3")]
792 let router = Arc::new(if let Some(s3_cfg) = s3_config {
793 tracing::info!(
794 "Initializing S3 storage backend: bucket={}, endpoint={}",
795 s3_cfg.bucket,
796 s3_cfg.endpoint
797 );
798
799 sync_block_on(async { StorageRouter::with_s3(local_store, s3_cfg).await })?
800 } else {
801 StorageRouter::new(local_store)
802 });
803
804 #[cfg(not(feature = "s3"))]
805 let router = Arc::new({
806 if s3_config.is_some() {
807 tracing::warn!(
808 "S3 config provided but S3 feature not enabled. Using local storage only."
809 );
810 }
811 StorageRouter::new(local_store)
812 });
813
814 Ok(Self {
815 base_path: path.to_path_buf(),
816 env,
817 pins,
818 pinned_refs,
819 tracked_authors,
820 blob_owners,
821 pubkey_blobs,
822 tree_meta,
823 blob_trees,
824 tree_refs,
825 cached_roots,
826 router,
827 max_size_bytes,
828 evict_orphans,
829 })
830 }
831
832 pub fn base_path(&self) -> &Path {
833 &self.base_path
834 }
835
836 pub fn router(&self) -> &StorageRouter {
838 &self.router
839 }
840
841 pub fn store_arc(&self) -> Arc<StorageRouter> {
844 Arc::clone(&self.router)
845 }
846
847 pub fn get_tree_node(&self, hash: &[u8; 32]) -> Result<Option<TreeNode>> {
849 let store = self.store_arc();
850 let tree = HashTree::new(HashTreeConfig::new(store).public());
851
852 sync_block_on(async {
853 tree.get_tree_node(hash)
854 .await
855 .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))
856 })
857 }
858
859 pub fn put_blob(&self, data: &[u8]) -> Result<String> {
861 let hash = sha256(data);
862 self.router
863 .put_sync(hash, data)
864 .map_err(|e| anyhow::anyhow!("Failed to store blob: {}", e))?;
865 Ok(to_hex(&hash))
866 }
867
868 pub fn put_cached_blob(&self, data: &[u8]) -> Result<String> {
874 let hash = sha256(data);
875 if self
876 .router
877 .exists(&hash)
878 .map_err(|e| anyhow::anyhow!("Failed to check cached blob: {}", e))?
879 {
880 return Ok(to_hex(&hash));
881 }
882
883 let incoming_bytes = data.len() as u64;
884 let _ = self.make_room_for_cached_blob(incoming_bytes);
885
886 let mut retried_after_cleanup = false;
887 loop {
888 match self.router.put_sync(hash, data) {
889 Ok(_) => return Ok(to_hex(&hash)),
890 Err(err) if !retried_after_cleanup && is_map_full_store_error(&err) => {
891 let freed = self.relieve_cached_blob_write_pressure(incoming_bytes)?;
892 if freed == 0 {
893 return Err(anyhow::anyhow!("Failed to store cached blob: {}", err));
894 }
895 retried_after_cleanup = true;
896 }
897 Err(err) => return Err(anyhow::anyhow!("Failed to store cached blob: {}", err)),
898 }
899 }
900 }
901
902 pub fn get_blob(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
904 self.router
905 .get_sync(hash)
906 .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))
907 }
908
909 pub fn blob_exists(&self, hash: &[u8; 32]) -> Result<bool> {
911 self.router
912 .exists(hash)
913 .map_err(|e| anyhow::anyhow!("Failed to check blob: {}", e))
914 }
915
916 fn blob_owner_key(sha256: &[u8; 32], pubkey: &[u8; 32]) -> [u8; 64] {
922 let mut key = [0u8; 64];
923 key[..32].copy_from_slice(sha256);
924 key[32..].copy_from_slice(pubkey);
925 key
926 }
927
928 pub fn set_blob_owner(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<()> {
931 let key = Self::blob_owner_key(sha256, pubkey);
932 let mut wtxn = self.env.write_txn()?;
933
934 self.blob_owners.put(&mut wtxn, &key[..], &())?;
936
937 let sha256_hex = to_hex(sha256);
939
940 let mut blobs: Vec<BlobMetadata> = self
942 .pubkey_blobs
943 .get(&wtxn, pubkey)?
944 .and_then(|b| serde_json::from_slice(b).ok())
945 .unwrap_or_default();
946
947 if !blobs.iter().any(|b| b.sha256 == sha256_hex) {
949 let now = SystemTime::now()
950 .duration_since(UNIX_EPOCH)
951 .unwrap()
952 .as_secs();
953
954 let size = self
956 .get_blob(sha256)?
957 .map(|data| data.len() as u64)
958 .unwrap_or(0);
959
960 blobs.push(BlobMetadata {
961 sha256: sha256_hex,
962 size,
963 mime_type: "application/octet-stream".to_string(),
964 uploaded: now,
965 });
966
967 let blobs_json = serde_json::to_vec(&blobs)?;
968 self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
969 }
970
971 wtxn.commit()?;
972 Ok(())
973 }
974
975 pub fn is_blob_owner(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<bool> {
977 let key = Self::blob_owner_key(sha256, pubkey);
978 let rtxn = self.env.read_txn()?;
979 Ok(self.blob_owners.get(&rtxn, &key[..])?.is_some())
980 }
981
982 pub fn get_blob_owners(&self, sha256: &[u8; 32]) -> Result<Vec<[u8; 32]>> {
984 let rtxn = self.env.read_txn()?;
985
986 let mut owners = Vec::new();
987 for item in self.blob_owners.prefix_iter(&rtxn, &sha256[..])? {
988 let (key, _) = item?;
989 if key.len() == 64 {
990 let mut pubkey = [0u8; 32];
992 pubkey.copy_from_slice(&key[32..64]);
993 owners.push(pubkey);
994 }
995 }
996 Ok(owners)
997 }
998
999 pub fn blob_has_owners(&self, sha256: &[u8; 32]) -> Result<bool> {
1001 let rtxn = self.env.read_txn()?;
1002
1003 for item in self.blob_owners.prefix_iter(&rtxn, &sha256[..])? {
1005 if item.is_ok() {
1006 return Ok(true);
1007 }
1008 }
1009 Ok(false)
1010 }
1011
1012 pub fn get_blob_owner(&self, sha256: &[u8; 32]) -> Result<Option<[u8; 32]>> {
1014 Ok(self.get_blob_owners(sha256)?.into_iter().next())
1015 }
1016
1017 pub fn delete_blossom_blob(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<bool> {
1021 let key = Self::blob_owner_key(sha256, pubkey);
1022 let mut wtxn = self.env.write_txn()?;
1023
1024 self.blob_owners.delete(&mut wtxn, &key[..])?;
1026
1027 let sha256_hex = to_hex(sha256);
1029
1030 if let Some(blobs_bytes) = self.pubkey_blobs.get(&wtxn, pubkey)? {
1032 if let Ok(mut blobs) = serde_json::from_slice::<Vec<BlobMetadata>>(blobs_bytes) {
1033 blobs.retain(|b| b.sha256 != sha256_hex);
1034 let blobs_json = serde_json::to_vec(&blobs)?;
1035 self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
1036 }
1037 }
1038
1039 let mut has_other_owners = false;
1041 for item in self.blob_owners.prefix_iter(&wtxn, &sha256[..])? {
1042 if item.is_ok() {
1043 has_other_owners = true;
1044 break;
1045 }
1046 }
1047
1048 if has_other_owners {
1049 wtxn.commit()?;
1050 tracing::debug!(
1051 "Removed {} from blob {} owners, other owners remain",
1052 &to_hex(pubkey)[..8],
1053 &sha256_hex[..8]
1054 );
1055 return Ok(false);
1056 }
1057
1058 tracing::info!(
1060 "All owners removed from blob {}, deleting",
1061 &sha256_hex[..8]
1062 );
1063
1064 let _ = self.router.delete_sync(sha256);
1066
1067 wtxn.commit()?;
1068 Ok(true)
1069 }
1070
1071 pub fn list_blobs_by_pubkey(
1073 &self,
1074 pubkey: &[u8; 32],
1075 ) -> Result<Vec<crate::server::blossom::BlobDescriptor>> {
1076 let rtxn = self.env.read_txn()?;
1077
1078 let blobs: Vec<BlobMetadata> = self
1079 .pubkey_blobs
1080 .get(&rtxn, pubkey)?
1081 .and_then(|b| serde_json::from_slice(b).ok())
1082 .unwrap_or_default();
1083
1084 Ok(blobs
1085 .into_iter()
1086 .map(|b| crate::server::blossom::BlobDescriptor {
1087 url: format!("/{}", b.sha256),
1088 sha256: b.sha256,
1089 size: b.size,
1090 mime_type: b.mime_type,
1091 uploaded: b.uploaded,
1092 })
1093 .collect())
1094 }
1095
1096 pub fn get_chunk(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
1098 self.router
1099 .get_sync(hash)
1100 .map_err(|e| anyhow::anyhow!("Failed to get chunk: {}", e))
1101 }
1102
1103 pub fn get_file(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
1106 let store = self.store_arc();
1107 let tree = HashTree::new(HashTreeConfig::new(store).public());
1108
1109 sync_block_on(async {
1110 tree.read_file(hash)
1111 .await
1112 .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
1113 })
1114 }
1115
1116 pub fn get_file_by_cid(&self, cid: &Cid) -> Result<Option<Vec<u8>>> {
1119 let store = self.store_arc();
1120 let tree = HashTree::new(HashTreeConfig::new(store).public());
1121
1122 sync_block_on(async {
1123 tree.get(cid, None)
1124 .await
1125 .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
1126 })
1127 }
1128
1129 fn ensure_cid_exists(&self, cid: &Cid) -> Result<()> {
1130 let exists = self
1131 .router
1132 .exists(&cid.hash)
1133 .map_err(|e| anyhow::anyhow!("Failed to check cid existence: {}", e))?;
1134 if !exists {
1135 anyhow::bail!("CID not found: {}", to_hex(&cid.hash));
1136 }
1137 Ok(())
1138 }
1139
1140 pub fn write_file_by_cid_to_writer<W: Write>(&self, cid: &Cid, writer: &mut W) -> Result<u64> {
1142 self.ensure_cid_exists(cid)?;
1143
1144 let store = self.store_arc();
1145 let tree = HashTree::new(HashTreeConfig::new(store).public());
1146 let mut total_bytes = 0u64;
1147 let mut streamed_any_chunk = false;
1148
1149 sync_block_on(async {
1150 let mut stream = tree.get_stream(cid);
1151 while let Some(chunk) = stream.next().await {
1152 streamed_any_chunk = true;
1153 let chunk =
1154 chunk.map_err(|e| anyhow::anyhow!("Failed to stream file chunk: {}", e))?;
1155 writer
1156 .write_all(&chunk)
1157 .map_err(|e| anyhow::anyhow!("Failed to write file chunk: {}", e))?;
1158 total_bytes += chunk.len() as u64;
1159 }
1160 Ok::<(), anyhow::Error>(())
1161 })?;
1162
1163 if !streamed_any_chunk {
1164 anyhow::bail!("CID not found: {}", to_hex(&cid.hash));
1165 }
1166
1167 writer
1168 .flush()
1169 .map_err(|e| anyhow::anyhow!("Failed to flush output: {}", e))?;
1170 Ok(total_bytes)
1171 }
1172
1173 pub fn write_file_by_cid<P: AsRef<Path>>(&self, cid: &Cid, output_path: P) -> Result<u64> {
1175 self.ensure_cid_exists(cid)?;
1176
1177 let output_path = output_path.as_ref();
1178 if let Some(parent) = output_path.parent() {
1179 if !parent.as_os_str().is_empty() {
1180 std::fs::create_dir_all(parent).with_context(|| {
1181 format!("Failed to create output directory {}", parent.display())
1182 })?;
1183 }
1184 }
1185
1186 let mut file = std::fs::File::create(output_path)
1187 .with_context(|| format!("Failed to create output file {}", output_path.display()))?;
1188 self.write_file_by_cid_to_writer(cid, &mut file)
1189 }
1190
1191 pub fn write_file<P: AsRef<Path>>(&self, hash: &[u8; 32], output_path: P) -> Result<u64> {
1193 self.write_file_by_cid(&Cid::public(*hash), output_path)
1194 }
1195
1196 pub fn resolve_path(&self, cid: &Cid, path: &str) -> Result<Option<Cid>> {
1198 let store = self.store_arc();
1199 let tree = HashTree::new(HashTreeConfig::new(store).public());
1200
1201 sync_block_on(async {
1202 tree.resolve_path(cid, path)
1203 .await
1204 .map_err(|e| anyhow::anyhow!("Failed to resolve path: {}", e))
1205 })
1206 }
1207
1208 pub fn get_file_chunk_metadata(&self, hash: &[u8; 32]) -> Result<Option<FileChunkMetadata>> {
1210 let store = self.store_arc();
1211 let tree = HashTree::new(HashTreeConfig::new(store.clone()).public());
1212
1213 sync_block_on(async {
1214 let exists = store
1217 .has(hash)
1218 .await
1219 .map_err(|e| anyhow::anyhow!("Failed to check existence: {}", e))?;
1220
1221 if !exists {
1222 return Ok(None);
1223 }
1224
1225 let total_size = tree
1227 .get_size(hash)
1228 .await
1229 .map_err(|e| anyhow::anyhow!("Failed to get size: {}", e))?;
1230
1231 let is_tree_node = tree
1233 .is_tree(hash)
1234 .await
1235 .map_err(|e| anyhow::anyhow!("Failed to check tree: {}", e))?;
1236
1237 if !is_tree_node {
1238 return Ok(Some(FileChunkMetadata {
1240 total_size,
1241 chunk_hashes: vec![],
1242 chunk_sizes: vec![],
1243 is_chunked: false,
1244 }));
1245 }
1246
1247 let node = match tree
1249 .get_tree_node(hash)
1250 .await
1251 .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
1252 {
1253 Some(n) => n,
1254 None => return Ok(None),
1255 };
1256
1257 let is_directory = tree
1259 .is_directory(hash)
1260 .await
1261 .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1262
1263 if is_directory {
1264 return Ok(None); }
1266
1267 let chunk_hashes: Vec<Hash> = node.links.iter().map(|l| l.hash).collect();
1269 let chunk_sizes: Vec<u64> = node.links.iter().map(|l| l.size).collect();
1270
1271 Ok(Some(FileChunkMetadata {
1272 total_size,
1273 chunk_hashes,
1274 chunk_sizes,
1275 is_chunked: !node.links.is_empty(),
1276 }))
1277 })
1278 }
1279
1280 pub fn get_file_range(
1282 &self,
1283 hash: &[u8; 32],
1284 start: u64,
1285 end: Option<u64>,
1286 ) -> Result<Option<(Vec<u8>, u64)>> {
1287 let metadata = match self.get_file_chunk_metadata(hash)? {
1288 Some(m) => m,
1289 None => return Ok(None),
1290 };
1291
1292 if metadata.total_size == 0 {
1293 return Ok(Some((Vec::new(), 0)));
1294 }
1295
1296 if start >= metadata.total_size {
1297 return Ok(None);
1298 }
1299
1300 let end = end
1301 .unwrap_or(metadata.total_size - 1)
1302 .min(metadata.total_size - 1);
1303
1304 if !metadata.is_chunked {
1306 let content = self.get_file(hash)?.unwrap_or_default();
1307 let range_content = if start < content.len() as u64 {
1308 content[start as usize..=(end as usize).min(content.len() - 1)].to_vec()
1309 } else {
1310 Vec::new()
1311 };
1312 return Ok(Some((range_content, metadata.total_size)));
1313 }
1314
1315 let mut result = Vec::new();
1317 let mut current_offset = 0u64;
1318
1319 for (i, chunk_hash) in metadata.chunk_hashes.iter().enumerate() {
1320 let chunk_size = metadata.chunk_sizes[i];
1321 let chunk_end = current_offset + chunk_size - 1;
1322
1323 if chunk_end >= start && current_offset <= end {
1325 let chunk_content = match self.get_chunk(chunk_hash)? {
1326 Some(content) => content,
1327 None => {
1328 return Err(anyhow::anyhow!("Chunk {} not found", to_hex(chunk_hash)));
1329 }
1330 };
1331
1332 let chunk_read_start = if current_offset >= start {
1333 0
1334 } else {
1335 (start - current_offset) as usize
1336 };
1337
1338 let chunk_read_end = if chunk_end <= end {
1339 chunk_size as usize - 1
1340 } else {
1341 (end - current_offset) as usize
1342 };
1343
1344 result.extend_from_slice(&chunk_content[chunk_read_start..=chunk_read_end]);
1345 }
1346
1347 current_offset += chunk_size;
1348
1349 if current_offset > end {
1350 break;
1351 }
1352 }
1353
1354 Ok(Some((result, metadata.total_size)))
1355 }
1356
1357 pub fn stream_file_range_chunks_owned(
1359 self: Arc<Self>,
1360 hash: &[u8; 32],
1361 start: u64,
1362 end: u64,
1363 ) -> Result<Option<FileRangeChunksOwned>> {
1364 let metadata = match self.get_file_chunk_metadata(hash)? {
1365 Some(m) => m,
1366 None => return Ok(None),
1367 };
1368
1369 if metadata.total_size == 0 || start >= metadata.total_size {
1370 return Ok(None);
1371 }
1372
1373 let end = end.min(metadata.total_size - 1);
1374
1375 Ok(Some(FileRangeChunksOwned {
1376 store: self,
1377 metadata,
1378 start,
1379 end,
1380 current_chunk_idx: 0,
1381 current_offset: 0,
1382 }))
1383 }
1384
1385 pub fn get_directory_listing(&self, hash: &[u8; 32]) -> Result<Option<DirectoryListing>> {
1387 let store = self.store_arc();
1388 let tree = HashTree::new(HashTreeConfig::new(store).public());
1389
1390 sync_block_on(async {
1391 let is_dir = tree
1393 .is_directory(hash)
1394 .await
1395 .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1396
1397 if !is_dir {
1398 return Ok(None);
1399 }
1400
1401 let cid = hashtree_core::Cid::public(*hash);
1403 let tree_entries = tree
1404 .list_directory(&cid)
1405 .await
1406 .map_err(|e| anyhow::anyhow!("Failed to list directory: {}", e))?;
1407
1408 let entries: Vec<DirEntry> = tree_entries
1409 .into_iter()
1410 .map(|e| DirEntry {
1411 name: e.name,
1412 cid: to_hex(&e.hash),
1413 is_directory: e.link_type.is_tree(),
1414 size: e.size,
1415 })
1416 .collect();
1417
1418 Ok(Some(DirectoryListing {
1419 dir_name: String::new(),
1420 entries,
1421 }))
1422 })
1423 }
1424
1425 pub fn get_directory_listing_by_cid(&self, cid: &Cid) -> Result<Option<DirectoryListing>> {
1427 let store = self.store_arc();
1428 let tree = HashTree::new(HashTreeConfig::new(store).public());
1429 let cid = cid.clone();
1430
1431 sync_block_on(async {
1432 let is_dir = tree
1433 .is_dir(&cid)
1434 .await
1435 .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1436
1437 if !is_dir {
1438 return Ok(None);
1439 }
1440
1441 let tree_entries = tree
1442 .list_directory(&cid)
1443 .await
1444 .map_err(|e| anyhow::anyhow!("Failed to list directory: {}", e))?;
1445
1446 let entries: Vec<DirEntry> = tree_entries
1447 .into_iter()
1448 .map(|e| DirEntry {
1449 name: e.name,
1450 cid: Cid {
1451 hash: e.hash,
1452 key: e.key,
1453 }
1454 .to_string(),
1455 is_directory: e.link_type.is_tree(),
1456 size: e.size,
1457 })
1458 .collect();
1459
1460 Ok(Some(DirectoryListing {
1461 dir_name: String::new(),
1462 entries,
1463 }))
1464 })
1465 }
1466
1467 pub fn add_pinned_ref(&self, key: &str) -> Result<()> {
1471 let mut wtxn = self.env.write_txn()?;
1472 self.pinned_refs.put(&mut wtxn, key, &())?;
1473 wtxn.commit()?;
1474 Ok(())
1475 }
1476
1477 pub fn remove_pinned_ref(&self, key: &str) -> Result<bool> {
1479 let mut wtxn = self.env.write_txn()?;
1480 let removed = self.pinned_refs.delete(&mut wtxn, key)?;
1481 wtxn.commit()?;
1482 Ok(removed)
1483 }
1484
1485 pub fn list_pinned_refs(&self) -> Result<Vec<String>> {
1487 let rtxn = self.env.read_txn()?;
1488 let mut refs = Vec::new();
1489
1490 for item in self.pinned_refs.iter(&rtxn)? {
1491 let (key, _) = item?;
1492 refs.push(key.to_string());
1493 }
1494
1495 refs.sort();
1496 Ok(refs)
1497 }
1498
1499 pub fn add_tracked_author(&self, npub: &str) -> Result<bool> {
1501 let mut wtxn = self.env.write_txn()?;
1502 let inserted = self.tracked_authors.get(&wtxn, npub)?.is_none();
1503 self.tracked_authors.put(&mut wtxn, npub, &())?;
1504 wtxn.commit()?;
1505 Ok(inserted)
1506 }
1507
1508 pub fn remove_tracked_author(&self, npub: &str) -> Result<bool> {
1510 let mut wtxn = self.env.write_txn()?;
1511 let removed = self.tracked_authors.delete(&mut wtxn, npub)?;
1512 wtxn.commit()?;
1513 Ok(removed)
1514 }
1515
1516 pub fn list_tracked_authors(&self) -> Result<Vec<String>> {
1518 let rtxn = self.env.read_txn()?;
1519 let mut authors = Vec::new();
1520
1521 for item in self.tracked_authors.iter(&rtxn)? {
1522 let (npub, _) = item?;
1523 authors.push(npub.to_string());
1524 }
1525
1526 authors.sort();
1527 Ok(authors)
1528 }
1529
1530 pub fn get_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<Option<CachedRoot>> {
1532 let key = format!("{}/{}", pubkey_hex, tree_name);
1533 let rtxn = self.env.read_txn()?;
1534 if let Some(bytes) = self.cached_roots.get(&rtxn, &key)? {
1535 let root: CachedRoot = rmp_serde::from_slice(bytes)
1536 .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
1537 Ok(Some(root))
1538 } else {
1539 Ok(None)
1540 }
1541 }
1542
1543 pub fn set_cached_root(
1545 &self,
1546 pubkey_hex: &str,
1547 tree_name: &str,
1548 hash: &str,
1549 key: Option<&str>,
1550 visibility: &str,
1551 updated_at: u64,
1552 ) -> Result<()> {
1553 let db_key = format!("{}/{}", pubkey_hex, tree_name);
1554 let root = CachedRoot {
1555 hash: hash.to_string(),
1556 key: key.map(|k| k.to_string()),
1557 updated_at,
1558 visibility: visibility.to_string(),
1559 };
1560 let bytes = rmp_serde::to_vec(&root)
1561 .map_err(|e| anyhow::anyhow!("Failed to serialize CachedRoot: {}", e))?;
1562 let mut wtxn = self.env.write_txn()?;
1563 self.cached_roots.put(&mut wtxn, &db_key, &bytes)?;
1564 wtxn.commit()?;
1565 Ok(())
1566 }
1567
1568 pub fn list_cached_roots(&self, pubkey_hex: &str) -> Result<Vec<(String, CachedRoot)>> {
1570 let prefix = format!("{}/", pubkey_hex);
1571 let rtxn = self.env.read_txn()?;
1572 let mut results = Vec::new();
1573
1574 for item in self.cached_roots.iter(&rtxn)? {
1575 let (key, bytes) = item?;
1576 if key.starts_with(&prefix) {
1577 let tree_name = key.strip_prefix(&prefix).unwrap_or(key);
1578 let root: CachedRoot = rmp_serde::from_slice(bytes)
1579 .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
1580 results.push((tree_name.to_string(), root));
1581 }
1582 }
1583
1584 Ok(results)
1585 }
1586
1587 pub fn delete_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<bool> {
1589 let key = format!("{}/{}", pubkey_hex, tree_name);
1590 let mut wtxn = self.env.write_txn()?;
1591 let deleted = self.cached_roots.delete(&mut wtxn, &key)?;
1592 wtxn.commit()?;
1593 Ok(deleted)
1594 }
1595}
1596
1597fn is_map_full_store_error(err: &StoreError) -> bool {
1598 let message = err.to_string();
1599 message.contains("MDB_MAP_FULL") || message.contains("MapFull")
1600}
1601
1602#[derive(Debug, Clone)]
1603pub struct FileChunkMetadata {
1604 pub total_size: u64,
1605 pub chunk_hashes: Vec<Hash>,
1606 pub chunk_sizes: Vec<u64>,
1607 pub is_chunked: bool,
1608}
1609
1610pub struct FileRangeChunksOwned {
1612 store: Arc<HashtreeStore>,
1613 metadata: FileChunkMetadata,
1614 start: u64,
1615 end: u64,
1616 current_chunk_idx: usize,
1617 current_offset: u64,
1618}
1619
1620impl Iterator for FileRangeChunksOwned {
1621 type Item = Result<Vec<u8>>;
1622
1623 fn next(&mut self) -> Option<Self::Item> {
1624 if !self.metadata.is_chunked || self.current_chunk_idx >= self.metadata.chunk_hashes.len() {
1625 return None;
1626 }
1627
1628 if self.current_offset > self.end {
1629 return None;
1630 }
1631
1632 let chunk_hash = &self.metadata.chunk_hashes[self.current_chunk_idx];
1633 let chunk_size = self.metadata.chunk_sizes[self.current_chunk_idx];
1634 let chunk_end = self.current_offset + chunk_size - 1;
1635
1636 self.current_chunk_idx += 1;
1637
1638 if chunk_end < self.start || self.current_offset > self.end {
1639 self.current_offset += chunk_size;
1640 return self.next();
1641 }
1642
1643 let chunk_content = match self.store.get_chunk(chunk_hash) {
1644 Ok(Some(content)) => content,
1645 Ok(None) => {
1646 return Some(Err(anyhow::anyhow!(
1647 "Chunk {} not found",
1648 to_hex(chunk_hash)
1649 )));
1650 }
1651 Err(e) => {
1652 return Some(Err(e));
1653 }
1654 };
1655
1656 let chunk_read_start = if self.current_offset >= self.start {
1657 0
1658 } else {
1659 (self.start - self.current_offset) as usize
1660 };
1661
1662 let chunk_read_end = if chunk_end <= self.end {
1663 chunk_size as usize - 1
1664 } else {
1665 (self.end - self.current_offset) as usize
1666 };
1667
1668 let result = chunk_content[chunk_read_start..=chunk_read_end].to_vec();
1669 self.current_offset += chunk_size;
1670
1671 Some(Ok(result))
1672 }
1673}
1674
1675#[derive(Debug)]
1676pub struct GcStats {
1677 pub deleted_dags: usize,
1678 pub freed_bytes: u64,
1679}
1680
1681#[derive(Debug, Clone)]
1682pub struct DirEntry {
1683 pub name: String,
1684 pub cid: String,
1685 pub is_directory: bool,
1686 pub size: u64,
1687}
1688
1689#[derive(Debug, Clone)]
1690pub struct DirectoryListing {
1691 pub dir_name: String,
1692 pub entries: Vec<DirEntry>,
1693}
1694
1695#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
1697pub struct BlobMetadata {
1698 pub sha256: String,
1699 pub size: u64,
1700 pub mime_type: String,
1701 pub uploaded: u64,
1702}
1703
1704impl crate::webrtc::ContentStore for HashtreeStore {
1706 fn get(&self, hash_hex: &str) -> Result<Option<Vec<u8>>> {
1707 let hash = from_hex(hash_hex).map_err(|e| anyhow::anyhow!("Invalid hash: {}", e))?;
1708 self.get_chunk(&hash)
1709 }
1710}
1711
1712#[cfg(test)]
1713mod tests {
1714 use super::*;
1715 #[cfg(feature = "lmdb")]
1716 use tempfile::TempDir;
1717
1718 #[cfg(feature = "lmdb")]
1719 #[test]
1720 fn hashtree_store_expands_blob_lmdb_map_size_to_storage_budget() -> Result<()> {
1721 let temp = TempDir::new()?;
1722 let requested = LMDB_BLOB_MIN_MAP_SIZE_BYTES + 64 * 1024 * 1024;
1723 let store = HashtreeStore::with_options_and_backend(
1724 temp.path(),
1725 None,
1726 requested,
1727 true,
1728 &StorageBackend::Lmdb,
1729 )?;
1730
1731 let map_size = match store.router.local.as_ref() {
1732 LocalStore::Lmdb(local) => local.map_size_bytes() as u64,
1733 LocalStore::Fs(_) => panic!("expected LMDB local store"),
1734 };
1735
1736 assert!(
1737 map_size >= requested,
1738 "expected blob LMDB map to grow to at least {requested} bytes, got {map_size}"
1739 );
1740
1741 drop(store);
1742 Ok(())
1743 }
1744
1745 #[cfg(feature = "lmdb")]
1746 #[test]
1747 fn local_store_can_override_lmdb_map_size() -> Result<()> {
1748 let temp = TempDir::new()?;
1749 let requested = 512 * 1024 * 1024u64;
1750 let store = LocalStore::new_with_lmdb_map_size(
1751 temp.path().join("lmdb-blobs"),
1752 &StorageBackend::Lmdb,
1753 Some(requested),
1754 )?;
1755
1756 let map_size = match store {
1757 LocalStore::Lmdb(local) => local.map_size_bytes() as u64,
1758 LocalStore::Fs(_) => panic!("expected LMDB local store"),
1759 };
1760
1761 assert!(
1762 map_size >= requested,
1763 "expected LMDB map to grow to at least {requested} bytes, got {map_size}"
1764 );
1765
1766 Ok(())
1767 }
1768
1769 #[cfg(feature = "lmdb")]
1770 #[test]
1771 fn lmdb_local_store_removes_stale_fs_blob_shard_dirs() -> Result<()> {
1772 let temp = TempDir::new()?;
1773 let path = temp.path().join("lmdb-blobs");
1774 std::fs::create_dir_all(path.join("aa"))?;
1775 std::fs::create_dir_all(path.join("b2"))?;
1776 std::fs::create_dir_all(path.join("keep-me"))?;
1777 std::fs::write(path.join("aa").join("blob.bin"), b"old fs shard")?;
1778 std::fs::write(path.join("b2").join("blob.bin"), b"old fs shard")?;
1779 std::fs::write(path.join("keep-me").join("note.txt"), b"keep")?;
1780
1781 let _store = LocalStore::new_with_lmdb_map_size(
1782 &path,
1783 &StorageBackend::Lmdb,
1784 Some(128 * 1024 * 1024),
1785 )?;
1786
1787 assert!(!path.join("aa").exists());
1788 assert!(!path.join("b2").exists());
1789 assert!(path.join("keep-me").exists());
1790 assert!(path.join("data.mdb").exists());
1791 assert!(path.join("lock.mdb").exists());
1792
1793 Ok(())
1794 }
1795
1796 #[cfg(feature = "lmdb")]
1797 #[test]
1798 fn replacing_tree_ref_unpins_and_unindexes_superseded_root() -> Result<()> {
1799 let temp = TempDir::new()?;
1800 let store = HashtreeStore::with_options_and_backend(
1801 temp.path(),
1802 None,
1803 LMDB_BLOB_MIN_MAP_SIZE_BYTES,
1804 true,
1805 &StorageBackend::Lmdb,
1806 )?;
1807
1808 let old_bytes = b"old published root";
1809 let new_bytes = b"new published root";
1810 let old_root = sha256(old_bytes);
1811 let new_root = sha256(new_bytes);
1812
1813 store.put_blob(old_bytes)?;
1814 store.pin(&old_root)?;
1815 store.index_tree(
1816 &old_root,
1817 "owner",
1818 Some("playlist"),
1819 PRIORITY_OWN,
1820 Some("npub1owner/playlist"),
1821 )?;
1822
1823 assert!(store.is_pinned(&old_root)?);
1824 assert!(store.get_tree_meta(&old_root)?.is_some());
1825
1826 store.put_blob(new_bytes)?;
1827 store.pin(&new_root)?;
1828 store.index_tree(
1829 &new_root,
1830 "owner",
1831 Some("playlist"),
1832 PRIORITY_OWN,
1833 Some("npub1owner/playlist"),
1834 )?;
1835
1836 assert!(
1837 !store.is_pinned(&old_root)?,
1838 "superseded root should be unpinned when ref is replaced"
1839 );
1840 assert!(
1841 store.get_tree_meta(&old_root)?.is_none(),
1842 "superseded root metadata should be removed when ref is replaced"
1843 );
1844 assert!(store.is_pinned(&new_root)?);
1845 assert!(store.get_tree_meta(&new_root)?.is_some());
1846
1847 Ok(())
1848 }
1849
1850 #[test]
1851 fn tracked_authors_round_trip_sorted_and_deduplicated() -> Result<()> {
1852 let temp = TempDir::new()?;
1853 let store = HashtreeStore::with_options(temp.path(), None, 1024 * 1024)?;
1854
1855 store
1856 .add_tracked_author("npub1zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzs9d3kk")?;
1857 store
1858 .add_tracked_author("npub1aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaqf5slm")?;
1859 store
1860 .add_tracked_author("npub1zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzs9d3kk")?;
1861
1862 assert_eq!(
1863 store.list_tracked_authors()?,
1864 vec![
1865 "npub1aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaqf5slm".to_string(),
1866 "npub1zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzs9d3kk".to_string(),
1867 ]
1868 );
1869 assert!(store.remove_tracked_author(
1870 "npub1aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaqf5slm"
1871 )?);
1872 assert!(!store.remove_tracked_author(
1873 "npub1bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbpqqqqq"
1874 )?);
1875 assert_eq!(
1876 store.list_tracked_authors()?,
1877 vec!["npub1zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzs9d3kk".to_string()]
1878 );
1879
1880 Ok(())
1881 }
1882
1883 #[cfg(feature = "s3")]
1884 #[test]
1885 fn async_store_s3_fallback_does_not_reenter_futures_executor() -> Result<()> {
1886 let temp = tempfile::TempDir::new()?;
1887 let local = Arc::new(LocalStore::new(
1888 temp.path().join("blobs"),
1889 &StorageBackend::Fs,
1890 )?);
1891
1892 let outcome = std::panic::catch_unwind(|| {
1893 sync_block_on(async {
1894 let aws_config = aws_config::from_env()
1895 .region(aws_sdk_s3::config::Region::new("auto"))
1896 .load()
1897 .await;
1898 let s3_client = aws_sdk_s3::Client::from_conf(
1899 aws_sdk_s3::config::Builder::from(&aws_config)
1900 .endpoint_url("http://127.0.0.1:9")
1901 .force_path_style(true)
1902 .build(),
1903 );
1904
1905 let router = StorageRouter {
1906 local,
1907 s3_client: Some(s3_client),
1908 s3_bucket: Some("test-bucket".to_string()),
1909 s3_prefix: String::new(),
1910 sync_tx: None,
1911 };
1912 let hash = [0u8; 32];
1913
1914 let _ = Store::has(&router, &hash).await;
1915 let _ = Store::get(&router, &hash).await;
1916 });
1917 });
1918
1919 assert!(
1920 outcome.is_ok(),
1921 "S3-backed async store methods should not panic inside futures::block_on"
1922 );
1923
1924 Ok(())
1925 }
1926}