1use anyhow::{Context, Result};
2use async_trait::async_trait;
3use heed::{Database, EnvOpenOptions};
4use heed::types::*;
5use hashtree_lmdb::LmdbBlobStore;
6use hashtree_core::{
7 HashTree, HashTreeConfig, Cid,
8 sha256, to_hex, from_hex, TreeNode, DirEntry as HashTreeDirEntry,
9 types::Hash,
10};
11use hashtree_core::store::{Store, StoreError};
12use serde::{Deserialize, Serialize};
13use std::path::Path;
14use std::collections::HashSet;
15use std::io::Read;
16use std::sync::Arc;
17use std::time::{SystemTime, UNIX_EPOCH};
18use futures::executor::block_on as sync_block_on;
19
20pub const PRIORITY_OTHER: u8 = 64;
22pub const PRIORITY_FOLLOWED: u8 = 128;
23pub const PRIORITY_OWN: u8 = 255;
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct TreeMeta {
28 pub owner: String,
30 pub name: Option<String>,
32 pub synced_at: u64,
34 pub total_size: u64,
36 pub priority: u8,
38}
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct CachedRoot {
43 pub hash: String,
45 pub key: Option<String>,
47 pub updated_at: u64,
49 pub visibility: String,
51}
52
53#[cfg(feature = "s3")]
54use tokio::sync::mpsc;
55
56use crate::config::S3Config;
57
58#[cfg(feature = "s3")]
60enum S3SyncMessage {
61 Upload { hash: Hash, data: Vec<u8> },
62 Delete { hash: Hash },
63}
64
65pub struct StorageRouter {
70 local: Arc<LmdbBlobStore>,
72 #[cfg(feature = "s3")]
74 s3_client: Option<aws_sdk_s3::Client>,
75 #[cfg(feature = "s3")]
76 s3_bucket: Option<String>,
77 #[cfg(feature = "s3")]
78 s3_prefix: String,
79 #[cfg(feature = "s3")]
81 sync_tx: Option<mpsc::UnboundedSender<S3SyncMessage>>,
82}
83
84impl StorageRouter {
85 pub fn new(local: Arc<LmdbBlobStore>) -> Self {
87 Self {
88 local,
89 #[cfg(feature = "s3")]
90 s3_client: None,
91 #[cfg(feature = "s3")]
92 s3_bucket: None,
93 #[cfg(feature = "s3")]
94 s3_prefix: String::new(),
95 #[cfg(feature = "s3")]
96 sync_tx: None,
97 }
98 }
99
100 #[cfg(feature = "s3")]
102 pub async fn with_s3(local: Arc<LmdbBlobStore>, config: &S3Config) -> Result<Self, anyhow::Error> {
103 use aws_sdk_s3::Client as S3Client;
104
105 let mut aws_config_loader = aws_config::from_env();
107 aws_config_loader = aws_config_loader.region(aws_sdk_s3::config::Region::new(config.region.clone()));
108 let aws_config = aws_config_loader.load().await;
109
110 let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&aws_config);
112 s3_config_builder = s3_config_builder
113 .endpoint_url(&config.endpoint)
114 .force_path_style(true);
115
116 let s3_client = S3Client::from_conf(s3_config_builder.build());
117 let bucket = config.bucket.clone();
118 let prefix = config.prefix.clone().unwrap_or_default();
119
120 let (sync_tx, mut sync_rx) = mpsc::unbounded_channel::<S3SyncMessage>();
122
123 let sync_client = s3_client.clone();
125 let sync_bucket = bucket.clone();
126 let sync_prefix = prefix.clone();
127
128 tokio::spawn(async move {
129 use aws_sdk_s3::primitives::ByteStream;
130
131 tracing::info!("S3 background sync task started");
132
133 while let Some(msg) = sync_rx.recv().await {
134 match msg {
135 S3SyncMessage::Upload { hash, data } => {
136 let key = format!("{}{}.bin", sync_prefix, to_hex(&hash));
137 tracing::info!("S3 uploading {} ({} bytes)", &key, data.len());
138
139 match sync_client
140 .put_object()
141 .bucket(&sync_bucket)
142 .key(&key)
143 .body(ByteStream::from(data))
144 .send()
145 .await
146 {
147 Ok(_) => tracing::info!("S3 upload succeeded for {}", &key),
148 Err(e) => tracing::error!("S3 upload failed for {}: {}", &key, e),
149 }
150 }
151 S3SyncMessage::Delete { hash } => {
152 let key = format!("{}{}.bin", sync_prefix, to_hex(&hash));
153 tracing::debug!("S3 deleting {}", &key);
154
155 if let Err(e) = sync_client
156 .delete_object()
157 .bucket(&sync_bucket)
158 .key(&key)
159 .send()
160 .await
161 {
162 tracing::error!("S3 delete failed for {}: {}", &key[..16.min(key.len())], e);
163 }
164 }
165 }
166 }
167 });
168
169 tracing::info!("S3 storage initialized: bucket={}, prefix={}", bucket, prefix);
170
171 Ok(Self {
172 local,
173 s3_client: Some(s3_client),
174 s3_bucket: Some(bucket),
175 s3_prefix: prefix,
176 sync_tx: Some(sync_tx),
177 })
178 }
179
180 pub fn put_sync(&self, hash: Hash, data: &[u8]) -> Result<bool, StoreError> {
182 let is_new = self.local.put_sync(hash, data)?;
184
185 #[cfg(feature = "s3")]
188 if let Some(ref tx) = self.sync_tx {
189 tracing::info!("Queueing S3 upload for {} ({} bytes, is_new={})",
190 crate::storage::to_hex(&hash)[..16].to_string(), data.len(), is_new);
191 if let Err(e) = tx.send(S3SyncMessage::Upload { hash, data: data.to_vec() }) {
192 tracing::error!("Failed to queue S3 upload: {}", e);
193 }
194 }
195
196 Ok(is_new)
197 }
198
199 pub fn get_sync(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
201 if let Some(data) = self.local.get_sync(hash)? {
203 return Ok(Some(data));
204 }
205
206 #[cfg(feature = "s3")]
208 if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
209 let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
210
211 match sync_block_on(async {
212 client.get_object()
213 .bucket(bucket)
214 .key(&key)
215 .send()
216 .await
217 }) {
218 Ok(output) => {
219 if let Ok(body) = sync_block_on(output.body.collect()) {
220 let data = body.into_bytes().to_vec();
221 let _ = self.local.put_sync(*hash, &data);
223 return Ok(Some(data));
224 }
225 }
226 Err(e) => {
227 let service_err = e.into_service_error();
228 if !service_err.is_no_such_key() {
229 tracing::warn!("S3 get failed: {}", service_err);
230 }
231 }
232 }
233 }
234
235 Ok(None)
236 }
237
238 pub fn exists(&self, hash: &Hash) -> Result<bool, StoreError> {
240 if self.local.exists(hash)? {
242 return Ok(true);
243 }
244
245 #[cfg(feature = "s3")]
247 if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
248 let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
249
250 match sync_block_on(async {
251 client.head_object()
252 .bucket(bucket)
253 .key(&key)
254 .send()
255 .await
256 }) {
257 Ok(_) => return Ok(true),
258 Err(e) => {
259 let service_err = e.into_service_error();
260 if !service_err.is_not_found() {
261 tracing::warn!("S3 head failed: {}", service_err);
262 }
263 }
264 }
265 }
266
267 Ok(false)
268 }
269
270 pub fn delete_sync(&self, hash: &Hash) -> Result<bool, StoreError> {
272 let deleted = self.local.delete_sync(hash)?;
273
274 #[cfg(feature = "s3")]
276 if let Some(ref tx) = self.sync_tx {
277 let _ = tx.send(S3SyncMessage::Delete { hash: *hash });
278 }
279
280 Ok(deleted)
281 }
282
283 pub fn delete_local_only(&self, hash: &Hash) -> Result<bool, StoreError> {
286 self.local.delete_sync(hash)
287 }
288
289 pub fn stats(&self) -> Result<hashtree_lmdb::LmdbStats, StoreError> {
291 self.local.stats()
292 }
293
294 pub fn list(&self) -> Result<Vec<Hash>, StoreError> {
296 self.local.list()
297 }
298
299 pub fn local_store(&self) -> Arc<LmdbBlobStore> {
301 Arc::clone(&self.local)
302 }
303}
304
305#[async_trait]
308impl Store for StorageRouter {
309 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
310 self.put_sync(hash, &data)
311 }
312
313 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
314 self.get_sync(hash)
315 }
316
317 async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
318 self.exists(hash)
319 }
320
321 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
322 self.delete_sync(hash)
323 }
324}
325
326pub struct HashtreeStore {
327 env: heed::Env,
328 pins: Database<Bytes, Unit>,
330 sha256_index: Database<Bytes, Bytes>,
332 blob_owners: Database<Bytes, Unit>,
334 pubkey_blobs: Database<Bytes, Bytes>,
336 tree_meta: Database<Bytes, Bytes>,
338 blob_trees: Database<Bytes, Unit>,
340 tree_refs: Database<Str, Bytes>,
342 cached_roots: Database<Str, Bytes>,
344 router: Arc<StorageRouter>,
346 max_size_bytes: u64,
348}
349
350impl HashtreeStore {
351 pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
353 Self::with_options(path, None, 10 * 1024 * 1024 * 1024)
354 }
355
356 pub fn with_s3<P: AsRef<Path>>(path: P, s3_config: Option<&S3Config>) -> Result<Self> {
358 Self::with_options(path, s3_config, 10 * 1024 * 1024 * 1024)
359 }
360
361 pub fn with_options<P: AsRef<Path>>(path: P, s3_config: Option<&S3Config>, max_size_bytes: u64) -> Result<Self> {
363 let path = path.as_ref();
364 std::fs::create_dir_all(path)?;
365
366 let env = unsafe {
367 EnvOpenOptions::new()
368 .map_size(10 * 1024 * 1024 * 1024) .max_dbs(9) .open(path)?
371 };
372
373 let mut wtxn = env.write_txn()?;
374 let pins = env.create_database(&mut wtxn, Some("pins"))?;
375 let sha256_index = env.create_database(&mut wtxn, Some("sha256_index"))?;
376 let blob_owners = env.create_database(&mut wtxn, Some("blob_owners"))?;
377 let pubkey_blobs = env.create_database(&mut wtxn, Some("pubkey_blobs"))?;
378 let tree_meta = env.create_database(&mut wtxn, Some("tree_meta"))?;
379 let blob_trees = env.create_database(&mut wtxn, Some("blob_trees"))?;
380 let tree_refs = env.create_database(&mut wtxn, Some("tree_refs"))?;
381 let cached_roots = env.create_database(&mut wtxn, Some("cached_roots"))?;
382 wtxn.commit()?;
383
384 let lmdb_store = Arc::new(LmdbBlobStore::new(path.join("blobs"))
386 .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?);
387
388 #[cfg(feature = "s3")]
390 let router = Arc::new(if let Some(s3_cfg) = s3_config {
391 tracing::info!("Initializing S3 storage backend: bucket={}, endpoint={}",
392 s3_cfg.bucket, s3_cfg.endpoint);
393
394 sync_block_on(async {
395 StorageRouter::with_s3(lmdb_store, s3_cfg).await
396 })?
397 } else {
398 StorageRouter::new(lmdb_store)
399 });
400
401 #[cfg(not(feature = "s3"))]
402 let router = Arc::new({
403 if s3_config.is_some() {
404 tracing::warn!("S3 config provided but S3 feature not enabled. Using local storage only.");
405 }
406 StorageRouter::new(lmdb_store)
407 });
408
409 Ok(Self {
410 env,
411 pins,
412 sha256_index,
413 blob_owners,
414 pubkey_blobs,
415 tree_meta,
416 blob_trees,
417 tree_refs,
418 cached_roots,
419 router,
420 max_size_bytes,
421 })
422 }
423
424 pub fn router(&self) -> &StorageRouter {
426 &self.router
427 }
428
429 pub fn store_arc(&self) -> Arc<StorageRouter> {
432 Arc::clone(&self.router)
433 }
434
435 pub fn upload_file<P: AsRef<Path>>(&self, file_path: P) -> Result<String> {
437 self.upload_file_internal(file_path, true)
438 }
439
440 pub fn upload_file_no_pin<P: AsRef<Path>>(&self, file_path: P) -> Result<String> {
442 self.upload_file_internal(file_path, false)
443 }
444
445 fn upload_file_internal<P: AsRef<Path>>(&self, file_path: P, pin: bool) -> Result<String> {
446 let file_path = file_path.as_ref();
447 let file_content = std::fs::read(file_path)?;
448
449 let content_sha256 = sha256(&file_content);
451
452 let store = self.store_arc();
454 let tree = HashTree::new(HashTreeConfig::new(store).public());
455
456 let cid = sync_block_on(async {
457 tree.put(&file_content).await
458 }).context("Failed to store file")?;
459
460 let mut wtxn = self.env.write_txn()?;
461
462 self.sha256_index.put(&mut wtxn, &content_sha256, &cid.hash)?;
464
465 if pin {
467 self.pins.put(&mut wtxn, cid.hash.as_slice(), &())?;
468 }
469
470 wtxn.commit()?;
471
472 Ok(to_hex(&cid.hash))
473 }
474
475 pub fn upload_file_stream<R: Read, F>(
477 &self,
478 mut reader: R,
479 _file_name: impl Into<String>,
480 mut callback: F,
481 ) -> Result<String>
482 where
483 F: FnMut(&str),
484 {
485 let mut data = Vec::new();
487 reader.read_to_end(&mut data)?;
488
489 let content_sha256 = sha256(&data);
491
492 let store = self.store_arc();
494 let tree = HashTree::new(HashTreeConfig::new(store).public());
495
496 let cid = sync_block_on(async {
497 tree.put(&data).await
498 }).context("Failed to store file")?;
499
500 let root_hex = to_hex(&cid.hash);
501 callback(&root_hex);
502
503 let mut wtxn = self.env.write_txn()?;
504
505 self.sha256_index.put(&mut wtxn, &content_sha256, &cid.hash)?;
507
508 self.pins.put(&mut wtxn, cid.hash.as_slice(), &())?;
510
511 wtxn.commit()?;
512
513 Ok(root_hex)
514 }
515
516 pub fn upload_dir<P: AsRef<Path>>(&self, dir_path: P) -> Result<String> {
519 self.upload_dir_with_options(dir_path, true)
520 }
521
522 pub fn upload_dir_with_options<P: AsRef<Path>>(&self, dir_path: P, respect_gitignore: bool) -> Result<String> {
524 let dir_path = dir_path.as_ref();
525
526 let store = self.store_arc();
527 let tree = HashTree::new(HashTreeConfig::new(store).public());
528
529 let root_cid = sync_block_on(async {
530 self.upload_dir_recursive(&tree, dir_path, dir_path, respect_gitignore).await
531 }).context("Failed to upload directory")?;
532
533 let root_hex = to_hex(&root_cid.hash);
534
535 let mut wtxn = self.env.write_txn()?;
536 self.pins.put(&mut wtxn, root_cid.hash.as_slice(), &())?;
537 wtxn.commit()?;
538
539 Ok(root_hex)
540 }
541
542 async fn upload_dir_recursive<S: Store>(
543 &self,
544 tree: &HashTree<S>,
545 _root_path: &Path,
546 current_path: &Path,
547 respect_gitignore: bool,
548 ) -> Result<Cid> {
549 use ignore::WalkBuilder;
550 use std::collections::HashMap;
551
552 let mut dir_contents: HashMap<String, Vec<(String, Cid)>> = HashMap::new();
554 dir_contents.insert(String::new(), Vec::new()); let walker = WalkBuilder::new(current_path)
557 .git_ignore(respect_gitignore)
558 .git_global(respect_gitignore)
559 .git_exclude(respect_gitignore)
560 .hidden(false)
561 .build();
562
563 for result in walker {
564 let entry = result?;
565 let path = entry.path();
566
567 if path == current_path {
569 continue;
570 }
571
572 let relative = path.strip_prefix(current_path)
573 .unwrap_or(path);
574
575 if path.is_file() {
576 let content = std::fs::read(path)?;
577 let cid = tree.put(&content).await
578 .map_err(|e| anyhow::anyhow!("Failed to upload file {}: {}", path.display(), e))?;
579
580 let parent = relative.parent()
582 .map(|p| p.to_string_lossy().to_string())
583 .unwrap_or_default();
584 let name = relative.file_name()
585 .map(|n| n.to_string_lossy().to_string())
586 .unwrap_or_default();
587
588 dir_contents.entry(parent).or_default().push((name, cid));
589 } else if path.is_dir() {
590 let dir_path = relative.to_string_lossy().to_string();
592 dir_contents.entry(dir_path).or_default();
593 }
594 }
595
596 self.build_directory_tree(tree, &mut dir_contents).await
598 }
599
600 async fn build_directory_tree<S: Store>(
601 &self,
602 tree: &HashTree<S>,
603 dir_contents: &mut std::collections::HashMap<String, Vec<(String, Cid)>>,
604 ) -> Result<Cid> {
605 let mut dirs: Vec<String> = dir_contents.keys().cloned().collect();
607 dirs.sort_by(|a, b| {
608 let depth_a = a.matches('/').count() + if a.is_empty() { 0 } else { 1 };
609 let depth_b = b.matches('/').count() + if b.is_empty() { 0 } else { 1 };
610 depth_b.cmp(&depth_a) });
612
613 let mut dir_cids: std::collections::HashMap<String, Cid> = std::collections::HashMap::new();
614
615 for dir_path in dirs {
616 let files = dir_contents.get(&dir_path).cloned().unwrap_or_default();
617
618 let mut entries: Vec<HashTreeDirEntry> = files.into_iter()
619 .map(|(name, cid)| HashTreeDirEntry::from_cid(name, &cid))
620 .collect();
621
622 for (subdir_path, cid) in &dir_cids {
624 let parent = std::path::Path::new(subdir_path)
625 .parent()
626 .map(|p| p.to_string_lossy().to_string())
627 .unwrap_or_default();
628
629 if parent == dir_path {
630 let name = std::path::Path::new(subdir_path)
631 .file_name()
632 .map(|n| n.to_string_lossy().to_string())
633 .unwrap_or_default();
634 entries.push(HashTreeDirEntry::from_cid(name, cid));
635 }
636 }
637
638 let cid = tree.put_directory(entries).await
639 .map_err(|e| anyhow::anyhow!("Failed to create directory node: {}", e))?;
640
641 dir_cids.insert(dir_path, cid);
642 }
643
644 dir_cids.get("")
646 .cloned()
647 .ok_or_else(|| anyhow::anyhow!("No root directory"))
648 }
649
650 pub fn upload_file_encrypted<P: AsRef<Path>>(&self, file_path: P) -> Result<String> {
652 let file_path = file_path.as_ref();
653 let file_content = std::fs::read(file_path)?;
654
655 let store = self.store_arc();
657 let tree = HashTree::new(HashTreeConfig::new(store));
658
659 let cid = sync_block_on(async {
660 tree.put(&file_content).await
661 }).map_err(|e| anyhow::anyhow!("Failed to encrypt file: {}", e))?;
662
663 let cid_str = cid.to_string();
664
665 let mut wtxn = self.env.write_txn()?;
666 self.pins.put(&mut wtxn, cid.hash.as_slice(), &())?;
667 wtxn.commit()?;
668
669 Ok(cid_str)
670 }
671
672 pub fn upload_dir_encrypted<P: AsRef<Path>>(&self, dir_path: P) -> Result<String> {
675 self.upload_dir_encrypted_with_options(dir_path, true)
676 }
677
678 pub fn upload_dir_encrypted_with_options<P: AsRef<Path>>(&self, dir_path: P, respect_gitignore: bool) -> Result<String> {
681 let dir_path = dir_path.as_ref();
682 let store = self.store_arc();
683
684 let tree = HashTree::new(HashTreeConfig::new(store));
686
687 let root_cid = sync_block_on(async {
688 self.upload_dir_recursive(&tree, dir_path, dir_path, respect_gitignore).await
689 }).context("Failed to upload encrypted directory")?;
690
691 let cid_str = root_cid.to_string(); let mut wtxn = self.env.write_txn()?;
694 self.pins.put(&mut wtxn, root_cid.hash.as_slice(), &())?;
696 wtxn.commit()?;
697
698 Ok(cid_str)
699 }
700
701 pub fn get_tree_node(&self, hash: &[u8; 32]) -> Result<Option<TreeNode>> {
703 let store = self.store_arc();
704 let tree = HashTree::new(HashTreeConfig::new(store).public());
705
706 sync_block_on(async {
707 tree.get_tree_node(hash).await
708 .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))
709 })
710 }
711
712 pub fn get_cid_by_sha256(&self, sha256: &[u8; 32]) -> Result<Option<Hash>> {
714 let rtxn = self.env.read_txn()?;
715 Ok(self.sha256_index.get(&rtxn, sha256)?.map(|bytes| {
716 let mut hash = [0u8; 32];
717 hash.copy_from_slice(bytes);
718 hash
719 }))
720 }
721
722 pub fn put_blob(&self, data: &[u8]) -> Result<String> {
724 let hash = sha256(data);
725 self.router.put_sync(hash, data)
726 .map_err(|e| anyhow::anyhow!("Failed to store blob: {}", e))?;
727 Ok(to_hex(&hash))
728 }
729
730 pub fn get_blob(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
732 self.router.get_sync(hash)
733 .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))
734 }
735
736 pub fn blob_exists(&self, hash: &[u8; 32]) -> Result<bool> {
738 self.router.exists(hash)
739 .map_err(|e| anyhow::anyhow!("Failed to check blob: {}", e))
740 }
741
742 fn blob_owner_key(sha256: &[u8; 32], pubkey: &[u8; 32]) -> [u8; 64] {
748 let mut key = [0u8; 64];
749 key[..32].copy_from_slice(sha256);
750 key[32..].copy_from_slice(pubkey);
751 key
752 }
753
754 pub fn set_blob_owner(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<()> {
757 let key = Self::blob_owner_key(sha256, pubkey);
758 let mut wtxn = self.env.write_txn()?;
759
760 self.blob_owners.put(&mut wtxn, &key[..], &())?;
762
763 let sha256_hex = to_hex(sha256);
765
766 let mut blobs: Vec<BlobMetadata> = self
768 .pubkey_blobs
769 .get(&wtxn, pubkey)?
770 .and_then(|b| serde_json::from_slice(b).ok())
771 .unwrap_or_default();
772
773 if !blobs.iter().any(|b| b.sha256 == sha256_hex) {
775 let now = SystemTime::now()
776 .duration_since(UNIX_EPOCH)
777 .unwrap()
778 .as_secs();
779
780 let size = self
782 .get_cid_by_sha256(sha256)?
783 .and_then(|root_hash| self.get_file_chunk_metadata(&root_hash).ok().flatten())
784 .map(|m| m.total_size)
785 .unwrap_or(0);
786
787 blobs.push(BlobMetadata {
788 sha256: sha256_hex,
789 size,
790 mime_type: "application/octet-stream".to_string(),
791 uploaded: now,
792 });
793
794 let blobs_json = serde_json::to_vec(&blobs)?;
795 self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
796 }
797
798 wtxn.commit()?;
799 Ok(())
800 }
801
802 pub fn is_blob_owner(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<bool> {
804 let key = Self::blob_owner_key(sha256, pubkey);
805 let rtxn = self.env.read_txn()?;
806 Ok(self.blob_owners.get(&rtxn, &key[..])?.is_some())
807 }
808
809 pub fn get_blob_owners(&self, sha256: &[u8; 32]) -> Result<Vec<[u8; 32]>> {
811 let rtxn = self.env.read_txn()?;
812
813 let mut owners = Vec::new();
814 for item in self.blob_owners.prefix_iter(&rtxn, &sha256[..])? {
815 let (key, _) = item?;
816 if key.len() == 64 {
817 let mut pubkey = [0u8; 32];
819 pubkey.copy_from_slice(&key[32..64]);
820 owners.push(pubkey);
821 }
822 }
823 Ok(owners)
824 }
825
826 pub fn blob_has_owners(&self, sha256: &[u8; 32]) -> Result<bool> {
828 let rtxn = self.env.read_txn()?;
829
830 for item in self.blob_owners.prefix_iter(&rtxn, &sha256[..])? {
832 if item.is_ok() {
833 return Ok(true);
834 }
835 }
836 Ok(false)
837 }
838
839 pub fn get_blob_owner(&self, sha256: &[u8; 32]) -> Result<Option<[u8; 32]>> {
841 Ok(self.get_blob_owners(sha256)?.into_iter().next())
842 }
843
844 pub fn delete_blossom_blob(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<bool> {
848 let key = Self::blob_owner_key(sha256, pubkey);
849 let mut wtxn = self.env.write_txn()?;
850
851 self.blob_owners.delete(&mut wtxn, &key[..])?;
853
854 let sha256_hex = to_hex(sha256);
856
857 if let Some(blobs_bytes) = self.pubkey_blobs.get(&wtxn, pubkey)? {
859 if let Ok(mut blobs) = serde_json::from_slice::<Vec<BlobMetadata>>(blobs_bytes) {
860 blobs.retain(|b| b.sha256 != sha256_hex);
861 let blobs_json = serde_json::to_vec(&blobs)?;
862 self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
863 }
864 }
865
866 let mut has_other_owners = false;
868 for item in self.blob_owners.prefix_iter(&wtxn, &sha256[..])? {
869 if item.is_ok() {
870 has_other_owners = true;
871 break;
872 }
873 }
874
875 if has_other_owners {
876 wtxn.commit()?;
877 tracing::debug!(
878 "Removed {} from blob {} owners, other owners remain",
879 &to_hex(pubkey)[..8],
880 &sha256_hex[..8]
881 );
882 return Ok(false);
883 }
884
885 tracing::info!(
887 "All owners removed from blob {}, deleting",
888 &sha256_hex[..8]
889 );
890
891 let root_hash: Option<Hash> = self.sha256_index.get(&wtxn, sha256)?.map(|bytes| {
893 let mut hash = [0u8; 32];
894 hash.copy_from_slice(bytes);
895 hash
896 });
897 if let Some(ref hash) = root_hash {
898 self.pins.delete(&mut wtxn, hash)?;
899 }
900 self.sha256_index.delete(&mut wtxn, sha256)?;
901
902 let _ = self.router.delete_sync(sha256);
904
905 wtxn.commit()?;
906 Ok(true)
907 }
908
909 pub fn list_blobs_by_pubkey(&self, pubkey: &[u8; 32]) -> Result<Vec<crate::server::blossom::BlobDescriptor>> {
911 let rtxn = self.env.read_txn()?;
912
913 let blobs: Vec<BlobMetadata> = self
914 .pubkey_blobs
915 .get(&rtxn, pubkey)?
916 .and_then(|b| serde_json::from_slice(b).ok())
917 .unwrap_or_default();
918
919 Ok(blobs
920 .into_iter()
921 .map(|b| crate::server::blossom::BlobDescriptor {
922 url: format!("/{}", b.sha256),
923 sha256: b.sha256,
924 size: b.size,
925 mime_type: b.mime_type,
926 uploaded: b.uploaded,
927 })
928 .collect())
929 }
930
931 pub fn get_chunk(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
933 self.router.get_sync(hash)
934 .map_err(|e| anyhow::anyhow!("Failed to get chunk: {}", e))
935 }
936
937 pub fn get_file(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
940 let store = self.store_arc();
941 let tree = HashTree::new(HashTreeConfig::new(store).public());
942
943 sync_block_on(async {
944 tree.read_file(hash).await
945 .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
946 })
947 }
948
949 pub fn get_file_by_cid(&self, cid: &Cid) -> Result<Option<Vec<u8>>> {
952 let store = self.store_arc();
953 let tree = HashTree::new(HashTreeConfig::new(store).public());
954
955 sync_block_on(async {
956 tree.get(cid).await
957 .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
958 })
959 }
960
961 pub fn get_file_chunk_metadata(&self, hash: &[u8; 32]) -> Result<Option<FileChunkMetadata>> {
963 let store = self.store_arc();
964 let tree = HashTree::new(HashTreeConfig::new(store.clone()).public());
965
966 sync_block_on(async {
967 let exists = store.has(&hash).await
970 .map_err(|e| anyhow::anyhow!("Failed to check existence: {}", e))?;
971
972 if !exists {
973 return Ok(None);
974 }
975
976 let total_size = tree.get_size(&hash).await
978 .map_err(|e| anyhow::anyhow!("Failed to get size: {}", e))?;
979
980 let is_tree_node = tree.is_tree(&hash).await
982 .map_err(|e| anyhow::anyhow!("Failed to check tree: {}", e))?;
983
984 if !is_tree_node {
985 return Ok(Some(FileChunkMetadata {
987 total_size,
988 chunk_hashes: vec![],
989 chunk_sizes: vec![],
990 is_chunked: false,
991 }));
992 }
993
994 let node = match tree.get_tree_node(&hash).await
996 .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))? {
997 Some(n) => n,
998 None => return Ok(None),
999 };
1000
1001 let is_directory = tree.is_directory(&hash).await
1003 .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1004
1005 if is_directory {
1006 return Ok(None); }
1008
1009 let chunk_hashes: Vec<Hash> = node.links.iter().map(|l| l.hash).collect();
1011 let chunk_sizes: Vec<u64> = node.links.iter().map(|l| l.size).collect();
1012
1013 Ok(Some(FileChunkMetadata {
1014 total_size,
1015 chunk_hashes,
1016 chunk_sizes,
1017 is_chunked: !node.links.is_empty(),
1018 }))
1019 })
1020 }
1021
1022 pub fn get_file_range(&self, hash: &[u8; 32], start: u64, end: Option<u64>) -> Result<Option<(Vec<u8>, u64)>> {
1024 let metadata = match self.get_file_chunk_metadata(hash)? {
1025 Some(m) => m,
1026 None => return Ok(None),
1027 };
1028
1029 if metadata.total_size == 0 {
1030 return Ok(Some((Vec::new(), 0)));
1031 }
1032
1033 if start >= metadata.total_size {
1034 return Ok(None);
1035 }
1036
1037 let end = end.unwrap_or(metadata.total_size - 1).min(metadata.total_size - 1);
1038
1039 if !metadata.is_chunked {
1041 let content = self.get_file(hash)?.unwrap_or_default();
1042 let range_content = if start < content.len() as u64 {
1043 content[start as usize..=(end as usize).min(content.len() - 1)].to_vec()
1044 } else {
1045 Vec::new()
1046 };
1047 return Ok(Some((range_content, metadata.total_size)));
1048 }
1049
1050 let mut result = Vec::new();
1052 let mut current_offset = 0u64;
1053
1054 for (i, chunk_hash) in metadata.chunk_hashes.iter().enumerate() {
1055 let chunk_size = metadata.chunk_sizes[i];
1056 let chunk_end = current_offset + chunk_size - 1;
1057
1058 if chunk_end >= start && current_offset <= end {
1060 let chunk_content = match self.get_chunk(chunk_hash)? {
1061 Some(content) => content,
1062 None => {
1063 return Err(anyhow::anyhow!("Chunk {} not found", to_hex(chunk_hash)));
1064 }
1065 };
1066
1067 let chunk_read_start = if current_offset >= start {
1068 0
1069 } else {
1070 (start - current_offset) as usize
1071 };
1072
1073 let chunk_read_end = if chunk_end <= end {
1074 chunk_size as usize - 1
1075 } else {
1076 (end - current_offset) as usize
1077 };
1078
1079 result.extend_from_slice(&chunk_content[chunk_read_start..=chunk_read_end]);
1080 }
1081
1082 current_offset += chunk_size;
1083
1084 if current_offset > end {
1085 break;
1086 }
1087 }
1088
1089 Ok(Some((result, metadata.total_size)))
1090 }
1091
1092 pub fn stream_file_range_chunks_owned(
1094 self: Arc<Self>,
1095 hash: &[u8; 32],
1096 start: u64,
1097 end: u64,
1098 ) -> Result<Option<FileRangeChunksOwned>> {
1099 let metadata = match self.get_file_chunk_metadata(hash)? {
1100 Some(m) => m,
1101 None => return Ok(None),
1102 };
1103
1104 if metadata.total_size == 0 || start >= metadata.total_size {
1105 return Ok(None);
1106 }
1107
1108 let end = end.min(metadata.total_size - 1);
1109
1110 Ok(Some(FileRangeChunksOwned {
1111 store: self,
1112 metadata,
1113 start,
1114 end,
1115 current_chunk_idx: 0,
1116 current_offset: 0,
1117 }))
1118 }
1119
1120 pub fn get_directory_listing(&self, hash: &[u8; 32]) -> Result<Option<DirectoryListing>> {
1122 let store = self.store_arc();
1123 let tree = HashTree::new(HashTreeConfig::new(store).public());
1124
1125 sync_block_on(async {
1126 let is_dir = tree.is_directory(&hash).await
1128 .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1129
1130 if !is_dir {
1131 return Ok(None);
1132 }
1133
1134 let cid = hashtree_core::Cid::public(*hash, 0);
1136 let tree_entries = tree.list_directory(&cid).await
1137 .map_err(|e| anyhow::anyhow!("Failed to list directory: {}", e))?;
1138
1139 let entries: Vec<DirEntry> = tree_entries.into_iter().map(|e| DirEntry {
1140 name: e.name,
1141 cid: to_hex(&e.hash),
1142 is_directory: e.link_type.is_tree(),
1143 size: e.size,
1144 }).collect();
1145
1146 Ok(Some(DirectoryListing {
1147 dir_name: String::new(),
1148 entries,
1149 }))
1150 })
1151 }
1152
1153 pub fn pin(&self, hash: &[u8; 32]) -> Result<()> {
1155 let mut wtxn = self.env.write_txn()?;
1156 self.pins.put(&mut wtxn, hash.as_slice(), &())?;
1157 wtxn.commit()?;
1158 Ok(())
1159 }
1160
1161 pub fn unpin(&self, hash: &[u8; 32]) -> Result<()> {
1163 let mut wtxn = self.env.write_txn()?;
1164 self.pins.delete(&mut wtxn, hash.as_slice())?;
1165 wtxn.commit()?;
1166 Ok(())
1167 }
1168
1169 pub fn is_pinned(&self, hash: &[u8; 32]) -> Result<bool> {
1171 let rtxn = self.env.read_txn()?;
1172 Ok(self.pins.get(&rtxn, hash.as_slice())?.is_some())
1173 }
1174
1175 pub fn list_pins_raw(&self) -> Result<Vec<[u8; 32]>> {
1177 let rtxn = self.env.read_txn()?;
1178 let mut pins = Vec::new();
1179
1180 for item in self.pins.iter(&rtxn)? {
1181 let (hash_bytes, _) = item?;
1182 if hash_bytes.len() == 32 {
1183 let mut hash = [0u8; 32];
1184 hash.copy_from_slice(hash_bytes);
1185 pins.push(hash);
1186 }
1187 }
1188
1189 Ok(pins)
1190 }
1191
1192 pub fn list_pins_with_names(&self) -> Result<Vec<PinnedItem>> {
1194 let rtxn = self.env.read_txn()?;
1195 let store = self.store_arc();
1196 let tree = HashTree::new(HashTreeConfig::new(store).public());
1197 let mut pins = Vec::new();
1198
1199 for item in self.pins.iter(&rtxn)? {
1200 let (hash_bytes, _) = item?;
1201 if hash_bytes.len() != 32 {
1202 continue;
1203 }
1204 let mut hash = [0u8; 32];
1205 hash.copy_from_slice(hash_bytes);
1206
1207 let is_directory = sync_block_on(async {
1209 tree.is_directory(&hash).await.unwrap_or(false)
1210 });
1211
1212 pins.push(PinnedItem {
1213 cid: to_hex(&hash),
1214 name: "Unknown".to_string(),
1215 is_directory,
1216 });
1217 }
1218
1219 Ok(pins)
1220 }
1221
1222 pub fn index_tree(
1229 &self,
1230 root_hash: &Hash,
1231 owner: &str,
1232 name: Option<&str>,
1233 priority: u8,
1234 ref_key: Option<&str>,
1235 ) -> Result<()> {
1236 let root_hex = to_hex(root_hash);
1237
1238 if let Some(key) = ref_key {
1240 let rtxn = self.env.read_txn()?;
1241 if let Some(old_hash_bytes) = self.tree_refs.get(&rtxn, key)? {
1242 if old_hash_bytes != root_hash.as_slice() {
1243 let old_hash: Hash = old_hash_bytes.try_into()
1244 .map_err(|_| anyhow::anyhow!("Invalid hash in tree_refs"))?;
1245 drop(rtxn);
1246 let _ = self.unindex_tree(&old_hash);
1248 tracing::debug!("Replaced old tree for ref {}", key);
1249 }
1250 }
1251 }
1252
1253 let store = self.store_arc();
1254 let tree = HashTree::new(HashTreeConfig::new(store).public());
1255
1256 let (blob_hashes, total_size) = sync_block_on(async {
1258 self.collect_tree_blobs(&tree, root_hash).await
1259 })?;
1260
1261 let mut wtxn = self.env.write_txn()?;
1262
1263 for blob_hash in &blob_hashes {
1265 let mut key = [0u8; 64];
1266 key[..32].copy_from_slice(blob_hash);
1267 key[32..].copy_from_slice(root_hash);
1268 self.blob_trees.put(&mut wtxn, &key[..], &())?;
1269 }
1270
1271 let meta = TreeMeta {
1273 owner: owner.to_string(),
1274 name: name.map(|s| s.to_string()),
1275 synced_at: SystemTime::now()
1276 .duration_since(UNIX_EPOCH)
1277 .unwrap()
1278 .as_secs(),
1279 total_size,
1280 priority,
1281 };
1282 let meta_bytes = rmp_serde::to_vec(&meta)
1283 .map_err(|e| anyhow::anyhow!("Failed to serialize TreeMeta: {}", e))?;
1284 self.tree_meta.put(&mut wtxn, root_hash.as_slice(), &meta_bytes)?;
1285
1286 if let Some(key) = ref_key {
1288 self.tree_refs.put(&mut wtxn, key, root_hash.as_slice())?;
1289 }
1290
1291 wtxn.commit()?;
1292
1293 tracing::debug!(
1294 "Indexed tree {} ({} blobs, {} bytes, priority {})",
1295 &root_hex[..8],
1296 blob_hashes.len(),
1297 total_size,
1298 priority
1299 );
1300
1301 Ok(())
1302 }
1303
1304 async fn collect_tree_blobs<S: Store>(
1306 &self,
1307 tree: &HashTree<S>,
1308 root: &Hash,
1309 ) -> Result<(Vec<Hash>, u64)> {
1310 let mut blobs = Vec::new();
1311 let mut total_size = 0u64;
1312 let mut stack = vec![*root];
1313
1314 while let Some(hash) = stack.pop() {
1315 let is_tree = tree.is_tree(&hash).await
1317 .map_err(|e| anyhow::anyhow!("Failed to check tree: {}", e))?;
1318
1319 if is_tree {
1320 if let Some(node) = tree.get_tree_node(&hash).await
1322 .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
1323 {
1324 for link in &node.links {
1325 stack.push(link.hash);
1326 }
1327 }
1328 } else {
1329 if let Some(data) = self.router.get_sync(&hash)
1331 .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))?
1332 {
1333 total_size += data.len() as u64;
1334 blobs.push(hash);
1335 }
1336 }
1337 }
1338
1339 Ok((blobs, total_size))
1340 }
1341
1342 pub fn unindex_tree(&self, root_hash: &Hash) -> Result<u64> {
1345 let root_hex = to_hex(root_hash);
1346
1347 let store = self.store_arc();
1348 let tree = HashTree::new(HashTreeConfig::new(store).public());
1349
1350 let (blob_hashes, _) = sync_block_on(async {
1352 self.collect_tree_blobs(&tree, root_hash).await
1353 })?;
1354
1355 let mut wtxn = self.env.write_txn()?;
1356 let mut freed = 0u64;
1357
1358 for blob_hash in &blob_hashes {
1360 let mut key = [0u8; 64];
1362 key[..32].copy_from_slice(blob_hash);
1363 key[32..].copy_from_slice(root_hash);
1364 self.blob_trees.delete(&mut wtxn, &key[..])?;
1365
1366 let rtxn = self.env.read_txn()?;
1368 let mut has_other_tree = false;
1369
1370 for item in self.blob_trees.prefix_iter(&rtxn, &blob_hash[..])? {
1371 if item.is_ok() {
1372 has_other_tree = true;
1373 break;
1374 }
1375 }
1376 drop(rtxn);
1377
1378 if !has_other_tree {
1380 if let Some(data) = self.router.get_sync(blob_hash)
1381 .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))?
1382 {
1383 freed += data.len() as u64;
1384 self.router.delete_local_only(blob_hash)
1386 .map_err(|e| anyhow::anyhow!("Failed to delete blob: {}", e))?;
1387 }
1388 }
1389 }
1390
1391 if let Some(data) = self.router.get_sync(root_hash)
1393 .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
1394 {
1395 freed += data.len() as u64;
1396 self.router.delete_local_only(root_hash)
1398 .map_err(|e| anyhow::anyhow!("Failed to delete tree node: {}", e))?;
1399 }
1400
1401 self.tree_meta.delete(&mut wtxn, root_hash.as_slice())?;
1403
1404 wtxn.commit()?;
1405
1406 tracing::debug!(
1407 "Unindexed tree {} ({} bytes freed)",
1408 &root_hex[..8],
1409 freed
1410 );
1411
1412 Ok(freed)
1413 }
1414
1415 pub fn get_tree_meta(&self, root_hash: &Hash) -> Result<Option<TreeMeta>> {
1417 let rtxn = self.env.read_txn()?;
1418 if let Some(bytes) = self.tree_meta.get(&rtxn, root_hash.as_slice())? {
1419 let meta: TreeMeta = rmp_serde::from_slice(bytes)
1420 .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
1421 Ok(Some(meta))
1422 } else {
1423 Ok(None)
1424 }
1425 }
1426
1427 pub fn list_indexed_trees(&self) -> Result<Vec<(Hash, TreeMeta)>> {
1429 let rtxn = self.env.read_txn()?;
1430 let mut trees = Vec::new();
1431
1432 for item in self.tree_meta.iter(&rtxn)? {
1433 let (hash_bytes, meta_bytes) = item?;
1434 let hash: Hash = hash_bytes.try_into()
1435 .map_err(|_| anyhow::anyhow!("Invalid hash in tree_meta"))?;
1436 let meta: TreeMeta = rmp_serde::from_slice(meta_bytes)
1437 .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
1438 trees.push((hash, meta));
1439 }
1440
1441 Ok(trees)
1442 }
1443
1444 pub fn tracked_size(&self) -> Result<u64> {
1446 let rtxn = self.env.read_txn()?;
1447 let mut total = 0u64;
1448
1449 for item in self.tree_meta.iter(&rtxn)? {
1450 let (_, bytes) = item?;
1451 let meta: TreeMeta = rmp_serde::from_slice(bytes)
1452 .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
1453 total += meta.total_size;
1454 }
1455
1456 Ok(total)
1457 }
1458
1459 fn get_evictable_trees(&self) -> Result<Vec<(Hash, TreeMeta)>> {
1461 let mut trees = self.list_indexed_trees()?;
1462
1463 trees.sort_by(|a, b| {
1465 match a.1.priority.cmp(&b.1.priority) {
1466 std::cmp::Ordering::Equal => a.1.synced_at.cmp(&b.1.synced_at),
1467 other => other,
1468 }
1469 });
1470
1471 Ok(trees)
1472 }
1473
1474 pub fn evict_if_needed(&self) -> Result<u64> {
1481 let stats = self.router.stats()
1483 .map_err(|e| anyhow::anyhow!("Failed to get stats: {}", e))?;
1484 let current = stats.total_bytes;
1485
1486 if current <= self.max_size_bytes {
1487 return Ok(0);
1488 }
1489
1490 let target = self.max_size_bytes * 90 / 100;
1492 let mut freed = 0u64;
1493 let mut current_size = current;
1494
1495 let orphan_freed = self.evict_orphaned_blobs()?;
1497 freed += orphan_freed;
1498 current_size = current_size.saturating_sub(orphan_freed);
1499
1500 if orphan_freed > 0 {
1501 tracing::info!("Evicted orphaned blobs: {} bytes freed", orphan_freed);
1502 }
1503
1504 if current_size <= target {
1506 if freed > 0 {
1507 tracing::info!("Eviction complete: {} bytes freed", freed);
1508 }
1509 return Ok(freed);
1510 }
1511
1512 let evictable = self.get_evictable_trees()?;
1515
1516 for (root_hash, meta) in evictable {
1517 if current_size <= target {
1518 break;
1519 }
1520
1521 let root_hex = to_hex(&root_hash);
1522
1523 if self.is_pinned(&root_hash)? {
1525 continue;
1526 }
1527
1528 let tree_freed = self.unindex_tree(&root_hash)?;
1529 freed += tree_freed;
1530 current_size = current_size.saturating_sub(tree_freed);
1531
1532 tracing::info!(
1533 "Evicted tree {} (owner={}, priority={}, {} bytes)",
1534 &root_hex[..8],
1535 &meta.owner[..8.min(meta.owner.len())],
1536 meta.priority,
1537 tree_freed
1538 );
1539 }
1540
1541 if freed > 0 {
1542 tracing::info!("Eviction complete: {} bytes freed", freed);
1543 }
1544
1545 Ok(freed)
1546 }
1547
1548 fn evict_orphaned_blobs(&self) -> Result<u64> {
1550 let mut freed = 0u64;
1551
1552 let all_hashes = self.router.list()
1554 .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
1555
1556 let rtxn = self.env.read_txn()?;
1558 let pinned: HashSet<Hash> = self.pins.iter(&rtxn)?
1559 .filter_map(|item| item.ok())
1560 .filter_map(|(hash_bytes, _)| {
1561 if hash_bytes.len() == 32 {
1562 let mut hash = [0u8; 32];
1563 hash.copy_from_slice(hash_bytes);
1564 Some(hash)
1565 } else {
1566 None
1567 }
1568 })
1569 .collect();
1570
1571 let mut blobs_in_trees: HashSet<Hash> = HashSet::new();
1574 for item in self.blob_trees.iter(&rtxn)? {
1575 if let Ok((key_bytes, _)) = item {
1576 if key_bytes.len() >= 32 {
1577 let blob_hash: Hash = key_bytes[..32].try_into().unwrap();
1578 blobs_in_trees.insert(blob_hash);
1579 }
1580 }
1581 }
1582 drop(rtxn);
1583
1584 for hash in all_hashes {
1586 if pinned.contains(&hash) {
1588 continue;
1589 }
1590
1591 if blobs_in_trees.contains(&hash) {
1593 continue;
1594 }
1595
1596 if let Ok(Some(data)) = self.router.get_sync(&hash) {
1598 freed += data.len() as u64;
1599 let _ = self.router.delete_local_only(&hash);
1600 tracing::debug!("Deleted orphaned blob {} ({} bytes)", &to_hex(&hash)[..8], data.len());
1601 }
1602 }
1603
1604 Ok(freed)
1605 }
1606
1607 pub fn max_size_bytes(&self) -> u64 {
1609 self.max_size_bytes
1610 }
1611
1612 pub fn storage_by_priority(&self) -> Result<StorageByPriority> {
1614 let rtxn = self.env.read_txn()?;
1615 let mut own = 0u64;
1616 let mut followed = 0u64;
1617 let mut other = 0u64;
1618
1619 for item in self.tree_meta.iter(&rtxn)? {
1620 let (_, bytes) = item?;
1621 let meta: TreeMeta = rmp_serde::from_slice(bytes)
1622 .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
1623
1624 if meta.priority >= PRIORITY_OWN {
1625 own += meta.total_size;
1626 } else if meta.priority >= PRIORITY_FOLLOWED {
1627 followed += meta.total_size;
1628 } else {
1629 other += meta.total_size;
1630 }
1631 }
1632
1633 Ok(StorageByPriority { own, followed, other })
1634 }
1635
1636 pub fn get_storage_stats(&self) -> Result<StorageStats> {
1638 let rtxn = self.env.read_txn()?;
1639 let total_pins = self.pins.len(&rtxn)? as usize;
1640
1641 let stats = self.router.stats()
1642 .map_err(|e| anyhow::anyhow!("Failed to get stats: {}", e))?;
1643
1644 Ok(StorageStats {
1645 total_dags: stats.count,
1646 pinned_dags: total_pins,
1647 total_bytes: stats.total_bytes,
1648 })
1649 }
1650
1651 pub fn get_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<Option<CachedRoot>> {
1655 let key = format!("{}/{}", pubkey_hex, tree_name);
1656 let rtxn = self.env.read_txn()?;
1657 if let Some(bytes) = self.cached_roots.get(&rtxn, &key)? {
1658 let root: CachedRoot = rmp_serde::from_slice(bytes)
1659 .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
1660 Ok(Some(root))
1661 } else {
1662 Ok(None)
1663 }
1664 }
1665
1666 pub fn set_cached_root(
1668 &self,
1669 pubkey_hex: &str,
1670 tree_name: &str,
1671 hash: &str,
1672 key: Option<&str>,
1673 visibility: &str,
1674 updated_at: u64,
1675 ) -> Result<()> {
1676 let db_key = format!("{}/{}", pubkey_hex, tree_name);
1677 let root = CachedRoot {
1678 hash: hash.to_string(),
1679 key: key.map(|k| k.to_string()),
1680 updated_at,
1681 visibility: visibility.to_string(),
1682 };
1683 let bytes = rmp_serde::to_vec(&root)
1684 .map_err(|e| anyhow::anyhow!("Failed to serialize CachedRoot: {}", e))?;
1685 let mut wtxn = self.env.write_txn()?;
1686 self.cached_roots.put(&mut wtxn, &db_key, &bytes)?;
1687 wtxn.commit()?;
1688 Ok(())
1689 }
1690
1691 pub fn list_cached_roots(&self, pubkey_hex: &str) -> Result<Vec<(String, CachedRoot)>> {
1693 let prefix = format!("{}/", pubkey_hex);
1694 let rtxn = self.env.read_txn()?;
1695 let mut results = Vec::new();
1696
1697 for item in self.cached_roots.iter(&rtxn)? {
1698 let (key, bytes) = item?;
1699 if key.starts_with(&prefix) {
1700 let tree_name = key.strip_prefix(&prefix).unwrap_or(key);
1701 let root: CachedRoot = rmp_serde::from_slice(bytes)
1702 .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
1703 results.push((tree_name.to_string(), root));
1704 }
1705 }
1706
1707 Ok(results)
1708 }
1709
1710 pub fn delete_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<bool> {
1712 let key = format!("{}/{}", pubkey_hex, tree_name);
1713 let mut wtxn = self.env.write_txn()?;
1714 let deleted = self.cached_roots.delete(&mut wtxn, &key)?;
1715 wtxn.commit()?;
1716 Ok(deleted)
1717 }
1718
1719 pub fn gc(&self) -> Result<GcStats> {
1721 let rtxn = self.env.read_txn()?;
1722
1723 let pinned: HashSet<Hash> = self.pins.iter(&rtxn)?
1725 .filter_map(|item| item.ok())
1726 .filter_map(|(hash_bytes, _)| {
1727 if hash_bytes.len() == 32 {
1728 let mut hash = [0u8; 32];
1729 hash.copy_from_slice(hash_bytes);
1730 Some(hash)
1731 } else {
1732 None
1733 }
1734 })
1735 .collect();
1736
1737 drop(rtxn);
1738
1739 let all_hashes = self.router.list()
1741 .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
1742
1743 let mut deleted = 0;
1745 let mut freed_bytes = 0u64;
1746
1747 for hash in all_hashes {
1748 if !pinned.contains(&hash) {
1749 if let Ok(Some(data)) = self.router.get_sync(&hash) {
1750 freed_bytes += data.len() as u64;
1751 let _ = self.router.delete_local_only(&hash);
1753 deleted += 1;
1754 }
1755 }
1756 }
1757
1758 Ok(GcStats {
1759 deleted_dags: deleted,
1760 freed_bytes,
1761 })
1762 }
1763
1764 pub fn verify_lmdb_integrity(&self, delete: bool) -> Result<VerifyResult> {
1767 let all_hashes = self.router.list()
1768 .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
1769
1770 let total = all_hashes.len();
1771 let mut valid = 0;
1772 let mut corrupted = 0;
1773 let mut deleted = 0;
1774 let mut corrupted_hashes = Vec::new();
1775
1776 for hash in &all_hashes {
1777 let hash_hex = to_hex(hash);
1778
1779 match self.router.get_sync(hash) {
1780 Ok(Some(data)) => {
1781 let actual_hash = sha256(&data);
1783
1784 if actual_hash == *hash {
1785 valid += 1;
1786 } else {
1787 corrupted += 1;
1788 let actual_hex = to_hex(&actual_hash);
1789 println!(" CORRUPTED: key={} actual={} size={}",
1790 &hash_hex[..16], &actual_hex[..16], data.len());
1791 corrupted_hashes.push(*hash);
1792 }
1793 }
1794 Ok(None) => {
1795 corrupted += 1;
1797 println!(" MISSING: key={}", &hash_hex[..16]);
1798 corrupted_hashes.push(*hash);
1799 }
1800 Err(e) => {
1801 corrupted += 1;
1802 println!(" ERROR: key={} err={}", &hash_hex[..16], e);
1803 corrupted_hashes.push(*hash);
1804 }
1805 }
1806 }
1807
1808 if delete {
1810 for hash in &corrupted_hashes {
1811 match self.router.delete_sync(hash) {
1812 Ok(true) => deleted += 1,
1813 Ok(false) => {} Err(e) => {
1815 let hash_hex = to_hex(hash);
1816 println!(" Failed to delete {}: {}", &hash_hex[..16], e);
1817 }
1818 }
1819 }
1820 }
1821
1822 Ok(VerifyResult {
1823 total,
1824 valid,
1825 corrupted,
1826 deleted,
1827 })
1828 }
1829
1830 #[cfg(feature = "s3")]
1833 pub async fn verify_r2_integrity(&self, delete: bool) -> Result<VerifyResult> {
1834 use aws_sdk_s3::Client as S3Client;
1835
1836 let config = crate::config::Config::load()?;
1839 let s3_config = config.storage.s3
1840 .ok_or_else(|| anyhow::anyhow!("S3 not configured"))?;
1841
1842 let aws_config = aws_config::from_env()
1844 .region(aws_sdk_s3::config::Region::new(s3_config.region.clone()))
1845 .load()
1846 .await;
1847
1848 let s3_client = S3Client::from_conf(
1849 aws_sdk_s3::config::Builder::from(&aws_config)
1850 .endpoint_url(&s3_config.endpoint)
1851 .force_path_style(true)
1852 .build()
1853 );
1854
1855 let bucket = &s3_config.bucket;
1856 let prefix = s3_config.prefix.as_deref().unwrap_or("");
1857
1858 let mut total = 0;
1859 let mut valid = 0;
1860 let mut corrupted = 0;
1861 let mut deleted = 0;
1862 let mut corrupted_keys = Vec::new();
1863
1864 let mut continuation_token: Option<String> = None;
1866
1867 loop {
1868 let mut list_req = s3_client.list_objects_v2()
1869 .bucket(bucket)
1870 .prefix(prefix);
1871
1872 if let Some(ref token) = continuation_token {
1873 list_req = list_req.continuation_token(token);
1874 }
1875
1876 let list_resp = list_req.send().await
1877 .map_err(|e| anyhow::anyhow!("Failed to list S3 objects: {}", e))?;
1878
1879 for object in list_resp.contents() {
1880 let key = object.key().unwrap_or("");
1881
1882 if !key.ends_with(".bin") {
1884 continue;
1885 }
1886
1887 total += 1;
1888
1889 let filename = key.strip_prefix(prefix).unwrap_or(key);
1891 let expected_hash_hex = filename.strip_suffix(".bin").unwrap_or(filename);
1892
1893 if expected_hash_hex.len() != 64 {
1895 corrupted += 1;
1896 println!(" INVALID KEY: {}", key);
1897 corrupted_keys.push(key.to_string());
1898 continue;
1899 }
1900
1901 let expected_hash = match from_hex(expected_hash_hex) {
1902 Ok(h) => h,
1903 Err(_) => {
1904 corrupted += 1;
1905 println!(" INVALID HEX: {}", key);
1906 corrupted_keys.push(key.to_string());
1907 continue;
1908 }
1909 };
1910
1911 match s3_client.get_object()
1913 .bucket(bucket)
1914 .key(key)
1915 .send()
1916 .await
1917 {
1918 Ok(resp) => {
1919 match resp.body.collect().await {
1920 Ok(bytes) => {
1921 let data = bytes.into_bytes();
1922 let actual_hash = sha256(&data);
1923
1924 if actual_hash == expected_hash {
1925 valid += 1;
1926 } else {
1927 corrupted += 1;
1928 let actual_hex = to_hex(&actual_hash);
1929 println!(" CORRUPTED: key={} actual={} size={}",
1930 &expected_hash_hex[..16], &actual_hex[..16], data.len());
1931 corrupted_keys.push(key.to_string());
1932 }
1933 }
1934 Err(e) => {
1935 corrupted += 1;
1936 println!(" READ ERROR: {} - {}", key, e);
1937 corrupted_keys.push(key.to_string());
1938 }
1939 }
1940 }
1941 Err(e) => {
1942 corrupted += 1;
1943 println!(" FETCH ERROR: {} - {}", key, e);
1944 corrupted_keys.push(key.to_string());
1945 }
1946 }
1947
1948 if total % 100 == 0 {
1950 println!(" Progress: {} objects checked, {} corrupted so far", total, corrupted);
1951 }
1952 }
1953
1954 if list_resp.is_truncated() == Some(true) {
1956 continuation_token = list_resp.next_continuation_token().map(|s| s.to_string());
1957 } else {
1958 break;
1959 }
1960 }
1961
1962 if delete {
1964 for key in &corrupted_keys {
1965 match s3_client.delete_object()
1966 .bucket(bucket)
1967 .key(key)
1968 .send()
1969 .await
1970 {
1971 Ok(_) => deleted += 1,
1972 Err(e) => {
1973 println!(" Failed to delete {}: {}", key, e);
1974 }
1975 }
1976 }
1977 }
1978
1979 Ok(VerifyResult {
1980 total,
1981 valid,
1982 corrupted,
1983 deleted,
1984 })
1985 }
1986
1987 #[cfg(not(feature = "s3"))]
1989 pub async fn verify_r2_integrity(&self, _delete: bool) -> Result<VerifyResult> {
1990 Err(anyhow::anyhow!("S3 feature not enabled"))
1991 }
1992}
1993
1994#[derive(Debug, Clone)]
1996pub struct VerifyResult {
1997 pub total: usize,
1998 pub valid: usize,
1999 pub corrupted: usize,
2000 pub deleted: usize,
2001}
2002
2003#[derive(Debug)]
2004pub struct StorageStats {
2005 pub total_dags: usize,
2006 pub pinned_dags: usize,
2007 pub total_bytes: u64,
2008}
2009
2010#[derive(Debug, Clone)]
2012pub struct StorageByPriority {
2013 pub own: u64,
2015 pub followed: u64,
2017 pub other: u64,
2019}
2020
2021#[derive(Debug, Clone)]
2022pub struct FileChunkMetadata {
2023 pub total_size: u64,
2024 pub chunk_hashes: Vec<Hash>,
2025 pub chunk_sizes: Vec<u64>,
2026 pub is_chunked: bool,
2027}
2028
2029pub struct FileRangeChunksOwned {
2031 store: Arc<HashtreeStore>,
2032 metadata: FileChunkMetadata,
2033 start: u64,
2034 end: u64,
2035 current_chunk_idx: usize,
2036 current_offset: u64,
2037}
2038
2039impl Iterator for FileRangeChunksOwned {
2040 type Item = Result<Vec<u8>>;
2041
2042 fn next(&mut self) -> Option<Self::Item> {
2043 if !self.metadata.is_chunked || self.current_chunk_idx >= self.metadata.chunk_hashes.len() {
2044 return None;
2045 }
2046
2047 if self.current_offset > self.end {
2048 return None;
2049 }
2050
2051 let chunk_hash = &self.metadata.chunk_hashes[self.current_chunk_idx];
2052 let chunk_size = self.metadata.chunk_sizes[self.current_chunk_idx];
2053 let chunk_end = self.current_offset + chunk_size - 1;
2054
2055 self.current_chunk_idx += 1;
2056
2057 if chunk_end < self.start || self.current_offset > self.end {
2058 self.current_offset += chunk_size;
2059 return self.next();
2060 }
2061
2062 let chunk_content = match self.store.get_chunk(chunk_hash) {
2063 Ok(Some(content)) => content,
2064 Ok(None) => {
2065 return Some(Err(anyhow::anyhow!("Chunk {} not found", to_hex(chunk_hash))));
2066 }
2067 Err(e) => {
2068 return Some(Err(e));
2069 }
2070 };
2071
2072 let chunk_read_start = if self.current_offset >= self.start {
2073 0
2074 } else {
2075 (self.start - self.current_offset) as usize
2076 };
2077
2078 let chunk_read_end = if chunk_end <= self.end {
2079 chunk_size as usize - 1
2080 } else {
2081 (self.end - self.current_offset) as usize
2082 };
2083
2084 let result = chunk_content[chunk_read_start..=chunk_read_end].to_vec();
2085 self.current_offset += chunk_size;
2086
2087 Some(Ok(result))
2088 }
2089}
2090
2091#[derive(Debug)]
2092pub struct GcStats {
2093 pub deleted_dags: usize,
2094 pub freed_bytes: u64,
2095}
2096
2097#[derive(Debug, Clone)]
2098pub struct DirEntry {
2099 pub name: String,
2100 pub cid: String,
2101 pub is_directory: bool,
2102 pub size: u64,
2103}
2104
2105#[derive(Debug, Clone)]
2106pub struct DirectoryListing {
2107 pub dir_name: String,
2108 pub entries: Vec<DirEntry>,
2109}
2110
2111#[derive(Debug, Clone)]
2112pub struct PinnedItem {
2113 pub cid: String,
2114 pub name: String,
2115 pub is_directory: bool,
2116}
2117
2118#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
2120pub struct BlobMetadata {
2121 pub sha256: String,
2122 pub size: u64,
2123 pub mime_type: String,
2124 pub uploaded: u64,
2125}
2126
2127impl crate::webrtc::ContentStore for HashtreeStore {
2129 fn get(&self, hash_hex: &str) -> Result<Option<Vec<u8>>> {
2130 let hash = from_hex(hash_hex)
2131 .map_err(|e| anyhow::anyhow!("Invalid hash: {}", e))?;
2132 self.get_chunk(&hash)
2133 }
2134}