1use anyhow::{Context, Result};
2use async_trait::async_trait;
3use futures::executor::block_on as sync_block_on;
4use futures::io::AllowStdIo;
5use futures::StreamExt;
6use hashtree_config::StorageBackend;
7use hashtree_core::store::{Store, StoreError};
8use hashtree_core::{
9 from_hex, sha256, to_hex, types::Hash, Cid, DirEntry as HashTreeDirEntry, HashTree,
10 HashTreeConfig, TreeNode,
11};
12use hashtree_fs::FsBlobStore;
13#[cfg(feature = "lmdb")]
14use hashtree_lmdb::LmdbBlobStore;
15use heed::types::*;
16use heed::{Database, EnvOpenOptions};
17use serde::{Deserialize, Serialize};
18use std::collections::HashSet;
19use std::io::{Read, Write};
20use std::path::Path;
21use std::sync::Arc;
22use std::time::{SystemTime, UNIX_EPOCH};
23
24pub const PRIORITY_OTHER: u8 = 64;
26pub const PRIORITY_FOLLOWED: u8 = 128;
27pub const PRIORITY_OWN: u8 = 255;
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct TreeMeta {
32 pub owner: String,
34 pub name: Option<String>,
36 pub synced_at: u64,
38 pub total_size: u64,
40 pub priority: u8,
42}
43
44#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct CachedRoot {
47 pub hash: String,
49 pub key: Option<String>,
51 pub updated_at: u64,
53 pub visibility: String,
55}
56
57#[derive(Debug, Clone)]
59pub struct LocalStoreStats {
60 pub count: usize,
61 pub total_bytes: u64,
62}
63
64pub enum LocalStore {
66 Fs(FsBlobStore),
67 #[cfg(feature = "lmdb")]
68 Lmdb(LmdbBlobStore),
69}
70
71impl LocalStore {
72 pub fn new<P: AsRef<Path>>(path: P, backend: &StorageBackend) -> Result<Self, StoreError> {
74 match backend {
75 StorageBackend::Fs => Ok(LocalStore::Fs(FsBlobStore::new(path)?)),
76 #[cfg(feature = "lmdb")]
77 StorageBackend::Lmdb => Ok(LocalStore::Lmdb(LmdbBlobStore::new(path)?)),
78 #[cfg(not(feature = "lmdb"))]
79 StorageBackend::Lmdb => {
80 tracing::warn!(
81 "LMDB backend requested but lmdb feature not enabled, using filesystem storage"
82 );
83 Ok(LocalStore::Fs(FsBlobStore::new(path)?))
84 }
85 }
86 }
87
88 pub fn put_sync(&self, hash: Hash, data: &[u8]) -> Result<bool, StoreError> {
90 match self {
91 LocalStore::Fs(store) => store.put_sync(hash, data),
92 #[cfg(feature = "lmdb")]
93 LocalStore::Lmdb(store) => store.put_sync(hash, data),
94 }
95 }
96
97 pub fn get_sync(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
99 match self {
100 LocalStore::Fs(store) => store.get_sync(hash),
101 #[cfg(feature = "lmdb")]
102 LocalStore::Lmdb(store) => store.get_sync(hash),
103 }
104 }
105
106 pub fn exists(&self, hash: &Hash) -> Result<bool, StoreError> {
108 match self {
109 LocalStore::Fs(store) => Ok(store.exists(hash)),
110 #[cfg(feature = "lmdb")]
111 LocalStore::Lmdb(store) => store.exists(hash),
112 }
113 }
114
115 pub fn delete_sync(&self, hash: &Hash) -> Result<bool, StoreError> {
117 match self {
118 LocalStore::Fs(store) => store.delete_sync(hash),
119 #[cfg(feature = "lmdb")]
120 LocalStore::Lmdb(store) => store.delete_sync(hash),
121 }
122 }
123
124 pub fn stats(&self) -> Result<LocalStoreStats, StoreError> {
126 match self {
127 LocalStore::Fs(store) => {
128 let stats = store.stats()?;
129 Ok(LocalStoreStats {
130 count: stats.count,
131 total_bytes: stats.total_bytes,
132 })
133 }
134 #[cfg(feature = "lmdb")]
135 LocalStore::Lmdb(store) => {
136 let stats = store.stats()?;
137 Ok(LocalStoreStats {
138 count: stats.count,
139 total_bytes: stats.total_bytes,
140 })
141 }
142 }
143 }
144
145 pub fn list(&self) -> Result<Vec<Hash>, StoreError> {
147 match self {
148 LocalStore::Fs(store) => store.list(),
149 #[cfg(feature = "lmdb")]
150 LocalStore::Lmdb(store) => store.list(),
151 }
152 }
153}
154
155#[async_trait]
156impl Store for LocalStore {
157 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
158 self.put_sync(hash, &data)
159 }
160
161 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
162 self.get_sync(hash)
163 }
164
165 async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
166 self.exists(hash)
167 }
168
169 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
170 self.delete_sync(hash)
171 }
172}
173
174#[cfg(feature = "s3")]
175use tokio::sync::mpsc;
176
177use crate::config::S3Config;
178
179#[cfg(feature = "s3")]
181enum S3SyncMessage {
182 Upload { hash: Hash, data: Vec<u8> },
183 Delete { hash: Hash },
184}
185
186pub struct StorageRouter {
191 local: Arc<LocalStore>,
193 #[cfg(feature = "s3")]
195 s3_client: Option<aws_sdk_s3::Client>,
196 #[cfg(feature = "s3")]
197 s3_bucket: Option<String>,
198 #[cfg(feature = "s3")]
199 s3_prefix: String,
200 #[cfg(feature = "s3")]
202 sync_tx: Option<mpsc::UnboundedSender<S3SyncMessage>>,
203}
204
205impl StorageRouter {
206 pub fn new(local: Arc<LocalStore>) -> Self {
208 Self {
209 local,
210 #[cfg(feature = "s3")]
211 s3_client: None,
212 #[cfg(feature = "s3")]
213 s3_bucket: None,
214 #[cfg(feature = "s3")]
215 s3_prefix: String::new(),
216 #[cfg(feature = "s3")]
217 sync_tx: None,
218 }
219 }
220
221 #[cfg(feature = "s3")]
223 pub async fn with_s3(local: Arc<LocalStore>, config: &S3Config) -> Result<Self, anyhow::Error> {
224 use aws_sdk_s3::Client as S3Client;
225
226 let mut aws_config_loader = aws_config::from_env();
228 aws_config_loader =
229 aws_config_loader.region(aws_sdk_s3::config::Region::new(config.region.clone()));
230 let aws_config = aws_config_loader.load().await;
231
232 let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&aws_config);
234 s3_config_builder = s3_config_builder
235 .endpoint_url(&config.endpoint)
236 .force_path_style(true);
237
238 let s3_client = S3Client::from_conf(s3_config_builder.build());
239 let bucket = config.bucket.clone();
240 let prefix = config.prefix.clone().unwrap_or_default();
241
242 let (sync_tx, mut sync_rx) = mpsc::unbounded_channel::<S3SyncMessage>();
244
245 let sync_client = s3_client.clone();
247 let sync_bucket = bucket.clone();
248 let sync_prefix = prefix.clone();
249
250 tokio::spawn(async move {
251 use aws_sdk_s3::primitives::ByteStream;
252
253 tracing::info!("S3 background sync task started");
254
255 let semaphore = std::sync::Arc::new(tokio::sync::Semaphore::new(32));
257 let client = std::sync::Arc::new(sync_client);
258 let bucket = std::sync::Arc::new(sync_bucket);
259 let prefix = std::sync::Arc::new(sync_prefix);
260
261 while let Some(msg) = sync_rx.recv().await {
262 let client = client.clone();
263 let bucket = bucket.clone();
264 let prefix = prefix.clone();
265 let semaphore = semaphore.clone();
266
267 tokio::spawn(async move {
269 let _permit = semaphore.acquire().await;
271
272 match msg {
273 S3SyncMessage::Upload { hash, data } => {
274 let key = format!("{}{}.bin", prefix, to_hex(&hash));
275 tracing::debug!("S3 uploading {} ({} bytes)", &key, data.len());
276
277 match client
278 .put_object()
279 .bucket(bucket.as_str())
280 .key(&key)
281 .body(ByteStream::from(data))
282 .send()
283 .await
284 {
285 Ok(_) => tracing::debug!("S3 upload succeeded: {}", &key),
286 Err(e) => tracing::error!("S3 upload failed {}: {}", &key, e),
287 }
288 }
289 S3SyncMessage::Delete { hash } => {
290 let key = format!("{}{}.bin", prefix, to_hex(&hash));
291 tracing::debug!("S3 deleting {}", &key);
292
293 if let Err(e) = client
294 .delete_object()
295 .bucket(bucket.as_str())
296 .key(&key)
297 .send()
298 .await
299 {
300 tracing::error!("S3 delete failed {}: {}", &key, e);
301 }
302 }
303 }
304 });
305 }
306 });
307
308 tracing::info!(
309 "S3 storage initialized: bucket={}, prefix={}",
310 bucket,
311 prefix
312 );
313
314 Ok(Self {
315 local,
316 s3_client: Some(s3_client),
317 s3_bucket: Some(bucket),
318 s3_prefix: prefix,
319 sync_tx: Some(sync_tx),
320 })
321 }
322
323 pub fn put_sync(&self, hash: Hash, data: &[u8]) -> Result<bool, StoreError> {
325 let is_new = self.local.put_sync(hash, data)?;
327
328 #[cfg(feature = "s3")]
331 if let Some(ref tx) = self.sync_tx {
332 tracing::info!(
333 "Queueing S3 upload for {} ({} bytes, is_new={})",
334 crate::storage::to_hex(&hash)[..16].to_string(),
335 data.len(),
336 is_new
337 );
338 if let Err(e) = tx.send(S3SyncMessage::Upload {
339 hash,
340 data: data.to_vec(),
341 }) {
342 tracing::error!("Failed to queue S3 upload: {}", e);
343 }
344 }
345
346 Ok(is_new)
347 }
348
349 pub fn get_sync(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
351 if let Some(data) = self.local.get_sync(hash)? {
353 return Ok(Some(data));
354 }
355
356 #[cfg(feature = "s3")]
358 if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
359 let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
360
361 match sync_block_on(async { client.get_object().bucket(bucket).key(&key).send().await })
362 {
363 Ok(output) => {
364 if let Ok(body) = sync_block_on(output.body.collect()) {
365 let data = body.into_bytes().to_vec();
366 let _ = self.local.put_sync(*hash, &data);
368 return Ok(Some(data));
369 }
370 }
371 Err(e) => {
372 let service_err = e.into_service_error();
373 if !service_err.is_no_such_key() {
374 tracing::warn!("S3 get failed: {}", service_err);
375 }
376 }
377 }
378 }
379
380 Ok(None)
381 }
382
383 pub fn exists(&self, hash: &Hash) -> Result<bool, StoreError> {
385 if self.local.exists(hash)? {
387 return Ok(true);
388 }
389
390 #[cfg(feature = "s3")]
392 if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
393 let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
394
395 match sync_block_on(async {
396 client.head_object().bucket(bucket).key(&key).send().await
397 }) {
398 Ok(_) => return Ok(true),
399 Err(e) => {
400 let service_err = e.into_service_error();
401 if !service_err.is_not_found() {
402 tracing::warn!("S3 head failed: {}", service_err);
403 }
404 }
405 }
406 }
407
408 Ok(false)
409 }
410
411 pub fn delete_sync(&self, hash: &Hash) -> Result<bool, StoreError> {
413 let deleted = self.local.delete_sync(hash)?;
414
415 #[cfg(feature = "s3")]
417 if let Some(ref tx) = self.sync_tx {
418 let _ = tx.send(S3SyncMessage::Delete { hash: *hash });
419 }
420
421 Ok(deleted)
422 }
423
424 pub fn delete_local_only(&self, hash: &Hash) -> Result<bool, StoreError> {
427 self.local.delete_sync(hash)
428 }
429
430 pub fn stats(&self) -> Result<LocalStoreStats, StoreError> {
432 self.local.stats()
433 }
434
435 pub fn list(&self) -> Result<Vec<Hash>, StoreError> {
437 self.local.list()
438 }
439
440 pub fn local_store(&self) -> Arc<LocalStore> {
442 Arc::clone(&self.local)
443 }
444}
445
446#[async_trait]
449impl Store for StorageRouter {
450 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
451 self.put_sync(hash, &data)
452 }
453
454 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
455 self.get_sync(hash)
456 }
457
458 async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
459 self.exists(hash)
460 }
461
462 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
463 self.delete_sync(hash)
464 }
465}
466
467pub struct HashtreeStore {
468 env: heed::Env,
469 pins: Database<Bytes, Unit>,
471 blob_owners: Database<Bytes, Unit>,
473 pubkey_blobs: Database<Bytes, Bytes>,
475 tree_meta: Database<Bytes, Bytes>,
477 blob_trees: Database<Bytes, Unit>,
479 tree_refs: Database<Str, Bytes>,
481 cached_roots: Database<Str, Bytes>,
483 router: Arc<StorageRouter>,
485 max_size_bytes: u64,
487}
488
489impl HashtreeStore {
490 pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
492 let config = hashtree_config::Config::load_or_default();
493 let max_size_bytes = config
494 .storage
495 .max_size_gb
496 .saturating_mul(1024 * 1024 * 1024);
497 Self::with_options_and_backend(path, None, max_size_bytes, &config.storage.backend)
498 }
499
500 pub fn new_with_backend<P: AsRef<Path>>(
502 path: P,
503 backend: hashtree_config::StorageBackend,
504 max_size_bytes: u64,
505 ) -> Result<Self> {
506 Self::with_options_and_backend(path, None, max_size_bytes, &backend)
507 }
508
509 pub fn with_s3<P: AsRef<Path>>(path: P, s3_config: Option<&S3Config>) -> Result<Self> {
511 let config = hashtree_config::Config::load_or_default();
512 let max_size_bytes = config
513 .storage
514 .max_size_gb
515 .saturating_mul(1024 * 1024 * 1024);
516 Self::with_options_and_backend(path, s3_config, max_size_bytes, &config.storage.backend)
517 }
518
519 pub fn with_options<P: AsRef<Path>>(
521 path: P,
522 s3_config: Option<&S3Config>,
523 max_size_bytes: u64,
524 ) -> Result<Self> {
525 let config = hashtree_config::Config::load_or_default();
526 Self::with_options_and_backend(path, s3_config, max_size_bytes, &config.storage.backend)
527 }
528
529 fn with_options_and_backend<P: AsRef<Path>>(
530 path: P,
531 s3_config: Option<&S3Config>,
532 max_size_bytes: u64,
533 backend: &hashtree_config::StorageBackend,
534 ) -> Result<Self> {
535 let path = path.as_ref();
536 std::fs::create_dir_all(path)?;
537
538 let env = unsafe {
539 EnvOpenOptions::new()
540 .map_size(10 * 1024 * 1024 * 1024) .max_dbs(8) .open(path)?
543 };
544
545 let mut wtxn = env.write_txn()?;
546 let pins = env.create_database(&mut wtxn, Some("pins"))?;
547 let blob_owners = env.create_database(&mut wtxn, Some("blob_owners"))?;
548 let pubkey_blobs = env.create_database(&mut wtxn, Some("pubkey_blobs"))?;
549 let tree_meta = env.create_database(&mut wtxn, Some("tree_meta"))?;
550 let blob_trees = env.create_database(&mut wtxn, Some("blob_trees"))?;
551 let tree_refs = env.create_database(&mut wtxn, Some("tree_refs"))?;
552 let cached_roots = env.create_database(&mut wtxn, Some("cached_roots"))?;
553 wtxn.commit()?;
554
555 let local_store = Arc::new(
557 LocalStore::new(path.join("blobs"), backend)
558 .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
559 );
560
561 #[cfg(feature = "s3")]
563 let router = Arc::new(if let Some(s3_cfg) = s3_config {
564 tracing::info!(
565 "Initializing S3 storage backend: bucket={}, endpoint={}",
566 s3_cfg.bucket,
567 s3_cfg.endpoint
568 );
569
570 sync_block_on(async { StorageRouter::with_s3(local_store, s3_cfg).await })?
571 } else {
572 StorageRouter::new(local_store)
573 });
574
575 #[cfg(not(feature = "s3"))]
576 let router = Arc::new({
577 if s3_config.is_some() {
578 tracing::warn!(
579 "S3 config provided but S3 feature not enabled. Using local storage only."
580 );
581 }
582 StorageRouter::new(local_store)
583 });
584
585 Ok(Self {
586 env,
587 pins,
588 blob_owners,
589 pubkey_blobs,
590 tree_meta,
591 blob_trees,
592 tree_refs,
593 cached_roots,
594 router,
595 max_size_bytes,
596 })
597 }
598
599 pub fn router(&self) -> &StorageRouter {
601 &self.router
602 }
603
604 pub fn store_arc(&self) -> Arc<StorageRouter> {
607 Arc::clone(&self.router)
608 }
609
610 pub fn upload_file<P: AsRef<Path>>(&self, file_path: P) -> Result<String> {
612 self.upload_file_internal(file_path, true)
613 }
614
615 pub fn upload_file_no_pin<P: AsRef<Path>>(&self, file_path: P) -> Result<String> {
617 self.upload_file_internal(file_path, false)
618 }
619
620 fn upload_file_internal<P: AsRef<Path>>(&self, file_path: P, pin: bool) -> Result<String> {
621 let file_path = file_path.as_ref();
622 let file = std::fs::File::open(file_path)
623 .with_context(|| format!("Failed to open file {}", file_path.display()))?;
624
625 let store = self.store_arc();
627 let tree = HashTree::new(HashTreeConfig::new(store).public());
628
629 let (cid, _size) = sync_block_on(async { tree.put_stream(AllowStdIo::new(file)).await })
630 .context("Failed to store file")?;
631
632 if pin {
634 let mut wtxn = self.env.write_txn()?;
635 self.pins.put(&mut wtxn, cid.hash.as_slice(), &())?;
636 wtxn.commit()?;
637 }
638
639 Ok(to_hex(&cid.hash))
640 }
641
642 pub fn upload_file_stream<R: Read, F>(
644 &self,
645 reader: R,
646 _file_name: impl Into<String>,
647 mut callback: F,
648 ) -> Result<String>
649 where
650 F: FnMut(&str),
651 {
652 let store = self.store_arc();
654 let tree = HashTree::new(HashTreeConfig::new(store).public());
655
656 let (cid, _size) = sync_block_on(async { tree.put_stream(AllowStdIo::new(reader)).await })
657 .context("Failed to store file")?;
658
659 let root_hex = to_hex(&cid.hash);
660 callback(&root_hex);
661
662 let mut wtxn = self.env.write_txn()?;
664 self.pins.put(&mut wtxn, cid.hash.as_slice(), &())?;
665 wtxn.commit()?;
666
667 Ok(root_hex)
668 }
669
670 pub fn upload_dir<P: AsRef<Path>>(&self, dir_path: P) -> Result<String> {
673 self.upload_dir_with_options(dir_path, true)
674 }
675
676 pub fn upload_dir_with_options<P: AsRef<Path>>(
678 &self,
679 dir_path: P,
680 respect_gitignore: bool,
681 ) -> Result<String> {
682 let dir_path = dir_path.as_ref();
683
684 let store = self.store_arc();
685 let tree = HashTree::new(HashTreeConfig::new(store).public());
686
687 let root_cid = sync_block_on(async {
688 self.upload_dir_recursive(&tree, dir_path, dir_path, respect_gitignore)
689 .await
690 })
691 .context("Failed to upload directory")?;
692
693 let root_hex = to_hex(&root_cid.hash);
694
695 let mut wtxn = self.env.write_txn()?;
696 self.pins.put(&mut wtxn, root_cid.hash.as_slice(), &())?;
697 wtxn.commit()?;
698
699 Ok(root_hex)
700 }
701
702 async fn upload_dir_recursive<S: Store>(
703 &self,
704 tree: &HashTree<S>,
705 _root_path: &Path,
706 current_path: &Path,
707 respect_gitignore: bool,
708 ) -> Result<Cid> {
709 use ignore::WalkBuilder;
710 use std::collections::HashMap;
711
712 let mut dir_contents: HashMap<String, Vec<(String, Cid)>> = HashMap::new();
714 dir_contents.insert(String::new(), Vec::new()); let walker = WalkBuilder::new(current_path)
717 .git_ignore(respect_gitignore)
718 .git_global(respect_gitignore)
719 .git_exclude(respect_gitignore)
720 .hidden(false)
721 .build();
722
723 for result in walker {
724 let entry = result?;
725 let path = entry.path();
726
727 if path == current_path {
729 continue;
730 }
731
732 let relative = path.strip_prefix(current_path).unwrap_or(path);
733
734 if path.is_file() {
735 let file = std::fs::File::open(path)
736 .with_context(|| format!("Failed to open file {}", path.display()))?;
737 let (cid, _size) = tree.put_stream(AllowStdIo::new(file)).await.map_err(|e| {
738 anyhow::anyhow!("Failed to upload file {}: {}", path.display(), e)
739 })?;
740
741 let parent = relative
743 .parent()
744 .map(|p| p.to_string_lossy().to_string())
745 .unwrap_or_default();
746 let name = relative
747 .file_name()
748 .map(|n| n.to_string_lossy().to_string())
749 .unwrap_or_default();
750
751 dir_contents.entry(parent).or_default().push((name, cid));
752 } else if path.is_dir() {
753 let dir_path = relative.to_string_lossy().to_string();
755 dir_contents.entry(dir_path).or_default();
756 }
757 }
758
759 self.build_directory_tree(tree, &mut dir_contents).await
761 }
762
763 async fn build_directory_tree<S: Store>(
764 &self,
765 tree: &HashTree<S>,
766 dir_contents: &mut std::collections::HashMap<String, Vec<(String, Cid)>>,
767 ) -> Result<Cid> {
768 let mut dirs: Vec<String> = dir_contents.keys().cloned().collect();
770 dirs.sort_by(|a, b| {
771 let depth_a = a.matches('/').count() + if a.is_empty() { 0 } else { 1 };
772 let depth_b = b.matches('/').count() + if b.is_empty() { 0 } else { 1 };
773 depth_b.cmp(&depth_a) });
775
776 let mut dir_cids: std::collections::HashMap<String, Cid> = std::collections::HashMap::new();
777
778 for dir_path in dirs {
779 let files = dir_contents.get(&dir_path).cloned().unwrap_or_default();
780
781 let mut entries: Vec<HashTreeDirEntry> = files
782 .into_iter()
783 .map(|(name, cid)| HashTreeDirEntry::from_cid(name, &cid))
784 .collect();
785
786 for (subdir_path, cid) in &dir_cids {
788 let parent = std::path::Path::new(subdir_path)
789 .parent()
790 .map(|p| p.to_string_lossy().to_string())
791 .unwrap_or_default();
792
793 if parent == dir_path {
794 let name = std::path::Path::new(subdir_path)
795 .file_name()
796 .map(|n| n.to_string_lossy().to_string())
797 .unwrap_or_default();
798 entries.push(HashTreeDirEntry::from_cid(name, cid));
799 }
800 }
801
802 let cid = tree
803 .put_directory(entries)
804 .await
805 .map_err(|e| anyhow::anyhow!("Failed to create directory node: {}", e))?;
806
807 dir_cids.insert(dir_path, cid);
808 }
809
810 dir_cids
812 .get("")
813 .cloned()
814 .ok_or_else(|| anyhow::anyhow!("No root directory"))
815 }
816
817 pub fn upload_file_encrypted<P: AsRef<Path>>(&self, file_path: P) -> Result<String> {
819 let file_path = file_path.as_ref();
820 let file = std::fs::File::open(file_path)
821 .with_context(|| format!("Failed to open file {}", file_path.display()))?;
822
823 let store = self.store_arc();
825 let tree = HashTree::new(HashTreeConfig::new(store));
826
827 let (cid, _size) = sync_block_on(async { tree.put_stream(AllowStdIo::new(file)).await })
828 .map_err(|e| anyhow::anyhow!("Failed to encrypt file: {}", e))?;
829
830 let cid_str = cid.to_string();
831
832 let mut wtxn = self.env.write_txn()?;
833 self.pins.put(&mut wtxn, cid.hash.as_slice(), &())?;
834 wtxn.commit()?;
835
836 Ok(cid_str)
837 }
838
839 pub fn upload_dir_encrypted<P: AsRef<Path>>(&self, dir_path: P) -> Result<String> {
842 self.upload_dir_encrypted_with_options(dir_path, true)
843 }
844
845 pub fn upload_dir_encrypted_with_options<P: AsRef<Path>>(
848 &self,
849 dir_path: P,
850 respect_gitignore: bool,
851 ) -> Result<String> {
852 let dir_path = dir_path.as_ref();
853 let store = self.store_arc();
854
855 let tree = HashTree::new(HashTreeConfig::new(store));
857
858 let root_cid = sync_block_on(async {
859 self.upload_dir_recursive(&tree, dir_path, dir_path, respect_gitignore)
860 .await
861 })
862 .context("Failed to upload encrypted directory")?;
863
864 let cid_str = root_cid.to_string(); let mut wtxn = self.env.write_txn()?;
867 self.pins.put(&mut wtxn, root_cid.hash.as_slice(), &())?;
869 wtxn.commit()?;
870
871 Ok(cid_str)
872 }
873
874 pub fn get_tree_node(&self, hash: &[u8; 32]) -> Result<Option<TreeNode>> {
876 let store = self.store_arc();
877 let tree = HashTree::new(HashTreeConfig::new(store).public());
878
879 sync_block_on(async {
880 tree.get_tree_node(hash)
881 .await
882 .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))
883 })
884 }
885
886 pub fn put_blob(&self, data: &[u8]) -> Result<String> {
888 let hash = sha256(data);
889 self.router
890 .put_sync(hash, data)
891 .map_err(|e| anyhow::anyhow!("Failed to store blob: {}", e))?;
892 Ok(to_hex(&hash))
893 }
894
895 pub fn get_blob(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
897 self.router
898 .get_sync(hash)
899 .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))
900 }
901
902 pub fn blob_exists(&self, hash: &[u8; 32]) -> Result<bool> {
904 self.router
905 .exists(hash)
906 .map_err(|e| anyhow::anyhow!("Failed to check blob: {}", e))
907 }
908
909 fn blob_owner_key(sha256: &[u8; 32], pubkey: &[u8; 32]) -> [u8; 64] {
915 let mut key = [0u8; 64];
916 key[..32].copy_from_slice(sha256);
917 key[32..].copy_from_slice(pubkey);
918 key
919 }
920
921 pub fn set_blob_owner(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<()> {
924 let key = Self::blob_owner_key(sha256, pubkey);
925 let mut wtxn = self.env.write_txn()?;
926
927 self.blob_owners.put(&mut wtxn, &key[..], &())?;
929
930 let sha256_hex = to_hex(sha256);
932
933 let mut blobs: Vec<BlobMetadata> = self
935 .pubkey_blobs
936 .get(&wtxn, pubkey)?
937 .and_then(|b| serde_json::from_slice(b).ok())
938 .unwrap_or_default();
939
940 if !blobs.iter().any(|b| b.sha256 == sha256_hex) {
942 let now = SystemTime::now()
943 .duration_since(UNIX_EPOCH)
944 .unwrap()
945 .as_secs();
946
947 let size = self
949 .get_blob(sha256)?
950 .map(|data| data.len() as u64)
951 .unwrap_or(0);
952
953 blobs.push(BlobMetadata {
954 sha256: sha256_hex,
955 size,
956 mime_type: "application/octet-stream".to_string(),
957 uploaded: now,
958 });
959
960 let blobs_json = serde_json::to_vec(&blobs)?;
961 self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
962 }
963
964 wtxn.commit()?;
965 Ok(())
966 }
967
968 pub fn is_blob_owner(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<bool> {
970 let key = Self::blob_owner_key(sha256, pubkey);
971 let rtxn = self.env.read_txn()?;
972 Ok(self.blob_owners.get(&rtxn, &key[..])?.is_some())
973 }
974
975 pub fn get_blob_owners(&self, sha256: &[u8; 32]) -> Result<Vec<[u8; 32]>> {
977 let rtxn = self.env.read_txn()?;
978
979 let mut owners = Vec::new();
980 for item in self.blob_owners.prefix_iter(&rtxn, &sha256[..])? {
981 let (key, _) = item?;
982 if key.len() == 64 {
983 let mut pubkey = [0u8; 32];
985 pubkey.copy_from_slice(&key[32..64]);
986 owners.push(pubkey);
987 }
988 }
989 Ok(owners)
990 }
991
992 pub fn blob_has_owners(&self, sha256: &[u8; 32]) -> Result<bool> {
994 let rtxn = self.env.read_txn()?;
995
996 for item in self.blob_owners.prefix_iter(&rtxn, &sha256[..])? {
998 if item.is_ok() {
999 return Ok(true);
1000 }
1001 }
1002 Ok(false)
1003 }
1004
1005 pub fn get_blob_owner(&self, sha256: &[u8; 32]) -> Result<Option<[u8; 32]>> {
1007 Ok(self.get_blob_owners(sha256)?.into_iter().next())
1008 }
1009
1010 pub fn delete_blossom_blob(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<bool> {
1014 let key = Self::blob_owner_key(sha256, pubkey);
1015 let mut wtxn = self.env.write_txn()?;
1016
1017 self.blob_owners.delete(&mut wtxn, &key[..])?;
1019
1020 let sha256_hex = to_hex(sha256);
1022
1023 if let Some(blobs_bytes) = self.pubkey_blobs.get(&wtxn, pubkey)? {
1025 if let Ok(mut blobs) = serde_json::from_slice::<Vec<BlobMetadata>>(blobs_bytes) {
1026 blobs.retain(|b| b.sha256 != sha256_hex);
1027 let blobs_json = serde_json::to_vec(&blobs)?;
1028 self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
1029 }
1030 }
1031
1032 let mut has_other_owners = false;
1034 for item in self.blob_owners.prefix_iter(&wtxn, &sha256[..])? {
1035 if item.is_ok() {
1036 has_other_owners = true;
1037 break;
1038 }
1039 }
1040
1041 if has_other_owners {
1042 wtxn.commit()?;
1043 tracing::debug!(
1044 "Removed {} from blob {} owners, other owners remain",
1045 &to_hex(pubkey)[..8],
1046 &sha256_hex[..8]
1047 );
1048 return Ok(false);
1049 }
1050
1051 tracing::info!(
1053 "All owners removed from blob {}, deleting",
1054 &sha256_hex[..8]
1055 );
1056
1057 let _ = self.router.delete_sync(sha256);
1059
1060 wtxn.commit()?;
1061 Ok(true)
1062 }
1063
1064 pub fn list_blobs_by_pubkey(
1066 &self,
1067 pubkey: &[u8; 32],
1068 ) -> Result<Vec<crate::server::blossom::BlobDescriptor>> {
1069 let rtxn = self.env.read_txn()?;
1070
1071 let blobs: Vec<BlobMetadata> = self
1072 .pubkey_blobs
1073 .get(&rtxn, pubkey)?
1074 .and_then(|b| serde_json::from_slice(b).ok())
1075 .unwrap_or_default();
1076
1077 Ok(blobs
1078 .into_iter()
1079 .map(|b| crate::server::blossom::BlobDescriptor {
1080 url: format!("/{}", b.sha256),
1081 sha256: b.sha256,
1082 size: b.size,
1083 mime_type: b.mime_type,
1084 uploaded: b.uploaded,
1085 })
1086 .collect())
1087 }
1088
1089 pub fn get_chunk(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
1091 self.router
1092 .get_sync(hash)
1093 .map_err(|e| anyhow::anyhow!("Failed to get chunk: {}", e))
1094 }
1095
1096 pub fn get_file(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
1099 let store = self.store_arc();
1100 let tree = HashTree::new(HashTreeConfig::new(store).public());
1101
1102 sync_block_on(async {
1103 tree.read_file(hash)
1104 .await
1105 .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
1106 })
1107 }
1108
1109 pub fn get_file_by_cid(&self, cid: &Cid) -> Result<Option<Vec<u8>>> {
1112 let store = self.store_arc();
1113 let tree = HashTree::new(HashTreeConfig::new(store).public());
1114
1115 sync_block_on(async {
1116 tree.get(cid, None)
1117 .await
1118 .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
1119 })
1120 }
1121
1122 fn ensure_cid_exists(&self, cid: &Cid) -> Result<()> {
1123 let exists = self
1124 .router
1125 .exists(&cid.hash)
1126 .map_err(|e| anyhow::anyhow!("Failed to check cid existence: {}", e))?;
1127 if !exists {
1128 anyhow::bail!("CID not found: {}", to_hex(&cid.hash));
1129 }
1130 Ok(())
1131 }
1132
1133 pub fn write_file_by_cid_to_writer<W: Write>(&self, cid: &Cid, writer: &mut W) -> Result<u64> {
1135 self.ensure_cid_exists(cid)?;
1136
1137 let store = self.store_arc();
1138 let tree = HashTree::new(HashTreeConfig::new(store).public());
1139 let mut total_bytes = 0u64;
1140 let mut streamed_any_chunk = false;
1141
1142 sync_block_on(async {
1143 let mut stream = tree.get_stream(cid);
1144 while let Some(chunk) = stream.next().await {
1145 streamed_any_chunk = true;
1146 let chunk =
1147 chunk.map_err(|e| anyhow::anyhow!("Failed to stream file chunk: {}", e))?;
1148 writer
1149 .write_all(&chunk)
1150 .map_err(|e| anyhow::anyhow!("Failed to write file chunk: {}", e))?;
1151 total_bytes += chunk.len() as u64;
1152 }
1153 Ok::<(), anyhow::Error>(())
1154 })?;
1155
1156 if !streamed_any_chunk {
1157 anyhow::bail!("CID not found: {}", to_hex(&cid.hash));
1158 }
1159
1160 writer
1161 .flush()
1162 .map_err(|e| anyhow::anyhow!("Failed to flush output: {}", e))?;
1163 Ok(total_bytes)
1164 }
1165
1166 pub fn write_file_by_cid<P: AsRef<Path>>(&self, cid: &Cid, output_path: P) -> Result<u64> {
1168 self.ensure_cid_exists(cid)?;
1169
1170 let output_path = output_path.as_ref();
1171 if let Some(parent) = output_path.parent() {
1172 if !parent.as_os_str().is_empty() {
1173 std::fs::create_dir_all(parent).with_context(|| {
1174 format!("Failed to create output directory {}", parent.display())
1175 })?;
1176 }
1177 }
1178
1179 let mut file = std::fs::File::create(output_path)
1180 .with_context(|| format!("Failed to create output file {}", output_path.display()))?;
1181 self.write_file_by_cid_to_writer(cid, &mut file)
1182 }
1183
1184 pub fn write_file<P: AsRef<Path>>(&self, hash: &[u8; 32], output_path: P) -> Result<u64> {
1186 self.write_file_by_cid(&Cid::public(*hash), output_path)
1187 }
1188
1189 pub fn resolve_path(&self, cid: &Cid, path: &str) -> Result<Option<Cid>> {
1191 let store = self.store_arc();
1192 let tree = HashTree::new(HashTreeConfig::new(store).public());
1193
1194 sync_block_on(async {
1195 tree.resolve_path(cid, path)
1196 .await
1197 .map_err(|e| anyhow::anyhow!("Failed to resolve path: {}", e))
1198 })
1199 }
1200
1201 pub fn get_file_chunk_metadata(&self, hash: &[u8; 32]) -> Result<Option<FileChunkMetadata>> {
1203 let store = self.store_arc();
1204 let tree = HashTree::new(HashTreeConfig::new(store.clone()).public());
1205
1206 sync_block_on(async {
1207 let exists = store
1210 .has(hash)
1211 .await
1212 .map_err(|e| anyhow::anyhow!("Failed to check existence: {}", e))?;
1213
1214 if !exists {
1215 return Ok(None);
1216 }
1217
1218 let total_size = tree
1220 .get_size(hash)
1221 .await
1222 .map_err(|e| anyhow::anyhow!("Failed to get size: {}", e))?;
1223
1224 let is_tree_node = tree
1226 .is_tree(hash)
1227 .await
1228 .map_err(|e| anyhow::anyhow!("Failed to check tree: {}", e))?;
1229
1230 if !is_tree_node {
1231 return Ok(Some(FileChunkMetadata {
1233 total_size,
1234 chunk_hashes: vec![],
1235 chunk_sizes: vec![],
1236 is_chunked: false,
1237 }));
1238 }
1239
1240 let node = match tree
1242 .get_tree_node(hash)
1243 .await
1244 .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
1245 {
1246 Some(n) => n,
1247 None => return Ok(None),
1248 };
1249
1250 let is_directory = tree
1252 .is_directory(hash)
1253 .await
1254 .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1255
1256 if is_directory {
1257 return Ok(None); }
1259
1260 let chunk_hashes: Vec<Hash> = node.links.iter().map(|l| l.hash).collect();
1262 let chunk_sizes: Vec<u64> = node.links.iter().map(|l| l.size).collect();
1263
1264 Ok(Some(FileChunkMetadata {
1265 total_size,
1266 chunk_hashes,
1267 chunk_sizes,
1268 is_chunked: !node.links.is_empty(),
1269 }))
1270 })
1271 }
1272
1273 pub fn get_file_range(
1275 &self,
1276 hash: &[u8; 32],
1277 start: u64,
1278 end: Option<u64>,
1279 ) -> Result<Option<(Vec<u8>, u64)>> {
1280 let metadata = match self.get_file_chunk_metadata(hash)? {
1281 Some(m) => m,
1282 None => return Ok(None),
1283 };
1284
1285 if metadata.total_size == 0 {
1286 return Ok(Some((Vec::new(), 0)));
1287 }
1288
1289 if start >= metadata.total_size {
1290 return Ok(None);
1291 }
1292
1293 let end = end
1294 .unwrap_or(metadata.total_size - 1)
1295 .min(metadata.total_size - 1);
1296
1297 if !metadata.is_chunked {
1299 let content = self.get_file(hash)?.unwrap_or_default();
1300 let range_content = if start < content.len() as u64 {
1301 content[start as usize..=(end as usize).min(content.len() - 1)].to_vec()
1302 } else {
1303 Vec::new()
1304 };
1305 return Ok(Some((range_content, metadata.total_size)));
1306 }
1307
1308 let mut result = Vec::new();
1310 let mut current_offset = 0u64;
1311
1312 for (i, chunk_hash) in metadata.chunk_hashes.iter().enumerate() {
1313 let chunk_size = metadata.chunk_sizes[i];
1314 let chunk_end = current_offset + chunk_size - 1;
1315
1316 if chunk_end >= start && current_offset <= end {
1318 let chunk_content = match self.get_chunk(chunk_hash)? {
1319 Some(content) => content,
1320 None => {
1321 return Err(anyhow::anyhow!("Chunk {} not found", to_hex(chunk_hash)));
1322 }
1323 };
1324
1325 let chunk_read_start = if current_offset >= start {
1326 0
1327 } else {
1328 (start - current_offset) as usize
1329 };
1330
1331 let chunk_read_end = if chunk_end <= end {
1332 chunk_size as usize - 1
1333 } else {
1334 (end - current_offset) as usize
1335 };
1336
1337 result.extend_from_slice(&chunk_content[chunk_read_start..=chunk_read_end]);
1338 }
1339
1340 current_offset += chunk_size;
1341
1342 if current_offset > end {
1343 break;
1344 }
1345 }
1346
1347 Ok(Some((result, metadata.total_size)))
1348 }
1349
1350 pub fn stream_file_range_chunks_owned(
1352 self: Arc<Self>,
1353 hash: &[u8; 32],
1354 start: u64,
1355 end: u64,
1356 ) -> Result<Option<FileRangeChunksOwned>> {
1357 let metadata = match self.get_file_chunk_metadata(hash)? {
1358 Some(m) => m,
1359 None => return Ok(None),
1360 };
1361
1362 if metadata.total_size == 0 || start >= metadata.total_size {
1363 return Ok(None);
1364 }
1365
1366 let end = end.min(metadata.total_size - 1);
1367
1368 Ok(Some(FileRangeChunksOwned {
1369 store: self,
1370 metadata,
1371 start,
1372 end,
1373 current_chunk_idx: 0,
1374 current_offset: 0,
1375 }))
1376 }
1377
1378 pub fn get_directory_listing(&self, hash: &[u8; 32]) -> Result<Option<DirectoryListing>> {
1380 let store = self.store_arc();
1381 let tree = HashTree::new(HashTreeConfig::new(store).public());
1382
1383 sync_block_on(async {
1384 let is_dir = tree
1386 .is_directory(hash)
1387 .await
1388 .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1389
1390 if !is_dir {
1391 return Ok(None);
1392 }
1393
1394 let cid = hashtree_core::Cid::public(*hash);
1396 let tree_entries = tree
1397 .list_directory(&cid)
1398 .await
1399 .map_err(|e| anyhow::anyhow!("Failed to list directory: {}", e))?;
1400
1401 let entries: Vec<DirEntry> = tree_entries
1402 .into_iter()
1403 .map(|e| DirEntry {
1404 name: e.name,
1405 cid: to_hex(&e.hash),
1406 is_directory: e.link_type.is_tree(),
1407 size: e.size,
1408 })
1409 .collect();
1410
1411 Ok(Some(DirectoryListing {
1412 dir_name: String::new(),
1413 entries,
1414 }))
1415 })
1416 }
1417
1418 pub fn get_directory_listing_by_cid(&self, cid: &Cid) -> Result<Option<DirectoryListing>> {
1420 let store = self.store_arc();
1421 let tree = HashTree::new(HashTreeConfig::new(store).public());
1422 let cid = cid.clone();
1423
1424 sync_block_on(async {
1425 let is_dir = tree
1426 .is_dir(&cid)
1427 .await
1428 .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1429
1430 if !is_dir {
1431 return Ok(None);
1432 }
1433
1434 let tree_entries = tree
1435 .list_directory(&cid)
1436 .await
1437 .map_err(|e| anyhow::anyhow!("Failed to list directory: {}", e))?;
1438
1439 let entries: Vec<DirEntry> = tree_entries
1440 .into_iter()
1441 .map(|e| DirEntry {
1442 name: e.name,
1443 cid: Cid {
1444 hash: e.hash,
1445 key: e.key,
1446 }
1447 .to_string(),
1448 is_directory: e.link_type.is_tree(),
1449 size: e.size,
1450 })
1451 .collect();
1452
1453 Ok(Some(DirectoryListing {
1454 dir_name: String::new(),
1455 entries,
1456 }))
1457 })
1458 }
1459
1460 pub fn pin(&self, hash: &[u8; 32]) -> Result<()> {
1462 let mut wtxn = self.env.write_txn()?;
1463 self.pins.put(&mut wtxn, hash.as_slice(), &())?;
1464 wtxn.commit()?;
1465 Ok(())
1466 }
1467
1468 pub fn unpin(&self, hash: &[u8; 32]) -> Result<()> {
1470 let mut wtxn = self.env.write_txn()?;
1471 self.pins.delete(&mut wtxn, hash.as_slice())?;
1472 wtxn.commit()?;
1473 Ok(())
1474 }
1475
1476 pub fn is_pinned(&self, hash: &[u8; 32]) -> Result<bool> {
1478 let rtxn = self.env.read_txn()?;
1479 Ok(self.pins.get(&rtxn, hash.as_slice())?.is_some())
1480 }
1481
1482 pub fn list_pins_raw(&self) -> Result<Vec<[u8; 32]>> {
1484 let rtxn = self.env.read_txn()?;
1485 let mut pins = Vec::new();
1486
1487 for item in self.pins.iter(&rtxn)? {
1488 let (hash_bytes, _) = item?;
1489 if hash_bytes.len() == 32 {
1490 let mut hash = [0u8; 32];
1491 hash.copy_from_slice(hash_bytes);
1492 pins.push(hash);
1493 }
1494 }
1495
1496 Ok(pins)
1497 }
1498
1499 pub fn list_pins_with_names(&self) -> Result<Vec<PinnedItem>> {
1501 let rtxn = self.env.read_txn()?;
1502 let store = self.store_arc();
1503 let tree = HashTree::new(HashTreeConfig::new(store).public());
1504 let mut pins = Vec::new();
1505
1506 for item in self.pins.iter(&rtxn)? {
1507 let (hash_bytes, _) = item?;
1508 if hash_bytes.len() != 32 {
1509 continue;
1510 }
1511 let mut hash = [0u8; 32];
1512 hash.copy_from_slice(hash_bytes);
1513
1514 let is_directory =
1516 sync_block_on(async { tree.is_directory(&hash).await.unwrap_or(false) });
1517
1518 pins.push(PinnedItem {
1519 cid: to_hex(&hash),
1520 name: "Unknown".to_string(),
1521 is_directory,
1522 });
1523 }
1524
1525 Ok(pins)
1526 }
1527
1528 pub fn index_tree(
1535 &self,
1536 root_hash: &Hash,
1537 owner: &str,
1538 name: Option<&str>,
1539 priority: u8,
1540 ref_key: Option<&str>,
1541 ) -> Result<()> {
1542 let root_hex = to_hex(root_hash);
1543
1544 if let Some(key) = ref_key {
1546 let rtxn = self.env.read_txn()?;
1547 if let Some(old_hash_bytes) = self.tree_refs.get(&rtxn, key)? {
1548 if old_hash_bytes != root_hash.as_slice() {
1549 let old_hash: Hash = old_hash_bytes
1550 .try_into()
1551 .map_err(|_| anyhow::anyhow!("Invalid hash in tree_refs"))?;
1552 drop(rtxn);
1553 let _ = self.unindex_tree(&old_hash);
1555 tracing::debug!("Replaced old tree for ref {}", key);
1556 }
1557 }
1558 }
1559
1560 let store = self.store_arc();
1561 let tree = HashTree::new(HashTreeConfig::new(store).public());
1562
1563 let (blob_hashes, total_size) =
1565 sync_block_on(async { self.collect_tree_blobs(&tree, root_hash).await })?;
1566
1567 let mut wtxn = self.env.write_txn()?;
1568
1569 for blob_hash in &blob_hashes {
1571 let mut key = [0u8; 64];
1572 key[..32].copy_from_slice(blob_hash);
1573 key[32..].copy_from_slice(root_hash);
1574 self.blob_trees.put(&mut wtxn, &key[..], &())?;
1575 }
1576
1577 let meta = TreeMeta {
1579 owner: owner.to_string(),
1580 name: name.map(|s| s.to_string()),
1581 synced_at: SystemTime::now()
1582 .duration_since(UNIX_EPOCH)
1583 .unwrap()
1584 .as_secs(),
1585 total_size,
1586 priority,
1587 };
1588 let meta_bytes = rmp_serde::to_vec(&meta)
1589 .map_err(|e| anyhow::anyhow!("Failed to serialize TreeMeta: {}", e))?;
1590 self.tree_meta
1591 .put(&mut wtxn, root_hash.as_slice(), &meta_bytes)?;
1592
1593 if let Some(key) = ref_key {
1595 self.tree_refs.put(&mut wtxn, key, root_hash.as_slice())?;
1596 }
1597
1598 wtxn.commit()?;
1599
1600 tracing::debug!(
1601 "Indexed tree {} ({} blobs, {} bytes, priority {})",
1602 &root_hex[..8],
1603 blob_hashes.len(),
1604 total_size,
1605 priority
1606 );
1607
1608 Ok(())
1609 }
1610
1611 async fn collect_tree_blobs<S: Store>(
1613 &self,
1614 tree: &HashTree<S>,
1615 root: &Hash,
1616 ) -> Result<(Vec<Hash>, u64)> {
1617 let mut blobs = Vec::new();
1618 let mut total_size = 0u64;
1619 let mut stack = vec![*root];
1620
1621 while let Some(hash) = stack.pop() {
1622 let is_tree = tree
1624 .is_tree(&hash)
1625 .await
1626 .map_err(|e| anyhow::anyhow!("Failed to check tree: {}", e))?;
1627
1628 if is_tree {
1629 if let Some(node) = tree
1631 .get_tree_node(&hash)
1632 .await
1633 .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
1634 {
1635 for link in &node.links {
1636 stack.push(link.hash);
1637 }
1638 }
1639 } else {
1640 if let Some(data) = self
1642 .router
1643 .get_sync(&hash)
1644 .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))?
1645 {
1646 total_size += data.len() as u64;
1647 blobs.push(hash);
1648 }
1649 }
1650 }
1651
1652 Ok((blobs, total_size))
1653 }
1654
1655 pub fn unindex_tree(&self, root_hash: &Hash) -> Result<u64> {
1658 let root_hex = to_hex(root_hash);
1659
1660 let store = self.store_arc();
1661 let tree = HashTree::new(HashTreeConfig::new(store).public());
1662
1663 let (blob_hashes, _) =
1665 sync_block_on(async { self.collect_tree_blobs(&tree, root_hash).await })?;
1666
1667 let mut wtxn = self.env.write_txn()?;
1668 let mut freed = 0u64;
1669
1670 for blob_hash in &blob_hashes {
1672 let mut key = [0u8; 64];
1674 key[..32].copy_from_slice(blob_hash);
1675 key[32..].copy_from_slice(root_hash);
1676 self.blob_trees.delete(&mut wtxn, &key[..])?;
1677
1678 let rtxn = self.env.read_txn()?;
1680 let mut has_other_tree = false;
1681
1682 for item in self.blob_trees.prefix_iter(&rtxn, &blob_hash[..])? {
1683 if item.is_ok() {
1684 has_other_tree = true;
1685 break;
1686 }
1687 }
1688 drop(rtxn);
1689
1690 if !has_other_tree {
1692 if let Some(data) = self
1693 .router
1694 .get_sync(blob_hash)
1695 .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))?
1696 {
1697 freed += data.len() as u64;
1698 self.router
1700 .delete_local_only(blob_hash)
1701 .map_err(|e| anyhow::anyhow!("Failed to delete blob: {}", e))?;
1702 }
1703 }
1704 }
1705
1706 if let Some(data) = self
1708 .router
1709 .get_sync(root_hash)
1710 .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
1711 {
1712 freed += data.len() as u64;
1713 self.router
1715 .delete_local_only(root_hash)
1716 .map_err(|e| anyhow::anyhow!("Failed to delete tree node: {}", e))?;
1717 }
1718
1719 self.tree_meta.delete(&mut wtxn, root_hash.as_slice())?;
1721
1722 wtxn.commit()?;
1723
1724 tracing::debug!("Unindexed tree {} ({} bytes freed)", &root_hex[..8], freed);
1725
1726 Ok(freed)
1727 }
1728
1729 pub fn get_tree_meta(&self, root_hash: &Hash) -> Result<Option<TreeMeta>> {
1731 let rtxn = self.env.read_txn()?;
1732 if let Some(bytes) = self.tree_meta.get(&rtxn, root_hash.as_slice())? {
1733 let meta: TreeMeta = rmp_serde::from_slice(bytes)
1734 .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
1735 Ok(Some(meta))
1736 } else {
1737 Ok(None)
1738 }
1739 }
1740
1741 pub fn list_indexed_trees(&self) -> Result<Vec<(Hash, TreeMeta)>> {
1743 let rtxn = self.env.read_txn()?;
1744 let mut trees = Vec::new();
1745
1746 for item in self.tree_meta.iter(&rtxn)? {
1747 let (hash_bytes, meta_bytes) = item?;
1748 let hash: Hash = hash_bytes
1749 .try_into()
1750 .map_err(|_| anyhow::anyhow!("Invalid hash in tree_meta"))?;
1751 let meta: TreeMeta = rmp_serde::from_slice(meta_bytes)
1752 .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
1753 trees.push((hash, meta));
1754 }
1755
1756 Ok(trees)
1757 }
1758
1759 pub fn tracked_size(&self) -> Result<u64> {
1761 let rtxn = self.env.read_txn()?;
1762 let mut total = 0u64;
1763
1764 for item in self.tree_meta.iter(&rtxn)? {
1765 let (_, bytes) = item?;
1766 let meta: TreeMeta = rmp_serde::from_slice(bytes)
1767 .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
1768 total += meta.total_size;
1769 }
1770
1771 Ok(total)
1772 }
1773
1774 fn get_evictable_trees(&self) -> Result<Vec<(Hash, TreeMeta)>> {
1776 let mut trees = self.list_indexed_trees()?;
1777
1778 trees.sort_by(|a, b| match a.1.priority.cmp(&b.1.priority) {
1780 std::cmp::Ordering::Equal => a.1.synced_at.cmp(&b.1.synced_at),
1781 other => other,
1782 });
1783
1784 Ok(trees)
1785 }
1786
1787 pub fn evict_if_needed(&self) -> Result<u64> {
1794 let stats = self
1796 .router
1797 .stats()
1798 .map_err(|e| anyhow::anyhow!("Failed to get stats: {}", e))?;
1799 let current = stats.total_bytes;
1800
1801 if current <= self.max_size_bytes {
1802 return Ok(0);
1803 }
1804
1805 let target = self.max_size_bytes * 90 / 100;
1807 let mut freed = 0u64;
1808 let mut current_size = current;
1809
1810 let orphan_freed = self.evict_orphaned_blobs()?;
1812 freed += orphan_freed;
1813 current_size = current_size.saturating_sub(orphan_freed);
1814
1815 if orphan_freed > 0 {
1816 tracing::info!("Evicted orphaned blobs: {} bytes freed", orphan_freed);
1817 }
1818
1819 if current_size <= target {
1821 if freed > 0 {
1822 tracing::info!("Eviction complete: {} bytes freed", freed);
1823 }
1824 return Ok(freed);
1825 }
1826
1827 let evictable = self.get_evictable_trees()?;
1830
1831 for (root_hash, meta) in evictable {
1832 if current_size <= target {
1833 break;
1834 }
1835
1836 let root_hex = to_hex(&root_hash);
1837
1838 if self.is_pinned(&root_hash)? {
1840 continue;
1841 }
1842
1843 let tree_freed = self.unindex_tree(&root_hash)?;
1844 freed += tree_freed;
1845 current_size = current_size.saturating_sub(tree_freed);
1846
1847 tracing::info!(
1848 "Evicted tree {} (owner={}, priority={}, {} bytes)",
1849 &root_hex[..8],
1850 &meta.owner[..8.min(meta.owner.len())],
1851 meta.priority,
1852 tree_freed
1853 );
1854 }
1855
1856 if freed > 0 {
1857 tracing::info!("Eviction complete: {} bytes freed", freed);
1858 }
1859
1860 Ok(freed)
1861 }
1862
1863 fn evict_orphaned_blobs(&self) -> Result<u64> {
1865 let mut freed = 0u64;
1866
1867 let all_hashes = self
1869 .router
1870 .list()
1871 .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
1872
1873 let rtxn = self.env.read_txn()?;
1875 let pinned: HashSet<Hash> = self
1876 .pins
1877 .iter(&rtxn)?
1878 .filter_map(|item| item.ok())
1879 .filter_map(|(hash_bytes, _)| {
1880 if hash_bytes.len() == 32 {
1881 let mut hash = [0u8; 32];
1882 hash.copy_from_slice(hash_bytes);
1883 Some(hash)
1884 } else {
1885 None
1886 }
1887 })
1888 .collect();
1889
1890 let mut blobs_in_trees: HashSet<Hash> = HashSet::new();
1893 for (key_bytes, _) in self.blob_trees.iter(&rtxn)?.flatten() {
1894 if key_bytes.len() >= 32 {
1895 let blob_hash: Hash = key_bytes[..32].try_into().unwrap();
1896 blobs_in_trees.insert(blob_hash);
1897 }
1898 }
1899 drop(rtxn);
1900
1901 for hash in all_hashes {
1903 if pinned.contains(&hash) {
1905 continue;
1906 }
1907
1908 if blobs_in_trees.contains(&hash) {
1910 continue;
1911 }
1912
1913 if let Ok(Some(data)) = self.router.get_sync(&hash) {
1915 freed += data.len() as u64;
1916 let _ = self.router.delete_local_only(&hash);
1917 tracing::debug!(
1918 "Deleted orphaned blob {} ({} bytes)",
1919 &to_hex(&hash)[..8],
1920 data.len()
1921 );
1922 }
1923 }
1924
1925 Ok(freed)
1926 }
1927
1928 pub fn max_size_bytes(&self) -> u64 {
1930 self.max_size_bytes
1931 }
1932
1933 pub fn storage_by_priority(&self) -> Result<StorageByPriority> {
1935 let rtxn = self.env.read_txn()?;
1936 let mut own = 0u64;
1937 let mut followed = 0u64;
1938 let mut other = 0u64;
1939
1940 for item in self.tree_meta.iter(&rtxn)? {
1941 let (_, bytes) = item?;
1942 let meta: TreeMeta = rmp_serde::from_slice(bytes)
1943 .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
1944
1945 if meta.priority == PRIORITY_OWN {
1946 own += meta.total_size;
1947 } else if meta.priority >= PRIORITY_FOLLOWED {
1948 followed += meta.total_size;
1949 } else {
1950 other += meta.total_size;
1951 }
1952 }
1953
1954 Ok(StorageByPriority {
1955 own,
1956 followed,
1957 other,
1958 })
1959 }
1960
1961 pub fn get_storage_stats(&self) -> Result<StorageStats> {
1963 let rtxn = self.env.read_txn()?;
1964 let total_pins = self.pins.len(&rtxn)? as usize;
1965
1966 let stats = self
1967 .router
1968 .stats()
1969 .map_err(|e| anyhow::anyhow!("Failed to get stats: {}", e))?;
1970
1971 Ok(StorageStats {
1972 total_dags: stats.count,
1973 pinned_dags: total_pins,
1974 total_bytes: stats.total_bytes,
1975 })
1976 }
1977
1978 pub fn get_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<Option<CachedRoot>> {
1982 let key = format!("{}/{}", pubkey_hex, tree_name);
1983 let rtxn = self.env.read_txn()?;
1984 if let Some(bytes) = self.cached_roots.get(&rtxn, &key)? {
1985 let root: CachedRoot = rmp_serde::from_slice(bytes)
1986 .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
1987 Ok(Some(root))
1988 } else {
1989 Ok(None)
1990 }
1991 }
1992
1993 pub fn set_cached_root(
1995 &self,
1996 pubkey_hex: &str,
1997 tree_name: &str,
1998 hash: &str,
1999 key: Option<&str>,
2000 visibility: &str,
2001 updated_at: u64,
2002 ) -> Result<()> {
2003 let db_key = format!("{}/{}", pubkey_hex, tree_name);
2004 let root = CachedRoot {
2005 hash: hash.to_string(),
2006 key: key.map(|k| k.to_string()),
2007 updated_at,
2008 visibility: visibility.to_string(),
2009 };
2010 let bytes = rmp_serde::to_vec(&root)
2011 .map_err(|e| anyhow::anyhow!("Failed to serialize CachedRoot: {}", e))?;
2012 let mut wtxn = self.env.write_txn()?;
2013 self.cached_roots.put(&mut wtxn, &db_key, &bytes)?;
2014 wtxn.commit()?;
2015 Ok(())
2016 }
2017
2018 pub fn list_cached_roots(&self, pubkey_hex: &str) -> Result<Vec<(String, CachedRoot)>> {
2020 let prefix = format!("{}/", pubkey_hex);
2021 let rtxn = self.env.read_txn()?;
2022 let mut results = Vec::new();
2023
2024 for item in self.cached_roots.iter(&rtxn)? {
2025 let (key, bytes) = item?;
2026 if key.starts_with(&prefix) {
2027 let tree_name = key.strip_prefix(&prefix).unwrap_or(key);
2028 let root: CachedRoot = rmp_serde::from_slice(bytes)
2029 .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
2030 results.push((tree_name.to_string(), root));
2031 }
2032 }
2033
2034 Ok(results)
2035 }
2036
2037 pub fn delete_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<bool> {
2039 let key = format!("{}/{}", pubkey_hex, tree_name);
2040 let mut wtxn = self.env.write_txn()?;
2041 let deleted = self.cached_roots.delete(&mut wtxn, &key)?;
2042 wtxn.commit()?;
2043 Ok(deleted)
2044 }
2045
2046 pub fn gc(&self) -> Result<GcStats> {
2048 let rtxn = self.env.read_txn()?;
2049
2050 let pinned: HashSet<Hash> = self
2052 .pins
2053 .iter(&rtxn)?
2054 .filter_map(|item| item.ok())
2055 .filter_map(|(hash_bytes, _)| {
2056 if hash_bytes.len() == 32 {
2057 let mut hash = [0u8; 32];
2058 hash.copy_from_slice(hash_bytes);
2059 Some(hash)
2060 } else {
2061 None
2062 }
2063 })
2064 .collect();
2065
2066 drop(rtxn);
2067
2068 let all_hashes = self
2070 .router
2071 .list()
2072 .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
2073
2074 let mut deleted = 0;
2076 let mut freed_bytes = 0u64;
2077
2078 for hash in all_hashes {
2079 if !pinned.contains(&hash) {
2080 if let Ok(Some(data)) = self.router.get_sync(&hash) {
2081 freed_bytes += data.len() as u64;
2082 let _ = self.router.delete_local_only(&hash);
2084 deleted += 1;
2085 }
2086 }
2087 }
2088
2089 Ok(GcStats {
2090 deleted_dags: deleted,
2091 freed_bytes,
2092 })
2093 }
2094
2095 pub fn verify_lmdb_integrity(&self, delete: bool) -> Result<VerifyResult> {
2098 let all_hashes = self
2099 .router
2100 .list()
2101 .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
2102
2103 let total = all_hashes.len();
2104 let mut valid = 0;
2105 let mut corrupted = 0;
2106 let mut deleted = 0;
2107 let mut corrupted_hashes = Vec::new();
2108
2109 for hash in &all_hashes {
2110 let hash_hex = to_hex(hash);
2111
2112 match self.router.get_sync(hash) {
2113 Ok(Some(data)) => {
2114 let actual_hash = sha256(&data);
2116
2117 if actual_hash == *hash {
2118 valid += 1;
2119 } else {
2120 corrupted += 1;
2121 let actual_hex = to_hex(&actual_hash);
2122 println!(
2123 " CORRUPTED: key={} actual={} size={}",
2124 &hash_hex[..16],
2125 &actual_hex[..16],
2126 data.len()
2127 );
2128 corrupted_hashes.push(*hash);
2129 }
2130 }
2131 Ok(None) => {
2132 corrupted += 1;
2134 println!(" MISSING: key={}", &hash_hex[..16]);
2135 corrupted_hashes.push(*hash);
2136 }
2137 Err(e) => {
2138 corrupted += 1;
2139 println!(" ERROR: key={} err={}", &hash_hex[..16], e);
2140 corrupted_hashes.push(*hash);
2141 }
2142 }
2143 }
2144
2145 if delete {
2147 for hash in &corrupted_hashes {
2148 match self.router.delete_sync(hash) {
2149 Ok(true) => deleted += 1,
2150 Ok(false) => {} Err(e) => {
2152 let hash_hex = to_hex(hash);
2153 println!(" Failed to delete {}: {}", &hash_hex[..16], e);
2154 }
2155 }
2156 }
2157 }
2158
2159 Ok(VerifyResult {
2160 total,
2161 valid,
2162 corrupted,
2163 deleted,
2164 })
2165 }
2166
2167 #[cfg(feature = "s3")]
2170 pub async fn verify_r2_integrity(&self, delete: bool) -> Result<VerifyResult> {
2171 use aws_sdk_s3::Client as S3Client;
2172
2173 let config = crate::config::Config::load()?;
2176 let s3_config = config
2177 .storage
2178 .s3
2179 .ok_or_else(|| anyhow::anyhow!("S3 not configured"))?;
2180
2181 let aws_config = aws_config::from_env()
2183 .region(aws_sdk_s3::config::Region::new(s3_config.region.clone()))
2184 .load()
2185 .await;
2186
2187 let s3_client = S3Client::from_conf(
2188 aws_sdk_s3::config::Builder::from(&aws_config)
2189 .endpoint_url(&s3_config.endpoint)
2190 .force_path_style(true)
2191 .build(),
2192 );
2193
2194 let bucket = &s3_config.bucket;
2195 let prefix = s3_config.prefix.as_deref().unwrap_or("");
2196
2197 let mut total = 0;
2198 let mut valid = 0;
2199 let mut corrupted = 0;
2200 let mut deleted = 0;
2201 let mut corrupted_keys = Vec::new();
2202
2203 let mut continuation_token: Option<String> = None;
2205
2206 loop {
2207 let mut list_req = s3_client.list_objects_v2().bucket(bucket).prefix(prefix);
2208
2209 if let Some(ref token) = continuation_token {
2210 list_req = list_req.continuation_token(token);
2211 }
2212
2213 let list_resp = list_req
2214 .send()
2215 .await
2216 .map_err(|e| anyhow::anyhow!("Failed to list S3 objects: {}", e))?;
2217
2218 for object in list_resp.contents() {
2219 let key = object.key().unwrap_or("");
2220
2221 if !key.ends_with(".bin") {
2223 continue;
2224 }
2225
2226 total += 1;
2227
2228 let filename = key.strip_prefix(prefix).unwrap_or(key);
2230 let expected_hash_hex = filename.strip_suffix(".bin").unwrap_or(filename);
2231
2232 if expected_hash_hex.len() != 64 {
2234 corrupted += 1;
2235 println!(" INVALID KEY: {}", key);
2236 corrupted_keys.push(key.to_string());
2237 continue;
2238 }
2239
2240 let expected_hash = match from_hex(expected_hash_hex) {
2241 Ok(h) => h,
2242 Err(_) => {
2243 corrupted += 1;
2244 println!(" INVALID HEX: {}", key);
2245 corrupted_keys.push(key.to_string());
2246 continue;
2247 }
2248 };
2249
2250 match s3_client.get_object().bucket(bucket).key(key).send().await {
2252 Ok(resp) => match resp.body.collect().await {
2253 Ok(bytes) => {
2254 let data = bytes.into_bytes();
2255 let actual_hash = sha256(&data);
2256
2257 if actual_hash == expected_hash {
2258 valid += 1;
2259 } else {
2260 corrupted += 1;
2261 let actual_hex = to_hex(&actual_hash);
2262 println!(
2263 " CORRUPTED: key={} actual={} size={}",
2264 &expected_hash_hex[..16],
2265 &actual_hex[..16],
2266 data.len()
2267 );
2268 corrupted_keys.push(key.to_string());
2269 }
2270 }
2271 Err(e) => {
2272 corrupted += 1;
2273 println!(" READ ERROR: {} - {}", key, e);
2274 corrupted_keys.push(key.to_string());
2275 }
2276 },
2277 Err(e) => {
2278 corrupted += 1;
2279 println!(" FETCH ERROR: {} - {}", key, e);
2280 corrupted_keys.push(key.to_string());
2281 }
2282 }
2283
2284 if total % 100 == 0 {
2286 println!(
2287 " Progress: {} objects checked, {} corrupted so far",
2288 total, corrupted
2289 );
2290 }
2291 }
2292
2293 if list_resp.is_truncated() == Some(true) {
2295 continuation_token = list_resp.next_continuation_token().map(|s| s.to_string());
2296 } else {
2297 break;
2298 }
2299 }
2300
2301 if delete {
2303 for key in &corrupted_keys {
2304 match s3_client
2305 .delete_object()
2306 .bucket(bucket)
2307 .key(key)
2308 .send()
2309 .await
2310 {
2311 Ok(_) => deleted += 1,
2312 Err(e) => {
2313 println!(" Failed to delete {}: {}", key, e);
2314 }
2315 }
2316 }
2317 }
2318
2319 Ok(VerifyResult {
2320 total,
2321 valid,
2322 corrupted,
2323 deleted,
2324 })
2325 }
2326
2327 #[cfg(not(feature = "s3"))]
2329 pub async fn verify_r2_integrity(&self, _delete: bool) -> Result<VerifyResult> {
2330 Err(anyhow::anyhow!("S3 feature not enabled"))
2331 }
2332}
2333
2334#[derive(Debug, Clone)]
2336pub struct VerifyResult {
2337 pub total: usize,
2338 pub valid: usize,
2339 pub corrupted: usize,
2340 pub deleted: usize,
2341}
2342
2343#[derive(Debug)]
2344pub struct StorageStats {
2345 pub total_dags: usize,
2346 pub pinned_dags: usize,
2347 pub total_bytes: u64,
2348}
2349
2350#[derive(Debug, Clone)]
2352pub struct StorageByPriority {
2353 pub own: u64,
2355 pub followed: u64,
2357 pub other: u64,
2359}
2360
2361#[derive(Debug, Clone)]
2362pub struct FileChunkMetadata {
2363 pub total_size: u64,
2364 pub chunk_hashes: Vec<Hash>,
2365 pub chunk_sizes: Vec<u64>,
2366 pub is_chunked: bool,
2367}
2368
2369pub struct FileRangeChunksOwned {
2371 store: Arc<HashtreeStore>,
2372 metadata: FileChunkMetadata,
2373 start: u64,
2374 end: u64,
2375 current_chunk_idx: usize,
2376 current_offset: u64,
2377}
2378
2379impl Iterator for FileRangeChunksOwned {
2380 type Item = Result<Vec<u8>>;
2381
2382 fn next(&mut self) -> Option<Self::Item> {
2383 if !self.metadata.is_chunked || self.current_chunk_idx >= self.metadata.chunk_hashes.len() {
2384 return None;
2385 }
2386
2387 if self.current_offset > self.end {
2388 return None;
2389 }
2390
2391 let chunk_hash = &self.metadata.chunk_hashes[self.current_chunk_idx];
2392 let chunk_size = self.metadata.chunk_sizes[self.current_chunk_idx];
2393 let chunk_end = self.current_offset + chunk_size - 1;
2394
2395 self.current_chunk_idx += 1;
2396
2397 if chunk_end < self.start || self.current_offset > self.end {
2398 self.current_offset += chunk_size;
2399 return self.next();
2400 }
2401
2402 let chunk_content = match self.store.get_chunk(chunk_hash) {
2403 Ok(Some(content)) => content,
2404 Ok(None) => {
2405 return Some(Err(anyhow::anyhow!(
2406 "Chunk {} not found",
2407 to_hex(chunk_hash)
2408 )));
2409 }
2410 Err(e) => {
2411 return Some(Err(e));
2412 }
2413 };
2414
2415 let chunk_read_start = if self.current_offset >= self.start {
2416 0
2417 } else {
2418 (self.start - self.current_offset) as usize
2419 };
2420
2421 let chunk_read_end = if chunk_end <= self.end {
2422 chunk_size as usize - 1
2423 } else {
2424 (self.end - self.current_offset) as usize
2425 };
2426
2427 let result = chunk_content[chunk_read_start..=chunk_read_end].to_vec();
2428 self.current_offset += chunk_size;
2429
2430 Some(Ok(result))
2431 }
2432}
2433
2434#[derive(Debug)]
2435pub struct GcStats {
2436 pub deleted_dags: usize,
2437 pub freed_bytes: u64,
2438}
2439
2440#[derive(Debug, Clone)]
2441pub struct DirEntry {
2442 pub name: String,
2443 pub cid: String,
2444 pub is_directory: bool,
2445 pub size: u64,
2446}
2447
2448#[derive(Debug, Clone)]
2449pub struct DirectoryListing {
2450 pub dir_name: String,
2451 pub entries: Vec<DirEntry>,
2452}
2453
2454#[derive(Debug, Clone)]
2455pub struct PinnedItem {
2456 pub cid: String,
2457 pub name: String,
2458 pub is_directory: bool,
2459}
2460
2461#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
2463pub struct BlobMetadata {
2464 pub sha256: String,
2465 pub size: u64,
2466 pub mime_type: String,
2467 pub uploaded: u64,
2468}
2469
2470impl crate::webrtc::ContentStore for HashtreeStore {
2472 fn get(&self, hash_hex: &str) -> Result<Option<Vec<u8>>> {
2473 let hash = from_hex(hash_hex).map_err(|e| anyhow::anyhow!("Invalid hash: {}", e))?;
2474 self.get_chunk(&hash)
2475 }
2476}