1use anyhow::{Context, Result};
2use async_trait::async_trait;
3use futures::executor::block_on as sync_block_on;
4use hashtree_config::StorageBackend;
5use hashtree_core::store::{Store, StoreError};
6use hashtree_core::{
7 from_hex, sha256, to_hex, types::Hash, Cid, DirEntry as HashTreeDirEntry, HashTree,
8 HashTreeConfig, TreeNode,
9};
10use hashtree_fs::FsBlobStore;
11#[cfg(feature = "lmdb")]
12use hashtree_lmdb::LmdbBlobStore;
13use heed::types::*;
14use heed::{Database, EnvOpenOptions};
15use serde::{Deserialize, Serialize};
16use std::collections::HashSet;
17use std::io::Read;
18use std::path::Path;
19use std::sync::Arc;
20use std::time::{SystemTime, UNIX_EPOCH};
21
22pub const PRIORITY_OTHER: u8 = 64;
24pub const PRIORITY_FOLLOWED: u8 = 128;
25pub const PRIORITY_OWN: u8 = 255;
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct TreeMeta {
30 pub owner: String,
32 pub name: Option<String>,
34 pub synced_at: u64,
36 pub total_size: u64,
38 pub priority: u8,
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct CachedRoot {
45 pub hash: String,
47 pub key: Option<String>,
49 pub updated_at: u64,
51 pub visibility: String,
53}
54
55#[derive(Debug, Clone)]
57pub struct LocalStoreStats {
58 pub count: usize,
59 pub total_bytes: u64,
60}
61
62pub enum LocalStore {
64 Fs(FsBlobStore),
65 #[cfg(feature = "lmdb")]
66 Lmdb(LmdbBlobStore),
67}
68
69impl LocalStore {
70 pub fn new<P: AsRef<Path>>(path: P, backend: &StorageBackend) -> Result<Self, StoreError> {
72 match backend {
73 StorageBackend::Fs => Ok(LocalStore::Fs(FsBlobStore::new(path)?)),
74 #[cfg(feature = "lmdb")]
75 StorageBackend::Lmdb => Ok(LocalStore::Lmdb(LmdbBlobStore::new(path)?)),
76 #[cfg(not(feature = "lmdb"))]
77 StorageBackend::Lmdb => {
78 tracing::warn!(
79 "LMDB backend requested but lmdb feature not enabled, using filesystem storage"
80 );
81 Ok(LocalStore::Fs(FsBlobStore::new(path)?))
82 }
83 }
84 }
85
86 pub fn put_sync(&self, hash: Hash, data: &[u8]) -> Result<bool, StoreError> {
88 match self {
89 LocalStore::Fs(store) => store.put_sync(hash, data),
90 #[cfg(feature = "lmdb")]
91 LocalStore::Lmdb(store) => store.put_sync(hash, data),
92 }
93 }
94
95 pub fn get_sync(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
97 match self {
98 LocalStore::Fs(store) => store.get_sync(hash),
99 #[cfg(feature = "lmdb")]
100 LocalStore::Lmdb(store) => store.get_sync(hash),
101 }
102 }
103
104 pub fn exists(&self, hash: &Hash) -> Result<bool, StoreError> {
106 match self {
107 LocalStore::Fs(store) => Ok(store.exists(hash)),
108 #[cfg(feature = "lmdb")]
109 LocalStore::Lmdb(store) => store.exists(hash),
110 }
111 }
112
113 pub fn delete_sync(&self, hash: &Hash) -> Result<bool, StoreError> {
115 match self {
116 LocalStore::Fs(store) => store.delete_sync(hash),
117 #[cfg(feature = "lmdb")]
118 LocalStore::Lmdb(store) => store.delete_sync(hash),
119 }
120 }
121
122 pub fn stats(&self) -> Result<LocalStoreStats, StoreError> {
124 match self {
125 LocalStore::Fs(store) => {
126 let stats = store.stats()?;
127 Ok(LocalStoreStats {
128 count: stats.count,
129 total_bytes: stats.total_bytes,
130 })
131 }
132 #[cfg(feature = "lmdb")]
133 LocalStore::Lmdb(store) => {
134 let stats = store.stats()?;
135 Ok(LocalStoreStats {
136 count: stats.count,
137 total_bytes: stats.total_bytes,
138 })
139 }
140 }
141 }
142
143 pub fn list(&self) -> Result<Vec<Hash>, StoreError> {
145 match self {
146 LocalStore::Fs(store) => store.list(),
147 #[cfg(feature = "lmdb")]
148 LocalStore::Lmdb(store) => store.list(),
149 }
150 }
151}
152
153#[async_trait]
154impl Store for LocalStore {
155 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
156 self.put_sync(hash, &data)
157 }
158
159 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
160 self.get_sync(hash)
161 }
162
163 async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
164 self.exists(hash)
165 }
166
167 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
168 self.delete_sync(hash)
169 }
170}
171
172#[cfg(feature = "s3")]
173use tokio::sync::mpsc;
174
175use crate::config::S3Config;
176
177#[cfg(feature = "s3")]
179enum S3SyncMessage {
180 Upload { hash: Hash, data: Vec<u8> },
181 Delete { hash: Hash },
182}
183
184pub struct StorageRouter {
189 local: Arc<LocalStore>,
191 #[cfg(feature = "s3")]
193 s3_client: Option<aws_sdk_s3::Client>,
194 #[cfg(feature = "s3")]
195 s3_bucket: Option<String>,
196 #[cfg(feature = "s3")]
197 s3_prefix: String,
198 #[cfg(feature = "s3")]
200 sync_tx: Option<mpsc::UnboundedSender<S3SyncMessage>>,
201}
202
203impl StorageRouter {
204 pub fn new(local: Arc<LocalStore>) -> Self {
206 Self {
207 local,
208 #[cfg(feature = "s3")]
209 s3_client: None,
210 #[cfg(feature = "s3")]
211 s3_bucket: None,
212 #[cfg(feature = "s3")]
213 s3_prefix: String::new(),
214 #[cfg(feature = "s3")]
215 sync_tx: None,
216 }
217 }
218
219 #[cfg(feature = "s3")]
221 pub async fn with_s3(local: Arc<LocalStore>, config: &S3Config) -> Result<Self, anyhow::Error> {
222 use aws_sdk_s3::Client as S3Client;
223
224 let mut aws_config_loader = aws_config::from_env();
226 aws_config_loader =
227 aws_config_loader.region(aws_sdk_s3::config::Region::new(config.region.clone()));
228 let aws_config = aws_config_loader.load().await;
229
230 let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&aws_config);
232 s3_config_builder = s3_config_builder
233 .endpoint_url(&config.endpoint)
234 .force_path_style(true);
235
236 let s3_client = S3Client::from_conf(s3_config_builder.build());
237 let bucket = config.bucket.clone();
238 let prefix = config.prefix.clone().unwrap_or_default();
239
240 let (sync_tx, mut sync_rx) = mpsc::unbounded_channel::<S3SyncMessage>();
242
243 let sync_client = s3_client.clone();
245 let sync_bucket = bucket.clone();
246 let sync_prefix = prefix.clone();
247
248 tokio::spawn(async move {
249 use aws_sdk_s3::primitives::ByteStream;
250
251 tracing::info!("S3 background sync task started");
252
253 let semaphore = std::sync::Arc::new(tokio::sync::Semaphore::new(32));
255 let client = std::sync::Arc::new(sync_client);
256 let bucket = std::sync::Arc::new(sync_bucket);
257 let prefix = std::sync::Arc::new(sync_prefix);
258
259 while let Some(msg) = sync_rx.recv().await {
260 let client = client.clone();
261 let bucket = bucket.clone();
262 let prefix = prefix.clone();
263 let semaphore = semaphore.clone();
264
265 tokio::spawn(async move {
267 let _permit = semaphore.acquire().await;
269
270 match msg {
271 S3SyncMessage::Upload { hash, data } => {
272 let key = format!("{}{}.bin", prefix, to_hex(&hash));
273 tracing::debug!("S3 uploading {} ({} bytes)", &key, data.len());
274
275 match client
276 .put_object()
277 .bucket(bucket.as_str())
278 .key(&key)
279 .body(ByteStream::from(data))
280 .send()
281 .await
282 {
283 Ok(_) => tracing::debug!("S3 upload succeeded: {}", &key),
284 Err(e) => tracing::error!("S3 upload failed {}: {}", &key, e),
285 }
286 }
287 S3SyncMessage::Delete { hash } => {
288 let key = format!("{}{}.bin", prefix, to_hex(&hash));
289 tracing::debug!("S3 deleting {}", &key);
290
291 if let Err(e) = client
292 .delete_object()
293 .bucket(bucket.as_str())
294 .key(&key)
295 .send()
296 .await
297 {
298 tracing::error!("S3 delete failed {}: {}", &key, e);
299 }
300 }
301 }
302 });
303 }
304 });
305
306 tracing::info!(
307 "S3 storage initialized: bucket={}, prefix={}",
308 bucket,
309 prefix
310 );
311
312 Ok(Self {
313 local,
314 s3_client: Some(s3_client),
315 s3_bucket: Some(bucket),
316 s3_prefix: prefix,
317 sync_tx: Some(sync_tx),
318 })
319 }
320
321 pub fn put_sync(&self, hash: Hash, data: &[u8]) -> Result<bool, StoreError> {
323 let is_new = self.local.put_sync(hash, data)?;
325
326 #[cfg(feature = "s3")]
329 if let Some(ref tx) = self.sync_tx {
330 tracing::info!(
331 "Queueing S3 upload for {} ({} bytes, is_new={})",
332 crate::storage::to_hex(&hash)[..16].to_string(),
333 data.len(),
334 is_new
335 );
336 if let Err(e) = tx.send(S3SyncMessage::Upload {
337 hash,
338 data: data.to_vec(),
339 }) {
340 tracing::error!("Failed to queue S3 upload: {}", e);
341 }
342 }
343
344 Ok(is_new)
345 }
346
347 pub fn get_sync(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
349 if let Some(data) = self.local.get_sync(hash)? {
351 return Ok(Some(data));
352 }
353
354 #[cfg(feature = "s3")]
356 if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
357 let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
358
359 match sync_block_on(async { client.get_object().bucket(bucket).key(&key).send().await })
360 {
361 Ok(output) => {
362 if let Ok(body) = sync_block_on(output.body.collect()) {
363 let data = body.into_bytes().to_vec();
364 let _ = self.local.put_sync(*hash, &data);
366 return Ok(Some(data));
367 }
368 }
369 Err(e) => {
370 let service_err = e.into_service_error();
371 if !service_err.is_no_such_key() {
372 tracing::warn!("S3 get failed: {}", service_err);
373 }
374 }
375 }
376 }
377
378 Ok(None)
379 }
380
381 pub fn exists(&self, hash: &Hash) -> Result<bool, StoreError> {
383 if self.local.exists(hash)? {
385 return Ok(true);
386 }
387
388 #[cfg(feature = "s3")]
390 if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
391 let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
392
393 match sync_block_on(async {
394 client.head_object().bucket(bucket).key(&key).send().await
395 }) {
396 Ok(_) => return Ok(true),
397 Err(e) => {
398 let service_err = e.into_service_error();
399 if !service_err.is_not_found() {
400 tracing::warn!("S3 head failed: {}", service_err);
401 }
402 }
403 }
404 }
405
406 Ok(false)
407 }
408
409 pub fn delete_sync(&self, hash: &Hash) -> Result<bool, StoreError> {
411 let deleted = self.local.delete_sync(hash)?;
412
413 #[cfg(feature = "s3")]
415 if let Some(ref tx) = self.sync_tx {
416 let _ = tx.send(S3SyncMessage::Delete { hash: *hash });
417 }
418
419 Ok(deleted)
420 }
421
422 pub fn delete_local_only(&self, hash: &Hash) -> Result<bool, StoreError> {
425 self.local.delete_sync(hash)
426 }
427
428 pub fn stats(&self) -> Result<LocalStoreStats, StoreError> {
430 self.local.stats()
431 }
432
433 pub fn list(&self) -> Result<Vec<Hash>, StoreError> {
435 self.local.list()
436 }
437
438 pub fn local_store(&self) -> Arc<LocalStore> {
440 Arc::clone(&self.local)
441 }
442}
443
444#[async_trait]
447impl Store for StorageRouter {
448 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
449 self.put_sync(hash, &data)
450 }
451
452 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
453 self.get_sync(hash)
454 }
455
456 async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
457 self.exists(hash)
458 }
459
460 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
461 self.delete_sync(hash)
462 }
463}
464
465pub struct HashtreeStore {
466 env: heed::Env,
467 pins: Database<Bytes, Unit>,
469 blob_owners: Database<Bytes, Unit>,
471 pubkey_blobs: Database<Bytes, Bytes>,
473 tree_meta: Database<Bytes, Bytes>,
475 blob_trees: Database<Bytes, Unit>,
477 tree_refs: Database<Str, Bytes>,
479 cached_roots: Database<Str, Bytes>,
481 router: Arc<StorageRouter>,
483 max_size_bytes: u64,
485}
486
487impl HashtreeStore {
488 pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
490 Self::with_options(path, None, 10 * 1024 * 1024 * 1024)
491 }
492
493 pub fn with_s3<P: AsRef<Path>>(path: P, s3_config: Option<&S3Config>) -> Result<Self> {
495 Self::with_options(path, s3_config, 10 * 1024 * 1024 * 1024)
496 }
497
498 pub fn with_options<P: AsRef<Path>>(
500 path: P,
501 s3_config: Option<&S3Config>,
502 max_size_bytes: u64,
503 ) -> Result<Self> {
504 let path = path.as_ref();
505 std::fs::create_dir_all(path)?;
506
507 let env = unsafe {
508 EnvOpenOptions::new()
509 .map_size(10 * 1024 * 1024 * 1024) .max_dbs(8) .open(path)?
512 };
513
514 let mut wtxn = env.write_txn()?;
515 let pins = env.create_database(&mut wtxn, Some("pins"))?;
516 let blob_owners = env.create_database(&mut wtxn, Some("blob_owners"))?;
517 let pubkey_blobs = env.create_database(&mut wtxn, Some("pubkey_blobs"))?;
518 let tree_meta = env.create_database(&mut wtxn, Some("tree_meta"))?;
519 let blob_trees = env.create_database(&mut wtxn, Some("blob_trees"))?;
520 let tree_refs = env.create_database(&mut wtxn, Some("tree_refs"))?;
521 let cached_roots = env.create_database(&mut wtxn, Some("cached_roots"))?;
522 wtxn.commit()?;
523
524 let config = hashtree_config::Config::load_or_default();
526 let backend = &config.storage.backend;
527
528 let local_store = Arc::new(
530 LocalStore::new(path.join("blobs"), backend)
531 .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?,
532 );
533
534 #[cfg(feature = "s3")]
536 let router = Arc::new(if let Some(s3_cfg) = s3_config {
537 tracing::info!(
538 "Initializing S3 storage backend: bucket={}, endpoint={}",
539 s3_cfg.bucket,
540 s3_cfg.endpoint
541 );
542
543 sync_block_on(async { StorageRouter::with_s3(local_store, s3_cfg).await })?
544 } else {
545 StorageRouter::new(local_store)
546 });
547
548 #[cfg(not(feature = "s3"))]
549 let router = Arc::new({
550 if s3_config.is_some() {
551 tracing::warn!(
552 "S3 config provided but S3 feature not enabled. Using local storage only."
553 );
554 }
555 StorageRouter::new(local_store)
556 });
557
558 Ok(Self {
559 env,
560 pins,
561 blob_owners,
562 pubkey_blobs,
563 tree_meta,
564 blob_trees,
565 tree_refs,
566 cached_roots,
567 router,
568 max_size_bytes,
569 })
570 }
571
572 pub fn router(&self) -> &StorageRouter {
574 &self.router
575 }
576
577 pub fn store_arc(&self) -> Arc<StorageRouter> {
580 Arc::clone(&self.router)
581 }
582
583 pub fn upload_file<P: AsRef<Path>>(&self, file_path: P) -> Result<String> {
585 self.upload_file_internal(file_path, true)
586 }
587
588 pub fn upload_file_no_pin<P: AsRef<Path>>(&self, file_path: P) -> Result<String> {
590 self.upload_file_internal(file_path, false)
591 }
592
593 fn upload_file_internal<P: AsRef<Path>>(&self, file_path: P, pin: bool) -> Result<String> {
594 let file_path = file_path.as_ref();
595 let file_content = std::fs::read(file_path)?;
596
597 let store = self.store_arc();
599 let tree = HashTree::new(HashTreeConfig::new(store).public());
600
601 let (cid, _size) = sync_block_on(async { tree.put(&file_content).await })
602 .context("Failed to store file")?;
603
604 if pin {
606 let mut wtxn = self.env.write_txn()?;
607 self.pins.put(&mut wtxn, cid.hash.as_slice(), &())?;
608 wtxn.commit()?;
609 }
610
611 Ok(to_hex(&cid.hash))
612 }
613
614 pub fn upload_file_stream<R: Read, F>(
616 &self,
617 mut reader: R,
618 _file_name: impl Into<String>,
619 mut callback: F,
620 ) -> Result<String>
621 where
622 F: FnMut(&str),
623 {
624 let mut data = Vec::new();
625 reader.read_to_end(&mut data)?;
626
627 let store = self.store_arc();
629 let tree = HashTree::new(HashTreeConfig::new(store).public());
630
631 let (cid, _size) =
632 sync_block_on(async { tree.put(&data).await }).context("Failed to store file")?;
633
634 let root_hex = to_hex(&cid.hash);
635 callback(&root_hex);
636
637 let mut wtxn = self.env.write_txn()?;
639 self.pins.put(&mut wtxn, cid.hash.as_slice(), &())?;
640 wtxn.commit()?;
641
642 Ok(root_hex)
643 }
644
645 pub fn upload_dir<P: AsRef<Path>>(&self, dir_path: P) -> Result<String> {
648 self.upload_dir_with_options(dir_path, true)
649 }
650
651 pub fn upload_dir_with_options<P: AsRef<Path>>(
653 &self,
654 dir_path: P,
655 respect_gitignore: bool,
656 ) -> Result<String> {
657 let dir_path = dir_path.as_ref();
658
659 let store = self.store_arc();
660 let tree = HashTree::new(HashTreeConfig::new(store).public());
661
662 let root_cid = sync_block_on(async {
663 self.upload_dir_recursive(&tree, dir_path, dir_path, respect_gitignore)
664 .await
665 })
666 .context("Failed to upload directory")?;
667
668 let root_hex = to_hex(&root_cid.hash);
669
670 let mut wtxn = self.env.write_txn()?;
671 self.pins.put(&mut wtxn, root_cid.hash.as_slice(), &())?;
672 wtxn.commit()?;
673
674 Ok(root_hex)
675 }
676
677 async fn upload_dir_recursive<S: Store>(
678 &self,
679 tree: &HashTree<S>,
680 _root_path: &Path,
681 current_path: &Path,
682 respect_gitignore: bool,
683 ) -> Result<Cid> {
684 use ignore::WalkBuilder;
685 use std::collections::HashMap;
686
687 let mut dir_contents: HashMap<String, Vec<(String, Cid)>> = HashMap::new();
689 dir_contents.insert(String::new(), Vec::new()); let walker = WalkBuilder::new(current_path)
692 .git_ignore(respect_gitignore)
693 .git_global(respect_gitignore)
694 .git_exclude(respect_gitignore)
695 .hidden(false)
696 .build();
697
698 for result in walker {
699 let entry = result?;
700 let path = entry.path();
701
702 if path == current_path {
704 continue;
705 }
706
707 let relative = path.strip_prefix(current_path).unwrap_or(path);
708
709 if path.is_file() {
710 let content = std::fs::read(path)?;
711 let (cid, _size) = tree.put(&content).await.map_err(|e| {
712 anyhow::anyhow!("Failed to upload file {}: {}", path.display(), e)
713 })?;
714
715 let parent = relative
717 .parent()
718 .map(|p| p.to_string_lossy().to_string())
719 .unwrap_or_default();
720 let name = relative
721 .file_name()
722 .map(|n| n.to_string_lossy().to_string())
723 .unwrap_or_default();
724
725 dir_contents.entry(parent).or_default().push((name, cid));
726 } else if path.is_dir() {
727 let dir_path = relative.to_string_lossy().to_string();
729 dir_contents.entry(dir_path).or_default();
730 }
731 }
732
733 self.build_directory_tree(tree, &mut dir_contents).await
735 }
736
737 async fn build_directory_tree<S: Store>(
738 &self,
739 tree: &HashTree<S>,
740 dir_contents: &mut std::collections::HashMap<String, Vec<(String, Cid)>>,
741 ) -> Result<Cid> {
742 let mut dirs: Vec<String> = dir_contents.keys().cloned().collect();
744 dirs.sort_by(|a, b| {
745 let depth_a = a.matches('/').count() + if a.is_empty() { 0 } else { 1 };
746 let depth_b = b.matches('/').count() + if b.is_empty() { 0 } else { 1 };
747 depth_b.cmp(&depth_a) });
749
750 let mut dir_cids: std::collections::HashMap<String, Cid> = std::collections::HashMap::new();
751
752 for dir_path in dirs {
753 let files = dir_contents.get(&dir_path).cloned().unwrap_or_default();
754
755 let mut entries: Vec<HashTreeDirEntry> = files
756 .into_iter()
757 .map(|(name, cid)| HashTreeDirEntry::from_cid(name, &cid))
758 .collect();
759
760 for (subdir_path, cid) in &dir_cids {
762 let parent = std::path::Path::new(subdir_path)
763 .parent()
764 .map(|p| p.to_string_lossy().to_string())
765 .unwrap_or_default();
766
767 if parent == dir_path {
768 let name = std::path::Path::new(subdir_path)
769 .file_name()
770 .map(|n| n.to_string_lossy().to_string())
771 .unwrap_or_default();
772 entries.push(HashTreeDirEntry::from_cid(name, cid));
773 }
774 }
775
776 let cid = tree
777 .put_directory(entries)
778 .await
779 .map_err(|e| anyhow::anyhow!("Failed to create directory node: {}", e))?;
780
781 dir_cids.insert(dir_path, cid);
782 }
783
784 dir_cids
786 .get("")
787 .cloned()
788 .ok_or_else(|| anyhow::anyhow!("No root directory"))
789 }
790
791 pub fn upload_file_encrypted<P: AsRef<Path>>(&self, file_path: P) -> Result<String> {
793 let file_path = file_path.as_ref();
794 let file_content = std::fs::read(file_path)?;
795
796 let store = self.store_arc();
798 let tree = HashTree::new(HashTreeConfig::new(store));
799
800 let (cid, _size) = sync_block_on(async { tree.put(&file_content).await })
801 .map_err(|e| anyhow::anyhow!("Failed to encrypt file: {}", e))?;
802
803 let cid_str = cid.to_string();
804
805 let mut wtxn = self.env.write_txn()?;
806 self.pins.put(&mut wtxn, cid.hash.as_slice(), &())?;
807 wtxn.commit()?;
808
809 Ok(cid_str)
810 }
811
812 pub fn upload_dir_encrypted<P: AsRef<Path>>(&self, dir_path: P) -> Result<String> {
815 self.upload_dir_encrypted_with_options(dir_path, true)
816 }
817
818 pub fn upload_dir_encrypted_with_options<P: AsRef<Path>>(
821 &self,
822 dir_path: P,
823 respect_gitignore: bool,
824 ) -> Result<String> {
825 let dir_path = dir_path.as_ref();
826 let store = self.store_arc();
827
828 let tree = HashTree::new(HashTreeConfig::new(store));
830
831 let root_cid = sync_block_on(async {
832 self.upload_dir_recursive(&tree, dir_path, dir_path, respect_gitignore)
833 .await
834 })
835 .context("Failed to upload encrypted directory")?;
836
837 let cid_str = root_cid.to_string(); let mut wtxn = self.env.write_txn()?;
840 self.pins.put(&mut wtxn, root_cid.hash.as_slice(), &())?;
842 wtxn.commit()?;
843
844 Ok(cid_str)
845 }
846
847 pub fn get_tree_node(&self, hash: &[u8; 32]) -> Result<Option<TreeNode>> {
849 let store = self.store_arc();
850 let tree = HashTree::new(HashTreeConfig::new(store).public());
851
852 sync_block_on(async {
853 tree.get_tree_node(hash)
854 .await
855 .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))
856 })
857 }
858
859 pub fn put_blob(&self, data: &[u8]) -> Result<String> {
861 let hash = sha256(data);
862 self.router
863 .put_sync(hash, data)
864 .map_err(|e| anyhow::anyhow!("Failed to store blob: {}", e))?;
865 Ok(to_hex(&hash))
866 }
867
868 pub fn get_blob(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
870 self.router
871 .get_sync(hash)
872 .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))
873 }
874
875 pub fn blob_exists(&self, hash: &[u8; 32]) -> Result<bool> {
877 self.router
878 .exists(hash)
879 .map_err(|e| anyhow::anyhow!("Failed to check blob: {}", e))
880 }
881
882 fn blob_owner_key(sha256: &[u8; 32], pubkey: &[u8; 32]) -> [u8; 64] {
888 let mut key = [0u8; 64];
889 key[..32].copy_from_slice(sha256);
890 key[32..].copy_from_slice(pubkey);
891 key
892 }
893
894 pub fn set_blob_owner(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<()> {
897 let key = Self::blob_owner_key(sha256, pubkey);
898 let mut wtxn = self.env.write_txn()?;
899
900 self.blob_owners.put(&mut wtxn, &key[..], &())?;
902
903 let sha256_hex = to_hex(sha256);
905
906 let mut blobs: Vec<BlobMetadata> = self
908 .pubkey_blobs
909 .get(&wtxn, pubkey)?
910 .and_then(|b| serde_json::from_slice(b).ok())
911 .unwrap_or_default();
912
913 if !blobs.iter().any(|b| b.sha256 == sha256_hex) {
915 let now = SystemTime::now()
916 .duration_since(UNIX_EPOCH)
917 .unwrap()
918 .as_secs();
919
920 let size = self
922 .get_blob(sha256)?
923 .map(|data| data.len() as u64)
924 .unwrap_or(0);
925
926 blobs.push(BlobMetadata {
927 sha256: sha256_hex,
928 size,
929 mime_type: "application/octet-stream".to_string(),
930 uploaded: now,
931 });
932
933 let blobs_json = serde_json::to_vec(&blobs)?;
934 self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
935 }
936
937 wtxn.commit()?;
938 Ok(())
939 }
940
941 pub fn is_blob_owner(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<bool> {
943 let key = Self::blob_owner_key(sha256, pubkey);
944 let rtxn = self.env.read_txn()?;
945 Ok(self.blob_owners.get(&rtxn, &key[..])?.is_some())
946 }
947
948 pub fn get_blob_owners(&self, sha256: &[u8; 32]) -> Result<Vec<[u8; 32]>> {
950 let rtxn = self.env.read_txn()?;
951
952 let mut owners = Vec::new();
953 for item in self.blob_owners.prefix_iter(&rtxn, &sha256[..])? {
954 let (key, _) = item?;
955 if key.len() == 64 {
956 let mut pubkey = [0u8; 32];
958 pubkey.copy_from_slice(&key[32..64]);
959 owners.push(pubkey);
960 }
961 }
962 Ok(owners)
963 }
964
965 pub fn blob_has_owners(&self, sha256: &[u8; 32]) -> Result<bool> {
967 let rtxn = self.env.read_txn()?;
968
969 for item in self.blob_owners.prefix_iter(&rtxn, &sha256[..])? {
971 if item.is_ok() {
972 return Ok(true);
973 }
974 }
975 Ok(false)
976 }
977
978 pub fn get_blob_owner(&self, sha256: &[u8; 32]) -> Result<Option<[u8; 32]>> {
980 Ok(self.get_blob_owners(sha256)?.into_iter().next())
981 }
982
983 pub fn delete_blossom_blob(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<bool> {
987 let key = Self::blob_owner_key(sha256, pubkey);
988 let mut wtxn = self.env.write_txn()?;
989
990 self.blob_owners.delete(&mut wtxn, &key[..])?;
992
993 let sha256_hex = to_hex(sha256);
995
996 if let Some(blobs_bytes) = self.pubkey_blobs.get(&wtxn, pubkey)? {
998 if let Ok(mut blobs) = serde_json::from_slice::<Vec<BlobMetadata>>(blobs_bytes) {
999 blobs.retain(|b| b.sha256 != sha256_hex);
1000 let blobs_json = serde_json::to_vec(&blobs)?;
1001 self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
1002 }
1003 }
1004
1005 let mut has_other_owners = false;
1007 for item in self.blob_owners.prefix_iter(&wtxn, &sha256[..])? {
1008 if item.is_ok() {
1009 has_other_owners = true;
1010 break;
1011 }
1012 }
1013
1014 if has_other_owners {
1015 wtxn.commit()?;
1016 tracing::debug!(
1017 "Removed {} from blob {} owners, other owners remain",
1018 &to_hex(pubkey)[..8],
1019 &sha256_hex[..8]
1020 );
1021 return Ok(false);
1022 }
1023
1024 tracing::info!(
1026 "All owners removed from blob {}, deleting",
1027 &sha256_hex[..8]
1028 );
1029
1030 let _ = self.router.delete_sync(sha256);
1032
1033 wtxn.commit()?;
1034 Ok(true)
1035 }
1036
1037 pub fn list_blobs_by_pubkey(
1039 &self,
1040 pubkey: &[u8; 32],
1041 ) -> Result<Vec<crate::server::blossom::BlobDescriptor>> {
1042 let rtxn = self.env.read_txn()?;
1043
1044 let blobs: Vec<BlobMetadata> = self
1045 .pubkey_blobs
1046 .get(&rtxn, pubkey)?
1047 .and_then(|b| serde_json::from_slice(b).ok())
1048 .unwrap_or_default();
1049
1050 Ok(blobs
1051 .into_iter()
1052 .map(|b| crate::server::blossom::BlobDescriptor {
1053 url: format!("/{}", b.sha256),
1054 sha256: b.sha256,
1055 size: b.size,
1056 mime_type: b.mime_type,
1057 uploaded: b.uploaded,
1058 })
1059 .collect())
1060 }
1061
1062 pub fn get_chunk(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
1064 self.router
1065 .get_sync(hash)
1066 .map_err(|e| anyhow::anyhow!("Failed to get chunk: {}", e))
1067 }
1068
1069 pub fn get_file(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
1072 let store = self.store_arc();
1073 let tree = HashTree::new(HashTreeConfig::new(store).public());
1074
1075 sync_block_on(async {
1076 tree.read_file(hash)
1077 .await
1078 .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
1079 })
1080 }
1081
1082 pub fn get_file_by_cid(&self, cid: &Cid) -> Result<Option<Vec<u8>>> {
1085 let store = self.store_arc();
1086 let tree = HashTree::new(HashTreeConfig::new(store).public());
1087
1088 sync_block_on(async {
1089 tree.get(cid)
1090 .await
1091 .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
1092 })
1093 }
1094
1095 pub fn resolve_path(&self, cid: &Cid, path: &str) -> Result<Option<Cid>> {
1097 let store = self.store_arc();
1098 let tree = HashTree::new(HashTreeConfig::new(store).public());
1099
1100 sync_block_on(async {
1101 tree.resolve_path(cid, path)
1102 .await
1103 .map_err(|e| anyhow::anyhow!("Failed to resolve path: {}", e))
1104 })
1105 }
1106
1107 pub fn get_file_chunk_metadata(&self, hash: &[u8; 32]) -> Result<Option<FileChunkMetadata>> {
1109 let store = self.store_arc();
1110 let tree = HashTree::new(HashTreeConfig::new(store.clone()).public());
1111
1112 sync_block_on(async {
1113 let exists = store
1116 .has(&hash)
1117 .await
1118 .map_err(|e| anyhow::anyhow!("Failed to check existence: {}", e))?;
1119
1120 if !exists {
1121 return Ok(None);
1122 }
1123
1124 let total_size = tree
1126 .get_size(&hash)
1127 .await
1128 .map_err(|e| anyhow::anyhow!("Failed to get size: {}", e))?;
1129
1130 let is_tree_node = tree
1132 .is_tree(&hash)
1133 .await
1134 .map_err(|e| anyhow::anyhow!("Failed to check tree: {}", e))?;
1135
1136 if !is_tree_node {
1137 return Ok(Some(FileChunkMetadata {
1139 total_size,
1140 chunk_hashes: vec![],
1141 chunk_sizes: vec![],
1142 is_chunked: false,
1143 }));
1144 }
1145
1146 let node = match tree
1148 .get_tree_node(&hash)
1149 .await
1150 .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
1151 {
1152 Some(n) => n,
1153 None => return Ok(None),
1154 };
1155
1156 let is_directory = tree
1158 .is_directory(&hash)
1159 .await
1160 .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1161
1162 if is_directory {
1163 return Ok(None); }
1165
1166 let chunk_hashes: Vec<Hash> = node.links.iter().map(|l| l.hash).collect();
1168 let chunk_sizes: Vec<u64> = node.links.iter().map(|l| l.size).collect();
1169
1170 Ok(Some(FileChunkMetadata {
1171 total_size,
1172 chunk_hashes,
1173 chunk_sizes,
1174 is_chunked: !node.links.is_empty(),
1175 }))
1176 })
1177 }
1178
1179 pub fn get_file_range(
1181 &self,
1182 hash: &[u8; 32],
1183 start: u64,
1184 end: Option<u64>,
1185 ) -> Result<Option<(Vec<u8>, u64)>> {
1186 let metadata = match self.get_file_chunk_metadata(hash)? {
1187 Some(m) => m,
1188 None => return Ok(None),
1189 };
1190
1191 if metadata.total_size == 0 {
1192 return Ok(Some((Vec::new(), 0)));
1193 }
1194
1195 if start >= metadata.total_size {
1196 return Ok(None);
1197 }
1198
1199 let end = end
1200 .unwrap_or(metadata.total_size - 1)
1201 .min(metadata.total_size - 1);
1202
1203 if !metadata.is_chunked {
1205 let content = self.get_file(hash)?.unwrap_or_default();
1206 let range_content = if start < content.len() as u64 {
1207 content[start as usize..=(end as usize).min(content.len() - 1)].to_vec()
1208 } else {
1209 Vec::new()
1210 };
1211 return Ok(Some((range_content, metadata.total_size)));
1212 }
1213
1214 let mut result = Vec::new();
1216 let mut current_offset = 0u64;
1217
1218 for (i, chunk_hash) in metadata.chunk_hashes.iter().enumerate() {
1219 let chunk_size = metadata.chunk_sizes[i];
1220 let chunk_end = current_offset + chunk_size - 1;
1221
1222 if chunk_end >= start && current_offset <= end {
1224 let chunk_content = match self.get_chunk(chunk_hash)? {
1225 Some(content) => content,
1226 None => {
1227 return Err(anyhow::anyhow!("Chunk {} not found", to_hex(chunk_hash)));
1228 }
1229 };
1230
1231 let chunk_read_start = if current_offset >= start {
1232 0
1233 } else {
1234 (start - current_offset) as usize
1235 };
1236
1237 let chunk_read_end = if chunk_end <= end {
1238 chunk_size as usize - 1
1239 } else {
1240 (end - current_offset) as usize
1241 };
1242
1243 result.extend_from_slice(&chunk_content[chunk_read_start..=chunk_read_end]);
1244 }
1245
1246 current_offset += chunk_size;
1247
1248 if current_offset > end {
1249 break;
1250 }
1251 }
1252
1253 Ok(Some((result, metadata.total_size)))
1254 }
1255
1256 pub fn stream_file_range_chunks_owned(
1258 self: Arc<Self>,
1259 hash: &[u8; 32],
1260 start: u64,
1261 end: u64,
1262 ) -> Result<Option<FileRangeChunksOwned>> {
1263 let metadata = match self.get_file_chunk_metadata(hash)? {
1264 Some(m) => m,
1265 None => return Ok(None),
1266 };
1267
1268 if metadata.total_size == 0 || start >= metadata.total_size {
1269 return Ok(None);
1270 }
1271
1272 let end = end.min(metadata.total_size - 1);
1273
1274 Ok(Some(FileRangeChunksOwned {
1275 store: self,
1276 metadata,
1277 start,
1278 end,
1279 current_chunk_idx: 0,
1280 current_offset: 0,
1281 }))
1282 }
1283
1284 pub fn get_directory_listing(&self, hash: &[u8; 32]) -> Result<Option<DirectoryListing>> {
1286 let store = self.store_arc();
1287 let tree = HashTree::new(HashTreeConfig::new(store).public());
1288
1289 sync_block_on(async {
1290 let is_dir = tree
1292 .is_directory(&hash)
1293 .await
1294 .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1295
1296 if !is_dir {
1297 return Ok(None);
1298 }
1299
1300 let cid = hashtree_core::Cid::public(*hash);
1302 let tree_entries = tree
1303 .list_directory(&cid)
1304 .await
1305 .map_err(|e| anyhow::anyhow!("Failed to list directory: {}", e))?;
1306
1307 let entries: Vec<DirEntry> = tree_entries
1308 .into_iter()
1309 .map(|e| DirEntry {
1310 name: e.name,
1311 cid: to_hex(&e.hash),
1312 is_directory: e.link_type.is_tree(),
1313 size: e.size,
1314 })
1315 .collect();
1316
1317 Ok(Some(DirectoryListing {
1318 dir_name: String::new(),
1319 entries,
1320 }))
1321 })
1322 }
1323
1324 pub fn pin(&self, hash: &[u8; 32]) -> Result<()> {
1326 let mut wtxn = self.env.write_txn()?;
1327 self.pins.put(&mut wtxn, hash.as_slice(), &())?;
1328 wtxn.commit()?;
1329 Ok(())
1330 }
1331
1332 pub fn unpin(&self, hash: &[u8; 32]) -> Result<()> {
1334 let mut wtxn = self.env.write_txn()?;
1335 self.pins.delete(&mut wtxn, hash.as_slice())?;
1336 wtxn.commit()?;
1337 Ok(())
1338 }
1339
1340 pub fn is_pinned(&self, hash: &[u8; 32]) -> Result<bool> {
1342 let rtxn = self.env.read_txn()?;
1343 Ok(self.pins.get(&rtxn, hash.as_slice())?.is_some())
1344 }
1345
1346 pub fn list_pins_raw(&self) -> Result<Vec<[u8; 32]>> {
1348 let rtxn = self.env.read_txn()?;
1349 let mut pins = Vec::new();
1350
1351 for item in self.pins.iter(&rtxn)? {
1352 let (hash_bytes, _) = item?;
1353 if hash_bytes.len() == 32 {
1354 let mut hash = [0u8; 32];
1355 hash.copy_from_slice(hash_bytes);
1356 pins.push(hash);
1357 }
1358 }
1359
1360 Ok(pins)
1361 }
1362
1363 pub fn list_pins_with_names(&self) -> Result<Vec<PinnedItem>> {
1365 let rtxn = self.env.read_txn()?;
1366 let store = self.store_arc();
1367 let tree = HashTree::new(HashTreeConfig::new(store).public());
1368 let mut pins = Vec::new();
1369
1370 for item in self.pins.iter(&rtxn)? {
1371 let (hash_bytes, _) = item?;
1372 if hash_bytes.len() != 32 {
1373 continue;
1374 }
1375 let mut hash = [0u8; 32];
1376 hash.copy_from_slice(hash_bytes);
1377
1378 let is_directory =
1380 sync_block_on(async { tree.is_directory(&hash).await.unwrap_or(false) });
1381
1382 pins.push(PinnedItem {
1383 cid: to_hex(&hash),
1384 name: "Unknown".to_string(),
1385 is_directory,
1386 });
1387 }
1388
1389 Ok(pins)
1390 }
1391
1392 pub fn index_tree(
1399 &self,
1400 root_hash: &Hash,
1401 owner: &str,
1402 name: Option<&str>,
1403 priority: u8,
1404 ref_key: Option<&str>,
1405 ) -> Result<()> {
1406 let root_hex = to_hex(root_hash);
1407
1408 if let Some(key) = ref_key {
1410 let rtxn = self.env.read_txn()?;
1411 if let Some(old_hash_bytes) = self.tree_refs.get(&rtxn, key)? {
1412 if old_hash_bytes != root_hash.as_slice() {
1413 let old_hash: Hash = old_hash_bytes
1414 .try_into()
1415 .map_err(|_| anyhow::anyhow!("Invalid hash in tree_refs"))?;
1416 drop(rtxn);
1417 let _ = self.unindex_tree(&old_hash);
1419 tracing::debug!("Replaced old tree for ref {}", key);
1420 }
1421 }
1422 }
1423
1424 let store = self.store_arc();
1425 let tree = HashTree::new(HashTreeConfig::new(store).public());
1426
1427 let (blob_hashes, total_size) =
1429 sync_block_on(async { self.collect_tree_blobs(&tree, root_hash).await })?;
1430
1431 let mut wtxn = self.env.write_txn()?;
1432
1433 for blob_hash in &blob_hashes {
1435 let mut key = [0u8; 64];
1436 key[..32].copy_from_slice(blob_hash);
1437 key[32..].copy_from_slice(root_hash);
1438 self.blob_trees.put(&mut wtxn, &key[..], &())?;
1439 }
1440
1441 let meta = TreeMeta {
1443 owner: owner.to_string(),
1444 name: name.map(|s| s.to_string()),
1445 synced_at: SystemTime::now()
1446 .duration_since(UNIX_EPOCH)
1447 .unwrap()
1448 .as_secs(),
1449 total_size,
1450 priority,
1451 };
1452 let meta_bytes = rmp_serde::to_vec(&meta)
1453 .map_err(|e| anyhow::anyhow!("Failed to serialize TreeMeta: {}", e))?;
1454 self.tree_meta
1455 .put(&mut wtxn, root_hash.as_slice(), &meta_bytes)?;
1456
1457 if let Some(key) = ref_key {
1459 self.tree_refs.put(&mut wtxn, key, root_hash.as_slice())?;
1460 }
1461
1462 wtxn.commit()?;
1463
1464 tracing::debug!(
1465 "Indexed tree {} ({} blobs, {} bytes, priority {})",
1466 &root_hex[..8],
1467 blob_hashes.len(),
1468 total_size,
1469 priority
1470 );
1471
1472 Ok(())
1473 }
1474
1475 async fn collect_tree_blobs<S: Store>(
1477 &self,
1478 tree: &HashTree<S>,
1479 root: &Hash,
1480 ) -> Result<(Vec<Hash>, u64)> {
1481 let mut blobs = Vec::new();
1482 let mut total_size = 0u64;
1483 let mut stack = vec![*root];
1484
1485 while let Some(hash) = stack.pop() {
1486 let is_tree = tree
1488 .is_tree(&hash)
1489 .await
1490 .map_err(|e| anyhow::anyhow!("Failed to check tree: {}", e))?;
1491
1492 if is_tree {
1493 if let Some(node) = tree
1495 .get_tree_node(&hash)
1496 .await
1497 .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
1498 {
1499 for link in &node.links {
1500 stack.push(link.hash);
1501 }
1502 }
1503 } else {
1504 if let Some(data) = self
1506 .router
1507 .get_sync(&hash)
1508 .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))?
1509 {
1510 total_size += data.len() as u64;
1511 blobs.push(hash);
1512 }
1513 }
1514 }
1515
1516 Ok((blobs, total_size))
1517 }
1518
1519 pub fn unindex_tree(&self, root_hash: &Hash) -> Result<u64> {
1522 let root_hex = to_hex(root_hash);
1523
1524 let store = self.store_arc();
1525 let tree = HashTree::new(HashTreeConfig::new(store).public());
1526
1527 let (blob_hashes, _) =
1529 sync_block_on(async { self.collect_tree_blobs(&tree, root_hash).await })?;
1530
1531 let mut wtxn = self.env.write_txn()?;
1532 let mut freed = 0u64;
1533
1534 for blob_hash in &blob_hashes {
1536 let mut key = [0u8; 64];
1538 key[..32].copy_from_slice(blob_hash);
1539 key[32..].copy_from_slice(root_hash);
1540 self.blob_trees.delete(&mut wtxn, &key[..])?;
1541
1542 let rtxn = self.env.read_txn()?;
1544 let mut has_other_tree = false;
1545
1546 for item in self.blob_trees.prefix_iter(&rtxn, &blob_hash[..])? {
1547 if item.is_ok() {
1548 has_other_tree = true;
1549 break;
1550 }
1551 }
1552 drop(rtxn);
1553
1554 if !has_other_tree {
1556 if let Some(data) = self
1557 .router
1558 .get_sync(blob_hash)
1559 .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))?
1560 {
1561 freed += data.len() as u64;
1562 self.router
1564 .delete_local_only(blob_hash)
1565 .map_err(|e| anyhow::anyhow!("Failed to delete blob: {}", e))?;
1566 }
1567 }
1568 }
1569
1570 if let Some(data) = self
1572 .router
1573 .get_sync(root_hash)
1574 .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
1575 {
1576 freed += data.len() as u64;
1577 self.router
1579 .delete_local_only(root_hash)
1580 .map_err(|e| anyhow::anyhow!("Failed to delete tree node: {}", e))?;
1581 }
1582
1583 self.tree_meta.delete(&mut wtxn, root_hash.as_slice())?;
1585
1586 wtxn.commit()?;
1587
1588 tracing::debug!("Unindexed tree {} ({} bytes freed)", &root_hex[..8], freed);
1589
1590 Ok(freed)
1591 }
1592
1593 pub fn get_tree_meta(&self, root_hash: &Hash) -> Result<Option<TreeMeta>> {
1595 let rtxn = self.env.read_txn()?;
1596 if let Some(bytes) = self.tree_meta.get(&rtxn, root_hash.as_slice())? {
1597 let meta: TreeMeta = rmp_serde::from_slice(bytes)
1598 .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
1599 Ok(Some(meta))
1600 } else {
1601 Ok(None)
1602 }
1603 }
1604
1605 pub fn list_indexed_trees(&self) -> Result<Vec<(Hash, TreeMeta)>> {
1607 let rtxn = self.env.read_txn()?;
1608 let mut trees = Vec::new();
1609
1610 for item in self.tree_meta.iter(&rtxn)? {
1611 let (hash_bytes, meta_bytes) = item?;
1612 let hash: Hash = hash_bytes
1613 .try_into()
1614 .map_err(|_| anyhow::anyhow!("Invalid hash in tree_meta"))?;
1615 let meta: TreeMeta = rmp_serde::from_slice(meta_bytes)
1616 .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
1617 trees.push((hash, meta));
1618 }
1619
1620 Ok(trees)
1621 }
1622
1623 pub fn tracked_size(&self) -> Result<u64> {
1625 let rtxn = self.env.read_txn()?;
1626 let mut total = 0u64;
1627
1628 for item in self.tree_meta.iter(&rtxn)? {
1629 let (_, bytes) = item?;
1630 let meta: TreeMeta = rmp_serde::from_slice(bytes)
1631 .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
1632 total += meta.total_size;
1633 }
1634
1635 Ok(total)
1636 }
1637
1638 fn get_evictable_trees(&self) -> Result<Vec<(Hash, TreeMeta)>> {
1640 let mut trees = self.list_indexed_trees()?;
1641
1642 trees.sort_by(|a, b| match a.1.priority.cmp(&b.1.priority) {
1644 std::cmp::Ordering::Equal => a.1.synced_at.cmp(&b.1.synced_at),
1645 other => other,
1646 });
1647
1648 Ok(trees)
1649 }
1650
1651 pub fn evict_if_needed(&self) -> Result<u64> {
1658 let stats = self
1660 .router
1661 .stats()
1662 .map_err(|e| anyhow::anyhow!("Failed to get stats: {}", e))?;
1663 let current = stats.total_bytes;
1664
1665 if current <= self.max_size_bytes {
1666 return Ok(0);
1667 }
1668
1669 let target = self.max_size_bytes * 90 / 100;
1671 let mut freed = 0u64;
1672 let mut current_size = current;
1673
1674 let orphan_freed = self.evict_orphaned_blobs()?;
1676 freed += orphan_freed;
1677 current_size = current_size.saturating_sub(orphan_freed);
1678
1679 if orphan_freed > 0 {
1680 tracing::info!("Evicted orphaned blobs: {} bytes freed", orphan_freed);
1681 }
1682
1683 if current_size <= target {
1685 if freed > 0 {
1686 tracing::info!("Eviction complete: {} bytes freed", freed);
1687 }
1688 return Ok(freed);
1689 }
1690
1691 let evictable = self.get_evictable_trees()?;
1694
1695 for (root_hash, meta) in evictable {
1696 if current_size <= target {
1697 break;
1698 }
1699
1700 let root_hex = to_hex(&root_hash);
1701
1702 if self.is_pinned(&root_hash)? {
1704 continue;
1705 }
1706
1707 let tree_freed = self.unindex_tree(&root_hash)?;
1708 freed += tree_freed;
1709 current_size = current_size.saturating_sub(tree_freed);
1710
1711 tracing::info!(
1712 "Evicted tree {} (owner={}, priority={}, {} bytes)",
1713 &root_hex[..8],
1714 &meta.owner[..8.min(meta.owner.len())],
1715 meta.priority,
1716 tree_freed
1717 );
1718 }
1719
1720 if freed > 0 {
1721 tracing::info!("Eviction complete: {} bytes freed", freed);
1722 }
1723
1724 Ok(freed)
1725 }
1726
1727 fn evict_orphaned_blobs(&self) -> Result<u64> {
1729 let mut freed = 0u64;
1730
1731 let all_hashes = self
1733 .router
1734 .list()
1735 .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
1736
1737 let rtxn = self.env.read_txn()?;
1739 let pinned: HashSet<Hash> = self
1740 .pins
1741 .iter(&rtxn)?
1742 .filter_map(|item| item.ok())
1743 .filter_map(|(hash_bytes, _)| {
1744 if hash_bytes.len() == 32 {
1745 let mut hash = [0u8; 32];
1746 hash.copy_from_slice(hash_bytes);
1747 Some(hash)
1748 } else {
1749 None
1750 }
1751 })
1752 .collect();
1753
1754 let mut blobs_in_trees: HashSet<Hash> = HashSet::new();
1757 for item in self.blob_trees.iter(&rtxn)? {
1758 if let Ok((key_bytes, _)) = item {
1759 if key_bytes.len() >= 32 {
1760 let blob_hash: Hash = key_bytes[..32].try_into().unwrap();
1761 blobs_in_trees.insert(blob_hash);
1762 }
1763 }
1764 }
1765 drop(rtxn);
1766
1767 for hash in all_hashes {
1769 if pinned.contains(&hash) {
1771 continue;
1772 }
1773
1774 if blobs_in_trees.contains(&hash) {
1776 continue;
1777 }
1778
1779 if let Ok(Some(data)) = self.router.get_sync(&hash) {
1781 freed += data.len() as u64;
1782 let _ = self.router.delete_local_only(&hash);
1783 tracing::debug!(
1784 "Deleted orphaned blob {} ({} bytes)",
1785 &to_hex(&hash)[..8],
1786 data.len()
1787 );
1788 }
1789 }
1790
1791 Ok(freed)
1792 }
1793
1794 pub fn max_size_bytes(&self) -> u64 {
1796 self.max_size_bytes
1797 }
1798
1799 pub fn storage_by_priority(&self) -> Result<StorageByPriority> {
1801 let rtxn = self.env.read_txn()?;
1802 let mut own = 0u64;
1803 let mut followed = 0u64;
1804 let mut other = 0u64;
1805
1806 for item in self.tree_meta.iter(&rtxn)? {
1807 let (_, bytes) = item?;
1808 let meta: TreeMeta = rmp_serde::from_slice(bytes)
1809 .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
1810
1811 if meta.priority >= PRIORITY_OWN {
1812 own += meta.total_size;
1813 } else if meta.priority >= PRIORITY_FOLLOWED {
1814 followed += meta.total_size;
1815 } else {
1816 other += meta.total_size;
1817 }
1818 }
1819
1820 Ok(StorageByPriority {
1821 own,
1822 followed,
1823 other,
1824 })
1825 }
1826
1827 pub fn get_storage_stats(&self) -> Result<StorageStats> {
1829 let rtxn = self.env.read_txn()?;
1830 let total_pins = self.pins.len(&rtxn)? as usize;
1831
1832 let stats = self
1833 .router
1834 .stats()
1835 .map_err(|e| anyhow::anyhow!("Failed to get stats: {}", e))?;
1836
1837 Ok(StorageStats {
1838 total_dags: stats.count,
1839 pinned_dags: total_pins,
1840 total_bytes: stats.total_bytes,
1841 })
1842 }
1843
1844 pub fn get_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<Option<CachedRoot>> {
1848 let key = format!("{}/{}", pubkey_hex, tree_name);
1849 let rtxn = self.env.read_txn()?;
1850 if let Some(bytes) = self.cached_roots.get(&rtxn, &key)? {
1851 let root: CachedRoot = rmp_serde::from_slice(bytes)
1852 .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
1853 Ok(Some(root))
1854 } else {
1855 Ok(None)
1856 }
1857 }
1858
1859 pub fn set_cached_root(
1861 &self,
1862 pubkey_hex: &str,
1863 tree_name: &str,
1864 hash: &str,
1865 key: Option<&str>,
1866 visibility: &str,
1867 updated_at: u64,
1868 ) -> Result<()> {
1869 let db_key = format!("{}/{}", pubkey_hex, tree_name);
1870 let root = CachedRoot {
1871 hash: hash.to_string(),
1872 key: key.map(|k| k.to_string()),
1873 updated_at,
1874 visibility: visibility.to_string(),
1875 };
1876 let bytes = rmp_serde::to_vec(&root)
1877 .map_err(|e| anyhow::anyhow!("Failed to serialize CachedRoot: {}", e))?;
1878 let mut wtxn = self.env.write_txn()?;
1879 self.cached_roots.put(&mut wtxn, &db_key, &bytes)?;
1880 wtxn.commit()?;
1881 Ok(())
1882 }
1883
1884 pub fn list_cached_roots(&self, pubkey_hex: &str) -> Result<Vec<(String, CachedRoot)>> {
1886 let prefix = format!("{}/", pubkey_hex);
1887 let rtxn = self.env.read_txn()?;
1888 let mut results = Vec::new();
1889
1890 for item in self.cached_roots.iter(&rtxn)? {
1891 let (key, bytes) = item?;
1892 if key.starts_with(&prefix) {
1893 let tree_name = key.strip_prefix(&prefix).unwrap_or(key);
1894 let root: CachedRoot = rmp_serde::from_slice(bytes)
1895 .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
1896 results.push((tree_name.to_string(), root));
1897 }
1898 }
1899
1900 Ok(results)
1901 }
1902
1903 pub fn delete_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<bool> {
1905 let key = format!("{}/{}", pubkey_hex, tree_name);
1906 let mut wtxn = self.env.write_txn()?;
1907 let deleted = self.cached_roots.delete(&mut wtxn, &key)?;
1908 wtxn.commit()?;
1909 Ok(deleted)
1910 }
1911
1912 pub fn gc(&self) -> Result<GcStats> {
1914 let rtxn = self.env.read_txn()?;
1915
1916 let pinned: HashSet<Hash> = self
1918 .pins
1919 .iter(&rtxn)?
1920 .filter_map(|item| item.ok())
1921 .filter_map(|(hash_bytes, _)| {
1922 if hash_bytes.len() == 32 {
1923 let mut hash = [0u8; 32];
1924 hash.copy_from_slice(hash_bytes);
1925 Some(hash)
1926 } else {
1927 None
1928 }
1929 })
1930 .collect();
1931
1932 drop(rtxn);
1933
1934 let all_hashes = self
1936 .router
1937 .list()
1938 .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
1939
1940 let mut deleted = 0;
1942 let mut freed_bytes = 0u64;
1943
1944 for hash in all_hashes {
1945 if !pinned.contains(&hash) {
1946 if let Ok(Some(data)) = self.router.get_sync(&hash) {
1947 freed_bytes += data.len() as u64;
1948 let _ = self.router.delete_local_only(&hash);
1950 deleted += 1;
1951 }
1952 }
1953 }
1954
1955 Ok(GcStats {
1956 deleted_dags: deleted,
1957 freed_bytes,
1958 })
1959 }
1960
1961 pub fn verify_lmdb_integrity(&self, delete: bool) -> Result<VerifyResult> {
1964 let all_hashes = self
1965 .router
1966 .list()
1967 .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
1968
1969 let total = all_hashes.len();
1970 let mut valid = 0;
1971 let mut corrupted = 0;
1972 let mut deleted = 0;
1973 let mut corrupted_hashes = Vec::new();
1974
1975 for hash in &all_hashes {
1976 let hash_hex = to_hex(hash);
1977
1978 match self.router.get_sync(hash) {
1979 Ok(Some(data)) => {
1980 let actual_hash = sha256(&data);
1982
1983 if actual_hash == *hash {
1984 valid += 1;
1985 } else {
1986 corrupted += 1;
1987 let actual_hex = to_hex(&actual_hash);
1988 println!(
1989 " CORRUPTED: key={} actual={} size={}",
1990 &hash_hex[..16],
1991 &actual_hex[..16],
1992 data.len()
1993 );
1994 corrupted_hashes.push(*hash);
1995 }
1996 }
1997 Ok(None) => {
1998 corrupted += 1;
2000 println!(" MISSING: key={}", &hash_hex[..16]);
2001 corrupted_hashes.push(*hash);
2002 }
2003 Err(e) => {
2004 corrupted += 1;
2005 println!(" ERROR: key={} err={}", &hash_hex[..16], e);
2006 corrupted_hashes.push(*hash);
2007 }
2008 }
2009 }
2010
2011 if delete {
2013 for hash in &corrupted_hashes {
2014 match self.router.delete_sync(hash) {
2015 Ok(true) => deleted += 1,
2016 Ok(false) => {} Err(e) => {
2018 let hash_hex = to_hex(hash);
2019 println!(" Failed to delete {}: {}", &hash_hex[..16], e);
2020 }
2021 }
2022 }
2023 }
2024
2025 Ok(VerifyResult {
2026 total,
2027 valid,
2028 corrupted,
2029 deleted,
2030 })
2031 }
2032
2033 #[cfg(feature = "s3")]
2036 pub async fn verify_r2_integrity(&self, delete: bool) -> Result<VerifyResult> {
2037 use aws_sdk_s3::Client as S3Client;
2038
2039 let config = crate::config::Config::load()?;
2042 let s3_config = config
2043 .storage
2044 .s3
2045 .ok_or_else(|| anyhow::anyhow!("S3 not configured"))?;
2046
2047 let aws_config = aws_config::from_env()
2049 .region(aws_sdk_s3::config::Region::new(s3_config.region.clone()))
2050 .load()
2051 .await;
2052
2053 let s3_client = S3Client::from_conf(
2054 aws_sdk_s3::config::Builder::from(&aws_config)
2055 .endpoint_url(&s3_config.endpoint)
2056 .force_path_style(true)
2057 .build(),
2058 );
2059
2060 let bucket = &s3_config.bucket;
2061 let prefix = s3_config.prefix.as_deref().unwrap_or("");
2062
2063 let mut total = 0;
2064 let mut valid = 0;
2065 let mut corrupted = 0;
2066 let mut deleted = 0;
2067 let mut corrupted_keys = Vec::new();
2068
2069 let mut continuation_token: Option<String> = None;
2071
2072 loop {
2073 let mut list_req = s3_client.list_objects_v2().bucket(bucket).prefix(prefix);
2074
2075 if let Some(ref token) = continuation_token {
2076 list_req = list_req.continuation_token(token);
2077 }
2078
2079 let list_resp = list_req
2080 .send()
2081 .await
2082 .map_err(|e| anyhow::anyhow!("Failed to list S3 objects: {}", e))?;
2083
2084 for object in list_resp.contents() {
2085 let key = object.key().unwrap_or("");
2086
2087 if !key.ends_with(".bin") {
2089 continue;
2090 }
2091
2092 total += 1;
2093
2094 let filename = key.strip_prefix(prefix).unwrap_or(key);
2096 let expected_hash_hex = filename.strip_suffix(".bin").unwrap_or(filename);
2097
2098 if expected_hash_hex.len() != 64 {
2100 corrupted += 1;
2101 println!(" INVALID KEY: {}", key);
2102 corrupted_keys.push(key.to_string());
2103 continue;
2104 }
2105
2106 let expected_hash = match from_hex(expected_hash_hex) {
2107 Ok(h) => h,
2108 Err(_) => {
2109 corrupted += 1;
2110 println!(" INVALID HEX: {}", key);
2111 corrupted_keys.push(key.to_string());
2112 continue;
2113 }
2114 };
2115
2116 match s3_client.get_object().bucket(bucket).key(key).send().await {
2118 Ok(resp) => match resp.body.collect().await {
2119 Ok(bytes) => {
2120 let data = bytes.into_bytes();
2121 let actual_hash = sha256(&data);
2122
2123 if actual_hash == expected_hash {
2124 valid += 1;
2125 } else {
2126 corrupted += 1;
2127 let actual_hex = to_hex(&actual_hash);
2128 println!(
2129 " CORRUPTED: key={} actual={} size={}",
2130 &expected_hash_hex[..16],
2131 &actual_hex[..16],
2132 data.len()
2133 );
2134 corrupted_keys.push(key.to_string());
2135 }
2136 }
2137 Err(e) => {
2138 corrupted += 1;
2139 println!(" READ ERROR: {} - {}", key, e);
2140 corrupted_keys.push(key.to_string());
2141 }
2142 },
2143 Err(e) => {
2144 corrupted += 1;
2145 println!(" FETCH ERROR: {} - {}", key, e);
2146 corrupted_keys.push(key.to_string());
2147 }
2148 }
2149
2150 if total % 100 == 0 {
2152 println!(
2153 " Progress: {} objects checked, {} corrupted so far",
2154 total, corrupted
2155 );
2156 }
2157 }
2158
2159 if list_resp.is_truncated() == Some(true) {
2161 continuation_token = list_resp.next_continuation_token().map(|s| s.to_string());
2162 } else {
2163 break;
2164 }
2165 }
2166
2167 if delete {
2169 for key in &corrupted_keys {
2170 match s3_client
2171 .delete_object()
2172 .bucket(bucket)
2173 .key(key)
2174 .send()
2175 .await
2176 {
2177 Ok(_) => deleted += 1,
2178 Err(e) => {
2179 println!(" Failed to delete {}: {}", key, e);
2180 }
2181 }
2182 }
2183 }
2184
2185 Ok(VerifyResult {
2186 total,
2187 valid,
2188 corrupted,
2189 deleted,
2190 })
2191 }
2192
2193 #[cfg(not(feature = "s3"))]
2195 pub async fn verify_r2_integrity(&self, _delete: bool) -> Result<VerifyResult> {
2196 Err(anyhow::anyhow!("S3 feature not enabled"))
2197 }
2198}
2199
2200#[derive(Debug, Clone)]
2202pub struct VerifyResult {
2203 pub total: usize,
2204 pub valid: usize,
2205 pub corrupted: usize,
2206 pub deleted: usize,
2207}
2208
2209#[derive(Debug)]
2210pub struct StorageStats {
2211 pub total_dags: usize,
2212 pub pinned_dags: usize,
2213 pub total_bytes: u64,
2214}
2215
2216#[derive(Debug, Clone)]
2218pub struct StorageByPriority {
2219 pub own: u64,
2221 pub followed: u64,
2223 pub other: u64,
2225}
2226
2227#[derive(Debug, Clone)]
2228pub struct FileChunkMetadata {
2229 pub total_size: u64,
2230 pub chunk_hashes: Vec<Hash>,
2231 pub chunk_sizes: Vec<u64>,
2232 pub is_chunked: bool,
2233}
2234
2235pub struct FileRangeChunksOwned {
2237 store: Arc<HashtreeStore>,
2238 metadata: FileChunkMetadata,
2239 start: u64,
2240 end: u64,
2241 current_chunk_idx: usize,
2242 current_offset: u64,
2243}
2244
2245impl Iterator for FileRangeChunksOwned {
2246 type Item = Result<Vec<u8>>;
2247
2248 fn next(&mut self) -> Option<Self::Item> {
2249 if !self.metadata.is_chunked || self.current_chunk_idx >= self.metadata.chunk_hashes.len() {
2250 return None;
2251 }
2252
2253 if self.current_offset > self.end {
2254 return None;
2255 }
2256
2257 let chunk_hash = &self.metadata.chunk_hashes[self.current_chunk_idx];
2258 let chunk_size = self.metadata.chunk_sizes[self.current_chunk_idx];
2259 let chunk_end = self.current_offset + chunk_size - 1;
2260
2261 self.current_chunk_idx += 1;
2262
2263 if chunk_end < self.start || self.current_offset > self.end {
2264 self.current_offset += chunk_size;
2265 return self.next();
2266 }
2267
2268 let chunk_content = match self.store.get_chunk(chunk_hash) {
2269 Ok(Some(content)) => content,
2270 Ok(None) => {
2271 return Some(Err(anyhow::anyhow!(
2272 "Chunk {} not found",
2273 to_hex(chunk_hash)
2274 )));
2275 }
2276 Err(e) => {
2277 return Some(Err(e));
2278 }
2279 };
2280
2281 let chunk_read_start = if self.current_offset >= self.start {
2282 0
2283 } else {
2284 (self.start - self.current_offset) as usize
2285 };
2286
2287 let chunk_read_end = if chunk_end <= self.end {
2288 chunk_size as usize - 1
2289 } else {
2290 (self.end - self.current_offset) as usize
2291 };
2292
2293 let result = chunk_content[chunk_read_start..=chunk_read_end].to_vec();
2294 self.current_offset += chunk_size;
2295
2296 Some(Ok(result))
2297 }
2298}
2299
2300#[derive(Debug)]
2301pub struct GcStats {
2302 pub deleted_dags: usize,
2303 pub freed_bytes: u64,
2304}
2305
2306#[derive(Debug, Clone)]
2307pub struct DirEntry {
2308 pub name: String,
2309 pub cid: String,
2310 pub is_directory: bool,
2311 pub size: u64,
2312}
2313
2314#[derive(Debug, Clone)]
2315pub struct DirectoryListing {
2316 pub dir_name: String,
2317 pub entries: Vec<DirEntry>,
2318}
2319
2320#[derive(Debug, Clone)]
2321pub struct PinnedItem {
2322 pub cid: String,
2323 pub name: String,
2324 pub is_directory: bool,
2325}
2326
2327#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
2329pub struct BlobMetadata {
2330 pub sha256: String,
2331 pub size: u64,
2332 pub mime_type: String,
2333 pub uploaded: u64,
2334}
2335
2336impl crate::webrtc::ContentStore for HashtreeStore {
2338 fn get(&self, hash_hex: &str) -> Result<Option<Vec<u8>>> {
2339 let hash = from_hex(hash_hex).map_err(|e| anyhow::anyhow!("Invalid hash: {}", e))?;
2340 self.get_chunk(&hash)
2341 }
2342}