1use anyhow::{Context, Result};
2use async_trait::async_trait;
3use futures::executor::block_on as sync_block_on;
4use futures::io::AllowStdIo;
5use futures::StreamExt;
6use hashtree_config::StorageBackend;
7use hashtree_core::store::{Store, StoreError};
8use hashtree_core::{
9 from_hex, sha256, to_hex, types::Hash, Cid, DirEntry as HashTreeDirEntry, HashTree,
10 HashTreeConfig, TreeNode,
11};
12use hashtree_fs::FsBlobStore;
13#[cfg(feature = "lmdb")]
14use hashtree_lmdb::LmdbBlobStore;
15use heed::types::*;
16use heed::{Database, EnvOpenOptions};
17use serde::{Deserialize, Serialize};
18use std::collections::HashSet;
19use std::io::{Read, Write};
20use std::path::Path;
21use std::sync::Arc;
22use std::time::{SystemTime, UNIX_EPOCH};
23
24pub const PRIORITY_OTHER: u8 = 64;
26pub const PRIORITY_FOLLOWED: u8 = 128;
27pub const PRIORITY_OWN: u8 = 255;
28const LMDB_MAX_READERS: u32 = 1024;
29#[cfg(feature = "lmdb")]
30const LMDB_BLOB_MIN_MAP_SIZE_BYTES: u64 = 10 * 1024 * 1024 * 1024;
31
32#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct TreeMeta {
35 pub owner: String,
37 pub name: Option<String>,
39 pub synced_at: u64,
41 pub total_size: u64,
43 pub priority: u8,
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct CachedRoot {
50 pub hash: String,
52 pub key: Option<String>,
54 pub updated_at: u64,
56 pub visibility: String,
58}
59
60#[derive(Debug, Clone)]
62pub struct LocalStoreStats {
63 pub count: usize,
64 pub total_bytes: u64,
65}
66
67pub enum LocalStore {
69 Fs(FsBlobStore),
70 #[cfg(feature = "lmdb")]
71 Lmdb(LmdbBlobStore),
72}
73
74impl LocalStore {
75 pub fn new<P: AsRef<Path>>(path: P, backend: &StorageBackend) -> Result<Self, StoreError> {
81 Self::new_unbounded(path, backend)
82 }
83
84 pub fn new_with_lmdb_map_size<P: AsRef<Path>>(
86 path: P,
87 backend: &StorageBackend,
88 _map_size_bytes: Option<u64>,
89 ) -> Result<Self, StoreError> {
90 match backend {
91 StorageBackend::Fs => Ok(LocalStore::Fs(FsBlobStore::new(path)?)),
92 #[cfg(feature = "lmdb")]
93 StorageBackend::Lmdb => match _map_size_bytes {
94 Some(map_size_bytes) => {
95 let map_size = usize::try_from(map_size_bytes).map_err(|_| {
96 StoreError::Other("LMDB map size exceeds usize".to_string())
97 })?;
98 Ok(LocalStore::Lmdb(LmdbBlobStore::with_map_size(
99 path, map_size,
100 )?))
101 }
102 None => Ok(LocalStore::Lmdb(LmdbBlobStore::new(path)?)),
103 },
104 #[cfg(not(feature = "lmdb"))]
105 StorageBackend::Lmdb => {
106 tracing::warn!(
107 "LMDB backend requested but lmdb feature not enabled, using filesystem storage"
108 );
109 Ok(LocalStore::Fs(FsBlobStore::new(path)?))
110 }
111 }
112 }
113
114 pub fn new_unbounded<P: AsRef<Path>>(
116 path: P,
117 backend: &StorageBackend,
118 ) -> Result<Self, StoreError> {
119 Self::new_with_lmdb_map_size(path, backend, None)
120 }
121
122 pub fn backend(&self) -> StorageBackend {
123 match self {
124 LocalStore::Fs(_) => StorageBackend::Fs,
125 #[cfg(feature = "lmdb")]
126 LocalStore::Lmdb(_) => StorageBackend::Lmdb,
127 }
128 }
129
130 pub fn put_sync(&self, hash: Hash, data: &[u8]) -> Result<bool, StoreError> {
132 match self {
133 LocalStore::Fs(store) => store.put_sync(hash, data),
134 #[cfg(feature = "lmdb")]
135 LocalStore::Lmdb(store) => store.put_sync(hash, data),
136 }
137 }
138
139 pub fn put_many_sync(&self, items: &[(Hash, Vec<u8>)]) -> Result<usize, StoreError> {
141 match self {
142 LocalStore::Fs(store) => {
143 let mut inserted = 0usize;
144 for (hash, data) in items {
145 if store.put_sync(*hash, data.as_slice())? {
146 inserted += 1;
147 }
148 }
149 Ok(inserted)
150 }
151 #[cfg(feature = "lmdb")]
152 LocalStore::Lmdb(store) => store.put_many_sync(items),
153 }
154 }
155
156 pub fn get_sync(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
158 match self {
159 LocalStore::Fs(store) => store.get_sync(hash),
160 #[cfg(feature = "lmdb")]
161 LocalStore::Lmdb(store) => store.get_sync(hash),
162 }
163 }
164
165 pub fn exists(&self, hash: &Hash) -> Result<bool, StoreError> {
167 match self {
168 LocalStore::Fs(store) => Ok(store.exists(hash)),
169 #[cfg(feature = "lmdb")]
170 LocalStore::Lmdb(store) => store.exists(hash),
171 }
172 }
173
174 pub fn delete_sync(&self, hash: &Hash) -> Result<bool, StoreError> {
176 match self {
177 LocalStore::Fs(store) => store.delete_sync(hash),
178 #[cfg(feature = "lmdb")]
179 LocalStore::Lmdb(store) => store.delete_sync(hash),
180 }
181 }
182
183 pub fn stats(&self) -> Result<LocalStoreStats, StoreError> {
185 match self {
186 LocalStore::Fs(store) => {
187 let stats = store.stats()?;
188 Ok(LocalStoreStats {
189 count: stats.count,
190 total_bytes: stats.total_bytes,
191 })
192 }
193 #[cfg(feature = "lmdb")]
194 LocalStore::Lmdb(store) => {
195 let stats = store.stats()?;
196 Ok(LocalStoreStats {
197 count: stats.count,
198 total_bytes: stats.total_bytes,
199 })
200 }
201 }
202 }
203
204 pub fn list(&self) -> Result<Vec<Hash>, StoreError> {
206 match self {
207 LocalStore::Fs(store) => store.list(),
208 #[cfg(feature = "lmdb")]
209 LocalStore::Lmdb(store) => store.list(),
210 }
211 }
212}
213
214#[async_trait]
215impl Store for LocalStore {
216 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
217 self.put_sync(hash, &data)
218 }
219
220 async fn put_many(&self, items: Vec<(Hash, Vec<u8>)>) -> Result<usize, StoreError> {
221 self.put_many_sync(&items)
222 }
223
224 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
225 self.get_sync(hash)
226 }
227
228 async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
229 self.exists(hash)
230 }
231
232 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
233 self.delete_sync(hash)
234 }
235}
236
237#[cfg(feature = "s3")]
238use tokio::sync::mpsc;
239
240use crate::config::S3Config;
241
242#[cfg(feature = "s3")]
244enum S3SyncMessage {
245 Upload { hash: Hash, data: Vec<u8> },
246 Delete { hash: Hash },
247}
248
249pub struct StorageRouter {
254 local: Arc<LocalStore>,
256 #[cfg(feature = "s3")]
258 s3_client: Option<aws_sdk_s3::Client>,
259 #[cfg(feature = "s3")]
260 s3_bucket: Option<String>,
261 #[cfg(feature = "s3")]
262 s3_prefix: String,
263 #[cfg(feature = "s3")]
265 sync_tx: Option<mpsc::UnboundedSender<S3SyncMessage>>,
266}
267
268impl StorageRouter {
269 pub fn new(local: Arc<LocalStore>) -> Self {
271 Self {
272 local,
273 #[cfg(feature = "s3")]
274 s3_client: None,
275 #[cfg(feature = "s3")]
276 s3_bucket: None,
277 #[cfg(feature = "s3")]
278 s3_prefix: String::new(),
279 #[cfg(feature = "s3")]
280 sync_tx: None,
281 }
282 }
283
284 #[cfg(feature = "s3")]
286 pub async fn with_s3(local: Arc<LocalStore>, config: &S3Config) -> Result<Self, anyhow::Error> {
287 use aws_sdk_s3::Client as S3Client;
288
289 let mut aws_config_loader = aws_config::from_env();
291 aws_config_loader =
292 aws_config_loader.region(aws_sdk_s3::config::Region::new(config.region.clone()));
293 let aws_config = aws_config_loader.load().await;
294
295 let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&aws_config);
297 s3_config_builder = s3_config_builder
298 .endpoint_url(&config.endpoint)
299 .force_path_style(true);
300
301 let s3_client = S3Client::from_conf(s3_config_builder.build());
302 let bucket = config.bucket.clone();
303 let prefix = config.prefix.clone().unwrap_or_default();
304
305 let (sync_tx, mut sync_rx) = mpsc::unbounded_channel::<S3SyncMessage>();
307
308 let sync_client = s3_client.clone();
310 let sync_bucket = bucket.clone();
311 let sync_prefix = prefix.clone();
312
313 tokio::spawn(async move {
314 use aws_sdk_s3::primitives::ByteStream;
315
316 tracing::info!("S3 background sync task started");
317
318 let semaphore = std::sync::Arc::new(tokio::sync::Semaphore::new(32));
320 let client = std::sync::Arc::new(sync_client);
321 let bucket = std::sync::Arc::new(sync_bucket);
322 let prefix = std::sync::Arc::new(sync_prefix);
323
324 while let Some(msg) = sync_rx.recv().await {
325 let client = client.clone();
326 let bucket = bucket.clone();
327 let prefix = prefix.clone();
328 let semaphore = semaphore.clone();
329
330 tokio::spawn(async move {
332 let _permit = semaphore.acquire().await;
334
335 match msg {
336 S3SyncMessage::Upload { hash, data } => {
337 let key = format!("{}{}.bin", prefix, to_hex(&hash));
338 tracing::debug!("S3 uploading {} ({} bytes)", &key, data.len());
339
340 match client
341 .put_object()
342 .bucket(bucket.as_str())
343 .key(&key)
344 .body(ByteStream::from(data))
345 .send()
346 .await
347 {
348 Ok(_) => tracing::debug!("S3 upload succeeded: {}", &key),
349 Err(e) => tracing::error!("S3 upload failed {}: {}", &key, e),
350 }
351 }
352 S3SyncMessage::Delete { hash } => {
353 let key = format!("{}{}.bin", prefix, to_hex(&hash));
354 tracing::debug!("S3 deleting {}", &key);
355
356 if let Err(e) = client
357 .delete_object()
358 .bucket(bucket.as_str())
359 .key(&key)
360 .send()
361 .await
362 {
363 tracing::error!("S3 delete failed {}: {}", &key, e);
364 }
365 }
366 }
367 });
368 }
369 });
370
371 tracing::info!(
372 "S3 storage initialized: bucket={}, prefix={}",
373 bucket,
374 prefix
375 );
376
377 Ok(Self {
378 local,
379 s3_client: Some(s3_client),
380 s3_bucket: Some(bucket),
381 s3_prefix: prefix,
382 sync_tx: Some(sync_tx),
383 })
384 }
385
386 pub fn put_sync(&self, hash: Hash, data: &[u8]) -> Result<bool, StoreError> {
388 let is_new = self.local.put_sync(hash, data)?;
390
391 #[cfg(feature = "s3")]
394 if let Some(ref tx) = self.sync_tx {
395 tracing::info!(
396 "Queueing S3 upload for {} ({} bytes, is_new={})",
397 crate::storage::to_hex(&hash)[..16].to_string(),
398 data.len(),
399 is_new
400 );
401 if let Err(e) = tx.send(S3SyncMessage::Upload {
402 hash,
403 data: data.to_vec(),
404 }) {
405 tracing::error!("Failed to queue S3 upload: {}", e);
406 }
407 }
408
409 Ok(is_new)
410 }
411
412 pub fn put_many_sync(&self, items: &[(Hash, Vec<u8>)]) -> Result<usize, StoreError> {
414 let inserted = self.local.put_many_sync(items)?;
415
416 #[cfg(feature = "s3")]
417 if let Some(ref tx) = self.sync_tx {
418 for (hash, data) in items {
419 if let Err(e) = tx.send(S3SyncMessage::Upload {
420 hash: *hash,
421 data: data.clone(),
422 }) {
423 tracing::error!("Failed to queue S3 upload: {}", e);
424 }
425 }
426 }
427
428 Ok(inserted)
429 }
430
431 pub fn get_sync(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
433 if let Some(data) = self.local.get_sync(hash)? {
435 return Ok(Some(data));
436 }
437
438 #[cfg(feature = "s3")]
440 if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
441 let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
442
443 match sync_block_on(async { client.get_object().bucket(bucket).key(&key).send().await })
444 {
445 Ok(output) => {
446 if let Ok(body) = sync_block_on(output.body.collect()) {
447 let data = body.into_bytes().to_vec();
448 let _ = self.local.put_sync(*hash, &data);
450 return Ok(Some(data));
451 }
452 }
453 Err(e) => {
454 let service_err = e.into_service_error();
455 if !service_err.is_no_such_key() {
456 tracing::warn!("S3 get failed: {}", service_err);
457 }
458 }
459 }
460 }
461
462 Ok(None)
463 }
464
465 pub fn exists(&self, hash: &Hash) -> Result<bool, StoreError> {
467 if self.local.exists(hash)? {
469 return Ok(true);
470 }
471
472 #[cfg(feature = "s3")]
474 if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
475 let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
476
477 match sync_block_on(async {
478 client.head_object().bucket(bucket).key(&key).send().await
479 }) {
480 Ok(_) => return Ok(true),
481 Err(e) => {
482 let service_err = e.into_service_error();
483 if !service_err.is_not_found() {
484 tracing::warn!("S3 head failed: {}", service_err);
485 }
486 }
487 }
488 }
489
490 Ok(false)
491 }
492
493 pub fn delete_sync(&self, hash: &Hash) -> Result<bool, StoreError> {
495 let deleted = self.local.delete_sync(hash)?;
496
497 #[cfg(feature = "s3")]
499 if let Some(ref tx) = self.sync_tx {
500 let _ = tx.send(S3SyncMessage::Delete { hash: *hash });
501 }
502
503 Ok(deleted)
504 }
505
506 pub fn delete_local_only(&self, hash: &Hash) -> Result<bool, StoreError> {
509 self.local.delete_sync(hash)
510 }
511
512 pub fn stats(&self) -> Result<LocalStoreStats, StoreError> {
514 self.local.stats()
515 }
516
517 pub fn list(&self) -> Result<Vec<Hash>, StoreError> {
519 self.local.list()
520 }
521
522 pub fn local_store(&self) -> Arc<LocalStore> {
524 Arc::clone(&self.local)
525 }
526}
527
528#[async_trait]
531impl Store for StorageRouter {
532 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
533 self.put_sync(hash, &data)
534 }
535
536 async fn put_many(&self, items: Vec<(Hash, Vec<u8>)>) -> Result<usize, StoreError> {
537 self.put_many_sync(&items)
538 }
539
540 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
541 self.get_sync(hash)
542 }
543
544 async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
545 self.exists(hash)
546 }
547
548 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
549 self.delete_sync(hash)
550 }
551}
552
553pub struct HashtreeStore {
554 env: heed::Env,
555 pins: Database<Bytes, Unit>,
557 blob_owners: Database<Bytes, Unit>,
559 pubkey_blobs: Database<Bytes, Bytes>,
561 tree_meta: Database<Bytes, Bytes>,
563 blob_trees: Database<Bytes, Unit>,
565 tree_refs: Database<Str, Bytes>,
567 cached_roots: Database<Str, Bytes>,
569 router: Arc<StorageRouter>,
571 max_size_bytes: u64,
573}
574
575impl HashtreeStore {
576 pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
578 let config = hashtree_config::Config::load_or_default();
579 let max_size_bytes = config
580 .storage
581 .max_size_gb
582 .saturating_mul(1024 * 1024 * 1024);
583 Self::with_options_and_backend(path, None, max_size_bytes, &config.storage.backend)
584 }
585
586 pub fn new_with_backend<P: AsRef<Path>>(
588 path: P,
589 backend: hashtree_config::StorageBackend,
590 max_size_bytes: u64,
591 ) -> Result<Self> {
592 Self::with_options_and_backend(path, None, max_size_bytes, &backend)
593 }
594
595 pub fn with_s3<P: AsRef<Path>>(path: P, s3_config: Option<&S3Config>) -> Result<Self> {
597 let config = hashtree_config::Config::load_or_default();
598 let max_size_bytes = config
599 .storage
600 .max_size_gb
601 .saturating_mul(1024 * 1024 * 1024);
602 Self::with_options_and_backend(path, s3_config, max_size_bytes, &config.storage.backend)
603 }
604
605 pub fn with_options<P: AsRef<Path>>(
611 path: P,
612 s3_config: Option<&S3Config>,
613 max_size_bytes: u64,
614 ) -> Result<Self> {
615 let config = hashtree_config::Config::load_or_default();
616 Self::with_options_and_backend(path, s3_config, max_size_bytes, &config.storage.backend)
617 }
618
619 fn with_options_and_backend<P: AsRef<Path>>(
620 path: P,
621 s3_config: Option<&S3Config>,
622 max_size_bytes: u64,
623 backend: &hashtree_config::StorageBackend,
624 ) -> Result<Self> {
625 let path = path.as_ref();
626 std::fs::create_dir_all(path)?;
627
628 let env = unsafe {
629 EnvOpenOptions::new()
630 .map_size(10 * 1024 * 1024 * 1024) .max_dbs(8) .max_readers(LMDB_MAX_READERS)
633 .open(path)?
634 };
635 let _ = env.clear_stale_readers();
636
637 let mut wtxn = env.write_txn()?;
638 let pins = env.create_database(&mut wtxn, Some("pins"))?;
639 let blob_owners = env.create_database(&mut wtxn, Some("blob_owners"))?;
640 let pubkey_blobs = env.create_database(&mut wtxn, Some("pubkey_blobs"))?;
641 let tree_meta = env.create_database(&mut wtxn, Some("tree_meta"))?;
642 let blob_trees = env.create_database(&mut wtxn, Some("blob_trees"))?;
643 let tree_refs = env.create_database(&mut wtxn, Some("tree_refs"))?;
644 let cached_roots = env.create_database(&mut wtxn, Some("cached_roots"))?;
645 wtxn.commit()?;
646
647 let local_store = Arc::new(match backend {
651 hashtree_config::StorageBackend::Fs => LocalStore::Fs(
652 FsBlobStore::new(path.join("blobs"))
653 .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
654 ),
655 #[cfg(feature = "lmdb")]
656 hashtree_config::StorageBackend::Lmdb => {
657 let requested_map_size = max_size_bytes.max(LMDB_BLOB_MIN_MAP_SIZE_BYTES);
658 let map_size = usize::try_from(requested_map_size)
659 .context("LMDB blob map size exceeds usize")?;
660 LocalStore::Lmdb(
661 LmdbBlobStore::with_map_size(path.join("blobs"), map_size)
662 .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
663 )
664 }
665 #[cfg(not(feature = "lmdb"))]
666 hashtree_config::StorageBackend::Lmdb => {
667 tracing::warn!(
668 "LMDB backend requested but lmdb feature not enabled, using filesystem storage"
669 );
670 LocalStore::Fs(
671 FsBlobStore::new(path.join("blobs"))
672 .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
673 )
674 }
675 });
676
677 #[cfg(feature = "s3")]
679 let router = Arc::new(if let Some(s3_cfg) = s3_config {
680 tracing::info!(
681 "Initializing S3 storage backend: bucket={}, endpoint={}",
682 s3_cfg.bucket,
683 s3_cfg.endpoint
684 );
685
686 sync_block_on(async { StorageRouter::with_s3(local_store, s3_cfg).await })?
687 } else {
688 StorageRouter::new(local_store)
689 });
690
691 #[cfg(not(feature = "s3"))]
692 let router = Arc::new({
693 if s3_config.is_some() {
694 tracing::warn!(
695 "S3 config provided but S3 feature not enabled. Using local storage only."
696 );
697 }
698 StorageRouter::new(local_store)
699 });
700
701 Ok(Self {
702 env,
703 pins,
704 blob_owners,
705 pubkey_blobs,
706 tree_meta,
707 blob_trees,
708 tree_refs,
709 cached_roots,
710 router,
711 max_size_bytes,
712 })
713 }
714
715 pub fn router(&self) -> &StorageRouter {
717 &self.router
718 }
719
720 pub fn store_arc(&self) -> Arc<StorageRouter> {
723 Arc::clone(&self.router)
724 }
725
726 pub fn upload_file<P: AsRef<Path>>(&self, file_path: P) -> Result<String> {
728 self.upload_file_internal(file_path, true)
729 }
730
731 pub fn upload_file_no_pin<P: AsRef<Path>>(&self, file_path: P) -> Result<String> {
733 self.upload_file_internal(file_path, false)
734 }
735
736 fn upload_file_internal<P: AsRef<Path>>(&self, file_path: P, pin: bool) -> Result<String> {
737 let file_path = file_path.as_ref();
738 let file = std::fs::File::open(file_path)
739 .with_context(|| format!("Failed to open file {}", file_path.display()))?;
740
741 let store = self.store_arc();
743 let tree = HashTree::new(HashTreeConfig::new(store).public());
744
745 let (cid, _size) = sync_block_on(async { tree.put_stream(AllowStdIo::new(file)).await })
746 .context("Failed to store file")?;
747
748 if pin {
750 let mut wtxn = self.env.write_txn()?;
751 self.pins.put(&mut wtxn, cid.hash.as_slice(), &())?;
752 wtxn.commit()?;
753 }
754
755 Ok(to_hex(&cid.hash))
756 }
757
758 pub fn upload_file_stream<R: Read, F>(
760 &self,
761 reader: R,
762 _file_name: impl Into<String>,
763 mut callback: F,
764 ) -> Result<String>
765 where
766 F: FnMut(&str),
767 {
768 let store = self.store_arc();
770 let tree = HashTree::new(HashTreeConfig::new(store).public());
771
772 let (cid, _size) = sync_block_on(async { tree.put_stream(AllowStdIo::new(reader)).await })
773 .context("Failed to store file")?;
774
775 let root_hex = to_hex(&cid.hash);
776 callback(&root_hex);
777
778 let mut wtxn = self.env.write_txn()?;
780 self.pins.put(&mut wtxn, cid.hash.as_slice(), &())?;
781 wtxn.commit()?;
782
783 Ok(root_hex)
784 }
785
786 pub fn upload_dir<P: AsRef<Path>>(&self, dir_path: P) -> Result<String> {
789 self.upload_dir_with_options(dir_path, true)
790 }
791
792 pub fn upload_dir_with_options<P: AsRef<Path>>(
794 &self,
795 dir_path: P,
796 respect_gitignore: bool,
797 ) -> Result<String> {
798 let dir_path = dir_path.as_ref();
799
800 let store = self.store_arc();
801 let tree = HashTree::new(HashTreeConfig::new(store).public());
802
803 let root_cid = sync_block_on(async {
804 self.upload_dir_recursive(&tree, dir_path, dir_path, respect_gitignore)
805 .await
806 })
807 .context("Failed to upload directory")?;
808
809 let root_hex = to_hex(&root_cid.hash);
810
811 let mut wtxn = self.env.write_txn()?;
812 self.pins.put(&mut wtxn, root_cid.hash.as_slice(), &())?;
813 wtxn.commit()?;
814
815 Ok(root_hex)
816 }
817
818 async fn upload_dir_recursive<S: Store>(
819 &self,
820 tree: &HashTree<S>,
821 _root_path: &Path,
822 current_path: &Path,
823 respect_gitignore: bool,
824 ) -> Result<Cid> {
825 use ignore::WalkBuilder;
826 use std::collections::HashMap;
827
828 let mut dir_contents: HashMap<String, Vec<(String, Cid)>> = HashMap::new();
830 dir_contents.insert(String::new(), Vec::new()); let walker = WalkBuilder::new(current_path)
833 .git_ignore(respect_gitignore)
834 .git_global(respect_gitignore)
835 .git_exclude(respect_gitignore)
836 .hidden(false)
837 .build();
838
839 for result in walker {
840 let entry = result?;
841 let path = entry.path();
842
843 if path == current_path {
845 continue;
846 }
847
848 let relative = path.strip_prefix(current_path).unwrap_or(path);
849
850 if path.is_file() {
851 let file = std::fs::File::open(path)
852 .with_context(|| format!("Failed to open file {}", path.display()))?;
853 let (cid, _size) = tree.put_stream(AllowStdIo::new(file)).await.map_err(|e| {
854 anyhow::anyhow!("Failed to upload file {}: {}", path.display(), e)
855 })?;
856
857 let parent = relative
859 .parent()
860 .map(|p| p.to_string_lossy().to_string())
861 .unwrap_or_default();
862 let name = relative
863 .file_name()
864 .map(|n| n.to_string_lossy().to_string())
865 .unwrap_or_default();
866
867 dir_contents.entry(parent).or_default().push((name, cid));
868 } else if path.is_dir() {
869 let dir_path = relative.to_string_lossy().to_string();
871 dir_contents.entry(dir_path).or_default();
872 }
873 }
874
875 self.build_directory_tree(tree, &mut dir_contents).await
877 }
878
879 async fn build_directory_tree<S: Store>(
880 &self,
881 tree: &HashTree<S>,
882 dir_contents: &mut std::collections::HashMap<String, Vec<(String, Cid)>>,
883 ) -> Result<Cid> {
884 let mut dirs: Vec<String> = dir_contents.keys().cloned().collect();
886 dirs.sort_by(|a, b| {
887 let depth_a = a.matches('/').count() + if a.is_empty() { 0 } else { 1 };
888 let depth_b = b.matches('/').count() + if b.is_empty() { 0 } else { 1 };
889 depth_b.cmp(&depth_a) });
891
892 let mut dir_cids: std::collections::HashMap<String, Cid> = std::collections::HashMap::new();
893
894 for dir_path in dirs {
895 let files = dir_contents.get(&dir_path).cloned().unwrap_or_default();
896
897 let mut entries: Vec<HashTreeDirEntry> = files
898 .into_iter()
899 .map(|(name, cid)| HashTreeDirEntry::from_cid(name, &cid))
900 .collect();
901
902 for (subdir_path, cid) in &dir_cids {
904 let parent = std::path::Path::new(subdir_path)
905 .parent()
906 .map(|p| p.to_string_lossy().to_string())
907 .unwrap_or_default();
908
909 if parent == dir_path {
910 let name = std::path::Path::new(subdir_path)
911 .file_name()
912 .map(|n| n.to_string_lossy().to_string())
913 .unwrap_or_default();
914 entries.push(HashTreeDirEntry::from_cid(name, cid));
915 }
916 }
917
918 let cid = tree
919 .put_directory(entries)
920 .await
921 .map_err(|e| anyhow::anyhow!("Failed to create directory node: {}", e))?;
922
923 dir_cids.insert(dir_path, cid);
924 }
925
926 dir_cids
928 .get("")
929 .cloned()
930 .ok_or_else(|| anyhow::anyhow!("No root directory"))
931 }
932
933 pub fn upload_file_encrypted<P: AsRef<Path>>(&self, file_path: P) -> Result<String> {
935 let file_path = file_path.as_ref();
936 let file = std::fs::File::open(file_path)
937 .with_context(|| format!("Failed to open file {}", file_path.display()))?;
938
939 let store = self.store_arc();
941 let tree = HashTree::new(HashTreeConfig::new(store));
942
943 let (cid, _size) = sync_block_on(async { tree.put_stream(AllowStdIo::new(file)).await })
944 .map_err(|e| anyhow::anyhow!("Failed to encrypt file: {}", e))?;
945
946 let cid_str = cid.to_string();
947
948 let mut wtxn = self.env.write_txn()?;
949 self.pins.put(&mut wtxn, cid.hash.as_slice(), &())?;
950 wtxn.commit()?;
951
952 Ok(cid_str)
953 }
954
955 pub fn upload_dir_encrypted<P: AsRef<Path>>(&self, dir_path: P) -> Result<String> {
958 self.upload_dir_encrypted_with_options(dir_path, true)
959 }
960
961 pub fn upload_dir_encrypted_with_options<P: AsRef<Path>>(
964 &self,
965 dir_path: P,
966 respect_gitignore: bool,
967 ) -> Result<String> {
968 let dir_path = dir_path.as_ref();
969 let store = self.store_arc();
970
971 let tree = HashTree::new(HashTreeConfig::new(store));
973
974 let root_cid = sync_block_on(async {
975 self.upload_dir_recursive(&tree, dir_path, dir_path, respect_gitignore)
976 .await
977 })
978 .context("Failed to upload encrypted directory")?;
979
980 let cid_str = root_cid.to_string(); let mut wtxn = self.env.write_txn()?;
983 self.pins.put(&mut wtxn, root_cid.hash.as_slice(), &())?;
985 wtxn.commit()?;
986
987 Ok(cid_str)
988 }
989
990 pub fn get_tree_node(&self, hash: &[u8; 32]) -> Result<Option<TreeNode>> {
992 let store = self.store_arc();
993 let tree = HashTree::new(HashTreeConfig::new(store).public());
994
995 sync_block_on(async {
996 tree.get_tree_node(hash)
997 .await
998 .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))
999 })
1000 }
1001
1002 pub fn put_blob(&self, data: &[u8]) -> Result<String> {
1004 let hash = sha256(data);
1005 self.router
1006 .put_sync(hash, data)
1007 .map_err(|e| anyhow::anyhow!("Failed to store blob: {}", e))?;
1008 Ok(to_hex(&hash))
1009 }
1010
1011 pub fn get_blob(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
1013 self.router
1014 .get_sync(hash)
1015 .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))
1016 }
1017
1018 pub fn blob_exists(&self, hash: &[u8; 32]) -> Result<bool> {
1020 self.router
1021 .exists(hash)
1022 .map_err(|e| anyhow::anyhow!("Failed to check blob: {}", e))
1023 }
1024
1025 fn blob_owner_key(sha256: &[u8; 32], pubkey: &[u8; 32]) -> [u8; 64] {
1031 let mut key = [0u8; 64];
1032 key[..32].copy_from_slice(sha256);
1033 key[32..].copy_from_slice(pubkey);
1034 key
1035 }
1036
1037 pub fn set_blob_owner(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<()> {
1040 let key = Self::blob_owner_key(sha256, pubkey);
1041 let mut wtxn = self.env.write_txn()?;
1042
1043 self.blob_owners.put(&mut wtxn, &key[..], &())?;
1045
1046 let sha256_hex = to_hex(sha256);
1048
1049 let mut blobs: Vec<BlobMetadata> = self
1051 .pubkey_blobs
1052 .get(&wtxn, pubkey)?
1053 .and_then(|b| serde_json::from_slice(b).ok())
1054 .unwrap_or_default();
1055
1056 if !blobs.iter().any(|b| b.sha256 == sha256_hex) {
1058 let now = SystemTime::now()
1059 .duration_since(UNIX_EPOCH)
1060 .unwrap()
1061 .as_secs();
1062
1063 let size = self
1065 .get_blob(sha256)?
1066 .map(|data| data.len() as u64)
1067 .unwrap_or(0);
1068
1069 blobs.push(BlobMetadata {
1070 sha256: sha256_hex,
1071 size,
1072 mime_type: "application/octet-stream".to_string(),
1073 uploaded: now,
1074 });
1075
1076 let blobs_json = serde_json::to_vec(&blobs)?;
1077 self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
1078 }
1079
1080 wtxn.commit()?;
1081 Ok(())
1082 }
1083
1084 pub fn is_blob_owner(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<bool> {
1086 let key = Self::blob_owner_key(sha256, pubkey);
1087 let rtxn = self.env.read_txn()?;
1088 Ok(self.blob_owners.get(&rtxn, &key[..])?.is_some())
1089 }
1090
1091 pub fn get_blob_owners(&self, sha256: &[u8; 32]) -> Result<Vec<[u8; 32]>> {
1093 let rtxn = self.env.read_txn()?;
1094
1095 let mut owners = Vec::new();
1096 for item in self.blob_owners.prefix_iter(&rtxn, &sha256[..])? {
1097 let (key, _) = item?;
1098 if key.len() == 64 {
1099 let mut pubkey = [0u8; 32];
1101 pubkey.copy_from_slice(&key[32..64]);
1102 owners.push(pubkey);
1103 }
1104 }
1105 Ok(owners)
1106 }
1107
1108 pub fn blob_has_owners(&self, sha256: &[u8; 32]) -> Result<bool> {
1110 let rtxn = self.env.read_txn()?;
1111
1112 for item in self.blob_owners.prefix_iter(&rtxn, &sha256[..])? {
1114 if item.is_ok() {
1115 return Ok(true);
1116 }
1117 }
1118 Ok(false)
1119 }
1120
1121 pub fn get_blob_owner(&self, sha256: &[u8; 32]) -> Result<Option<[u8; 32]>> {
1123 Ok(self.get_blob_owners(sha256)?.into_iter().next())
1124 }
1125
1126 pub fn delete_blossom_blob(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<bool> {
1130 let key = Self::blob_owner_key(sha256, pubkey);
1131 let mut wtxn = self.env.write_txn()?;
1132
1133 self.blob_owners.delete(&mut wtxn, &key[..])?;
1135
1136 let sha256_hex = to_hex(sha256);
1138
1139 if let Some(blobs_bytes) = self.pubkey_blobs.get(&wtxn, pubkey)? {
1141 if let Ok(mut blobs) = serde_json::from_slice::<Vec<BlobMetadata>>(blobs_bytes) {
1142 blobs.retain(|b| b.sha256 != sha256_hex);
1143 let blobs_json = serde_json::to_vec(&blobs)?;
1144 self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
1145 }
1146 }
1147
1148 let mut has_other_owners = false;
1150 for item in self.blob_owners.prefix_iter(&wtxn, &sha256[..])? {
1151 if item.is_ok() {
1152 has_other_owners = true;
1153 break;
1154 }
1155 }
1156
1157 if has_other_owners {
1158 wtxn.commit()?;
1159 tracing::debug!(
1160 "Removed {} from blob {} owners, other owners remain",
1161 &to_hex(pubkey)[..8],
1162 &sha256_hex[..8]
1163 );
1164 return Ok(false);
1165 }
1166
1167 tracing::info!(
1169 "All owners removed from blob {}, deleting",
1170 &sha256_hex[..8]
1171 );
1172
1173 let _ = self.router.delete_sync(sha256);
1175
1176 wtxn.commit()?;
1177 Ok(true)
1178 }
1179
1180 pub fn list_blobs_by_pubkey(
1182 &self,
1183 pubkey: &[u8; 32],
1184 ) -> Result<Vec<crate::server::blossom::BlobDescriptor>> {
1185 let rtxn = self.env.read_txn()?;
1186
1187 let blobs: Vec<BlobMetadata> = self
1188 .pubkey_blobs
1189 .get(&rtxn, pubkey)?
1190 .and_then(|b| serde_json::from_slice(b).ok())
1191 .unwrap_or_default();
1192
1193 Ok(blobs
1194 .into_iter()
1195 .map(|b| crate::server::blossom::BlobDescriptor {
1196 url: format!("/{}", b.sha256),
1197 sha256: b.sha256,
1198 size: b.size,
1199 mime_type: b.mime_type,
1200 uploaded: b.uploaded,
1201 })
1202 .collect())
1203 }
1204
1205 pub fn get_chunk(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
1207 self.router
1208 .get_sync(hash)
1209 .map_err(|e| anyhow::anyhow!("Failed to get chunk: {}", e))
1210 }
1211
1212 pub fn get_file(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
1215 let store = self.store_arc();
1216 let tree = HashTree::new(HashTreeConfig::new(store).public());
1217
1218 sync_block_on(async {
1219 tree.read_file(hash)
1220 .await
1221 .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
1222 })
1223 }
1224
1225 pub fn get_file_by_cid(&self, cid: &Cid) -> Result<Option<Vec<u8>>> {
1228 let store = self.store_arc();
1229 let tree = HashTree::new(HashTreeConfig::new(store).public());
1230
1231 sync_block_on(async {
1232 tree.get(cid, None)
1233 .await
1234 .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
1235 })
1236 }
1237
1238 fn ensure_cid_exists(&self, cid: &Cid) -> Result<()> {
1239 let exists = self
1240 .router
1241 .exists(&cid.hash)
1242 .map_err(|e| anyhow::anyhow!("Failed to check cid existence: {}", e))?;
1243 if !exists {
1244 anyhow::bail!("CID not found: {}", to_hex(&cid.hash));
1245 }
1246 Ok(())
1247 }
1248
1249 pub fn write_file_by_cid_to_writer<W: Write>(&self, cid: &Cid, writer: &mut W) -> Result<u64> {
1251 self.ensure_cid_exists(cid)?;
1252
1253 let store = self.store_arc();
1254 let tree = HashTree::new(HashTreeConfig::new(store).public());
1255 let mut total_bytes = 0u64;
1256 let mut streamed_any_chunk = false;
1257
1258 sync_block_on(async {
1259 let mut stream = tree.get_stream(cid);
1260 while let Some(chunk) = stream.next().await {
1261 streamed_any_chunk = true;
1262 let chunk =
1263 chunk.map_err(|e| anyhow::anyhow!("Failed to stream file chunk: {}", e))?;
1264 writer
1265 .write_all(&chunk)
1266 .map_err(|e| anyhow::anyhow!("Failed to write file chunk: {}", e))?;
1267 total_bytes += chunk.len() as u64;
1268 }
1269 Ok::<(), anyhow::Error>(())
1270 })?;
1271
1272 if !streamed_any_chunk {
1273 anyhow::bail!("CID not found: {}", to_hex(&cid.hash));
1274 }
1275
1276 writer
1277 .flush()
1278 .map_err(|e| anyhow::anyhow!("Failed to flush output: {}", e))?;
1279 Ok(total_bytes)
1280 }
1281
1282 pub fn write_file_by_cid<P: AsRef<Path>>(&self, cid: &Cid, output_path: P) -> Result<u64> {
1284 self.ensure_cid_exists(cid)?;
1285
1286 let output_path = output_path.as_ref();
1287 if let Some(parent) = output_path.parent() {
1288 if !parent.as_os_str().is_empty() {
1289 std::fs::create_dir_all(parent).with_context(|| {
1290 format!("Failed to create output directory {}", parent.display())
1291 })?;
1292 }
1293 }
1294
1295 let mut file = std::fs::File::create(output_path)
1296 .with_context(|| format!("Failed to create output file {}", output_path.display()))?;
1297 self.write_file_by_cid_to_writer(cid, &mut file)
1298 }
1299
1300 pub fn write_file<P: AsRef<Path>>(&self, hash: &[u8; 32], output_path: P) -> Result<u64> {
1302 self.write_file_by_cid(&Cid::public(*hash), output_path)
1303 }
1304
1305 pub fn resolve_path(&self, cid: &Cid, path: &str) -> Result<Option<Cid>> {
1307 let store = self.store_arc();
1308 let tree = HashTree::new(HashTreeConfig::new(store).public());
1309
1310 sync_block_on(async {
1311 tree.resolve_path(cid, path)
1312 .await
1313 .map_err(|e| anyhow::anyhow!("Failed to resolve path: {}", e))
1314 })
1315 }
1316
1317 pub fn get_file_chunk_metadata(&self, hash: &[u8; 32]) -> Result<Option<FileChunkMetadata>> {
1319 let store = self.store_arc();
1320 let tree = HashTree::new(HashTreeConfig::new(store.clone()).public());
1321
1322 sync_block_on(async {
1323 let exists = store
1326 .has(hash)
1327 .await
1328 .map_err(|e| anyhow::anyhow!("Failed to check existence: {}", e))?;
1329
1330 if !exists {
1331 return Ok(None);
1332 }
1333
1334 let total_size = tree
1336 .get_size(hash)
1337 .await
1338 .map_err(|e| anyhow::anyhow!("Failed to get size: {}", e))?;
1339
1340 let is_tree_node = tree
1342 .is_tree(hash)
1343 .await
1344 .map_err(|e| anyhow::anyhow!("Failed to check tree: {}", e))?;
1345
1346 if !is_tree_node {
1347 return Ok(Some(FileChunkMetadata {
1349 total_size,
1350 chunk_hashes: vec![],
1351 chunk_sizes: vec![],
1352 is_chunked: false,
1353 }));
1354 }
1355
1356 let node = match tree
1358 .get_tree_node(hash)
1359 .await
1360 .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
1361 {
1362 Some(n) => n,
1363 None => return Ok(None),
1364 };
1365
1366 let is_directory = tree
1368 .is_directory(hash)
1369 .await
1370 .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1371
1372 if is_directory {
1373 return Ok(None); }
1375
1376 let chunk_hashes: Vec<Hash> = node.links.iter().map(|l| l.hash).collect();
1378 let chunk_sizes: Vec<u64> = node.links.iter().map(|l| l.size).collect();
1379
1380 Ok(Some(FileChunkMetadata {
1381 total_size,
1382 chunk_hashes,
1383 chunk_sizes,
1384 is_chunked: !node.links.is_empty(),
1385 }))
1386 })
1387 }
1388
1389 pub fn get_file_range(
1391 &self,
1392 hash: &[u8; 32],
1393 start: u64,
1394 end: Option<u64>,
1395 ) -> Result<Option<(Vec<u8>, u64)>> {
1396 let metadata = match self.get_file_chunk_metadata(hash)? {
1397 Some(m) => m,
1398 None => return Ok(None),
1399 };
1400
1401 if metadata.total_size == 0 {
1402 return Ok(Some((Vec::new(), 0)));
1403 }
1404
1405 if start >= metadata.total_size {
1406 return Ok(None);
1407 }
1408
1409 let end = end
1410 .unwrap_or(metadata.total_size - 1)
1411 .min(metadata.total_size - 1);
1412
1413 if !metadata.is_chunked {
1415 let content = self.get_file(hash)?.unwrap_or_default();
1416 let range_content = if start < content.len() as u64 {
1417 content[start as usize..=(end as usize).min(content.len() - 1)].to_vec()
1418 } else {
1419 Vec::new()
1420 };
1421 return Ok(Some((range_content, metadata.total_size)));
1422 }
1423
1424 let mut result = Vec::new();
1426 let mut current_offset = 0u64;
1427
1428 for (i, chunk_hash) in metadata.chunk_hashes.iter().enumerate() {
1429 let chunk_size = metadata.chunk_sizes[i];
1430 let chunk_end = current_offset + chunk_size - 1;
1431
1432 if chunk_end >= start && current_offset <= end {
1434 let chunk_content = match self.get_chunk(chunk_hash)? {
1435 Some(content) => content,
1436 None => {
1437 return Err(anyhow::anyhow!("Chunk {} not found", to_hex(chunk_hash)));
1438 }
1439 };
1440
1441 let chunk_read_start = if current_offset >= start {
1442 0
1443 } else {
1444 (start - current_offset) as usize
1445 };
1446
1447 let chunk_read_end = if chunk_end <= end {
1448 chunk_size as usize - 1
1449 } else {
1450 (end - current_offset) as usize
1451 };
1452
1453 result.extend_from_slice(&chunk_content[chunk_read_start..=chunk_read_end]);
1454 }
1455
1456 current_offset += chunk_size;
1457
1458 if current_offset > end {
1459 break;
1460 }
1461 }
1462
1463 Ok(Some((result, metadata.total_size)))
1464 }
1465
1466 pub fn stream_file_range_chunks_owned(
1468 self: Arc<Self>,
1469 hash: &[u8; 32],
1470 start: u64,
1471 end: u64,
1472 ) -> Result<Option<FileRangeChunksOwned>> {
1473 let metadata = match self.get_file_chunk_metadata(hash)? {
1474 Some(m) => m,
1475 None => return Ok(None),
1476 };
1477
1478 if metadata.total_size == 0 || start >= metadata.total_size {
1479 return Ok(None);
1480 }
1481
1482 let end = end.min(metadata.total_size - 1);
1483
1484 Ok(Some(FileRangeChunksOwned {
1485 store: self,
1486 metadata,
1487 start,
1488 end,
1489 current_chunk_idx: 0,
1490 current_offset: 0,
1491 }))
1492 }
1493
1494 pub fn get_directory_listing(&self, hash: &[u8; 32]) -> Result<Option<DirectoryListing>> {
1496 let store = self.store_arc();
1497 let tree = HashTree::new(HashTreeConfig::new(store).public());
1498
1499 sync_block_on(async {
1500 let is_dir = tree
1502 .is_directory(hash)
1503 .await
1504 .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1505
1506 if !is_dir {
1507 return Ok(None);
1508 }
1509
1510 let cid = hashtree_core::Cid::public(*hash);
1512 let tree_entries = tree
1513 .list_directory(&cid)
1514 .await
1515 .map_err(|e| anyhow::anyhow!("Failed to list directory: {}", e))?;
1516
1517 let entries: Vec<DirEntry> = tree_entries
1518 .into_iter()
1519 .map(|e| DirEntry {
1520 name: e.name,
1521 cid: to_hex(&e.hash),
1522 is_directory: e.link_type.is_tree(),
1523 size: e.size,
1524 })
1525 .collect();
1526
1527 Ok(Some(DirectoryListing {
1528 dir_name: String::new(),
1529 entries,
1530 }))
1531 })
1532 }
1533
1534 pub fn get_directory_listing_by_cid(&self, cid: &Cid) -> Result<Option<DirectoryListing>> {
1536 let store = self.store_arc();
1537 let tree = HashTree::new(HashTreeConfig::new(store).public());
1538 let cid = cid.clone();
1539
1540 sync_block_on(async {
1541 let is_dir = tree
1542 .is_dir(&cid)
1543 .await
1544 .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1545
1546 if !is_dir {
1547 return Ok(None);
1548 }
1549
1550 let tree_entries = tree
1551 .list_directory(&cid)
1552 .await
1553 .map_err(|e| anyhow::anyhow!("Failed to list directory: {}", e))?;
1554
1555 let entries: Vec<DirEntry> = tree_entries
1556 .into_iter()
1557 .map(|e| DirEntry {
1558 name: e.name,
1559 cid: Cid {
1560 hash: e.hash,
1561 key: e.key,
1562 }
1563 .to_string(),
1564 is_directory: e.link_type.is_tree(),
1565 size: e.size,
1566 })
1567 .collect();
1568
1569 Ok(Some(DirectoryListing {
1570 dir_name: String::new(),
1571 entries,
1572 }))
1573 })
1574 }
1575
1576 pub fn pin(&self, hash: &[u8; 32]) -> Result<()> {
1578 let mut wtxn = self.env.write_txn()?;
1579 self.pins.put(&mut wtxn, hash.as_slice(), &())?;
1580 wtxn.commit()?;
1581 Ok(())
1582 }
1583
1584 pub fn unpin(&self, hash: &[u8; 32]) -> Result<()> {
1586 let mut wtxn = self.env.write_txn()?;
1587 self.pins.delete(&mut wtxn, hash.as_slice())?;
1588 wtxn.commit()?;
1589 Ok(())
1590 }
1591
1592 pub fn is_pinned(&self, hash: &[u8; 32]) -> Result<bool> {
1594 let rtxn = self.env.read_txn()?;
1595 Ok(self.pins.get(&rtxn, hash.as_slice())?.is_some())
1596 }
1597
1598 pub fn list_pins_raw(&self) -> Result<Vec<[u8; 32]>> {
1600 let rtxn = self.env.read_txn()?;
1601 let mut pins = Vec::new();
1602
1603 for item in self.pins.iter(&rtxn)? {
1604 let (hash_bytes, _) = item?;
1605 if hash_bytes.len() == 32 {
1606 let mut hash = [0u8; 32];
1607 hash.copy_from_slice(hash_bytes);
1608 pins.push(hash);
1609 }
1610 }
1611
1612 Ok(pins)
1613 }
1614
1615 pub fn list_pins_with_names(&self) -> Result<Vec<PinnedItem>> {
1617 let rtxn = self.env.read_txn()?;
1618 let store = self.store_arc();
1619 let tree = HashTree::new(HashTreeConfig::new(store).public());
1620 let mut pins = Vec::new();
1621
1622 for item in self.pins.iter(&rtxn)? {
1623 let (hash_bytes, _) = item?;
1624 if hash_bytes.len() != 32 {
1625 continue;
1626 }
1627 let mut hash = [0u8; 32];
1628 hash.copy_from_slice(hash_bytes);
1629
1630 let is_directory =
1632 sync_block_on(async { tree.is_directory(&hash).await.unwrap_or(false) });
1633
1634 pins.push(PinnedItem {
1635 cid: to_hex(&hash),
1636 name: "Unknown".to_string(),
1637 is_directory,
1638 });
1639 }
1640
1641 Ok(pins)
1642 }
1643
1644 pub fn index_tree(
1651 &self,
1652 root_hash: &Hash,
1653 owner: &str,
1654 name: Option<&str>,
1655 priority: u8,
1656 ref_key: Option<&str>,
1657 ) -> Result<()> {
1658 let root_hex = to_hex(root_hash);
1659
1660 if let Some(key) = ref_key {
1662 let rtxn = self.env.read_txn()?;
1663 if let Some(old_hash_bytes) = self.tree_refs.get(&rtxn, key)? {
1664 if old_hash_bytes != root_hash.as_slice() {
1665 let old_hash: Hash = old_hash_bytes
1666 .try_into()
1667 .map_err(|_| anyhow::anyhow!("Invalid hash in tree_refs"))?;
1668 drop(rtxn);
1669 let _ = self.unindex_tree(&old_hash);
1671 tracing::debug!("Replaced old tree for ref {}", key);
1672 }
1673 }
1674 }
1675
1676 let store = self.store_arc();
1677 let tree = HashTree::new(HashTreeConfig::new(store).public());
1678
1679 let (blob_hashes, total_size) =
1681 sync_block_on(async { self.collect_tree_blobs(&tree, root_hash).await })?;
1682
1683 let mut wtxn = self.env.write_txn()?;
1684
1685 for blob_hash in &blob_hashes {
1687 let mut key = [0u8; 64];
1688 key[..32].copy_from_slice(blob_hash);
1689 key[32..].copy_from_slice(root_hash);
1690 self.blob_trees.put(&mut wtxn, &key[..], &())?;
1691 }
1692
1693 let meta = TreeMeta {
1695 owner: owner.to_string(),
1696 name: name.map(|s| s.to_string()),
1697 synced_at: SystemTime::now()
1698 .duration_since(UNIX_EPOCH)
1699 .unwrap()
1700 .as_secs(),
1701 total_size,
1702 priority,
1703 };
1704 let meta_bytes = rmp_serde::to_vec(&meta)
1705 .map_err(|e| anyhow::anyhow!("Failed to serialize TreeMeta: {}", e))?;
1706 self.tree_meta
1707 .put(&mut wtxn, root_hash.as_slice(), &meta_bytes)?;
1708
1709 if let Some(key) = ref_key {
1711 self.tree_refs.put(&mut wtxn, key, root_hash.as_slice())?;
1712 }
1713
1714 wtxn.commit()?;
1715
1716 tracing::debug!(
1717 "Indexed tree {} ({} blobs, {} bytes, priority {})",
1718 &root_hex[..8],
1719 blob_hashes.len(),
1720 total_size,
1721 priority
1722 );
1723
1724 Ok(())
1725 }
1726
1727 async fn collect_tree_blobs<S: Store>(
1729 &self,
1730 tree: &HashTree<S>,
1731 root: &Hash,
1732 ) -> Result<(Vec<Hash>, u64)> {
1733 let mut blobs = Vec::new();
1734 let mut total_size = 0u64;
1735 let mut stack = vec![*root];
1736
1737 while let Some(hash) = stack.pop() {
1738 let is_tree = tree
1740 .is_tree(&hash)
1741 .await
1742 .map_err(|e| anyhow::anyhow!("Failed to check tree: {}", e))?;
1743
1744 if is_tree {
1745 if let Some(node) = tree
1747 .get_tree_node(&hash)
1748 .await
1749 .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
1750 {
1751 for link in &node.links {
1752 stack.push(link.hash);
1753 }
1754 }
1755 } else {
1756 if let Some(data) = self
1758 .router
1759 .get_sync(&hash)
1760 .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))?
1761 {
1762 total_size += data.len() as u64;
1763 blobs.push(hash);
1764 }
1765 }
1766 }
1767
1768 Ok((blobs, total_size))
1769 }
1770
1771 pub fn unindex_tree(&self, root_hash: &Hash) -> Result<u64> {
1774 let root_hex = to_hex(root_hash);
1775
1776 let store = self.store_arc();
1777 let tree = HashTree::new(HashTreeConfig::new(store).public());
1778
1779 let (blob_hashes, _) =
1781 sync_block_on(async { self.collect_tree_blobs(&tree, root_hash).await })?;
1782
1783 let mut wtxn = self.env.write_txn()?;
1784 let mut freed = 0u64;
1785
1786 for blob_hash in &blob_hashes {
1788 let mut key = [0u8; 64];
1790 key[..32].copy_from_slice(blob_hash);
1791 key[32..].copy_from_slice(root_hash);
1792 self.blob_trees.delete(&mut wtxn, &key[..])?;
1793
1794 let rtxn = self.env.read_txn()?;
1796 let mut has_other_tree = false;
1797
1798 for item in self.blob_trees.prefix_iter(&rtxn, &blob_hash[..])? {
1799 if item.is_ok() {
1800 has_other_tree = true;
1801 break;
1802 }
1803 }
1804 drop(rtxn);
1805
1806 if !has_other_tree {
1808 if let Some(data) = self
1809 .router
1810 .get_sync(blob_hash)
1811 .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))?
1812 {
1813 freed += data.len() as u64;
1814 self.router
1816 .delete_local_only(blob_hash)
1817 .map_err(|e| anyhow::anyhow!("Failed to delete blob: {}", e))?;
1818 }
1819 }
1820 }
1821
1822 if let Some(data) = self
1824 .router
1825 .get_sync(root_hash)
1826 .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
1827 {
1828 freed += data.len() as u64;
1829 self.router
1831 .delete_local_only(root_hash)
1832 .map_err(|e| anyhow::anyhow!("Failed to delete tree node: {}", e))?;
1833 }
1834
1835 self.tree_meta.delete(&mut wtxn, root_hash.as_slice())?;
1837
1838 wtxn.commit()?;
1839
1840 tracing::debug!("Unindexed tree {} ({} bytes freed)", &root_hex[..8], freed);
1841
1842 Ok(freed)
1843 }
1844
1845 pub fn get_tree_meta(&self, root_hash: &Hash) -> Result<Option<TreeMeta>> {
1847 let rtxn = self.env.read_txn()?;
1848 if let Some(bytes) = self.tree_meta.get(&rtxn, root_hash.as_slice())? {
1849 let meta: TreeMeta = rmp_serde::from_slice(bytes)
1850 .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
1851 Ok(Some(meta))
1852 } else {
1853 Ok(None)
1854 }
1855 }
1856
1857 pub fn list_indexed_trees(&self) -> Result<Vec<(Hash, TreeMeta)>> {
1859 let rtxn = self.env.read_txn()?;
1860 let mut trees = Vec::new();
1861
1862 for item in self.tree_meta.iter(&rtxn)? {
1863 let (hash_bytes, meta_bytes) = item?;
1864 let hash: Hash = hash_bytes
1865 .try_into()
1866 .map_err(|_| anyhow::anyhow!("Invalid hash in tree_meta"))?;
1867 let meta: TreeMeta = rmp_serde::from_slice(meta_bytes)
1868 .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
1869 trees.push((hash, meta));
1870 }
1871
1872 Ok(trees)
1873 }
1874
1875 pub fn tracked_size(&self) -> Result<u64> {
1877 let rtxn = self.env.read_txn()?;
1878 let mut total = 0u64;
1879
1880 for item in self.tree_meta.iter(&rtxn)? {
1881 let (_, bytes) = item?;
1882 let meta: TreeMeta = rmp_serde::from_slice(bytes)
1883 .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
1884 total += meta.total_size;
1885 }
1886
1887 Ok(total)
1888 }
1889
1890 fn get_evictable_trees(&self) -> Result<Vec<(Hash, TreeMeta)>> {
1892 let mut trees = self.list_indexed_trees()?;
1893
1894 trees.sort_by(|a, b| match a.1.priority.cmp(&b.1.priority) {
1896 std::cmp::Ordering::Equal => a.1.synced_at.cmp(&b.1.synced_at),
1897 other => other,
1898 });
1899
1900 Ok(trees)
1901 }
1902
1903 pub fn evict_if_needed(&self) -> Result<u64> {
1910 let stats = self
1912 .router
1913 .stats()
1914 .map_err(|e| anyhow::anyhow!("Failed to get stats: {}", e))?;
1915 let current = stats.total_bytes;
1916
1917 if current <= self.max_size_bytes {
1918 return Ok(0);
1919 }
1920
1921 let target = self.max_size_bytes * 90 / 100;
1923 let mut freed = 0u64;
1924 let mut current_size = current;
1925
1926 let orphan_freed = self.evict_orphaned_blobs()?;
1928 freed += orphan_freed;
1929 current_size = current_size.saturating_sub(orphan_freed);
1930
1931 if orphan_freed > 0 {
1932 tracing::info!("Evicted orphaned blobs: {} bytes freed", orphan_freed);
1933 }
1934
1935 if current_size <= target {
1937 if freed > 0 {
1938 tracing::info!("Eviction complete: {} bytes freed", freed);
1939 }
1940 return Ok(freed);
1941 }
1942
1943 let evictable = self.get_evictable_trees()?;
1946
1947 for (root_hash, meta) in evictable {
1948 if current_size <= target {
1949 break;
1950 }
1951
1952 let root_hex = to_hex(&root_hash);
1953
1954 if self.is_pinned(&root_hash)? {
1956 continue;
1957 }
1958
1959 let tree_freed = self.unindex_tree(&root_hash)?;
1960 freed += tree_freed;
1961 current_size = current_size.saturating_sub(tree_freed);
1962
1963 tracing::info!(
1964 "Evicted tree {} (owner={}, priority={}, {} bytes)",
1965 &root_hex[..8],
1966 &meta.owner[..8.min(meta.owner.len())],
1967 meta.priority,
1968 tree_freed
1969 );
1970 }
1971
1972 if freed > 0 {
1973 tracing::info!("Eviction complete: {} bytes freed", freed);
1974 }
1975
1976 Ok(freed)
1977 }
1978
1979 fn evict_orphaned_blobs(&self) -> Result<u64> {
1981 let mut freed = 0u64;
1982
1983 let all_hashes = self
1985 .router
1986 .list()
1987 .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
1988
1989 let rtxn = self.env.read_txn()?;
1991 let pinned: HashSet<Hash> = self
1992 .pins
1993 .iter(&rtxn)?
1994 .filter_map(|item| item.ok())
1995 .filter_map(|(hash_bytes, _)| {
1996 if hash_bytes.len() == 32 {
1997 let mut hash = [0u8; 32];
1998 hash.copy_from_slice(hash_bytes);
1999 Some(hash)
2000 } else {
2001 None
2002 }
2003 })
2004 .collect();
2005
2006 let mut blobs_in_trees: HashSet<Hash> = HashSet::new();
2009 for (key_bytes, _) in self.blob_trees.iter(&rtxn)?.flatten() {
2010 if key_bytes.len() >= 32 {
2011 let blob_hash: Hash = key_bytes[..32].try_into().unwrap();
2012 blobs_in_trees.insert(blob_hash);
2013 }
2014 }
2015 drop(rtxn);
2016
2017 for hash in all_hashes {
2019 if pinned.contains(&hash) {
2021 continue;
2022 }
2023
2024 if blobs_in_trees.contains(&hash) {
2026 continue;
2027 }
2028
2029 if let Ok(Some(data)) = self.router.get_sync(&hash) {
2031 freed += data.len() as u64;
2032 let _ = self.router.delete_local_only(&hash);
2033 tracing::debug!(
2034 "Deleted orphaned blob {} ({} bytes)",
2035 &to_hex(&hash)[..8],
2036 data.len()
2037 );
2038 }
2039 }
2040
2041 Ok(freed)
2042 }
2043
2044 pub fn max_size_bytes(&self) -> u64 {
2046 self.max_size_bytes
2047 }
2048
2049 pub fn storage_by_priority(&self) -> Result<StorageByPriority> {
2051 let rtxn = self.env.read_txn()?;
2052 let mut own = 0u64;
2053 let mut followed = 0u64;
2054 let mut other = 0u64;
2055
2056 for item in self.tree_meta.iter(&rtxn)? {
2057 let (_, bytes) = item?;
2058 let meta: TreeMeta = rmp_serde::from_slice(bytes)
2059 .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
2060
2061 if meta.priority == PRIORITY_OWN {
2062 own += meta.total_size;
2063 } else if meta.priority >= PRIORITY_FOLLOWED {
2064 followed += meta.total_size;
2065 } else {
2066 other += meta.total_size;
2067 }
2068 }
2069
2070 Ok(StorageByPriority {
2071 own,
2072 followed,
2073 other,
2074 })
2075 }
2076
2077 pub fn get_storage_stats(&self) -> Result<StorageStats> {
2079 let rtxn = self.env.read_txn()?;
2080 let total_pins = self.pins.len(&rtxn)? as usize;
2081
2082 let stats = self
2083 .router
2084 .stats()
2085 .map_err(|e| anyhow::anyhow!("Failed to get stats: {}", e))?;
2086
2087 Ok(StorageStats {
2088 total_dags: stats.count,
2089 pinned_dags: total_pins,
2090 total_bytes: stats.total_bytes,
2091 })
2092 }
2093
2094 pub fn get_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<Option<CachedRoot>> {
2098 let key = format!("{}/{}", pubkey_hex, tree_name);
2099 let rtxn = self.env.read_txn()?;
2100 if let Some(bytes) = self.cached_roots.get(&rtxn, &key)? {
2101 let root: CachedRoot = rmp_serde::from_slice(bytes)
2102 .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
2103 Ok(Some(root))
2104 } else {
2105 Ok(None)
2106 }
2107 }
2108
2109 pub fn set_cached_root(
2111 &self,
2112 pubkey_hex: &str,
2113 tree_name: &str,
2114 hash: &str,
2115 key: Option<&str>,
2116 visibility: &str,
2117 updated_at: u64,
2118 ) -> Result<()> {
2119 let db_key = format!("{}/{}", pubkey_hex, tree_name);
2120 let root = CachedRoot {
2121 hash: hash.to_string(),
2122 key: key.map(|k| k.to_string()),
2123 updated_at,
2124 visibility: visibility.to_string(),
2125 };
2126 let bytes = rmp_serde::to_vec(&root)
2127 .map_err(|e| anyhow::anyhow!("Failed to serialize CachedRoot: {}", e))?;
2128 let mut wtxn = self.env.write_txn()?;
2129 self.cached_roots.put(&mut wtxn, &db_key, &bytes)?;
2130 wtxn.commit()?;
2131 Ok(())
2132 }
2133
2134 pub fn list_cached_roots(&self, pubkey_hex: &str) -> Result<Vec<(String, CachedRoot)>> {
2136 let prefix = format!("{}/", pubkey_hex);
2137 let rtxn = self.env.read_txn()?;
2138 let mut results = Vec::new();
2139
2140 for item in self.cached_roots.iter(&rtxn)? {
2141 let (key, bytes) = item?;
2142 if key.starts_with(&prefix) {
2143 let tree_name = key.strip_prefix(&prefix).unwrap_or(key);
2144 let root: CachedRoot = rmp_serde::from_slice(bytes)
2145 .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
2146 results.push((tree_name.to_string(), root));
2147 }
2148 }
2149
2150 Ok(results)
2151 }
2152
2153 pub fn delete_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<bool> {
2155 let key = format!("{}/{}", pubkey_hex, tree_name);
2156 let mut wtxn = self.env.write_txn()?;
2157 let deleted = self.cached_roots.delete(&mut wtxn, &key)?;
2158 wtxn.commit()?;
2159 Ok(deleted)
2160 }
2161
2162 pub fn gc(&self) -> Result<GcStats> {
2164 let rtxn = self.env.read_txn()?;
2165
2166 let pinned: HashSet<Hash> = self
2168 .pins
2169 .iter(&rtxn)?
2170 .filter_map(|item| item.ok())
2171 .filter_map(|(hash_bytes, _)| {
2172 if hash_bytes.len() == 32 {
2173 let mut hash = [0u8; 32];
2174 hash.copy_from_slice(hash_bytes);
2175 Some(hash)
2176 } else {
2177 None
2178 }
2179 })
2180 .collect();
2181
2182 drop(rtxn);
2183
2184 let all_hashes = self
2186 .router
2187 .list()
2188 .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
2189
2190 let mut deleted = 0;
2192 let mut freed_bytes = 0u64;
2193
2194 for hash in all_hashes {
2195 if !pinned.contains(&hash) {
2196 if let Ok(Some(data)) = self.router.get_sync(&hash) {
2197 freed_bytes += data.len() as u64;
2198 let _ = self.router.delete_local_only(&hash);
2200 deleted += 1;
2201 }
2202 }
2203 }
2204
2205 Ok(GcStats {
2206 deleted_dags: deleted,
2207 freed_bytes,
2208 })
2209 }
2210
2211 pub fn verify_lmdb_integrity(&self, delete: bool) -> Result<VerifyResult> {
2214 let all_hashes = self
2215 .router
2216 .list()
2217 .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
2218
2219 let total = all_hashes.len();
2220 let mut valid = 0;
2221 let mut corrupted = 0;
2222 let mut deleted = 0;
2223 let mut corrupted_hashes = Vec::new();
2224
2225 for hash in &all_hashes {
2226 let hash_hex = to_hex(hash);
2227
2228 match self.router.get_sync(hash) {
2229 Ok(Some(data)) => {
2230 let actual_hash = sha256(&data);
2232
2233 if actual_hash == *hash {
2234 valid += 1;
2235 } else {
2236 corrupted += 1;
2237 let actual_hex = to_hex(&actual_hash);
2238 println!(
2239 " CORRUPTED: key={} actual={} size={}",
2240 &hash_hex[..16],
2241 &actual_hex[..16],
2242 data.len()
2243 );
2244 corrupted_hashes.push(*hash);
2245 }
2246 }
2247 Ok(None) => {
2248 corrupted += 1;
2250 println!(" MISSING: key={}", &hash_hex[..16]);
2251 corrupted_hashes.push(*hash);
2252 }
2253 Err(e) => {
2254 corrupted += 1;
2255 println!(" ERROR: key={} err={}", &hash_hex[..16], e);
2256 corrupted_hashes.push(*hash);
2257 }
2258 }
2259 }
2260
2261 if delete {
2263 for hash in &corrupted_hashes {
2264 match self.router.delete_sync(hash) {
2265 Ok(true) => deleted += 1,
2266 Ok(false) => {} Err(e) => {
2268 let hash_hex = to_hex(hash);
2269 println!(" Failed to delete {}: {}", &hash_hex[..16], e);
2270 }
2271 }
2272 }
2273 }
2274
2275 Ok(VerifyResult {
2276 total,
2277 valid,
2278 corrupted,
2279 deleted,
2280 })
2281 }
2282
2283 #[cfg(feature = "s3")]
2286 pub async fn verify_r2_integrity(&self, delete: bool) -> Result<VerifyResult> {
2287 use aws_sdk_s3::Client as S3Client;
2288
2289 let config = crate::config::Config::load()?;
2292 let s3_config = config
2293 .storage
2294 .s3
2295 .ok_or_else(|| anyhow::anyhow!("S3 not configured"))?;
2296
2297 let aws_config = aws_config::from_env()
2299 .region(aws_sdk_s3::config::Region::new(s3_config.region.clone()))
2300 .load()
2301 .await;
2302
2303 let s3_client = S3Client::from_conf(
2304 aws_sdk_s3::config::Builder::from(&aws_config)
2305 .endpoint_url(&s3_config.endpoint)
2306 .force_path_style(true)
2307 .build(),
2308 );
2309
2310 let bucket = &s3_config.bucket;
2311 let prefix = s3_config.prefix.as_deref().unwrap_or("");
2312
2313 let mut total = 0;
2314 let mut valid = 0;
2315 let mut corrupted = 0;
2316 let mut deleted = 0;
2317 let mut corrupted_keys = Vec::new();
2318
2319 let mut continuation_token: Option<String> = None;
2321
2322 loop {
2323 let mut list_req = s3_client.list_objects_v2().bucket(bucket).prefix(prefix);
2324
2325 if let Some(ref token) = continuation_token {
2326 list_req = list_req.continuation_token(token);
2327 }
2328
2329 let list_resp = list_req
2330 .send()
2331 .await
2332 .map_err(|e| anyhow::anyhow!("Failed to list S3 objects: {}", e))?;
2333
2334 for object in list_resp.contents() {
2335 let key = object.key().unwrap_or("");
2336
2337 if !key.ends_with(".bin") {
2339 continue;
2340 }
2341
2342 total += 1;
2343
2344 let filename = key.strip_prefix(prefix).unwrap_or(key);
2346 let expected_hash_hex = filename.strip_suffix(".bin").unwrap_or(filename);
2347
2348 if expected_hash_hex.len() != 64 {
2350 corrupted += 1;
2351 println!(" INVALID KEY: {}", key);
2352 corrupted_keys.push(key.to_string());
2353 continue;
2354 }
2355
2356 let expected_hash = match from_hex(expected_hash_hex) {
2357 Ok(h) => h,
2358 Err(_) => {
2359 corrupted += 1;
2360 println!(" INVALID HEX: {}", key);
2361 corrupted_keys.push(key.to_string());
2362 continue;
2363 }
2364 };
2365
2366 match s3_client.get_object().bucket(bucket).key(key).send().await {
2368 Ok(resp) => match resp.body.collect().await {
2369 Ok(bytes) => {
2370 let data = bytes.into_bytes();
2371 let actual_hash = sha256(&data);
2372
2373 if actual_hash == expected_hash {
2374 valid += 1;
2375 } else {
2376 corrupted += 1;
2377 let actual_hex = to_hex(&actual_hash);
2378 println!(
2379 " CORRUPTED: key={} actual={} size={}",
2380 &expected_hash_hex[..16],
2381 &actual_hex[..16],
2382 data.len()
2383 );
2384 corrupted_keys.push(key.to_string());
2385 }
2386 }
2387 Err(e) => {
2388 corrupted += 1;
2389 println!(" READ ERROR: {} - {}", key, e);
2390 corrupted_keys.push(key.to_string());
2391 }
2392 },
2393 Err(e) => {
2394 corrupted += 1;
2395 println!(" FETCH ERROR: {} - {}", key, e);
2396 corrupted_keys.push(key.to_string());
2397 }
2398 }
2399
2400 if total % 100 == 0 {
2402 println!(
2403 " Progress: {} objects checked, {} corrupted so far",
2404 total, corrupted
2405 );
2406 }
2407 }
2408
2409 if list_resp.is_truncated() == Some(true) {
2411 continuation_token = list_resp.next_continuation_token().map(|s| s.to_string());
2412 } else {
2413 break;
2414 }
2415 }
2416
2417 if delete {
2419 for key in &corrupted_keys {
2420 match s3_client
2421 .delete_object()
2422 .bucket(bucket)
2423 .key(key)
2424 .send()
2425 .await
2426 {
2427 Ok(_) => deleted += 1,
2428 Err(e) => {
2429 println!(" Failed to delete {}: {}", key, e);
2430 }
2431 }
2432 }
2433 }
2434
2435 Ok(VerifyResult {
2436 total,
2437 valid,
2438 corrupted,
2439 deleted,
2440 })
2441 }
2442
2443 #[cfg(not(feature = "s3"))]
2445 pub async fn verify_r2_integrity(&self, _delete: bool) -> Result<VerifyResult> {
2446 Err(anyhow::anyhow!("S3 feature not enabled"))
2447 }
2448}
2449
2450#[derive(Debug, Clone)]
2452pub struct VerifyResult {
2453 pub total: usize,
2454 pub valid: usize,
2455 pub corrupted: usize,
2456 pub deleted: usize,
2457}
2458
2459#[derive(Debug)]
2460pub struct StorageStats {
2461 pub total_dags: usize,
2462 pub pinned_dags: usize,
2463 pub total_bytes: u64,
2464}
2465
2466#[derive(Debug, Clone)]
2468pub struct StorageByPriority {
2469 pub own: u64,
2471 pub followed: u64,
2473 pub other: u64,
2475}
2476
2477#[derive(Debug, Clone)]
2478pub struct FileChunkMetadata {
2479 pub total_size: u64,
2480 pub chunk_hashes: Vec<Hash>,
2481 pub chunk_sizes: Vec<u64>,
2482 pub is_chunked: bool,
2483}
2484
2485pub struct FileRangeChunksOwned {
2487 store: Arc<HashtreeStore>,
2488 metadata: FileChunkMetadata,
2489 start: u64,
2490 end: u64,
2491 current_chunk_idx: usize,
2492 current_offset: u64,
2493}
2494
2495impl Iterator for FileRangeChunksOwned {
2496 type Item = Result<Vec<u8>>;
2497
2498 fn next(&mut self) -> Option<Self::Item> {
2499 if !self.metadata.is_chunked || self.current_chunk_idx >= self.metadata.chunk_hashes.len() {
2500 return None;
2501 }
2502
2503 if self.current_offset > self.end {
2504 return None;
2505 }
2506
2507 let chunk_hash = &self.metadata.chunk_hashes[self.current_chunk_idx];
2508 let chunk_size = self.metadata.chunk_sizes[self.current_chunk_idx];
2509 let chunk_end = self.current_offset + chunk_size - 1;
2510
2511 self.current_chunk_idx += 1;
2512
2513 if chunk_end < self.start || self.current_offset > self.end {
2514 self.current_offset += chunk_size;
2515 return self.next();
2516 }
2517
2518 let chunk_content = match self.store.get_chunk(chunk_hash) {
2519 Ok(Some(content)) => content,
2520 Ok(None) => {
2521 return Some(Err(anyhow::anyhow!(
2522 "Chunk {} not found",
2523 to_hex(chunk_hash)
2524 )));
2525 }
2526 Err(e) => {
2527 return Some(Err(e));
2528 }
2529 };
2530
2531 let chunk_read_start = if self.current_offset >= self.start {
2532 0
2533 } else {
2534 (self.start - self.current_offset) as usize
2535 };
2536
2537 let chunk_read_end = if chunk_end <= self.end {
2538 chunk_size as usize - 1
2539 } else {
2540 (self.end - self.current_offset) as usize
2541 };
2542
2543 let result = chunk_content[chunk_read_start..=chunk_read_end].to_vec();
2544 self.current_offset += chunk_size;
2545
2546 Some(Ok(result))
2547 }
2548}
2549
2550#[derive(Debug)]
2551pub struct GcStats {
2552 pub deleted_dags: usize,
2553 pub freed_bytes: u64,
2554}
2555
2556#[derive(Debug, Clone)]
2557pub struct DirEntry {
2558 pub name: String,
2559 pub cid: String,
2560 pub is_directory: bool,
2561 pub size: u64,
2562}
2563
2564#[derive(Debug, Clone)]
2565pub struct DirectoryListing {
2566 pub dir_name: String,
2567 pub entries: Vec<DirEntry>,
2568}
2569
2570#[derive(Debug, Clone)]
2571pub struct PinnedItem {
2572 pub cid: String,
2573 pub name: String,
2574 pub is_directory: bool,
2575}
2576
2577#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
2579pub struct BlobMetadata {
2580 pub sha256: String,
2581 pub size: u64,
2582 pub mime_type: String,
2583 pub uploaded: u64,
2584}
2585
2586impl crate::webrtc::ContentStore for HashtreeStore {
2588 fn get(&self, hash_hex: &str) -> Result<Option<Vec<u8>>> {
2589 let hash = from_hex(hash_hex).map_err(|e| anyhow::anyhow!("Invalid hash: {}", e))?;
2590 self.get_chunk(&hash)
2591 }
2592}
2593
2594#[cfg(test)]
2595mod tests {
2596 #[cfg(feature = "lmdb")]
2597 use super::*;
2598 #[cfg(feature = "lmdb")]
2599 use tempfile::TempDir;
2600
2601 #[cfg(feature = "lmdb")]
2602 #[test]
2603 fn hashtree_store_expands_blob_lmdb_map_size_to_storage_budget() -> Result<()> {
2604 let temp = TempDir::new()?;
2605 let requested = LMDB_BLOB_MIN_MAP_SIZE_BYTES + 64 * 1024 * 1024;
2606 let store = HashtreeStore::with_options_and_backend(
2607 temp.path(),
2608 None,
2609 requested,
2610 &StorageBackend::Lmdb,
2611 )?;
2612
2613 let map_size = match store.router.local.as_ref() {
2614 LocalStore::Lmdb(local) => local.map_size_bytes() as u64,
2615 LocalStore::Fs(_) => panic!("expected LMDB local store"),
2616 };
2617
2618 assert!(
2619 map_size >= requested,
2620 "expected blob LMDB map to grow to at least {requested} bytes, got {map_size}"
2621 );
2622
2623 drop(store);
2624 Ok(())
2625 }
2626
2627 #[cfg(feature = "lmdb")]
2628 #[test]
2629 fn local_store_can_override_lmdb_map_size() -> Result<()> {
2630 let temp = TempDir::new()?;
2631 let requested = 512 * 1024 * 1024u64;
2632 let store = LocalStore::new_with_lmdb_map_size(
2633 temp.path().join("lmdb-blobs"),
2634 &StorageBackend::Lmdb,
2635 Some(requested),
2636 )?;
2637
2638 let map_size = match store {
2639 LocalStore::Lmdb(local) => local.map_size_bytes() as u64,
2640 LocalStore::Fs(_) => panic!("expected LMDB local store"),
2641 };
2642
2643 assert!(
2644 map_size >= requested,
2645 "expected LMDB map to grow to at least {requested} bytes, got {map_size}"
2646 );
2647
2648 Ok(())
2649 }
2650}