1use anyhow::{Context, Result};
2use async_trait::async_trait;
3use heed::{Database, EnvOpenOptions};
4use heed::types::*;
5use hashtree_lmdb::LmdbBlobStore;
6use hashtree_core::{
7 HashTree, HashTreeConfig, Cid,
8 sha256, to_hex, from_hex, TreeNode, DirEntry as HashTreeDirEntry,
9 types::Hash,
10};
11use hashtree_core::store::{Store, StoreError};
12use serde::{Deserialize, Serialize};
13use std::path::Path;
14use std::collections::HashSet;
15use std::io::Read;
16use std::sync::Arc;
17use std::time::{SystemTime, UNIX_EPOCH};
18use futures::executor::block_on as sync_block_on;
19
20pub const PRIORITY_OTHER: u8 = 64;
22pub const PRIORITY_FOLLOWED: u8 = 128;
23pub const PRIORITY_OWN: u8 = 255;
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct TreeMeta {
28 pub owner: String,
30 pub name: Option<String>,
32 pub synced_at: u64,
34 pub total_size: u64,
36 pub priority: u8,
38}
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct CachedRoot {
43 pub hash: String,
45 pub key: Option<String>,
47 pub updated_at: u64,
49 pub visibility: String,
51}
52
53#[cfg(feature = "s3")]
54use tokio::sync::mpsc;
55
56use crate::config::S3Config;
57
58#[cfg(feature = "s3")]
60enum S3SyncMessage {
61 Upload { hash: Hash, data: Vec<u8> },
62 Delete { hash: Hash },
63}
64
65pub struct StorageRouter {
70 local: Arc<LmdbBlobStore>,
72 #[cfg(feature = "s3")]
74 s3_client: Option<aws_sdk_s3::Client>,
75 #[cfg(feature = "s3")]
76 s3_bucket: Option<String>,
77 #[cfg(feature = "s3")]
78 s3_prefix: String,
79 #[cfg(feature = "s3")]
81 sync_tx: Option<mpsc::UnboundedSender<S3SyncMessage>>,
82}
83
84impl StorageRouter {
85 pub fn new(local: Arc<LmdbBlobStore>) -> Self {
87 Self {
88 local,
89 #[cfg(feature = "s3")]
90 s3_client: None,
91 #[cfg(feature = "s3")]
92 s3_bucket: None,
93 #[cfg(feature = "s3")]
94 s3_prefix: String::new(),
95 #[cfg(feature = "s3")]
96 sync_tx: None,
97 }
98 }
99
100 #[cfg(feature = "s3")]
102 pub async fn with_s3(local: Arc<LmdbBlobStore>, config: &S3Config) -> Result<Self, anyhow::Error> {
103 use aws_sdk_s3::Client as S3Client;
104
105 let mut aws_config_loader = aws_config::from_env();
107 aws_config_loader = aws_config_loader.region(aws_sdk_s3::config::Region::new(config.region.clone()));
108 let aws_config = aws_config_loader.load().await;
109
110 let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&aws_config);
112 s3_config_builder = s3_config_builder
113 .endpoint_url(&config.endpoint)
114 .force_path_style(true);
115
116 let s3_client = S3Client::from_conf(s3_config_builder.build());
117 let bucket = config.bucket.clone();
118 let prefix = config.prefix.clone().unwrap_or_default();
119
120 let (sync_tx, mut sync_rx) = mpsc::unbounded_channel::<S3SyncMessage>();
122
123 let sync_client = s3_client.clone();
125 let sync_bucket = bucket.clone();
126 let sync_prefix = prefix.clone();
127
128 tokio::spawn(async move {
129 use aws_sdk_s3::primitives::ByteStream;
130
131 tracing::info!("S3 background sync task started");
132
133 let semaphore = std::sync::Arc::new(tokio::sync::Semaphore::new(32));
135 let client = std::sync::Arc::new(sync_client);
136 let bucket = std::sync::Arc::new(sync_bucket);
137 let prefix = std::sync::Arc::new(sync_prefix);
138
139 while let Some(msg) = sync_rx.recv().await {
140 let client = client.clone();
141 let bucket = bucket.clone();
142 let prefix = prefix.clone();
143 let semaphore = semaphore.clone();
144
145 tokio::spawn(async move {
147 let _permit = semaphore.acquire().await;
149
150 match msg {
151 S3SyncMessage::Upload { hash, data } => {
152 let key = format!("{}{}.bin", prefix, to_hex(&hash));
153 tracing::debug!("S3 uploading {} ({} bytes)", &key, data.len());
154
155 match client
156 .put_object()
157 .bucket(bucket.as_str())
158 .key(&key)
159 .body(ByteStream::from(data))
160 .send()
161 .await
162 {
163 Ok(_) => tracing::debug!("S3 upload succeeded: {}", &key),
164 Err(e) => tracing::error!("S3 upload failed {}: {}", &key, e),
165 }
166 }
167 S3SyncMessage::Delete { hash } => {
168 let key = format!("{}{}.bin", prefix, to_hex(&hash));
169 tracing::debug!("S3 deleting {}", &key);
170
171 if let Err(e) = client
172 .delete_object()
173 .bucket(bucket.as_str())
174 .key(&key)
175 .send()
176 .await
177 {
178 tracing::error!("S3 delete failed {}: {}", &key, e);
179 }
180 }
181 }
182 });
183 }
184 });
185
186 tracing::info!("S3 storage initialized: bucket={}, prefix={}", bucket, prefix);
187
188 Ok(Self {
189 local,
190 s3_client: Some(s3_client),
191 s3_bucket: Some(bucket),
192 s3_prefix: prefix,
193 sync_tx: Some(sync_tx),
194 })
195 }
196
197 pub fn put_sync(&self, hash: Hash, data: &[u8]) -> Result<bool, StoreError> {
199 let is_new = self.local.put_sync(hash, data)?;
201
202 #[cfg(feature = "s3")]
205 if let Some(ref tx) = self.sync_tx {
206 tracing::info!("Queueing S3 upload for {} ({} bytes, is_new={})",
207 crate::storage::to_hex(&hash)[..16].to_string(), data.len(), is_new);
208 if let Err(e) = tx.send(S3SyncMessage::Upload { hash, data: data.to_vec() }) {
209 tracing::error!("Failed to queue S3 upload: {}", e);
210 }
211 }
212
213 Ok(is_new)
214 }
215
216 pub fn get_sync(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
218 if let Some(data) = self.local.get_sync(hash)? {
220 return Ok(Some(data));
221 }
222
223 #[cfg(feature = "s3")]
225 if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
226 let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
227
228 match sync_block_on(async {
229 client.get_object()
230 .bucket(bucket)
231 .key(&key)
232 .send()
233 .await
234 }) {
235 Ok(output) => {
236 if let Ok(body) = sync_block_on(output.body.collect()) {
237 let data = body.into_bytes().to_vec();
238 let _ = self.local.put_sync(*hash, &data);
240 return Ok(Some(data));
241 }
242 }
243 Err(e) => {
244 let service_err = e.into_service_error();
245 if !service_err.is_no_such_key() {
246 tracing::warn!("S3 get failed: {}", service_err);
247 }
248 }
249 }
250 }
251
252 Ok(None)
253 }
254
255 pub fn exists(&self, hash: &Hash) -> Result<bool, StoreError> {
257 if self.local.exists(hash)? {
259 return Ok(true);
260 }
261
262 #[cfg(feature = "s3")]
264 if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
265 let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
266
267 match sync_block_on(async {
268 client.head_object()
269 .bucket(bucket)
270 .key(&key)
271 .send()
272 .await
273 }) {
274 Ok(_) => return Ok(true),
275 Err(e) => {
276 let service_err = e.into_service_error();
277 if !service_err.is_not_found() {
278 tracing::warn!("S3 head failed: {}", service_err);
279 }
280 }
281 }
282 }
283
284 Ok(false)
285 }
286
287 pub fn delete_sync(&self, hash: &Hash) -> Result<bool, StoreError> {
289 let deleted = self.local.delete_sync(hash)?;
290
291 #[cfg(feature = "s3")]
293 if let Some(ref tx) = self.sync_tx {
294 let _ = tx.send(S3SyncMessage::Delete { hash: *hash });
295 }
296
297 Ok(deleted)
298 }
299
300 pub fn delete_local_only(&self, hash: &Hash) -> Result<bool, StoreError> {
303 self.local.delete_sync(hash)
304 }
305
306 pub fn stats(&self) -> Result<hashtree_lmdb::LmdbStats, StoreError> {
308 self.local.stats()
309 }
310
311 pub fn list(&self) -> Result<Vec<Hash>, StoreError> {
313 self.local.list()
314 }
315
316 pub fn local_store(&self) -> Arc<LmdbBlobStore> {
318 Arc::clone(&self.local)
319 }
320}
321
322#[async_trait]
325impl Store for StorageRouter {
326 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
327 self.put_sync(hash, &data)
328 }
329
330 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
331 self.get_sync(hash)
332 }
333
334 async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
335 self.exists(hash)
336 }
337
338 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
339 self.delete_sync(hash)
340 }
341}
342
343pub struct HashtreeStore {
344 env: heed::Env,
345 pins: Database<Bytes, Unit>,
347 sha256_index: Database<Bytes, Bytes>,
349 blob_owners: Database<Bytes, Unit>,
351 pubkey_blobs: Database<Bytes, Bytes>,
353 tree_meta: Database<Bytes, Bytes>,
355 blob_trees: Database<Bytes, Unit>,
357 tree_refs: Database<Str, Bytes>,
359 cached_roots: Database<Str, Bytes>,
361 router: Arc<StorageRouter>,
363 max_size_bytes: u64,
365}
366
367impl HashtreeStore {
368 pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
370 Self::with_options(path, None, 10 * 1024 * 1024 * 1024)
371 }
372
373 pub fn with_s3<P: AsRef<Path>>(path: P, s3_config: Option<&S3Config>) -> Result<Self> {
375 Self::with_options(path, s3_config, 10 * 1024 * 1024 * 1024)
376 }
377
378 pub fn with_options<P: AsRef<Path>>(path: P, s3_config: Option<&S3Config>, max_size_bytes: u64) -> Result<Self> {
380 let path = path.as_ref();
381 std::fs::create_dir_all(path)?;
382
383 let env = unsafe {
384 EnvOpenOptions::new()
385 .map_size(10 * 1024 * 1024 * 1024) .max_dbs(9) .open(path)?
388 };
389
390 let mut wtxn = env.write_txn()?;
391 let pins = env.create_database(&mut wtxn, Some("pins"))?;
392 let sha256_index = env.create_database(&mut wtxn, Some("sha256_index"))?;
393 let blob_owners = env.create_database(&mut wtxn, Some("blob_owners"))?;
394 let pubkey_blobs = env.create_database(&mut wtxn, Some("pubkey_blobs"))?;
395 let tree_meta = env.create_database(&mut wtxn, Some("tree_meta"))?;
396 let blob_trees = env.create_database(&mut wtxn, Some("blob_trees"))?;
397 let tree_refs = env.create_database(&mut wtxn, Some("tree_refs"))?;
398 let cached_roots = env.create_database(&mut wtxn, Some("cached_roots"))?;
399 wtxn.commit()?;
400
401 let lmdb_store = Arc::new(LmdbBlobStore::new(path.join("blobs"))
403 .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?);
404
405 #[cfg(feature = "s3")]
407 let router = Arc::new(if let Some(s3_cfg) = s3_config {
408 tracing::info!("Initializing S3 storage backend: bucket={}, endpoint={}",
409 s3_cfg.bucket, s3_cfg.endpoint);
410
411 sync_block_on(async {
412 StorageRouter::with_s3(lmdb_store, s3_cfg).await
413 })?
414 } else {
415 StorageRouter::new(lmdb_store)
416 });
417
418 #[cfg(not(feature = "s3"))]
419 let router = Arc::new({
420 if s3_config.is_some() {
421 tracing::warn!("S3 config provided but S3 feature not enabled. Using local storage only.");
422 }
423 StorageRouter::new(lmdb_store)
424 });
425
426 Ok(Self {
427 env,
428 pins,
429 sha256_index,
430 blob_owners,
431 pubkey_blobs,
432 tree_meta,
433 blob_trees,
434 tree_refs,
435 cached_roots,
436 router,
437 max_size_bytes,
438 })
439 }
440
441 pub fn router(&self) -> &StorageRouter {
443 &self.router
444 }
445
446 pub fn store_arc(&self) -> Arc<StorageRouter> {
449 Arc::clone(&self.router)
450 }
451
452 pub fn upload_file<P: AsRef<Path>>(&self, file_path: P) -> Result<String> {
454 self.upload_file_internal(file_path, true)
455 }
456
457 pub fn upload_file_no_pin<P: AsRef<Path>>(&self, file_path: P) -> Result<String> {
459 self.upload_file_internal(file_path, false)
460 }
461
462 fn upload_file_internal<P: AsRef<Path>>(&self, file_path: P, pin: bool) -> Result<String> {
463 let file_path = file_path.as_ref();
464 let file_content = std::fs::read(file_path)?;
465
466 let content_sha256 = sha256(&file_content);
468
469 let store = self.store_arc();
471 let tree = HashTree::new(HashTreeConfig::new(store).public());
472
473 let cid = sync_block_on(async {
474 tree.put(&file_content).await
475 }).context("Failed to store file")?;
476
477 let mut wtxn = self.env.write_txn()?;
478
479 self.sha256_index.put(&mut wtxn, &content_sha256, &cid.hash)?;
481
482 if pin {
484 self.pins.put(&mut wtxn, cid.hash.as_slice(), &())?;
485 }
486
487 wtxn.commit()?;
488
489 Ok(to_hex(&cid.hash))
490 }
491
492 pub fn upload_file_stream<R: Read, F>(
494 &self,
495 mut reader: R,
496 _file_name: impl Into<String>,
497 mut callback: F,
498 ) -> Result<String>
499 where
500 F: FnMut(&str),
501 {
502 let mut data = Vec::new();
504 reader.read_to_end(&mut data)?;
505
506 let content_sha256 = sha256(&data);
508
509 let store = self.store_arc();
511 let tree = HashTree::new(HashTreeConfig::new(store).public());
512
513 let cid = sync_block_on(async {
514 tree.put(&data).await
515 }).context("Failed to store file")?;
516
517 let root_hex = to_hex(&cid.hash);
518 callback(&root_hex);
519
520 let mut wtxn = self.env.write_txn()?;
521
522 self.sha256_index.put(&mut wtxn, &content_sha256, &cid.hash)?;
524
525 self.pins.put(&mut wtxn, cid.hash.as_slice(), &())?;
527
528 wtxn.commit()?;
529
530 Ok(root_hex)
531 }
532
533 pub fn upload_dir<P: AsRef<Path>>(&self, dir_path: P) -> Result<String> {
536 self.upload_dir_with_options(dir_path, true)
537 }
538
539 pub fn upload_dir_with_options<P: AsRef<Path>>(&self, dir_path: P, respect_gitignore: bool) -> Result<String> {
541 let dir_path = dir_path.as_ref();
542
543 let store = self.store_arc();
544 let tree = HashTree::new(HashTreeConfig::new(store).public());
545
546 let root_cid = sync_block_on(async {
547 self.upload_dir_recursive(&tree, dir_path, dir_path, respect_gitignore).await
548 }).context("Failed to upload directory")?;
549
550 let root_hex = to_hex(&root_cid.hash);
551
552 let mut wtxn = self.env.write_txn()?;
553 self.pins.put(&mut wtxn, root_cid.hash.as_slice(), &())?;
554 wtxn.commit()?;
555
556 Ok(root_hex)
557 }
558
559 async fn upload_dir_recursive<S: Store>(
560 &self,
561 tree: &HashTree<S>,
562 _root_path: &Path,
563 current_path: &Path,
564 respect_gitignore: bool,
565 ) -> Result<Cid> {
566 use ignore::WalkBuilder;
567 use std::collections::HashMap;
568
569 let mut dir_contents: HashMap<String, Vec<(String, Cid)>> = HashMap::new();
571 dir_contents.insert(String::new(), Vec::new()); let walker = WalkBuilder::new(current_path)
574 .git_ignore(respect_gitignore)
575 .git_global(respect_gitignore)
576 .git_exclude(respect_gitignore)
577 .hidden(false)
578 .build();
579
580 for result in walker {
581 let entry = result?;
582 let path = entry.path();
583
584 if path == current_path {
586 continue;
587 }
588
589 let relative = path.strip_prefix(current_path)
590 .unwrap_or(path);
591
592 if path.is_file() {
593 let content = std::fs::read(path)?;
594 let cid = tree.put(&content).await
595 .map_err(|e| anyhow::anyhow!("Failed to upload file {}: {}", path.display(), e))?;
596
597 let parent = relative.parent()
599 .map(|p| p.to_string_lossy().to_string())
600 .unwrap_or_default();
601 let name = relative.file_name()
602 .map(|n| n.to_string_lossy().to_string())
603 .unwrap_or_default();
604
605 dir_contents.entry(parent).or_default().push((name, cid));
606 } else if path.is_dir() {
607 let dir_path = relative.to_string_lossy().to_string();
609 dir_contents.entry(dir_path).or_default();
610 }
611 }
612
613 self.build_directory_tree(tree, &mut dir_contents).await
615 }
616
617 async fn build_directory_tree<S: Store>(
618 &self,
619 tree: &HashTree<S>,
620 dir_contents: &mut std::collections::HashMap<String, Vec<(String, Cid)>>,
621 ) -> Result<Cid> {
622 let mut dirs: Vec<String> = dir_contents.keys().cloned().collect();
624 dirs.sort_by(|a, b| {
625 let depth_a = a.matches('/').count() + if a.is_empty() { 0 } else { 1 };
626 let depth_b = b.matches('/').count() + if b.is_empty() { 0 } else { 1 };
627 depth_b.cmp(&depth_a) });
629
630 let mut dir_cids: std::collections::HashMap<String, Cid> = std::collections::HashMap::new();
631
632 for dir_path in dirs {
633 let files = dir_contents.get(&dir_path).cloned().unwrap_or_default();
634
635 let mut entries: Vec<HashTreeDirEntry> = files.into_iter()
636 .map(|(name, cid)| HashTreeDirEntry::from_cid(name, &cid))
637 .collect();
638
639 for (subdir_path, cid) in &dir_cids {
641 let parent = std::path::Path::new(subdir_path)
642 .parent()
643 .map(|p| p.to_string_lossy().to_string())
644 .unwrap_or_default();
645
646 if parent == dir_path {
647 let name = std::path::Path::new(subdir_path)
648 .file_name()
649 .map(|n| n.to_string_lossy().to_string())
650 .unwrap_or_default();
651 entries.push(HashTreeDirEntry::from_cid(name, cid));
652 }
653 }
654
655 let cid = tree.put_directory(entries).await
656 .map_err(|e| anyhow::anyhow!("Failed to create directory node: {}", e))?;
657
658 dir_cids.insert(dir_path, cid);
659 }
660
661 dir_cids.get("")
663 .cloned()
664 .ok_or_else(|| anyhow::anyhow!("No root directory"))
665 }
666
667 pub fn upload_file_encrypted<P: AsRef<Path>>(&self, file_path: P) -> Result<String> {
669 let file_path = file_path.as_ref();
670 let file_content = std::fs::read(file_path)?;
671
672 let store = self.store_arc();
674 let tree = HashTree::new(HashTreeConfig::new(store));
675
676 let cid = sync_block_on(async {
677 tree.put(&file_content).await
678 }).map_err(|e| anyhow::anyhow!("Failed to encrypt file: {}", e))?;
679
680 let cid_str = cid.to_string();
681
682 let mut wtxn = self.env.write_txn()?;
683 self.pins.put(&mut wtxn, cid.hash.as_slice(), &())?;
684 wtxn.commit()?;
685
686 Ok(cid_str)
687 }
688
689 pub fn upload_dir_encrypted<P: AsRef<Path>>(&self, dir_path: P) -> Result<String> {
692 self.upload_dir_encrypted_with_options(dir_path, true)
693 }
694
695 pub fn upload_dir_encrypted_with_options<P: AsRef<Path>>(&self, dir_path: P, respect_gitignore: bool) -> Result<String> {
698 let dir_path = dir_path.as_ref();
699 let store = self.store_arc();
700
701 let tree = HashTree::new(HashTreeConfig::new(store));
703
704 let root_cid = sync_block_on(async {
705 self.upload_dir_recursive(&tree, dir_path, dir_path, respect_gitignore).await
706 }).context("Failed to upload encrypted directory")?;
707
708 let cid_str = root_cid.to_string(); let mut wtxn = self.env.write_txn()?;
711 self.pins.put(&mut wtxn, root_cid.hash.as_slice(), &())?;
713 wtxn.commit()?;
714
715 Ok(cid_str)
716 }
717
718 pub fn get_tree_node(&self, hash: &[u8; 32]) -> Result<Option<TreeNode>> {
720 let store = self.store_arc();
721 let tree = HashTree::new(HashTreeConfig::new(store).public());
722
723 sync_block_on(async {
724 tree.get_tree_node(hash).await
725 .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))
726 })
727 }
728
729 pub fn get_cid_by_sha256(&self, sha256: &[u8; 32]) -> Result<Option<Hash>> {
731 let rtxn = self.env.read_txn()?;
732 Ok(self.sha256_index.get(&rtxn, sha256)?.map(|bytes| {
733 let mut hash = [0u8; 32];
734 hash.copy_from_slice(bytes);
735 hash
736 }))
737 }
738
739 pub fn put_blob(&self, data: &[u8]) -> Result<String> {
741 let hash = sha256(data);
742 self.router.put_sync(hash, data)
743 .map_err(|e| anyhow::anyhow!("Failed to store blob: {}", e))?;
744 Ok(to_hex(&hash))
745 }
746
747 pub fn get_blob(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
749 self.router.get_sync(hash)
750 .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))
751 }
752
753 pub fn blob_exists(&self, hash: &[u8; 32]) -> Result<bool> {
755 self.router.exists(hash)
756 .map_err(|e| anyhow::anyhow!("Failed to check blob: {}", e))
757 }
758
759 fn blob_owner_key(sha256: &[u8; 32], pubkey: &[u8; 32]) -> [u8; 64] {
765 let mut key = [0u8; 64];
766 key[..32].copy_from_slice(sha256);
767 key[32..].copy_from_slice(pubkey);
768 key
769 }
770
771 pub fn set_blob_owner(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<()> {
774 let key = Self::blob_owner_key(sha256, pubkey);
775 let mut wtxn = self.env.write_txn()?;
776
777 self.blob_owners.put(&mut wtxn, &key[..], &())?;
779
780 let sha256_hex = to_hex(sha256);
782
783 let mut blobs: Vec<BlobMetadata> = self
785 .pubkey_blobs
786 .get(&wtxn, pubkey)?
787 .and_then(|b| serde_json::from_slice(b).ok())
788 .unwrap_or_default();
789
790 if !blobs.iter().any(|b| b.sha256 == sha256_hex) {
792 let now = SystemTime::now()
793 .duration_since(UNIX_EPOCH)
794 .unwrap()
795 .as_secs();
796
797 let size = self
799 .get_cid_by_sha256(sha256)?
800 .and_then(|root_hash| self.get_file_chunk_metadata(&root_hash).ok().flatten())
801 .map(|m| m.total_size)
802 .unwrap_or(0);
803
804 blobs.push(BlobMetadata {
805 sha256: sha256_hex,
806 size,
807 mime_type: "application/octet-stream".to_string(),
808 uploaded: now,
809 });
810
811 let blobs_json = serde_json::to_vec(&blobs)?;
812 self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
813 }
814
815 wtxn.commit()?;
816 Ok(())
817 }
818
819 pub fn is_blob_owner(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<bool> {
821 let key = Self::blob_owner_key(sha256, pubkey);
822 let rtxn = self.env.read_txn()?;
823 Ok(self.blob_owners.get(&rtxn, &key[..])?.is_some())
824 }
825
826 pub fn get_blob_owners(&self, sha256: &[u8; 32]) -> Result<Vec<[u8; 32]>> {
828 let rtxn = self.env.read_txn()?;
829
830 let mut owners = Vec::new();
831 for item in self.blob_owners.prefix_iter(&rtxn, &sha256[..])? {
832 let (key, _) = item?;
833 if key.len() == 64 {
834 let mut pubkey = [0u8; 32];
836 pubkey.copy_from_slice(&key[32..64]);
837 owners.push(pubkey);
838 }
839 }
840 Ok(owners)
841 }
842
843 pub fn blob_has_owners(&self, sha256: &[u8; 32]) -> Result<bool> {
845 let rtxn = self.env.read_txn()?;
846
847 for item in self.blob_owners.prefix_iter(&rtxn, &sha256[..])? {
849 if item.is_ok() {
850 return Ok(true);
851 }
852 }
853 Ok(false)
854 }
855
856 pub fn get_blob_owner(&self, sha256: &[u8; 32]) -> Result<Option<[u8; 32]>> {
858 Ok(self.get_blob_owners(sha256)?.into_iter().next())
859 }
860
861 pub fn delete_blossom_blob(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<bool> {
865 let key = Self::blob_owner_key(sha256, pubkey);
866 let mut wtxn = self.env.write_txn()?;
867
868 self.blob_owners.delete(&mut wtxn, &key[..])?;
870
871 let sha256_hex = to_hex(sha256);
873
874 if let Some(blobs_bytes) = self.pubkey_blobs.get(&wtxn, pubkey)? {
876 if let Ok(mut blobs) = serde_json::from_slice::<Vec<BlobMetadata>>(blobs_bytes) {
877 blobs.retain(|b| b.sha256 != sha256_hex);
878 let blobs_json = serde_json::to_vec(&blobs)?;
879 self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
880 }
881 }
882
883 let mut has_other_owners = false;
885 for item in self.blob_owners.prefix_iter(&wtxn, &sha256[..])? {
886 if item.is_ok() {
887 has_other_owners = true;
888 break;
889 }
890 }
891
892 if has_other_owners {
893 wtxn.commit()?;
894 tracing::debug!(
895 "Removed {} from blob {} owners, other owners remain",
896 &to_hex(pubkey)[..8],
897 &sha256_hex[..8]
898 );
899 return Ok(false);
900 }
901
902 tracing::info!(
904 "All owners removed from blob {}, deleting",
905 &sha256_hex[..8]
906 );
907
908 let root_hash: Option<Hash> = self.sha256_index.get(&wtxn, sha256)?.map(|bytes| {
910 let mut hash = [0u8; 32];
911 hash.copy_from_slice(bytes);
912 hash
913 });
914 if let Some(ref hash) = root_hash {
915 self.pins.delete(&mut wtxn, hash)?;
916 }
917 self.sha256_index.delete(&mut wtxn, sha256)?;
918
919 let _ = self.router.delete_sync(sha256);
921
922 wtxn.commit()?;
923 Ok(true)
924 }
925
926 pub fn list_blobs_by_pubkey(&self, pubkey: &[u8; 32]) -> Result<Vec<crate::server::blossom::BlobDescriptor>> {
928 let rtxn = self.env.read_txn()?;
929
930 let blobs: Vec<BlobMetadata> = self
931 .pubkey_blobs
932 .get(&rtxn, pubkey)?
933 .and_then(|b| serde_json::from_slice(b).ok())
934 .unwrap_or_default();
935
936 Ok(blobs
937 .into_iter()
938 .map(|b| crate::server::blossom::BlobDescriptor {
939 url: format!("/{}", b.sha256),
940 sha256: b.sha256,
941 size: b.size,
942 mime_type: b.mime_type,
943 uploaded: b.uploaded,
944 })
945 .collect())
946 }
947
948 pub fn get_chunk(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
950 self.router.get_sync(hash)
951 .map_err(|e| anyhow::anyhow!("Failed to get chunk: {}", e))
952 }
953
954 pub fn get_file(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
957 let store = self.store_arc();
958 let tree = HashTree::new(HashTreeConfig::new(store).public());
959
960 sync_block_on(async {
961 tree.read_file(hash).await
962 .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
963 })
964 }
965
966 pub fn get_file_by_cid(&self, cid: &Cid) -> Result<Option<Vec<u8>>> {
969 let store = self.store_arc();
970 let tree = HashTree::new(HashTreeConfig::new(store).public());
971
972 sync_block_on(async {
973 tree.get(cid).await
974 .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
975 })
976 }
977
978 pub fn get_file_chunk_metadata(&self, hash: &[u8; 32]) -> Result<Option<FileChunkMetadata>> {
980 let store = self.store_arc();
981 let tree = HashTree::new(HashTreeConfig::new(store.clone()).public());
982
983 sync_block_on(async {
984 let exists = store.has(&hash).await
987 .map_err(|e| anyhow::anyhow!("Failed to check existence: {}", e))?;
988
989 if !exists {
990 return Ok(None);
991 }
992
993 let total_size = tree.get_size(&hash).await
995 .map_err(|e| anyhow::anyhow!("Failed to get size: {}", e))?;
996
997 let is_tree_node = tree.is_tree(&hash).await
999 .map_err(|e| anyhow::anyhow!("Failed to check tree: {}", e))?;
1000
1001 if !is_tree_node {
1002 return Ok(Some(FileChunkMetadata {
1004 total_size,
1005 chunk_hashes: vec![],
1006 chunk_sizes: vec![],
1007 is_chunked: false,
1008 }));
1009 }
1010
1011 let node = match tree.get_tree_node(&hash).await
1013 .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))? {
1014 Some(n) => n,
1015 None => return Ok(None),
1016 };
1017
1018 let is_directory = tree.is_directory(&hash).await
1020 .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1021
1022 if is_directory {
1023 return Ok(None); }
1025
1026 let chunk_hashes: Vec<Hash> = node.links.iter().map(|l| l.hash).collect();
1028 let chunk_sizes: Vec<u64> = node.links.iter().map(|l| l.size).collect();
1029
1030 Ok(Some(FileChunkMetadata {
1031 total_size,
1032 chunk_hashes,
1033 chunk_sizes,
1034 is_chunked: !node.links.is_empty(),
1035 }))
1036 })
1037 }
1038
1039 pub fn get_file_range(&self, hash: &[u8; 32], start: u64, end: Option<u64>) -> Result<Option<(Vec<u8>, u64)>> {
1041 let metadata = match self.get_file_chunk_metadata(hash)? {
1042 Some(m) => m,
1043 None => return Ok(None),
1044 };
1045
1046 if metadata.total_size == 0 {
1047 return Ok(Some((Vec::new(), 0)));
1048 }
1049
1050 if start >= metadata.total_size {
1051 return Ok(None);
1052 }
1053
1054 let end = end.unwrap_or(metadata.total_size - 1).min(metadata.total_size - 1);
1055
1056 if !metadata.is_chunked {
1058 let content = self.get_file(hash)?.unwrap_or_default();
1059 let range_content = if start < content.len() as u64 {
1060 content[start as usize..=(end as usize).min(content.len() - 1)].to_vec()
1061 } else {
1062 Vec::new()
1063 };
1064 return Ok(Some((range_content, metadata.total_size)));
1065 }
1066
1067 let mut result = Vec::new();
1069 let mut current_offset = 0u64;
1070
1071 for (i, chunk_hash) in metadata.chunk_hashes.iter().enumerate() {
1072 let chunk_size = metadata.chunk_sizes[i];
1073 let chunk_end = current_offset + chunk_size - 1;
1074
1075 if chunk_end >= start && current_offset <= end {
1077 let chunk_content = match self.get_chunk(chunk_hash)? {
1078 Some(content) => content,
1079 None => {
1080 return Err(anyhow::anyhow!("Chunk {} not found", to_hex(chunk_hash)));
1081 }
1082 };
1083
1084 let chunk_read_start = if current_offset >= start {
1085 0
1086 } else {
1087 (start - current_offset) as usize
1088 };
1089
1090 let chunk_read_end = if chunk_end <= end {
1091 chunk_size as usize - 1
1092 } else {
1093 (end - current_offset) as usize
1094 };
1095
1096 result.extend_from_slice(&chunk_content[chunk_read_start..=chunk_read_end]);
1097 }
1098
1099 current_offset += chunk_size;
1100
1101 if current_offset > end {
1102 break;
1103 }
1104 }
1105
1106 Ok(Some((result, metadata.total_size)))
1107 }
1108
1109 pub fn stream_file_range_chunks_owned(
1111 self: Arc<Self>,
1112 hash: &[u8; 32],
1113 start: u64,
1114 end: u64,
1115 ) -> Result<Option<FileRangeChunksOwned>> {
1116 let metadata = match self.get_file_chunk_metadata(hash)? {
1117 Some(m) => m,
1118 None => return Ok(None),
1119 };
1120
1121 if metadata.total_size == 0 || start >= metadata.total_size {
1122 return Ok(None);
1123 }
1124
1125 let end = end.min(metadata.total_size - 1);
1126
1127 Ok(Some(FileRangeChunksOwned {
1128 store: self,
1129 metadata,
1130 start,
1131 end,
1132 current_chunk_idx: 0,
1133 current_offset: 0,
1134 }))
1135 }
1136
1137 pub fn get_directory_listing(&self, hash: &[u8; 32]) -> Result<Option<DirectoryListing>> {
1139 let store = self.store_arc();
1140 let tree = HashTree::new(HashTreeConfig::new(store).public());
1141
1142 sync_block_on(async {
1143 let is_dir = tree.is_directory(&hash).await
1145 .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1146
1147 if !is_dir {
1148 return Ok(None);
1149 }
1150
1151 let cid = hashtree_core::Cid::public(*hash, 0);
1153 let tree_entries = tree.list_directory(&cid).await
1154 .map_err(|e| anyhow::anyhow!("Failed to list directory: {}", e))?;
1155
1156 let entries: Vec<DirEntry> = tree_entries.into_iter().map(|e| DirEntry {
1157 name: e.name,
1158 cid: to_hex(&e.hash),
1159 is_directory: e.link_type.is_tree(),
1160 size: e.size,
1161 }).collect();
1162
1163 Ok(Some(DirectoryListing {
1164 dir_name: String::new(),
1165 entries,
1166 }))
1167 })
1168 }
1169
1170 pub fn pin(&self, hash: &[u8; 32]) -> Result<()> {
1172 let mut wtxn = self.env.write_txn()?;
1173 self.pins.put(&mut wtxn, hash.as_slice(), &())?;
1174 wtxn.commit()?;
1175 Ok(())
1176 }
1177
1178 pub fn unpin(&self, hash: &[u8; 32]) -> Result<()> {
1180 let mut wtxn = self.env.write_txn()?;
1181 self.pins.delete(&mut wtxn, hash.as_slice())?;
1182 wtxn.commit()?;
1183 Ok(())
1184 }
1185
1186 pub fn is_pinned(&self, hash: &[u8; 32]) -> Result<bool> {
1188 let rtxn = self.env.read_txn()?;
1189 Ok(self.pins.get(&rtxn, hash.as_slice())?.is_some())
1190 }
1191
1192 pub fn list_pins_raw(&self) -> Result<Vec<[u8; 32]>> {
1194 let rtxn = self.env.read_txn()?;
1195 let mut pins = Vec::new();
1196
1197 for item in self.pins.iter(&rtxn)? {
1198 let (hash_bytes, _) = item?;
1199 if hash_bytes.len() == 32 {
1200 let mut hash = [0u8; 32];
1201 hash.copy_from_slice(hash_bytes);
1202 pins.push(hash);
1203 }
1204 }
1205
1206 Ok(pins)
1207 }
1208
1209 pub fn list_pins_with_names(&self) -> Result<Vec<PinnedItem>> {
1211 let rtxn = self.env.read_txn()?;
1212 let store = self.store_arc();
1213 let tree = HashTree::new(HashTreeConfig::new(store).public());
1214 let mut pins = Vec::new();
1215
1216 for item in self.pins.iter(&rtxn)? {
1217 let (hash_bytes, _) = item?;
1218 if hash_bytes.len() != 32 {
1219 continue;
1220 }
1221 let mut hash = [0u8; 32];
1222 hash.copy_from_slice(hash_bytes);
1223
1224 let is_directory = sync_block_on(async {
1226 tree.is_directory(&hash).await.unwrap_or(false)
1227 });
1228
1229 pins.push(PinnedItem {
1230 cid: to_hex(&hash),
1231 name: "Unknown".to_string(),
1232 is_directory,
1233 });
1234 }
1235
1236 Ok(pins)
1237 }
1238
1239 pub fn index_tree(
1246 &self,
1247 root_hash: &Hash,
1248 owner: &str,
1249 name: Option<&str>,
1250 priority: u8,
1251 ref_key: Option<&str>,
1252 ) -> Result<()> {
1253 let root_hex = to_hex(root_hash);
1254
1255 if let Some(key) = ref_key {
1257 let rtxn = self.env.read_txn()?;
1258 if let Some(old_hash_bytes) = self.tree_refs.get(&rtxn, key)? {
1259 if old_hash_bytes != root_hash.as_slice() {
1260 let old_hash: Hash = old_hash_bytes.try_into()
1261 .map_err(|_| anyhow::anyhow!("Invalid hash in tree_refs"))?;
1262 drop(rtxn);
1263 let _ = self.unindex_tree(&old_hash);
1265 tracing::debug!("Replaced old tree for ref {}", key);
1266 }
1267 }
1268 }
1269
1270 let store = self.store_arc();
1271 let tree = HashTree::new(HashTreeConfig::new(store).public());
1272
1273 let (blob_hashes, total_size) = sync_block_on(async {
1275 self.collect_tree_blobs(&tree, root_hash).await
1276 })?;
1277
1278 let mut wtxn = self.env.write_txn()?;
1279
1280 for blob_hash in &blob_hashes {
1282 let mut key = [0u8; 64];
1283 key[..32].copy_from_slice(blob_hash);
1284 key[32..].copy_from_slice(root_hash);
1285 self.blob_trees.put(&mut wtxn, &key[..], &())?;
1286 }
1287
1288 let meta = TreeMeta {
1290 owner: owner.to_string(),
1291 name: name.map(|s| s.to_string()),
1292 synced_at: SystemTime::now()
1293 .duration_since(UNIX_EPOCH)
1294 .unwrap()
1295 .as_secs(),
1296 total_size,
1297 priority,
1298 };
1299 let meta_bytes = rmp_serde::to_vec(&meta)
1300 .map_err(|e| anyhow::anyhow!("Failed to serialize TreeMeta: {}", e))?;
1301 self.tree_meta.put(&mut wtxn, root_hash.as_slice(), &meta_bytes)?;
1302
1303 if let Some(key) = ref_key {
1305 self.tree_refs.put(&mut wtxn, key, root_hash.as_slice())?;
1306 }
1307
1308 wtxn.commit()?;
1309
1310 tracing::debug!(
1311 "Indexed tree {} ({} blobs, {} bytes, priority {})",
1312 &root_hex[..8],
1313 blob_hashes.len(),
1314 total_size,
1315 priority
1316 );
1317
1318 Ok(())
1319 }
1320
1321 async fn collect_tree_blobs<S: Store>(
1323 &self,
1324 tree: &HashTree<S>,
1325 root: &Hash,
1326 ) -> Result<(Vec<Hash>, u64)> {
1327 let mut blobs = Vec::new();
1328 let mut total_size = 0u64;
1329 let mut stack = vec![*root];
1330
1331 while let Some(hash) = stack.pop() {
1332 let is_tree = tree.is_tree(&hash).await
1334 .map_err(|e| anyhow::anyhow!("Failed to check tree: {}", e))?;
1335
1336 if is_tree {
1337 if let Some(node) = tree.get_tree_node(&hash).await
1339 .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
1340 {
1341 for link in &node.links {
1342 stack.push(link.hash);
1343 }
1344 }
1345 } else {
1346 if let Some(data) = self.router.get_sync(&hash)
1348 .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))?
1349 {
1350 total_size += data.len() as u64;
1351 blobs.push(hash);
1352 }
1353 }
1354 }
1355
1356 Ok((blobs, total_size))
1357 }
1358
1359 pub fn unindex_tree(&self, root_hash: &Hash) -> Result<u64> {
1362 let root_hex = to_hex(root_hash);
1363
1364 let store = self.store_arc();
1365 let tree = HashTree::new(HashTreeConfig::new(store).public());
1366
1367 let (blob_hashes, _) = sync_block_on(async {
1369 self.collect_tree_blobs(&tree, root_hash).await
1370 })?;
1371
1372 let mut wtxn = self.env.write_txn()?;
1373 let mut freed = 0u64;
1374
1375 for blob_hash in &blob_hashes {
1377 let mut key = [0u8; 64];
1379 key[..32].copy_from_slice(blob_hash);
1380 key[32..].copy_from_slice(root_hash);
1381 self.blob_trees.delete(&mut wtxn, &key[..])?;
1382
1383 let rtxn = self.env.read_txn()?;
1385 let mut has_other_tree = false;
1386
1387 for item in self.blob_trees.prefix_iter(&rtxn, &blob_hash[..])? {
1388 if item.is_ok() {
1389 has_other_tree = true;
1390 break;
1391 }
1392 }
1393 drop(rtxn);
1394
1395 if !has_other_tree {
1397 if let Some(data) = self.router.get_sync(blob_hash)
1398 .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))?
1399 {
1400 freed += data.len() as u64;
1401 self.router.delete_local_only(blob_hash)
1403 .map_err(|e| anyhow::anyhow!("Failed to delete blob: {}", e))?;
1404 }
1405 }
1406 }
1407
1408 if let Some(data) = self.router.get_sync(root_hash)
1410 .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
1411 {
1412 freed += data.len() as u64;
1413 self.router.delete_local_only(root_hash)
1415 .map_err(|e| anyhow::anyhow!("Failed to delete tree node: {}", e))?;
1416 }
1417
1418 self.tree_meta.delete(&mut wtxn, root_hash.as_slice())?;
1420
1421 wtxn.commit()?;
1422
1423 tracing::debug!(
1424 "Unindexed tree {} ({} bytes freed)",
1425 &root_hex[..8],
1426 freed
1427 );
1428
1429 Ok(freed)
1430 }
1431
1432 pub fn get_tree_meta(&self, root_hash: &Hash) -> Result<Option<TreeMeta>> {
1434 let rtxn = self.env.read_txn()?;
1435 if let Some(bytes) = self.tree_meta.get(&rtxn, root_hash.as_slice())? {
1436 let meta: TreeMeta = rmp_serde::from_slice(bytes)
1437 .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
1438 Ok(Some(meta))
1439 } else {
1440 Ok(None)
1441 }
1442 }
1443
1444 pub fn list_indexed_trees(&self) -> Result<Vec<(Hash, TreeMeta)>> {
1446 let rtxn = self.env.read_txn()?;
1447 let mut trees = Vec::new();
1448
1449 for item in self.tree_meta.iter(&rtxn)? {
1450 let (hash_bytes, meta_bytes) = item?;
1451 let hash: Hash = hash_bytes.try_into()
1452 .map_err(|_| anyhow::anyhow!("Invalid hash in tree_meta"))?;
1453 let meta: TreeMeta = rmp_serde::from_slice(meta_bytes)
1454 .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
1455 trees.push((hash, meta));
1456 }
1457
1458 Ok(trees)
1459 }
1460
1461 pub fn tracked_size(&self) -> Result<u64> {
1463 let rtxn = self.env.read_txn()?;
1464 let mut total = 0u64;
1465
1466 for item in self.tree_meta.iter(&rtxn)? {
1467 let (_, bytes) = item?;
1468 let meta: TreeMeta = rmp_serde::from_slice(bytes)
1469 .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
1470 total += meta.total_size;
1471 }
1472
1473 Ok(total)
1474 }
1475
1476 fn get_evictable_trees(&self) -> Result<Vec<(Hash, TreeMeta)>> {
1478 let mut trees = self.list_indexed_trees()?;
1479
1480 trees.sort_by(|a, b| {
1482 match a.1.priority.cmp(&b.1.priority) {
1483 std::cmp::Ordering::Equal => a.1.synced_at.cmp(&b.1.synced_at),
1484 other => other,
1485 }
1486 });
1487
1488 Ok(trees)
1489 }
1490
1491 pub fn evict_if_needed(&self) -> Result<u64> {
1498 let stats = self.router.stats()
1500 .map_err(|e| anyhow::anyhow!("Failed to get stats: {}", e))?;
1501 let current = stats.total_bytes;
1502
1503 if current <= self.max_size_bytes {
1504 return Ok(0);
1505 }
1506
1507 let target = self.max_size_bytes * 90 / 100;
1509 let mut freed = 0u64;
1510 let mut current_size = current;
1511
1512 let orphan_freed = self.evict_orphaned_blobs()?;
1514 freed += orphan_freed;
1515 current_size = current_size.saturating_sub(orphan_freed);
1516
1517 if orphan_freed > 0 {
1518 tracing::info!("Evicted orphaned blobs: {} bytes freed", orphan_freed);
1519 }
1520
1521 if current_size <= target {
1523 if freed > 0 {
1524 tracing::info!("Eviction complete: {} bytes freed", freed);
1525 }
1526 return Ok(freed);
1527 }
1528
1529 let evictable = self.get_evictable_trees()?;
1532
1533 for (root_hash, meta) in evictable {
1534 if current_size <= target {
1535 break;
1536 }
1537
1538 let root_hex = to_hex(&root_hash);
1539
1540 if self.is_pinned(&root_hash)? {
1542 continue;
1543 }
1544
1545 let tree_freed = self.unindex_tree(&root_hash)?;
1546 freed += tree_freed;
1547 current_size = current_size.saturating_sub(tree_freed);
1548
1549 tracing::info!(
1550 "Evicted tree {} (owner={}, priority={}, {} bytes)",
1551 &root_hex[..8],
1552 &meta.owner[..8.min(meta.owner.len())],
1553 meta.priority,
1554 tree_freed
1555 );
1556 }
1557
1558 if freed > 0 {
1559 tracing::info!("Eviction complete: {} bytes freed", freed);
1560 }
1561
1562 Ok(freed)
1563 }
1564
1565 fn evict_orphaned_blobs(&self) -> Result<u64> {
1567 let mut freed = 0u64;
1568
1569 let all_hashes = self.router.list()
1571 .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
1572
1573 let rtxn = self.env.read_txn()?;
1575 let pinned: HashSet<Hash> = self.pins.iter(&rtxn)?
1576 .filter_map(|item| item.ok())
1577 .filter_map(|(hash_bytes, _)| {
1578 if hash_bytes.len() == 32 {
1579 let mut hash = [0u8; 32];
1580 hash.copy_from_slice(hash_bytes);
1581 Some(hash)
1582 } else {
1583 None
1584 }
1585 })
1586 .collect();
1587
1588 let mut blobs_in_trees: HashSet<Hash> = HashSet::new();
1591 for item in self.blob_trees.iter(&rtxn)? {
1592 if let Ok((key_bytes, _)) = item {
1593 if key_bytes.len() >= 32 {
1594 let blob_hash: Hash = key_bytes[..32].try_into().unwrap();
1595 blobs_in_trees.insert(blob_hash);
1596 }
1597 }
1598 }
1599 drop(rtxn);
1600
1601 for hash in all_hashes {
1603 if pinned.contains(&hash) {
1605 continue;
1606 }
1607
1608 if blobs_in_trees.contains(&hash) {
1610 continue;
1611 }
1612
1613 if let Ok(Some(data)) = self.router.get_sync(&hash) {
1615 freed += data.len() as u64;
1616 let _ = self.router.delete_local_only(&hash);
1617 tracing::debug!("Deleted orphaned blob {} ({} bytes)", &to_hex(&hash)[..8], data.len());
1618 }
1619 }
1620
1621 Ok(freed)
1622 }
1623
1624 pub fn max_size_bytes(&self) -> u64 {
1626 self.max_size_bytes
1627 }
1628
1629 pub fn storage_by_priority(&self) -> Result<StorageByPriority> {
1631 let rtxn = self.env.read_txn()?;
1632 let mut own = 0u64;
1633 let mut followed = 0u64;
1634 let mut other = 0u64;
1635
1636 for item in self.tree_meta.iter(&rtxn)? {
1637 let (_, bytes) = item?;
1638 let meta: TreeMeta = rmp_serde::from_slice(bytes)
1639 .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
1640
1641 if meta.priority >= PRIORITY_OWN {
1642 own += meta.total_size;
1643 } else if meta.priority >= PRIORITY_FOLLOWED {
1644 followed += meta.total_size;
1645 } else {
1646 other += meta.total_size;
1647 }
1648 }
1649
1650 Ok(StorageByPriority { own, followed, other })
1651 }
1652
1653 pub fn get_storage_stats(&self) -> Result<StorageStats> {
1655 let rtxn = self.env.read_txn()?;
1656 let total_pins = self.pins.len(&rtxn)? as usize;
1657
1658 let stats = self.router.stats()
1659 .map_err(|e| anyhow::anyhow!("Failed to get stats: {}", e))?;
1660
1661 Ok(StorageStats {
1662 total_dags: stats.count,
1663 pinned_dags: total_pins,
1664 total_bytes: stats.total_bytes,
1665 })
1666 }
1667
1668 pub fn get_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<Option<CachedRoot>> {
1672 let key = format!("{}/{}", pubkey_hex, tree_name);
1673 let rtxn = self.env.read_txn()?;
1674 if let Some(bytes) = self.cached_roots.get(&rtxn, &key)? {
1675 let root: CachedRoot = rmp_serde::from_slice(bytes)
1676 .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
1677 Ok(Some(root))
1678 } else {
1679 Ok(None)
1680 }
1681 }
1682
1683 pub fn set_cached_root(
1685 &self,
1686 pubkey_hex: &str,
1687 tree_name: &str,
1688 hash: &str,
1689 key: Option<&str>,
1690 visibility: &str,
1691 updated_at: u64,
1692 ) -> Result<()> {
1693 let db_key = format!("{}/{}", pubkey_hex, tree_name);
1694 let root = CachedRoot {
1695 hash: hash.to_string(),
1696 key: key.map(|k| k.to_string()),
1697 updated_at,
1698 visibility: visibility.to_string(),
1699 };
1700 let bytes = rmp_serde::to_vec(&root)
1701 .map_err(|e| anyhow::anyhow!("Failed to serialize CachedRoot: {}", e))?;
1702 let mut wtxn = self.env.write_txn()?;
1703 self.cached_roots.put(&mut wtxn, &db_key, &bytes)?;
1704 wtxn.commit()?;
1705 Ok(())
1706 }
1707
1708 pub fn list_cached_roots(&self, pubkey_hex: &str) -> Result<Vec<(String, CachedRoot)>> {
1710 let prefix = format!("{}/", pubkey_hex);
1711 let rtxn = self.env.read_txn()?;
1712 let mut results = Vec::new();
1713
1714 for item in self.cached_roots.iter(&rtxn)? {
1715 let (key, bytes) = item?;
1716 if key.starts_with(&prefix) {
1717 let tree_name = key.strip_prefix(&prefix).unwrap_or(key);
1718 let root: CachedRoot = rmp_serde::from_slice(bytes)
1719 .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
1720 results.push((tree_name.to_string(), root));
1721 }
1722 }
1723
1724 Ok(results)
1725 }
1726
1727 pub fn delete_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<bool> {
1729 let key = format!("{}/{}", pubkey_hex, tree_name);
1730 let mut wtxn = self.env.write_txn()?;
1731 let deleted = self.cached_roots.delete(&mut wtxn, &key)?;
1732 wtxn.commit()?;
1733 Ok(deleted)
1734 }
1735
1736 pub fn gc(&self) -> Result<GcStats> {
1738 let rtxn = self.env.read_txn()?;
1739
1740 let pinned: HashSet<Hash> = self.pins.iter(&rtxn)?
1742 .filter_map(|item| item.ok())
1743 .filter_map(|(hash_bytes, _)| {
1744 if hash_bytes.len() == 32 {
1745 let mut hash = [0u8; 32];
1746 hash.copy_from_slice(hash_bytes);
1747 Some(hash)
1748 } else {
1749 None
1750 }
1751 })
1752 .collect();
1753
1754 drop(rtxn);
1755
1756 let all_hashes = self.router.list()
1758 .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
1759
1760 let mut deleted = 0;
1762 let mut freed_bytes = 0u64;
1763
1764 for hash in all_hashes {
1765 if !pinned.contains(&hash) {
1766 if let Ok(Some(data)) = self.router.get_sync(&hash) {
1767 freed_bytes += data.len() as u64;
1768 let _ = self.router.delete_local_only(&hash);
1770 deleted += 1;
1771 }
1772 }
1773 }
1774
1775 Ok(GcStats {
1776 deleted_dags: deleted,
1777 freed_bytes,
1778 })
1779 }
1780
1781 pub fn verify_lmdb_integrity(&self, delete: bool) -> Result<VerifyResult> {
1784 let all_hashes = self.router.list()
1785 .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
1786
1787 let total = all_hashes.len();
1788 let mut valid = 0;
1789 let mut corrupted = 0;
1790 let mut deleted = 0;
1791 let mut corrupted_hashes = Vec::new();
1792
1793 for hash in &all_hashes {
1794 let hash_hex = to_hex(hash);
1795
1796 match self.router.get_sync(hash) {
1797 Ok(Some(data)) => {
1798 let actual_hash = sha256(&data);
1800
1801 if actual_hash == *hash {
1802 valid += 1;
1803 } else {
1804 corrupted += 1;
1805 let actual_hex = to_hex(&actual_hash);
1806 println!(" CORRUPTED: key={} actual={} size={}",
1807 &hash_hex[..16], &actual_hex[..16], data.len());
1808 corrupted_hashes.push(*hash);
1809 }
1810 }
1811 Ok(None) => {
1812 corrupted += 1;
1814 println!(" MISSING: key={}", &hash_hex[..16]);
1815 corrupted_hashes.push(*hash);
1816 }
1817 Err(e) => {
1818 corrupted += 1;
1819 println!(" ERROR: key={} err={}", &hash_hex[..16], e);
1820 corrupted_hashes.push(*hash);
1821 }
1822 }
1823 }
1824
1825 if delete {
1827 for hash in &corrupted_hashes {
1828 match self.router.delete_sync(hash) {
1829 Ok(true) => deleted += 1,
1830 Ok(false) => {} Err(e) => {
1832 let hash_hex = to_hex(hash);
1833 println!(" Failed to delete {}: {}", &hash_hex[..16], e);
1834 }
1835 }
1836 }
1837 }
1838
1839 Ok(VerifyResult {
1840 total,
1841 valid,
1842 corrupted,
1843 deleted,
1844 })
1845 }
1846
1847 #[cfg(feature = "s3")]
1850 pub async fn verify_r2_integrity(&self, delete: bool) -> Result<VerifyResult> {
1851 use aws_sdk_s3::Client as S3Client;
1852
1853 let config = crate::config::Config::load()?;
1856 let s3_config = config.storage.s3
1857 .ok_or_else(|| anyhow::anyhow!("S3 not configured"))?;
1858
1859 let aws_config = aws_config::from_env()
1861 .region(aws_sdk_s3::config::Region::new(s3_config.region.clone()))
1862 .load()
1863 .await;
1864
1865 let s3_client = S3Client::from_conf(
1866 aws_sdk_s3::config::Builder::from(&aws_config)
1867 .endpoint_url(&s3_config.endpoint)
1868 .force_path_style(true)
1869 .build()
1870 );
1871
1872 let bucket = &s3_config.bucket;
1873 let prefix = s3_config.prefix.as_deref().unwrap_or("");
1874
1875 let mut total = 0;
1876 let mut valid = 0;
1877 let mut corrupted = 0;
1878 let mut deleted = 0;
1879 let mut corrupted_keys = Vec::new();
1880
1881 let mut continuation_token: Option<String> = None;
1883
1884 loop {
1885 let mut list_req = s3_client.list_objects_v2()
1886 .bucket(bucket)
1887 .prefix(prefix);
1888
1889 if let Some(ref token) = continuation_token {
1890 list_req = list_req.continuation_token(token);
1891 }
1892
1893 let list_resp = list_req.send().await
1894 .map_err(|e| anyhow::anyhow!("Failed to list S3 objects: {}", e))?;
1895
1896 for object in list_resp.contents() {
1897 let key = object.key().unwrap_or("");
1898
1899 if !key.ends_with(".bin") {
1901 continue;
1902 }
1903
1904 total += 1;
1905
1906 let filename = key.strip_prefix(prefix).unwrap_or(key);
1908 let expected_hash_hex = filename.strip_suffix(".bin").unwrap_or(filename);
1909
1910 if expected_hash_hex.len() != 64 {
1912 corrupted += 1;
1913 println!(" INVALID KEY: {}", key);
1914 corrupted_keys.push(key.to_string());
1915 continue;
1916 }
1917
1918 let expected_hash = match from_hex(expected_hash_hex) {
1919 Ok(h) => h,
1920 Err(_) => {
1921 corrupted += 1;
1922 println!(" INVALID HEX: {}", key);
1923 corrupted_keys.push(key.to_string());
1924 continue;
1925 }
1926 };
1927
1928 match s3_client.get_object()
1930 .bucket(bucket)
1931 .key(key)
1932 .send()
1933 .await
1934 {
1935 Ok(resp) => {
1936 match resp.body.collect().await {
1937 Ok(bytes) => {
1938 let data = bytes.into_bytes();
1939 let actual_hash = sha256(&data);
1940
1941 if actual_hash == expected_hash {
1942 valid += 1;
1943 } else {
1944 corrupted += 1;
1945 let actual_hex = to_hex(&actual_hash);
1946 println!(" CORRUPTED: key={} actual={} size={}",
1947 &expected_hash_hex[..16], &actual_hex[..16], data.len());
1948 corrupted_keys.push(key.to_string());
1949 }
1950 }
1951 Err(e) => {
1952 corrupted += 1;
1953 println!(" READ ERROR: {} - {}", key, e);
1954 corrupted_keys.push(key.to_string());
1955 }
1956 }
1957 }
1958 Err(e) => {
1959 corrupted += 1;
1960 println!(" FETCH ERROR: {} - {}", key, e);
1961 corrupted_keys.push(key.to_string());
1962 }
1963 }
1964
1965 if total % 100 == 0 {
1967 println!(" Progress: {} objects checked, {} corrupted so far", total, corrupted);
1968 }
1969 }
1970
1971 if list_resp.is_truncated() == Some(true) {
1973 continuation_token = list_resp.next_continuation_token().map(|s| s.to_string());
1974 } else {
1975 break;
1976 }
1977 }
1978
1979 if delete {
1981 for key in &corrupted_keys {
1982 match s3_client.delete_object()
1983 .bucket(bucket)
1984 .key(key)
1985 .send()
1986 .await
1987 {
1988 Ok(_) => deleted += 1,
1989 Err(e) => {
1990 println!(" Failed to delete {}: {}", key, e);
1991 }
1992 }
1993 }
1994 }
1995
1996 Ok(VerifyResult {
1997 total,
1998 valid,
1999 corrupted,
2000 deleted,
2001 })
2002 }
2003
2004 #[cfg(not(feature = "s3"))]
2006 pub async fn verify_r2_integrity(&self, _delete: bool) -> Result<VerifyResult> {
2007 Err(anyhow::anyhow!("S3 feature not enabled"))
2008 }
2009}
2010
2011#[derive(Debug, Clone)]
2013pub struct VerifyResult {
2014 pub total: usize,
2015 pub valid: usize,
2016 pub corrupted: usize,
2017 pub deleted: usize,
2018}
2019
2020#[derive(Debug)]
2021pub struct StorageStats {
2022 pub total_dags: usize,
2023 pub pinned_dags: usize,
2024 pub total_bytes: u64,
2025}
2026
2027#[derive(Debug, Clone)]
2029pub struct StorageByPriority {
2030 pub own: u64,
2032 pub followed: u64,
2034 pub other: u64,
2036}
2037
2038#[derive(Debug, Clone)]
2039pub struct FileChunkMetadata {
2040 pub total_size: u64,
2041 pub chunk_hashes: Vec<Hash>,
2042 pub chunk_sizes: Vec<u64>,
2043 pub is_chunked: bool,
2044}
2045
2046pub struct FileRangeChunksOwned {
2048 store: Arc<HashtreeStore>,
2049 metadata: FileChunkMetadata,
2050 start: u64,
2051 end: u64,
2052 current_chunk_idx: usize,
2053 current_offset: u64,
2054}
2055
2056impl Iterator for FileRangeChunksOwned {
2057 type Item = Result<Vec<u8>>;
2058
2059 fn next(&mut self) -> Option<Self::Item> {
2060 if !self.metadata.is_chunked || self.current_chunk_idx >= self.metadata.chunk_hashes.len() {
2061 return None;
2062 }
2063
2064 if self.current_offset > self.end {
2065 return None;
2066 }
2067
2068 let chunk_hash = &self.metadata.chunk_hashes[self.current_chunk_idx];
2069 let chunk_size = self.metadata.chunk_sizes[self.current_chunk_idx];
2070 let chunk_end = self.current_offset + chunk_size - 1;
2071
2072 self.current_chunk_idx += 1;
2073
2074 if chunk_end < self.start || self.current_offset > self.end {
2075 self.current_offset += chunk_size;
2076 return self.next();
2077 }
2078
2079 let chunk_content = match self.store.get_chunk(chunk_hash) {
2080 Ok(Some(content)) => content,
2081 Ok(None) => {
2082 return Some(Err(anyhow::anyhow!("Chunk {} not found", to_hex(chunk_hash))));
2083 }
2084 Err(e) => {
2085 return Some(Err(e));
2086 }
2087 };
2088
2089 let chunk_read_start = if self.current_offset >= self.start {
2090 0
2091 } else {
2092 (self.start - self.current_offset) as usize
2093 };
2094
2095 let chunk_read_end = if chunk_end <= self.end {
2096 chunk_size as usize - 1
2097 } else {
2098 (self.end - self.current_offset) as usize
2099 };
2100
2101 let result = chunk_content[chunk_read_start..=chunk_read_end].to_vec();
2102 self.current_offset += chunk_size;
2103
2104 Some(Ok(result))
2105 }
2106}
2107
2108#[derive(Debug)]
2109pub struct GcStats {
2110 pub deleted_dags: usize,
2111 pub freed_bytes: u64,
2112}
2113
2114#[derive(Debug, Clone)]
2115pub struct DirEntry {
2116 pub name: String,
2117 pub cid: String,
2118 pub is_directory: bool,
2119 pub size: u64,
2120}
2121
2122#[derive(Debug, Clone)]
2123pub struct DirectoryListing {
2124 pub dir_name: String,
2125 pub entries: Vec<DirEntry>,
2126}
2127
2128#[derive(Debug, Clone)]
2129pub struct PinnedItem {
2130 pub cid: String,
2131 pub name: String,
2132 pub is_directory: bool,
2133}
2134
2135#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
2137pub struct BlobMetadata {
2138 pub sha256: String,
2139 pub size: u64,
2140 pub mime_type: String,
2141 pub uploaded: u64,
2142}
2143
2144impl crate::webrtc::ContentStore for HashtreeStore {
2146 fn get(&self, hash_hex: &str) -> Result<Option<Vec<u8>>> {
2147 let hash = from_hex(hash_hex)
2148 .map_err(|e| anyhow::anyhow!("Invalid hash: {}", e))?;
2149 self.get_chunk(&hash)
2150 }
2151}