1use anyhow::{Context, Result};
2use async_trait::async_trait;
3use heed::{Database, EnvOpenOptions};
4use heed::types::*;
5use hashtree_fs::FsBlobStore;
6#[cfg(feature = "lmdb")]
7use hashtree_lmdb::LmdbBlobStore;
8use hashtree_core::{
9 HashTree, HashTreeConfig, Cid,
10 sha256, to_hex, from_hex, TreeNode, DirEntry as HashTreeDirEntry,
11 types::Hash,
12};
13use hashtree_core::store::{Store, StoreError};
14use hashtree_config::StorageBackend;
15use serde::{Deserialize, Serialize};
16use std::path::Path;
17use std::collections::HashSet;
18use std::io::Read;
19use std::sync::Arc;
20use std::time::{SystemTime, UNIX_EPOCH};
21use futures::executor::block_on as sync_block_on;
22
23pub const PRIORITY_OTHER: u8 = 64;
25pub const PRIORITY_FOLLOWED: u8 = 128;
26pub const PRIORITY_OWN: u8 = 255;
27
28#[derive(Debug, Clone, Serialize, Deserialize)]
30pub struct TreeMeta {
31 pub owner: String,
33 pub name: Option<String>,
35 pub synced_at: u64,
37 pub total_size: u64,
39 pub priority: u8,
41}
42
43#[derive(Debug, Clone, Serialize, Deserialize)]
45pub struct CachedRoot {
46 pub hash: String,
48 pub key: Option<String>,
50 pub updated_at: u64,
52 pub visibility: String,
54}
55
56#[derive(Debug, Clone)]
58pub struct LocalStoreStats {
59 pub count: usize,
60 pub total_bytes: u64,
61}
62
63pub enum LocalStore {
65 Fs(FsBlobStore),
66 #[cfg(feature = "lmdb")]
67 Lmdb(LmdbBlobStore),
68}
69
70impl LocalStore {
71 pub fn new<P: AsRef<Path>>(path: P, backend: &StorageBackend) -> Result<Self, StoreError> {
73 match backend {
74 StorageBackend::Fs => {
75 Ok(LocalStore::Fs(FsBlobStore::new(path)?))
76 }
77 #[cfg(feature = "lmdb")]
78 StorageBackend::Lmdb => {
79 Ok(LocalStore::Lmdb(LmdbBlobStore::new(path)?))
80 }
81 #[cfg(not(feature = "lmdb"))]
82 StorageBackend::Lmdb => {
83 tracing::warn!("LMDB backend requested but lmdb feature not enabled, using filesystem storage");
84 Ok(LocalStore::Fs(FsBlobStore::new(path)?))
85 }
86 }
87 }
88
89 pub fn put_sync(&self, hash: Hash, data: &[u8]) -> Result<bool, StoreError> {
91 match self {
92 LocalStore::Fs(store) => store.put_sync(hash, data),
93 #[cfg(feature = "lmdb")]
94 LocalStore::Lmdb(store) => store.put_sync(hash, data),
95 }
96 }
97
98 pub fn get_sync(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
100 match self {
101 LocalStore::Fs(store) => store.get_sync(hash),
102 #[cfg(feature = "lmdb")]
103 LocalStore::Lmdb(store) => store.get_sync(hash),
104 }
105 }
106
107 pub fn exists(&self, hash: &Hash) -> Result<bool, StoreError> {
109 match self {
110 LocalStore::Fs(store) => Ok(store.exists(hash)),
111 #[cfg(feature = "lmdb")]
112 LocalStore::Lmdb(store) => store.exists(hash),
113 }
114 }
115
116 pub fn delete_sync(&self, hash: &Hash) -> Result<bool, StoreError> {
118 match self {
119 LocalStore::Fs(store) => store.delete_sync(hash),
120 #[cfg(feature = "lmdb")]
121 LocalStore::Lmdb(store) => store.delete_sync(hash),
122 }
123 }
124
125 pub fn stats(&self) -> Result<LocalStoreStats, StoreError> {
127 match self {
128 LocalStore::Fs(store) => {
129 let stats = store.stats()?;
130 Ok(LocalStoreStats {
131 count: stats.count,
132 total_bytes: stats.total_bytes,
133 })
134 }
135 #[cfg(feature = "lmdb")]
136 LocalStore::Lmdb(store) => {
137 let stats = store.stats()?;
138 Ok(LocalStoreStats {
139 count: stats.count,
140 total_bytes: stats.total_bytes,
141 })
142 }
143 }
144 }
145
146 pub fn list(&self) -> Result<Vec<Hash>, StoreError> {
148 match self {
149 LocalStore::Fs(store) => store.list(),
150 #[cfg(feature = "lmdb")]
151 LocalStore::Lmdb(store) => store.list(),
152 }
153 }
154}
155
156#[async_trait]
157impl Store for LocalStore {
158 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
159 self.put_sync(hash, &data)
160 }
161
162 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
163 self.get_sync(hash)
164 }
165
166 async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
167 self.exists(hash)
168 }
169
170 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
171 self.delete_sync(hash)
172 }
173}
174
175#[cfg(feature = "s3")]
176use tokio::sync::mpsc;
177
178use crate::config::S3Config;
179
180#[cfg(feature = "s3")]
182enum S3SyncMessage {
183 Upload { hash: Hash, data: Vec<u8> },
184 Delete { hash: Hash },
185}
186
187pub struct StorageRouter {
192 local: Arc<LocalStore>,
194 #[cfg(feature = "s3")]
196 s3_client: Option<aws_sdk_s3::Client>,
197 #[cfg(feature = "s3")]
198 s3_bucket: Option<String>,
199 #[cfg(feature = "s3")]
200 s3_prefix: String,
201 #[cfg(feature = "s3")]
203 sync_tx: Option<mpsc::UnboundedSender<S3SyncMessage>>,
204}
205
206impl StorageRouter {
207 pub fn new(local: Arc<LocalStore>) -> Self {
209 Self {
210 local,
211 #[cfg(feature = "s3")]
212 s3_client: None,
213 #[cfg(feature = "s3")]
214 s3_bucket: None,
215 #[cfg(feature = "s3")]
216 s3_prefix: String::new(),
217 #[cfg(feature = "s3")]
218 sync_tx: None,
219 }
220 }
221
222 #[cfg(feature = "s3")]
224 pub async fn with_s3(local: Arc<LocalStore>, config: &S3Config) -> Result<Self, anyhow::Error> {
225 use aws_sdk_s3::Client as S3Client;
226
227 let mut aws_config_loader = aws_config::from_env();
229 aws_config_loader = aws_config_loader.region(aws_sdk_s3::config::Region::new(config.region.clone()));
230 let aws_config = aws_config_loader.load().await;
231
232 let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&aws_config);
234 s3_config_builder = s3_config_builder
235 .endpoint_url(&config.endpoint)
236 .force_path_style(true);
237
238 let s3_client = S3Client::from_conf(s3_config_builder.build());
239 let bucket = config.bucket.clone();
240 let prefix = config.prefix.clone().unwrap_or_default();
241
242 let (sync_tx, mut sync_rx) = mpsc::unbounded_channel::<S3SyncMessage>();
244
245 let sync_client = s3_client.clone();
247 let sync_bucket = bucket.clone();
248 let sync_prefix = prefix.clone();
249
250 tokio::spawn(async move {
251 use aws_sdk_s3::primitives::ByteStream;
252
253 tracing::info!("S3 background sync task started");
254
255 let semaphore = std::sync::Arc::new(tokio::sync::Semaphore::new(32));
257 let client = std::sync::Arc::new(sync_client);
258 let bucket = std::sync::Arc::new(sync_bucket);
259 let prefix = std::sync::Arc::new(sync_prefix);
260
261 while let Some(msg) = sync_rx.recv().await {
262 let client = client.clone();
263 let bucket = bucket.clone();
264 let prefix = prefix.clone();
265 let semaphore = semaphore.clone();
266
267 tokio::spawn(async move {
269 let _permit = semaphore.acquire().await;
271
272 match msg {
273 S3SyncMessage::Upload { hash, data } => {
274 let key = format!("{}{}.bin", prefix, to_hex(&hash));
275 tracing::debug!("S3 uploading {} ({} bytes)", &key, data.len());
276
277 match client
278 .put_object()
279 .bucket(bucket.as_str())
280 .key(&key)
281 .body(ByteStream::from(data))
282 .send()
283 .await
284 {
285 Ok(_) => tracing::debug!("S3 upload succeeded: {}", &key),
286 Err(e) => tracing::error!("S3 upload failed {}: {}", &key, e),
287 }
288 }
289 S3SyncMessage::Delete { hash } => {
290 let key = format!("{}{}.bin", prefix, to_hex(&hash));
291 tracing::debug!("S3 deleting {}", &key);
292
293 if let Err(e) = client
294 .delete_object()
295 .bucket(bucket.as_str())
296 .key(&key)
297 .send()
298 .await
299 {
300 tracing::error!("S3 delete failed {}: {}", &key, e);
301 }
302 }
303 }
304 });
305 }
306 });
307
308 tracing::info!("S3 storage initialized: bucket={}, prefix={}", bucket, prefix);
309
310 Ok(Self {
311 local,
312 s3_client: Some(s3_client),
313 s3_bucket: Some(bucket),
314 s3_prefix: prefix,
315 sync_tx: Some(sync_tx),
316 })
317 }
318
319 pub fn put_sync(&self, hash: Hash, data: &[u8]) -> Result<bool, StoreError> {
321 let is_new = self.local.put_sync(hash, data)?;
323
324 #[cfg(feature = "s3")]
327 if let Some(ref tx) = self.sync_tx {
328 tracing::info!("Queueing S3 upload for {} ({} bytes, is_new={})",
329 crate::storage::to_hex(&hash)[..16].to_string(), data.len(), is_new);
330 if let Err(e) = tx.send(S3SyncMessage::Upload { hash, data: data.to_vec() }) {
331 tracing::error!("Failed to queue S3 upload: {}", e);
332 }
333 }
334
335 Ok(is_new)
336 }
337
338 pub fn get_sync(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
340 if let Some(data) = self.local.get_sync(hash)? {
342 return Ok(Some(data));
343 }
344
345 #[cfg(feature = "s3")]
347 if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
348 let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
349
350 match sync_block_on(async {
351 client.get_object()
352 .bucket(bucket)
353 .key(&key)
354 .send()
355 .await
356 }) {
357 Ok(output) => {
358 if let Ok(body) = sync_block_on(output.body.collect()) {
359 let data = body.into_bytes().to_vec();
360 let _ = self.local.put_sync(*hash, &data);
362 return Ok(Some(data));
363 }
364 }
365 Err(e) => {
366 let service_err = e.into_service_error();
367 if !service_err.is_no_such_key() {
368 tracing::warn!("S3 get failed: {}", service_err);
369 }
370 }
371 }
372 }
373
374 Ok(None)
375 }
376
377 pub fn exists(&self, hash: &Hash) -> Result<bool, StoreError> {
379 if self.local.exists(hash)? {
381 return Ok(true);
382 }
383
384 #[cfg(feature = "s3")]
386 if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
387 let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
388
389 match sync_block_on(async {
390 client.head_object()
391 .bucket(bucket)
392 .key(&key)
393 .send()
394 .await
395 }) {
396 Ok(_) => return Ok(true),
397 Err(e) => {
398 let service_err = e.into_service_error();
399 if !service_err.is_not_found() {
400 tracing::warn!("S3 head failed: {}", service_err);
401 }
402 }
403 }
404 }
405
406 Ok(false)
407 }
408
409 pub fn delete_sync(&self, hash: &Hash) -> Result<bool, StoreError> {
411 let deleted = self.local.delete_sync(hash)?;
412
413 #[cfg(feature = "s3")]
415 if let Some(ref tx) = self.sync_tx {
416 let _ = tx.send(S3SyncMessage::Delete { hash: *hash });
417 }
418
419 Ok(deleted)
420 }
421
422 pub fn delete_local_only(&self, hash: &Hash) -> Result<bool, StoreError> {
425 self.local.delete_sync(hash)
426 }
427
428 pub fn stats(&self) -> Result<LocalStoreStats, StoreError> {
430 self.local.stats()
431 }
432
433 pub fn list(&self) -> Result<Vec<Hash>, StoreError> {
435 self.local.list()
436 }
437
438 pub fn local_store(&self) -> Arc<LocalStore> {
440 Arc::clone(&self.local)
441 }
442}
443
444#[async_trait]
447impl Store for StorageRouter {
448 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
449 self.put_sync(hash, &data)
450 }
451
452 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
453 self.get_sync(hash)
454 }
455
456 async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
457 self.exists(hash)
458 }
459
460 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
461 self.delete_sync(hash)
462 }
463}
464
465pub struct HashtreeStore {
466 env: heed::Env,
467 pins: Database<Bytes, Unit>,
469 blob_owners: Database<Bytes, Unit>,
471 pubkey_blobs: Database<Bytes, Bytes>,
473 tree_meta: Database<Bytes, Bytes>,
475 blob_trees: Database<Bytes, Unit>,
477 tree_refs: Database<Str, Bytes>,
479 cached_roots: Database<Str, Bytes>,
481 router: Arc<StorageRouter>,
483 max_size_bytes: u64,
485}
486
487impl HashtreeStore {
488 pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
490 Self::with_options(path, None, 10 * 1024 * 1024 * 1024)
491 }
492
493 pub fn with_s3<P: AsRef<Path>>(path: P, s3_config: Option<&S3Config>) -> Result<Self> {
495 Self::with_options(path, s3_config, 10 * 1024 * 1024 * 1024)
496 }
497
498 pub fn with_options<P: AsRef<Path>>(path: P, s3_config: Option<&S3Config>, max_size_bytes: u64) -> Result<Self> {
500 let path = path.as_ref();
501 std::fs::create_dir_all(path)?;
502
503 let env = unsafe {
504 EnvOpenOptions::new()
505 .map_size(10 * 1024 * 1024 * 1024) .max_dbs(8) .open(path)?
508 };
509
510 let mut wtxn = env.write_txn()?;
511 let pins = env.create_database(&mut wtxn, Some("pins"))?;
512 let blob_owners = env.create_database(&mut wtxn, Some("blob_owners"))?;
513 let pubkey_blobs = env.create_database(&mut wtxn, Some("pubkey_blobs"))?;
514 let tree_meta = env.create_database(&mut wtxn, Some("tree_meta"))?;
515 let blob_trees = env.create_database(&mut wtxn, Some("blob_trees"))?;
516 let tree_refs = env.create_database(&mut wtxn, Some("tree_refs"))?;
517 let cached_roots = env.create_database(&mut wtxn, Some("cached_roots"))?;
518 wtxn.commit()?;
519
520 let config = hashtree_config::Config::load_or_default();
522 let backend = &config.storage.backend;
523
524 let local_store = Arc::new(LocalStore::new(path.join("blobs"), backend)
526 .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?);
527
528 #[cfg(feature = "s3")]
530 let router = Arc::new(if let Some(s3_cfg) = s3_config {
531 tracing::info!("Initializing S3 storage backend: bucket={}, endpoint={}",
532 s3_cfg.bucket, s3_cfg.endpoint);
533
534 sync_block_on(async {
535 StorageRouter::with_s3(local_store, s3_cfg).await
536 })?
537 } else {
538 StorageRouter::new(local_store)
539 });
540
541 #[cfg(not(feature = "s3"))]
542 let router = Arc::new({
543 if s3_config.is_some() {
544 tracing::warn!("S3 config provided but S3 feature not enabled. Using local storage only.");
545 }
546 StorageRouter::new(local_store)
547 });
548
549 Ok(Self {
550 env,
551 pins,
552 blob_owners,
553 pubkey_blobs,
554 tree_meta,
555 blob_trees,
556 tree_refs,
557 cached_roots,
558 router,
559 max_size_bytes,
560 })
561 }
562
563 pub fn router(&self) -> &StorageRouter {
565 &self.router
566 }
567
568 pub fn store_arc(&self) -> Arc<StorageRouter> {
571 Arc::clone(&self.router)
572 }
573
574 pub fn upload_file<P: AsRef<Path>>(&self, file_path: P) -> Result<String> {
576 self.upload_file_internal(file_path, true)
577 }
578
579 pub fn upload_file_no_pin<P: AsRef<Path>>(&self, file_path: P) -> Result<String> {
581 self.upload_file_internal(file_path, false)
582 }
583
584 fn upload_file_internal<P: AsRef<Path>>(&self, file_path: P, pin: bool) -> Result<String> {
585 let file_path = file_path.as_ref();
586 let file_content = std::fs::read(file_path)?;
587
588 let store = self.store_arc();
590 let tree = HashTree::new(HashTreeConfig::new(store).public());
591
592 let (cid, _size) = sync_block_on(async {
593 tree.put(&file_content).await
594 }).context("Failed to store file")?;
595
596 if pin {
598 let mut wtxn = self.env.write_txn()?;
599 self.pins.put(&mut wtxn, cid.hash.as_slice(), &())?;
600 wtxn.commit()?;
601 }
602
603 Ok(to_hex(&cid.hash))
604 }
605
606 pub fn upload_file_stream<R: Read, F>(
608 &self,
609 mut reader: R,
610 _file_name: impl Into<String>,
611 mut callback: F,
612 ) -> Result<String>
613 where
614 F: FnMut(&str),
615 {
616 let mut data = Vec::new();
617 reader.read_to_end(&mut data)?;
618
619 let store = self.store_arc();
621 let tree = HashTree::new(HashTreeConfig::new(store).public());
622
623 let (cid, _size) = sync_block_on(async {
624 tree.put(&data).await
625 }).context("Failed to store file")?;
626
627 let root_hex = to_hex(&cid.hash);
628 callback(&root_hex);
629
630 let mut wtxn = self.env.write_txn()?;
632 self.pins.put(&mut wtxn, cid.hash.as_slice(), &())?;
633 wtxn.commit()?;
634
635 Ok(root_hex)
636 }
637
638 pub fn upload_dir<P: AsRef<Path>>(&self, dir_path: P) -> Result<String> {
641 self.upload_dir_with_options(dir_path, true)
642 }
643
644 pub fn upload_dir_with_options<P: AsRef<Path>>(&self, dir_path: P, respect_gitignore: bool) -> Result<String> {
646 let dir_path = dir_path.as_ref();
647
648 let store = self.store_arc();
649 let tree = HashTree::new(HashTreeConfig::new(store).public());
650
651 let root_cid = sync_block_on(async {
652 self.upload_dir_recursive(&tree, dir_path, dir_path, respect_gitignore).await
653 }).context("Failed to upload directory")?;
654
655 let root_hex = to_hex(&root_cid.hash);
656
657 let mut wtxn = self.env.write_txn()?;
658 self.pins.put(&mut wtxn, root_cid.hash.as_slice(), &())?;
659 wtxn.commit()?;
660
661 Ok(root_hex)
662 }
663
664 async fn upload_dir_recursive<S: Store>(
665 &self,
666 tree: &HashTree<S>,
667 _root_path: &Path,
668 current_path: &Path,
669 respect_gitignore: bool,
670 ) -> Result<Cid> {
671 use ignore::WalkBuilder;
672 use std::collections::HashMap;
673
674 let mut dir_contents: HashMap<String, Vec<(String, Cid)>> = HashMap::new();
676 dir_contents.insert(String::new(), Vec::new()); let walker = WalkBuilder::new(current_path)
679 .git_ignore(respect_gitignore)
680 .git_global(respect_gitignore)
681 .git_exclude(respect_gitignore)
682 .hidden(false)
683 .build();
684
685 for result in walker {
686 let entry = result?;
687 let path = entry.path();
688
689 if path == current_path {
691 continue;
692 }
693
694 let relative = path.strip_prefix(current_path)
695 .unwrap_or(path);
696
697 if path.is_file() {
698 let content = std::fs::read(path)?;
699 let (cid, _size) = tree.put(&content).await
700 .map_err(|e| anyhow::anyhow!("Failed to upload file {}: {}", path.display(), e))?;
701
702 let parent = relative.parent()
704 .map(|p| p.to_string_lossy().to_string())
705 .unwrap_or_default();
706 let name = relative.file_name()
707 .map(|n| n.to_string_lossy().to_string())
708 .unwrap_or_default();
709
710 dir_contents.entry(parent).or_default().push((name, cid));
711 } else if path.is_dir() {
712 let dir_path = relative.to_string_lossy().to_string();
714 dir_contents.entry(dir_path).or_default();
715 }
716 }
717
718 self.build_directory_tree(tree, &mut dir_contents).await
720 }
721
722 async fn build_directory_tree<S: Store>(
723 &self,
724 tree: &HashTree<S>,
725 dir_contents: &mut std::collections::HashMap<String, Vec<(String, Cid)>>,
726 ) -> Result<Cid> {
727 let mut dirs: Vec<String> = dir_contents.keys().cloned().collect();
729 dirs.sort_by(|a, b| {
730 let depth_a = a.matches('/').count() + if a.is_empty() { 0 } else { 1 };
731 let depth_b = b.matches('/').count() + if b.is_empty() { 0 } else { 1 };
732 depth_b.cmp(&depth_a) });
734
735 let mut dir_cids: std::collections::HashMap<String, Cid> = std::collections::HashMap::new();
736
737 for dir_path in dirs {
738 let files = dir_contents.get(&dir_path).cloned().unwrap_or_default();
739
740 let mut entries: Vec<HashTreeDirEntry> = files.into_iter()
741 .map(|(name, cid)| HashTreeDirEntry::from_cid(name, &cid))
742 .collect();
743
744 for (subdir_path, cid) in &dir_cids {
746 let parent = std::path::Path::new(subdir_path)
747 .parent()
748 .map(|p| p.to_string_lossy().to_string())
749 .unwrap_or_default();
750
751 if parent == dir_path {
752 let name = std::path::Path::new(subdir_path)
753 .file_name()
754 .map(|n| n.to_string_lossy().to_string())
755 .unwrap_or_default();
756 entries.push(HashTreeDirEntry::from_cid(name, cid));
757 }
758 }
759
760 let cid = tree.put_directory(entries).await
761 .map_err(|e| anyhow::anyhow!("Failed to create directory node: {}", e))?;
762
763 dir_cids.insert(dir_path, cid);
764 }
765
766 dir_cids.get("")
768 .cloned()
769 .ok_or_else(|| anyhow::anyhow!("No root directory"))
770 }
771
772 pub fn upload_file_encrypted<P: AsRef<Path>>(&self, file_path: P) -> Result<String> {
774 let file_path = file_path.as_ref();
775 let file_content = std::fs::read(file_path)?;
776
777 let store = self.store_arc();
779 let tree = HashTree::new(HashTreeConfig::new(store));
780
781 let (cid, _size) = sync_block_on(async {
782 tree.put(&file_content).await
783 }).map_err(|e| anyhow::anyhow!("Failed to encrypt file: {}", e))?;
784
785 let cid_str = cid.to_string();
786
787 let mut wtxn = self.env.write_txn()?;
788 self.pins.put(&mut wtxn, cid.hash.as_slice(), &())?;
789 wtxn.commit()?;
790
791 Ok(cid_str)
792 }
793
794 pub fn upload_dir_encrypted<P: AsRef<Path>>(&self, dir_path: P) -> Result<String> {
797 self.upload_dir_encrypted_with_options(dir_path, true)
798 }
799
800 pub fn upload_dir_encrypted_with_options<P: AsRef<Path>>(&self, dir_path: P, respect_gitignore: bool) -> Result<String> {
803 let dir_path = dir_path.as_ref();
804 let store = self.store_arc();
805
806 let tree = HashTree::new(HashTreeConfig::new(store));
808
809 let root_cid = sync_block_on(async {
810 self.upload_dir_recursive(&tree, dir_path, dir_path, respect_gitignore).await
811 }).context("Failed to upload encrypted directory")?;
812
813 let cid_str = root_cid.to_string(); let mut wtxn = self.env.write_txn()?;
816 self.pins.put(&mut wtxn, root_cid.hash.as_slice(), &())?;
818 wtxn.commit()?;
819
820 Ok(cid_str)
821 }
822
823 pub fn get_tree_node(&self, hash: &[u8; 32]) -> Result<Option<TreeNode>> {
825 let store = self.store_arc();
826 let tree = HashTree::new(HashTreeConfig::new(store).public());
827
828 sync_block_on(async {
829 tree.get_tree_node(hash).await
830 .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))
831 })
832 }
833
834 pub fn put_blob(&self, data: &[u8]) -> Result<String> {
836 let hash = sha256(data);
837 self.router.put_sync(hash, data)
838 .map_err(|e| anyhow::anyhow!("Failed to store blob: {}", e))?;
839 Ok(to_hex(&hash))
840 }
841
842 pub fn get_blob(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
844 self.router.get_sync(hash)
845 .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))
846 }
847
848 pub fn blob_exists(&self, hash: &[u8; 32]) -> Result<bool> {
850 self.router.exists(hash)
851 .map_err(|e| anyhow::anyhow!("Failed to check blob: {}", e))
852 }
853
854 fn blob_owner_key(sha256: &[u8; 32], pubkey: &[u8; 32]) -> [u8; 64] {
860 let mut key = [0u8; 64];
861 key[..32].copy_from_slice(sha256);
862 key[32..].copy_from_slice(pubkey);
863 key
864 }
865
866 pub fn set_blob_owner(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<()> {
869 let key = Self::blob_owner_key(sha256, pubkey);
870 let mut wtxn = self.env.write_txn()?;
871
872 self.blob_owners.put(&mut wtxn, &key[..], &())?;
874
875 let sha256_hex = to_hex(sha256);
877
878 let mut blobs: Vec<BlobMetadata> = self
880 .pubkey_blobs
881 .get(&wtxn, pubkey)?
882 .and_then(|b| serde_json::from_slice(b).ok())
883 .unwrap_or_default();
884
885 if !blobs.iter().any(|b| b.sha256 == sha256_hex) {
887 let now = SystemTime::now()
888 .duration_since(UNIX_EPOCH)
889 .unwrap()
890 .as_secs();
891
892 let size = self
894 .get_blob(sha256)?
895 .map(|data| data.len() as u64)
896 .unwrap_or(0);
897
898 blobs.push(BlobMetadata {
899 sha256: sha256_hex,
900 size,
901 mime_type: "application/octet-stream".to_string(),
902 uploaded: now,
903 });
904
905 let blobs_json = serde_json::to_vec(&blobs)?;
906 self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
907 }
908
909 wtxn.commit()?;
910 Ok(())
911 }
912
913 pub fn is_blob_owner(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<bool> {
915 let key = Self::blob_owner_key(sha256, pubkey);
916 let rtxn = self.env.read_txn()?;
917 Ok(self.blob_owners.get(&rtxn, &key[..])?.is_some())
918 }
919
920 pub fn get_blob_owners(&self, sha256: &[u8; 32]) -> Result<Vec<[u8; 32]>> {
922 let rtxn = self.env.read_txn()?;
923
924 let mut owners = Vec::new();
925 for item in self.blob_owners.prefix_iter(&rtxn, &sha256[..])? {
926 let (key, _) = item?;
927 if key.len() == 64 {
928 let mut pubkey = [0u8; 32];
930 pubkey.copy_from_slice(&key[32..64]);
931 owners.push(pubkey);
932 }
933 }
934 Ok(owners)
935 }
936
937 pub fn blob_has_owners(&self, sha256: &[u8; 32]) -> Result<bool> {
939 let rtxn = self.env.read_txn()?;
940
941 for item in self.blob_owners.prefix_iter(&rtxn, &sha256[..])? {
943 if item.is_ok() {
944 return Ok(true);
945 }
946 }
947 Ok(false)
948 }
949
950 pub fn get_blob_owner(&self, sha256: &[u8; 32]) -> Result<Option<[u8; 32]>> {
952 Ok(self.get_blob_owners(sha256)?.into_iter().next())
953 }
954
955 pub fn delete_blossom_blob(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<bool> {
959 let key = Self::blob_owner_key(sha256, pubkey);
960 let mut wtxn = self.env.write_txn()?;
961
962 self.blob_owners.delete(&mut wtxn, &key[..])?;
964
965 let sha256_hex = to_hex(sha256);
967
968 if let Some(blobs_bytes) = self.pubkey_blobs.get(&wtxn, pubkey)? {
970 if let Ok(mut blobs) = serde_json::from_slice::<Vec<BlobMetadata>>(blobs_bytes) {
971 blobs.retain(|b| b.sha256 != sha256_hex);
972 let blobs_json = serde_json::to_vec(&blobs)?;
973 self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
974 }
975 }
976
977 let mut has_other_owners = false;
979 for item in self.blob_owners.prefix_iter(&wtxn, &sha256[..])? {
980 if item.is_ok() {
981 has_other_owners = true;
982 break;
983 }
984 }
985
986 if has_other_owners {
987 wtxn.commit()?;
988 tracing::debug!(
989 "Removed {} from blob {} owners, other owners remain",
990 &to_hex(pubkey)[..8],
991 &sha256_hex[..8]
992 );
993 return Ok(false);
994 }
995
996 tracing::info!(
998 "All owners removed from blob {}, deleting",
999 &sha256_hex[..8]
1000 );
1001
1002 let _ = self.router.delete_sync(sha256);
1004
1005 wtxn.commit()?;
1006 Ok(true)
1007 }
1008
1009 pub fn list_blobs_by_pubkey(&self, pubkey: &[u8; 32]) -> Result<Vec<crate::server::blossom::BlobDescriptor>> {
1011 let rtxn = self.env.read_txn()?;
1012
1013 let blobs: Vec<BlobMetadata> = self
1014 .pubkey_blobs
1015 .get(&rtxn, pubkey)?
1016 .and_then(|b| serde_json::from_slice(b).ok())
1017 .unwrap_or_default();
1018
1019 Ok(blobs
1020 .into_iter()
1021 .map(|b| crate::server::blossom::BlobDescriptor {
1022 url: format!("/{}", b.sha256),
1023 sha256: b.sha256,
1024 size: b.size,
1025 mime_type: b.mime_type,
1026 uploaded: b.uploaded,
1027 })
1028 .collect())
1029 }
1030
1031 pub fn get_chunk(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
1033 self.router.get_sync(hash)
1034 .map_err(|e| anyhow::anyhow!("Failed to get chunk: {}", e))
1035 }
1036
1037 pub fn get_file(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
1040 let store = self.store_arc();
1041 let tree = HashTree::new(HashTreeConfig::new(store).public());
1042
1043 sync_block_on(async {
1044 tree.read_file(hash).await
1045 .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
1046 })
1047 }
1048
1049 pub fn get_file_by_cid(&self, cid: &Cid) -> Result<Option<Vec<u8>>> {
1052 let store = self.store_arc();
1053 let tree = HashTree::new(HashTreeConfig::new(store).public());
1054
1055 sync_block_on(async {
1056 tree.get(cid).await
1057 .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
1058 })
1059 }
1060
1061 pub fn resolve_path(&self, cid: &Cid, path: &str) -> Result<Option<Cid>> {
1063 let store = self.store_arc();
1064 let tree = HashTree::new(HashTreeConfig::new(store).public());
1065
1066 sync_block_on(async {
1067 tree.resolve_path(cid, path).await
1068 .map_err(|e| anyhow::anyhow!("Failed to resolve path: {}", e))
1069 })
1070 }
1071
1072 pub fn get_file_chunk_metadata(&self, hash: &[u8; 32]) -> Result<Option<FileChunkMetadata>> {
1074 let store = self.store_arc();
1075 let tree = HashTree::new(HashTreeConfig::new(store.clone()).public());
1076
1077 sync_block_on(async {
1078 let exists = store.has(&hash).await
1081 .map_err(|e| anyhow::anyhow!("Failed to check existence: {}", e))?;
1082
1083 if !exists {
1084 return Ok(None);
1085 }
1086
1087 let total_size = tree.get_size(&hash).await
1089 .map_err(|e| anyhow::anyhow!("Failed to get size: {}", e))?;
1090
1091 let is_tree_node = tree.is_tree(&hash).await
1093 .map_err(|e| anyhow::anyhow!("Failed to check tree: {}", e))?;
1094
1095 if !is_tree_node {
1096 return Ok(Some(FileChunkMetadata {
1098 total_size,
1099 chunk_hashes: vec![],
1100 chunk_sizes: vec![],
1101 is_chunked: false,
1102 }));
1103 }
1104
1105 let node = match tree.get_tree_node(&hash).await
1107 .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))? {
1108 Some(n) => n,
1109 None => return Ok(None),
1110 };
1111
1112 let is_directory = tree.is_directory(&hash).await
1114 .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1115
1116 if is_directory {
1117 return Ok(None); }
1119
1120 let chunk_hashes: Vec<Hash> = node.links.iter().map(|l| l.hash).collect();
1122 let chunk_sizes: Vec<u64> = node.links.iter().map(|l| l.size).collect();
1123
1124 Ok(Some(FileChunkMetadata {
1125 total_size,
1126 chunk_hashes,
1127 chunk_sizes,
1128 is_chunked: !node.links.is_empty(),
1129 }))
1130 })
1131 }
1132
1133 pub fn get_file_range(&self, hash: &[u8; 32], start: u64, end: Option<u64>) -> Result<Option<(Vec<u8>, u64)>> {
1135 let metadata = match self.get_file_chunk_metadata(hash)? {
1136 Some(m) => m,
1137 None => return Ok(None),
1138 };
1139
1140 if metadata.total_size == 0 {
1141 return Ok(Some((Vec::new(), 0)));
1142 }
1143
1144 if start >= metadata.total_size {
1145 return Ok(None);
1146 }
1147
1148 let end = end.unwrap_or(metadata.total_size - 1).min(metadata.total_size - 1);
1149
1150 if !metadata.is_chunked {
1152 let content = self.get_file(hash)?.unwrap_or_default();
1153 let range_content = if start < content.len() as u64 {
1154 content[start as usize..=(end as usize).min(content.len() - 1)].to_vec()
1155 } else {
1156 Vec::new()
1157 };
1158 return Ok(Some((range_content, metadata.total_size)));
1159 }
1160
1161 let mut result = Vec::new();
1163 let mut current_offset = 0u64;
1164
1165 for (i, chunk_hash) in metadata.chunk_hashes.iter().enumerate() {
1166 let chunk_size = metadata.chunk_sizes[i];
1167 let chunk_end = current_offset + chunk_size - 1;
1168
1169 if chunk_end >= start && current_offset <= end {
1171 let chunk_content = match self.get_chunk(chunk_hash)? {
1172 Some(content) => content,
1173 None => {
1174 return Err(anyhow::anyhow!("Chunk {} not found", to_hex(chunk_hash)));
1175 }
1176 };
1177
1178 let chunk_read_start = if current_offset >= start {
1179 0
1180 } else {
1181 (start - current_offset) as usize
1182 };
1183
1184 let chunk_read_end = if chunk_end <= end {
1185 chunk_size as usize - 1
1186 } else {
1187 (end - current_offset) as usize
1188 };
1189
1190 result.extend_from_slice(&chunk_content[chunk_read_start..=chunk_read_end]);
1191 }
1192
1193 current_offset += chunk_size;
1194
1195 if current_offset > end {
1196 break;
1197 }
1198 }
1199
1200 Ok(Some((result, metadata.total_size)))
1201 }
1202
1203 pub fn stream_file_range_chunks_owned(
1205 self: Arc<Self>,
1206 hash: &[u8; 32],
1207 start: u64,
1208 end: u64,
1209 ) -> Result<Option<FileRangeChunksOwned>> {
1210 let metadata = match self.get_file_chunk_metadata(hash)? {
1211 Some(m) => m,
1212 None => return Ok(None),
1213 };
1214
1215 if metadata.total_size == 0 || start >= metadata.total_size {
1216 return Ok(None);
1217 }
1218
1219 let end = end.min(metadata.total_size - 1);
1220
1221 Ok(Some(FileRangeChunksOwned {
1222 store: self,
1223 metadata,
1224 start,
1225 end,
1226 current_chunk_idx: 0,
1227 current_offset: 0,
1228 }))
1229 }
1230
1231 pub fn get_directory_listing(&self, hash: &[u8; 32]) -> Result<Option<DirectoryListing>> {
1233 let store = self.store_arc();
1234 let tree = HashTree::new(HashTreeConfig::new(store).public());
1235
1236 sync_block_on(async {
1237 let is_dir = tree.is_directory(&hash).await
1239 .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1240
1241 if !is_dir {
1242 return Ok(None);
1243 }
1244
1245 let cid = hashtree_core::Cid::public(*hash);
1247 let tree_entries = tree.list_directory(&cid).await
1248 .map_err(|e| anyhow::anyhow!("Failed to list directory: {}", e))?;
1249
1250 let entries: Vec<DirEntry> = tree_entries.into_iter().map(|e| DirEntry {
1251 name: e.name,
1252 cid: to_hex(&e.hash),
1253 is_directory: e.link_type.is_tree(),
1254 size: e.size,
1255 }).collect();
1256
1257 Ok(Some(DirectoryListing {
1258 dir_name: String::new(),
1259 entries,
1260 }))
1261 })
1262 }
1263
1264 pub fn pin(&self, hash: &[u8; 32]) -> Result<()> {
1266 let mut wtxn = self.env.write_txn()?;
1267 self.pins.put(&mut wtxn, hash.as_slice(), &())?;
1268 wtxn.commit()?;
1269 Ok(())
1270 }
1271
1272 pub fn unpin(&self, hash: &[u8; 32]) -> Result<()> {
1274 let mut wtxn = self.env.write_txn()?;
1275 self.pins.delete(&mut wtxn, hash.as_slice())?;
1276 wtxn.commit()?;
1277 Ok(())
1278 }
1279
1280 pub fn is_pinned(&self, hash: &[u8; 32]) -> Result<bool> {
1282 let rtxn = self.env.read_txn()?;
1283 Ok(self.pins.get(&rtxn, hash.as_slice())?.is_some())
1284 }
1285
1286 pub fn list_pins_raw(&self) -> Result<Vec<[u8; 32]>> {
1288 let rtxn = self.env.read_txn()?;
1289 let mut pins = Vec::new();
1290
1291 for item in self.pins.iter(&rtxn)? {
1292 let (hash_bytes, _) = item?;
1293 if hash_bytes.len() == 32 {
1294 let mut hash = [0u8; 32];
1295 hash.copy_from_slice(hash_bytes);
1296 pins.push(hash);
1297 }
1298 }
1299
1300 Ok(pins)
1301 }
1302
1303 pub fn list_pins_with_names(&self) -> Result<Vec<PinnedItem>> {
1305 let rtxn = self.env.read_txn()?;
1306 let store = self.store_arc();
1307 let tree = HashTree::new(HashTreeConfig::new(store).public());
1308 let mut pins = Vec::new();
1309
1310 for item in self.pins.iter(&rtxn)? {
1311 let (hash_bytes, _) = item?;
1312 if hash_bytes.len() != 32 {
1313 continue;
1314 }
1315 let mut hash = [0u8; 32];
1316 hash.copy_from_slice(hash_bytes);
1317
1318 let is_directory = sync_block_on(async {
1320 tree.is_directory(&hash).await.unwrap_or(false)
1321 });
1322
1323 pins.push(PinnedItem {
1324 cid: to_hex(&hash),
1325 name: "Unknown".to_string(),
1326 is_directory,
1327 });
1328 }
1329
1330 Ok(pins)
1331 }
1332
1333 pub fn index_tree(
1340 &self,
1341 root_hash: &Hash,
1342 owner: &str,
1343 name: Option<&str>,
1344 priority: u8,
1345 ref_key: Option<&str>,
1346 ) -> Result<()> {
1347 let root_hex = to_hex(root_hash);
1348
1349 if let Some(key) = ref_key {
1351 let rtxn = self.env.read_txn()?;
1352 if let Some(old_hash_bytes) = self.tree_refs.get(&rtxn, key)? {
1353 if old_hash_bytes != root_hash.as_slice() {
1354 let old_hash: Hash = old_hash_bytes.try_into()
1355 .map_err(|_| anyhow::anyhow!("Invalid hash in tree_refs"))?;
1356 drop(rtxn);
1357 let _ = self.unindex_tree(&old_hash);
1359 tracing::debug!("Replaced old tree for ref {}", key);
1360 }
1361 }
1362 }
1363
1364 let store = self.store_arc();
1365 let tree = HashTree::new(HashTreeConfig::new(store).public());
1366
1367 let (blob_hashes, total_size) = sync_block_on(async {
1369 self.collect_tree_blobs(&tree, root_hash).await
1370 })?;
1371
1372 let mut wtxn = self.env.write_txn()?;
1373
1374 for blob_hash in &blob_hashes {
1376 let mut key = [0u8; 64];
1377 key[..32].copy_from_slice(blob_hash);
1378 key[32..].copy_from_slice(root_hash);
1379 self.blob_trees.put(&mut wtxn, &key[..], &())?;
1380 }
1381
1382 let meta = TreeMeta {
1384 owner: owner.to_string(),
1385 name: name.map(|s| s.to_string()),
1386 synced_at: SystemTime::now()
1387 .duration_since(UNIX_EPOCH)
1388 .unwrap()
1389 .as_secs(),
1390 total_size,
1391 priority,
1392 };
1393 let meta_bytes = rmp_serde::to_vec(&meta)
1394 .map_err(|e| anyhow::anyhow!("Failed to serialize TreeMeta: {}", e))?;
1395 self.tree_meta.put(&mut wtxn, root_hash.as_slice(), &meta_bytes)?;
1396
1397 if let Some(key) = ref_key {
1399 self.tree_refs.put(&mut wtxn, key, root_hash.as_slice())?;
1400 }
1401
1402 wtxn.commit()?;
1403
1404 tracing::debug!(
1405 "Indexed tree {} ({} blobs, {} bytes, priority {})",
1406 &root_hex[..8],
1407 blob_hashes.len(),
1408 total_size,
1409 priority
1410 );
1411
1412 Ok(())
1413 }
1414
1415 async fn collect_tree_blobs<S: Store>(
1417 &self,
1418 tree: &HashTree<S>,
1419 root: &Hash,
1420 ) -> Result<(Vec<Hash>, u64)> {
1421 let mut blobs = Vec::new();
1422 let mut total_size = 0u64;
1423 let mut stack = vec![*root];
1424
1425 while let Some(hash) = stack.pop() {
1426 let is_tree = tree.is_tree(&hash).await
1428 .map_err(|e| anyhow::anyhow!("Failed to check tree: {}", e))?;
1429
1430 if is_tree {
1431 if let Some(node) = tree.get_tree_node(&hash).await
1433 .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
1434 {
1435 for link in &node.links {
1436 stack.push(link.hash);
1437 }
1438 }
1439 } else {
1440 if let Some(data) = self.router.get_sync(&hash)
1442 .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))?
1443 {
1444 total_size += data.len() as u64;
1445 blobs.push(hash);
1446 }
1447 }
1448 }
1449
1450 Ok((blobs, total_size))
1451 }
1452
1453 pub fn unindex_tree(&self, root_hash: &Hash) -> Result<u64> {
1456 let root_hex = to_hex(root_hash);
1457
1458 let store = self.store_arc();
1459 let tree = HashTree::new(HashTreeConfig::new(store).public());
1460
1461 let (blob_hashes, _) = sync_block_on(async {
1463 self.collect_tree_blobs(&tree, root_hash).await
1464 })?;
1465
1466 let mut wtxn = self.env.write_txn()?;
1467 let mut freed = 0u64;
1468
1469 for blob_hash in &blob_hashes {
1471 let mut key = [0u8; 64];
1473 key[..32].copy_from_slice(blob_hash);
1474 key[32..].copy_from_slice(root_hash);
1475 self.blob_trees.delete(&mut wtxn, &key[..])?;
1476
1477 let rtxn = self.env.read_txn()?;
1479 let mut has_other_tree = false;
1480
1481 for item in self.blob_trees.prefix_iter(&rtxn, &blob_hash[..])? {
1482 if item.is_ok() {
1483 has_other_tree = true;
1484 break;
1485 }
1486 }
1487 drop(rtxn);
1488
1489 if !has_other_tree {
1491 if let Some(data) = self.router.get_sync(blob_hash)
1492 .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))?
1493 {
1494 freed += data.len() as u64;
1495 self.router.delete_local_only(blob_hash)
1497 .map_err(|e| anyhow::anyhow!("Failed to delete blob: {}", e))?;
1498 }
1499 }
1500 }
1501
1502 if let Some(data) = self.router.get_sync(root_hash)
1504 .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
1505 {
1506 freed += data.len() as u64;
1507 self.router.delete_local_only(root_hash)
1509 .map_err(|e| anyhow::anyhow!("Failed to delete tree node: {}", e))?;
1510 }
1511
1512 self.tree_meta.delete(&mut wtxn, root_hash.as_slice())?;
1514
1515 wtxn.commit()?;
1516
1517 tracing::debug!(
1518 "Unindexed tree {} ({} bytes freed)",
1519 &root_hex[..8],
1520 freed
1521 );
1522
1523 Ok(freed)
1524 }
1525
1526 pub fn get_tree_meta(&self, root_hash: &Hash) -> Result<Option<TreeMeta>> {
1528 let rtxn = self.env.read_txn()?;
1529 if let Some(bytes) = self.tree_meta.get(&rtxn, root_hash.as_slice())? {
1530 let meta: TreeMeta = rmp_serde::from_slice(bytes)
1531 .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
1532 Ok(Some(meta))
1533 } else {
1534 Ok(None)
1535 }
1536 }
1537
1538 pub fn list_indexed_trees(&self) -> Result<Vec<(Hash, TreeMeta)>> {
1540 let rtxn = self.env.read_txn()?;
1541 let mut trees = Vec::new();
1542
1543 for item in self.tree_meta.iter(&rtxn)? {
1544 let (hash_bytes, meta_bytes) = item?;
1545 let hash: Hash = hash_bytes.try_into()
1546 .map_err(|_| anyhow::anyhow!("Invalid hash in tree_meta"))?;
1547 let meta: TreeMeta = rmp_serde::from_slice(meta_bytes)
1548 .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
1549 trees.push((hash, meta));
1550 }
1551
1552 Ok(trees)
1553 }
1554
1555 pub fn tracked_size(&self) -> Result<u64> {
1557 let rtxn = self.env.read_txn()?;
1558 let mut total = 0u64;
1559
1560 for item in self.tree_meta.iter(&rtxn)? {
1561 let (_, bytes) = item?;
1562 let meta: TreeMeta = rmp_serde::from_slice(bytes)
1563 .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
1564 total += meta.total_size;
1565 }
1566
1567 Ok(total)
1568 }
1569
1570 fn get_evictable_trees(&self) -> Result<Vec<(Hash, TreeMeta)>> {
1572 let mut trees = self.list_indexed_trees()?;
1573
1574 trees.sort_by(|a, b| {
1576 match a.1.priority.cmp(&b.1.priority) {
1577 std::cmp::Ordering::Equal => a.1.synced_at.cmp(&b.1.synced_at),
1578 other => other,
1579 }
1580 });
1581
1582 Ok(trees)
1583 }
1584
1585 pub fn evict_if_needed(&self) -> Result<u64> {
1592 let stats = self.router.stats()
1594 .map_err(|e| anyhow::anyhow!("Failed to get stats: {}", e))?;
1595 let current = stats.total_bytes;
1596
1597 if current <= self.max_size_bytes {
1598 return Ok(0);
1599 }
1600
1601 let target = self.max_size_bytes * 90 / 100;
1603 let mut freed = 0u64;
1604 let mut current_size = current;
1605
1606 let orphan_freed = self.evict_orphaned_blobs()?;
1608 freed += orphan_freed;
1609 current_size = current_size.saturating_sub(orphan_freed);
1610
1611 if orphan_freed > 0 {
1612 tracing::info!("Evicted orphaned blobs: {} bytes freed", orphan_freed);
1613 }
1614
1615 if current_size <= target {
1617 if freed > 0 {
1618 tracing::info!("Eviction complete: {} bytes freed", freed);
1619 }
1620 return Ok(freed);
1621 }
1622
1623 let evictable = self.get_evictable_trees()?;
1626
1627 for (root_hash, meta) in evictable {
1628 if current_size <= target {
1629 break;
1630 }
1631
1632 let root_hex = to_hex(&root_hash);
1633
1634 if self.is_pinned(&root_hash)? {
1636 continue;
1637 }
1638
1639 let tree_freed = self.unindex_tree(&root_hash)?;
1640 freed += tree_freed;
1641 current_size = current_size.saturating_sub(tree_freed);
1642
1643 tracing::info!(
1644 "Evicted tree {} (owner={}, priority={}, {} bytes)",
1645 &root_hex[..8],
1646 &meta.owner[..8.min(meta.owner.len())],
1647 meta.priority,
1648 tree_freed
1649 );
1650 }
1651
1652 if freed > 0 {
1653 tracing::info!("Eviction complete: {} bytes freed", freed);
1654 }
1655
1656 Ok(freed)
1657 }
1658
1659 fn evict_orphaned_blobs(&self) -> Result<u64> {
1661 let mut freed = 0u64;
1662
1663 let all_hashes = self.router.list()
1665 .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
1666
1667 let rtxn = self.env.read_txn()?;
1669 let pinned: HashSet<Hash> = self.pins.iter(&rtxn)?
1670 .filter_map(|item| item.ok())
1671 .filter_map(|(hash_bytes, _)| {
1672 if hash_bytes.len() == 32 {
1673 let mut hash = [0u8; 32];
1674 hash.copy_from_slice(hash_bytes);
1675 Some(hash)
1676 } else {
1677 None
1678 }
1679 })
1680 .collect();
1681
1682 let mut blobs_in_trees: HashSet<Hash> = HashSet::new();
1685 for item in self.blob_trees.iter(&rtxn)? {
1686 if let Ok((key_bytes, _)) = item {
1687 if key_bytes.len() >= 32 {
1688 let blob_hash: Hash = key_bytes[..32].try_into().unwrap();
1689 blobs_in_trees.insert(blob_hash);
1690 }
1691 }
1692 }
1693 drop(rtxn);
1694
1695 for hash in all_hashes {
1697 if pinned.contains(&hash) {
1699 continue;
1700 }
1701
1702 if blobs_in_trees.contains(&hash) {
1704 continue;
1705 }
1706
1707 if let Ok(Some(data)) = self.router.get_sync(&hash) {
1709 freed += data.len() as u64;
1710 let _ = self.router.delete_local_only(&hash);
1711 tracing::debug!("Deleted orphaned blob {} ({} bytes)", &to_hex(&hash)[..8], data.len());
1712 }
1713 }
1714
1715 Ok(freed)
1716 }
1717
1718 pub fn max_size_bytes(&self) -> u64 {
1720 self.max_size_bytes
1721 }
1722
1723 pub fn storage_by_priority(&self) -> Result<StorageByPriority> {
1725 let rtxn = self.env.read_txn()?;
1726 let mut own = 0u64;
1727 let mut followed = 0u64;
1728 let mut other = 0u64;
1729
1730 for item in self.tree_meta.iter(&rtxn)? {
1731 let (_, bytes) = item?;
1732 let meta: TreeMeta = rmp_serde::from_slice(bytes)
1733 .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
1734
1735 if meta.priority >= PRIORITY_OWN {
1736 own += meta.total_size;
1737 } else if meta.priority >= PRIORITY_FOLLOWED {
1738 followed += meta.total_size;
1739 } else {
1740 other += meta.total_size;
1741 }
1742 }
1743
1744 Ok(StorageByPriority { own, followed, other })
1745 }
1746
1747 pub fn get_storage_stats(&self) -> Result<StorageStats> {
1749 let rtxn = self.env.read_txn()?;
1750 let total_pins = self.pins.len(&rtxn)? as usize;
1751
1752 let stats = self.router.stats()
1753 .map_err(|e| anyhow::anyhow!("Failed to get stats: {}", e))?;
1754
1755 Ok(StorageStats {
1756 total_dags: stats.count,
1757 pinned_dags: total_pins,
1758 total_bytes: stats.total_bytes,
1759 })
1760 }
1761
1762 pub fn get_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<Option<CachedRoot>> {
1766 let key = format!("{}/{}", pubkey_hex, tree_name);
1767 let rtxn = self.env.read_txn()?;
1768 if let Some(bytes) = self.cached_roots.get(&rtxn, &key)? {
1769 let root: CachedRoot = rmp_serde::from_slice(bytes)
1770 .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
1771 Ok(Some(root))
1772 } else {
1773 Ok(None)
1774 }
1775 }
1776
1777 pub fn set_cached_root(
1779 &self,
1780 pubkey_hex: &str,
1781 tree_name: &str,
1782 hash: &str,
1783 key: Option<&str>,
1784 visibility: &str,
1785 updated_at: u64,
1786 ) -> Result<()> {
1787 let db_key = format!("{}/{}", pubkey_hex, tree_name);
1788 let root = CachedRoot {
1789 hash: hash.to_string(),
1790 key: key.map(|k| k.to_string()),
1791 updated_at,
1792 visibility: visibility.to_string(),
1793 };
1794 let bytes = rmp_serde::to_vec(&root)
1795 .map_err(|e| anyhow::anyhow!("Failed to serialize CachedRoot: {}", e))?;
1796 let mut wtxn = self.env.write_txn()?;
1797 self.cached_roots.put(&mut wtxn, &db_key, &bytes)?;
1798 wtxn.commit()?;
1799 Ok(())
1800 }
1801
1802 pub fn list_cached_roots(&self, pubkey_hex: &str) -> Result<Vec<(String, CachedRoot)>> {
1804 let prefix = format!("{}/", pubkey_hex);
1805 let rtxn = self.env.read_txn()?;
1806 let mut results = Vec::new();
1807
1808 for item in self.cached_roots.iter(&rtxn)? {
1809 let (key, bytes) = item?;
1810 if key.starts_with(&prefix) {
1811 let tree_name = key.strip_prefix(&prefix).unwrap_or(key);
1812 let root: CachedRoot = rmp_serde::from_slice(bytes)
1813 .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
1814 results.push((tree_name.to_string(), root));
1815 }
1816 }
1817
1818 Ok(results)
1819 }
1820
1821 pub fn delete_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<bool> {
1823 let key = format!("{}/{}", pubkey_hex, tree_name);
1824 let mut wtxn = self.env.write_txn()?;
1825 let deleted = self.cached_roots.delete(&mut wtxn, &key)?;
1826 wtxn.commit()?;
1827 Ok(deleted)
1828 }
1829
1830 pub fn gc(&self) -> Result<GcStats> {
1832 let rtxn = self.env.read_txn()?;
1833
1834 let pinned: HashSet<Hash> = self.pins.iter(&rtxn)?
1836 .filter_map(|item| item.ok())
1837 .filter_map(|(hash_bytes, _)| {
1838 if hash_bytes.len() == 32 {
1839 let mut hash = [0u8; 32];
1840 hash.copy_from_slice(hash_bytes);
1841 Some(hash)
1842 } else {
1843 None
1844 }
1845 })
1846 .collect();
1847
1848 drop(rtxn);
1849
1850 let all_hashes = self.router.list()
1852 .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
1853
1854 let mut deleted = 0;
1856 let mut freed_bytes = 0u64;
1857
1858 for hash in all_hashes {
1859 if !pinned.contains(&hash) {
1860 if let Ok(Some(data)) = self.router.get_sync(&hash) {
1861 freed_bytes += data.len() as u64;
1862 let _ = self.router.delete_local_only(&hash);
1864 deleted += 1;
1865 }
1866 }
1867 }
1868
1869 Ok(GcStats {
1870 deleted_dags: deleted,
1871 freed_bytes,
1872 })
1873 }
1874
1875 pub fn verify_lmdb_integrity(&self, delete: bool) -> Result<VerifyResult> {
1878 let all_hashes = self.router.list()
1879 .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
1880
1881 let total = all_hashes.len();
1882 let mut valid = 0;
1883 let mut corrupted = 0;
1884 let mut deleted = 0;
1885 let mut corrupted_hashes = Vec::new();
1886
1887 for hash in &all_hashes {
1888 let hash_hex = to_hex(hash);
1889
1890 match self.router.get_sync(hash) {
1891 Ok(Some(data)) => {
1892 let actual_hash = sha256(&data);
1894
1895 if actual_hash == *hash {
1896 valid += 1;
1897 } else {
1898 corrupted += 1;
1899 let actual_hex = to_hex(&actual_hash);
1900 println!(" CORRUPTED: key={} actual={} size={}",
1901 &hash_hex[..16], &actual_hex[..16], data.len());
1902 corrupted_hashes.push(*hash);
1903 }
1904 }
1905 Ok(None) => {
1906 corrupted += 1;
1908 println!(" MISSING: key={}", &hash_hex[..16]);
1909 corrupted_hashes.push(*hash);
1910 }
1911 Err(e) => {
1912 corrupted += 1;
1913 println!(" ERROR: key={} err={}", &hash_hex[..16], e);
1914 corrupted_hashes.push(*hash);
1915 }
1916 }
1917 }
1918
1919 if delete {
1921 for hash in &corrupted_hashes {
1922 match self.router.delete_sync(hash) {
1923 Ok(true) => deleted += 1,
1924 Ok(false) => {} Err(e) => {
1926 let hash_hex = to_hex(hash);
1927 println!(" Failed to delete {}: {}", &hash_hex[..16], e);
1928 }
1929 }
1930 }
1931 }
1932
1933 Ok(VerifyResult {
1934 total,
1935 valid,
1936 corrupted,
1937 deleted,
1938 })
1939 }
1940
1941 #[cfg(feature = "s3")]
1944 pub async fn verify_r2_integrity(&self, delete: bool) -> Result<VerifyResult> {
1945 use aws_sdk_s3::Client as S3Client;
1946
1947 let config = crate::config::Config::load()?;
1950 let s3_config = config.storage.s3
1951 .ok_or_else(|| anyhow::anyhow!("S3 not configured"))?;
1952
1953 let aws_config = aws_config::from_env()
1955 .region(aws_sdk_s3::config::Region::new(s3_config.region.clone()))
1956 .load()
1957 .await;
1958
1959 let s3_client = S3Client::from_conf(
1960 aws_sdk_s3::config::Builder::from(&aws_config)
1961 .endpoint_url(&s3_config.endpoint)
1962 .force_path_style(true)
1963 .build()
1964 );
1965
1966 let bucket = &s3_config.bucket;
1967 let prefix = s3_config.prefix.as_deref().unwrap_or("");
1968
1969 let mut total = 0;
1970 let mut valid = 0;
1971 let mut corrupted = 0;
1972 let mut deleted = 0;
1973 let mut corrupted_keys = Vec::new();
1974
1975 let mut continuation_token: Option<String> = None;
1977
1978 loop {
1979 let mut list_req = s3_client.list_objects_v2()
1980 .bucket(bucket)
1981 .prefix(prefix);
1982
1983 if let Some(ref token) = continuation_token {
1984 list_req = list_req.continuation_token(token);
1985 }
1986
1987 let list_resp = list_req.send().await
1988 .map_err(|e| anyhow::anyhow!("Failed to list S3 objects: {}", e))?;
1989
1990 for object in list_resp.contents() {
1991 let key = object.key().unwrap_or("");
1992
1993 if !key.ends_with(".bin") {
1995 continue;
1996 }
1997
1998 total += 1;
1999
2000 let filename = key.strip_prefix(prefix).unwrap_or(key);
2002 let expected_hash_hex = filename.strip_suffix(".bin").unwrap_or(filename);
2003
2004 if expected_hash_hex.len() != 64 {
2006 corrupted += 1;
2007 println!(" INVALID KEY: {}", key);
2008 corrupted_keys.push(key.to_string());
2009 continue;
2010 }
2011
2012 let expected_hash = match from_hex(expected_hash_hex) {
2013 Ok(h) => h,
2014 Err(_) => {
2015 corrupted += 1;
2016 println!(" INVALID HEX: {}", key);
2017 corrupted_keys.push(key.to_string());
2018 continue;
2019 }
2020 };
2021
2022 match s3_client.get_object()
2024 .bucket(bucket)
2025 .key(key)
2026 .send()
2027 .await
2028 {
2029 Ok(resp) => {
2030 match resp.body.collect().await {
2031 Ok(bytes) => {
2032 let data = bytes.into_bytes();
2033 let actual_hash = sha256(&data);
2034
2035 if actual_hash == expected_hash {
2036 valid += 1;
2037 } else {
2038 corrupted += 1;
2039 let actual_hex = to_hex(&actual_hash);
2040 println!(" CORRUPTED: key={} actual={} size={}",
2041 &expected_hash_hex[..16], &actual_hex[..16], data.len());
2042 corrupted_keys.push(key.to_string());
2043 }
2044 }
2045 Err(e) => {
2046 corrupted += 1;
2047 println!(" READ ERROR: {} - {}", key, e);
2048 corrupted_keys.push(key.to_string());
2049 }
2050 }
2051 }
2052 Err(e) => {
2053 corrupted += 1;
2054 println!(" FETCH ERROR: {} - {}", key, e);
2055 corrupted_keys.push(key.to_string());
2056 }
2057 }
2058
2059 if total % 100 == 0 {
2061 println!(" Progress: {} objects checked, {} corrupted so far", total, corrupted);
2062 }
2063 }
2064
2065 if list_resp.is_truncated() == Some(true) {
2067 continuation_token = list_resp.next_continuation_token().map(|s| s.to_string());
2068 } else {
2069 break;
2070 }
2071 }
2072
2073 if delete {
2075 for key in &corrupted_keys {
2076 match s3_client.delete_object()
2077 .bucket(bucket)
2078 .key(key)
2079 .send()
2080 .await
2081 {
2082 Ok(_) => deleted += 1,
2083 Err(e) => {
2084 println!(" Failed to delete {}: {}", key, e);
2085 }
2086 }
2087 }
2088 }
2089
2090 Ok(VerifyResult {
2091 total,
2092 valid,
2093 corrupted,
2094 deleted,
2095 })
2096 }
2097
2098 #[cfg(not(feature = "s3"))]
2100 pub async fn verify_r2_integrity(&self, _delete: bool) -> Result<VerifyResult> {
2101 Err(anyhow::anyhow!("S3 feature not enabled"))
2102 }
2103}
2104
2105#[derive(Debug, Clone)]
2107pub struct VerifyResult {
2108 pub total: usize,
2109 pub valid: usize,
2110 pub corrupted: usize,
2111 pub deleted: usize,
2112}
2113
2114#[derive(Debug)]
2115pub struct StorageStats {
2116 pub total_dags: usize,
2117 pub pinned_dags: usize,
2118 pub total_bytes: u64,
2119}
2120
2121#[derive(Debug, Clone)]
2123pub struct StorageByPriority {
2124 pub own: u64,
2126 pub followed: u64,
2128 pub other: u64,
2130}
2131
2132#[derive(Debug, Clone)]
2133pub struct FileChunkMetadata {
2134 pub total_size: u64,
2135 pub chunk_hashes: Vec<Hash>,
2136 pub chunk_sizes: Vec<u64>,
2137 pub is_chunked: bool,
2138}
2139
2140pub struct FileRangeChunksOwned {
2142 store: Arc<HashtreeStore>,
2143 metadata: FileChunkMetadata,
2144 start: u64,
2145 end: u64,
2146 current_chunk_idx: usize,
2147 current_offset: u64,
2148}
2149
2150impl Iterator for FileRangeChunksOwned {
2151 type Item = Result<Vec<u8>>;
2152
2153 fn next(&mut self) -> Option<Self::Item> {
2154 if !self.metadata.is_chunked || self.current_chunk_idx >= self.metadata.chunk_hashes.len() {
2155 return None;
2156 }
2157
2158 if self.current_offset > self.end {
2159 return None;
2160 }
2161
2162 let chunk_hash = &self.metadata.chunk_hashes[self.current_chunk_idx];
2163 let chunk_size = self.metadata.chunk_sizes[self.current_chunk_idx];
2164 let chunk_end = self.current_offset + chunk_size - 1;
2165
2166 self.current_chunk_idx += 1;
2167
2168 if chunk_end < self.start || self.current_offset > self.end {
2169 self.current_offset += chunk_size;
2170 return self.next();
2171 }
2172
2173 let chunk_content = match self.store.get_chunk(chunk_hash) {
2174 Ok(Some(content)) => content,
2175 Ok(None) => {
2176 return Some(Err(anyhow::anyhow!("Chunk {} not found", to_hex(chunk_hash))));
2177 }
2178 Err(e) => {
2179 return Some(Err(e));
2180 }
2181 };
2182
2183 let chunk_read_start = if self.current_offset >= self.start {
2184 0
2185 } else {
2186 (self.start - self.current_offset) as usize
2187 };
2188
2189 let chunk_read_end = if chunk_end <= self.end {
2190 chunk_size as usize - 1
2191 } else {
2192 (self.end - self.current_offset) as usize
2193 };
2194
2195 let result = chunk_content[chunk_read_start..=chunk_read_end].to_vec();
2196 self.current_offset += chunk_size;
2197
2198 Some(Ok(result))
2199 }
2200}
2201
2202#[derive(Debug)]
2203pub struct GcStats {
2204 pub deleted_dags: usize,
2205 pub freed_bytes: u64,
2206}
2207
2208#[derive(Debug, Clone)]
2209pub struct DirEntry {
2210 pub name: String,
2211 pub cid: String,
2212 pub is_directory: bool,
2213 pub size: u64,
2214}
2215
2216#[derive(Debug, Clone)]
2217pub struct DirectoryListing {
2218 pub dir_name: String,
2219 pub entries: Vec<DirEntry>,
2220}
2221
2222#[derive(Debug, Clone)]
2223pub struct PinnedItem {
2224 pub cid: String,
2225 pub name: String,
2226 pub is_directory: bool,
2227}
2228
2229#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
2231pub struct BlobMetadata {
2232 pub sha256: String,
2233 pub size: u64,
2234 pub mime_type: String,
2235 pub uploaded: u64,
2236}
2237
2238impl crate::webrtc::ContentStore for HashtreeStore {
2240 fn get(&self, hash_hex: &str) -> Result<Option<Vec<u8>>> {
2241 let hash = from_hex(hash_hex)
2242 .map_err(|e| anyhow::anyhow!("Invalid hash: {}", e))?;
2243 self.get_chunk(&hash)
2244 }
2245}