1use anyhow::{Context, Result};
2use heed::{Database, EnvOpenOptions};
3use heed::types::*;
4use hashtree_lmdb::LmdbBlobStore;
5use hashtree_core::{
6 HashTree, HashTreeConfig, Cid,
7 sha256, to_hex, from_hex, TreeNode, DirEntry as HashTreeDirEntry,
8 types::Hash,
9};
10use hashtree_core::store::{Store, StoreError};
11use serde::{Deserialize, Serialize};
12use std::path::Path;
13use std::collections::HashSet;
14use std::io::Read;
15use std::sync::Arc;
16use std::time::{SystemTime, UNIX_EPOCH};
17use futures::executor::block_on as sync_block_on;
18
19pub const PRIORITY_OTHER: u8 = 64;
21pub const PRIORITY_FOLLOWED: u8 = 128;
22pub const PRIORITY_OWN: u8 = 255;
23
24#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct TreeMeta {
27 pub owner: String,
29 pub name: Option<String>,
31 pub synced_at: u64,
33 pub total_size: u64,
35 pub priority: u8,
37}
38
39#[derive(Debug, Clone, Serialize, Deserialize)]
41pub struct CachedRoot {
42 pub hash: String,
44 pub key: Option<String>,
46 pub updated_at: u64,
48 pub visibility: String,
50}
51
52#[cfg(feature = "s3")]
53use tokio::sync::mpsc;
54
55use crate::config::S3Config;
56
57#[cfg(feature = "s3")]
59enum S3SyncMessage {
60 Upload { hash: Hash, data: Vec<u8> },
61 Delete { hash: Hash },
62}
63
64pub struct StorageRouter {
69 local: Arc<LmdbBlobStore>,
71 #[cfg(feature = "s3")]
73 s3_client: Option<aws_sdk_s3::Client>,
74 #[cfg(feature = "s3")]
75 s3_bucket: Option<String>,
76 #[cfg(feature = "s3")]
77 s3_prefix: String,
78 #[cfg(feature = "s3")]
80 sync_tx: Option<mpsc::UnboundedSender<S3SyncMessage>>,
81}
82
83impl StorageRouter {
84 pub fn new(local: Arc<LmdbBlobStore>) -> Self {
86 Self {
87 local,
88 #[cfg(feature = "s3")]
89 s3_client: None,
90 #[cfg(feature = "s3")]
91 s3_bucket: None,
92 #[cfg(feature = "s3")]
93 s3_prefix: String::new(),
94 #[cfg(feature = "s3")]
95 sync_tx: None,
96 }
97 }
98
99 #[cfg(feature = "s3")]
101 pub async fn with_s3(local: Arc<LmdbBlobStore>, config: &S3Config) -> Result<Self, anyhow::Error> {
102 use aws_sdk_s3::Client as S3Client;
103
104 let mut aws_config_loader = aws_config::from_env();
106 aws_config_loader = aws_config_loader.region(aws_sdk_s3::config::Region::new(config.region.clone()));
107 let aws_config = aws_config_loader.load().await;
108
109 let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&aws_config);
111 s3_config_builder = s3_config_builder
112 .endpoint_url(&config.endpoint)
113 .force_path_style(true);
114
115 let s3_client = S3Client::from_conf(s3_config_builder.build());
116 let bucket = config.bucket.clone();
117 let prefix = config.prefix.clone().unwrap_or_default();
118
119 let (sync_tx, mut sync_rx) = mpsc::unbounded_channel::<S3SyncMessage>();
121
122 let sync_client = s3_client.clone();
124 let sync_bucket = bucket.clone();
125 let sync_prefix = prefix.clone();
126
127 tokio::spawn(async move {
128 use aws_sdk_s3::primitives::ByteStream;
129
130 tracing::info!("S3 background sync task started");
131
132 while let Some(msg) = sync_rx.recv().await {
133 match msg {
134 S3SyncMessage::Upload { hash, data } => {
135 let key = format!("{}{}.bin", sync_prefix, to_hex(&hash));
136 tracing::info!("S3 uploading {} ({} bytes)", &key, data.len());
137
138 match sync_client
139 .put_object()
140 .bucket(&sync_bucket)
141 .key(&key)
142 .body(ByteStream::from(data))
143 .send()
144 .await
145 {
146 Ok(_) => tracing::info!("S3 upload succeeded for {}", &key),
147 Err(e) => tracing::error!("S3 upload failed for {}: {}", &key, e),
148 }
149 }
150 S3SyncMessage::Delete { hash } => {
151 let key = format!("{}{}.bin", sync_prefix, to_hex(&hash));
152 tracing::debug!("S3 deleting {}", &key);
153
154 if let Err(e) = sync_client
155 .delete_object()
156 .bucket(&sync_bucket)
157 .key(&key)
158 .send()
159 .await
160 {
161 tracing::error!("S3 delete failed for {}: {}", &key[..16.min(key.len())], e);
162 }
163 }
164 }
165 }
166 });
167
168 tracing::info!("S3 storage initialized: bucket={}, prefix={}", bucket, prefix);
169
170 Ok(Self {
171 local,
172 s3_client: Some(s3_client),
173 s3_bucket: Some(bucket),
174 s3_prefix: prefix,
175 sync_tx: Some(sync_tx),
176 })
177 }
178
179 pub fn put_sync(&self, hash: Hash, data: &[u8]) -> Result<bool, StoreError> {
181 let is_new = self.local.put_sync(hash, data)?;
183
184 #[cfg(feature = "s3")]
187 if let Some(ref tx) = self.sync_tx {
188 tracing::info!("Queueing S3 upload for {} ({} bytes, is_new={})",
189 crate::storage::to_hex(&hash)[..16].to_string(), data.len(), is_new);
190 if let Err(e) = tx.send(S3SyncMessage::Upload { hash, data: data.to_vec() }) {
191 tracing::error!("Failed to queue S3 upload: {}", e);
192 }
193 }
194
195 Ok(is_new)
196 }
197
198 pub fn get_sync(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
200 if let Some(data) = self.local.get_sync(hash)? {
202 return Ok(Some(data));
203 }
204
205 #[cfg(feature = "s3")]
207 if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
208 let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
209
210 match sync_block_on(async {
211 client.get_object()
212 .bucket(bucket)
213 .key(&key)
214 .send()
215 .await
216 }) {
217 Ok(output) => {
218 if let Ok(body) = sync_block_on(output.body.collect()) {
219 let data = body.into_bytes().to_vec();
220 let _ = self.local.put_sync(*hash, &data);
222 return Ok(Some(data));
223 }
224 }
225 Err(e) => {
226 let service_err = e.into_service_error();
227 if !service_err.is_no_such_key() {
228 tracing::warn!("S3 get failed: {}", service_err);
229 }
230 }
231 }
232 }
233
234 Ok(None)
235 }
236
237 pub fn exists(&self, hash: &Hash) -> Result<bool, StoreError> {
239 if self.local.exists(hash)? {
241 return Ok(true);
242 }
243
244 #[cfg(feature = "s3")]
246 if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
247 let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
248
249 match sync_block_on(async {
250 client.head_object()
251 .bucket(bucket)
252 .key(&key)
253 .send()
254 .await
255 }) {
256 Ok(_) => return Ok(true),
257 Err(e) => {
258 let service_err = e.into_service_error();
259 if !service_err.is_not_found() {
260 tracing::warn!("S3 head failed: {}", service_err);
261 }
262 }
263 }
264 }
265
266 Ok(false)
267 }
268
269 pub fn delete_sync(&self, hash: &Hash) -> Result<bool, StoreError> {
271 let deleted = self.local.delete_sync(hash)?;
272
273 #[cfg(feature = "s3")]
275 if let Some(ref tx) = self.sync_tx {
276 let _ = tx.send(S3SyncMessage::Delete { hash: *hash });
277 }
278
279 Ok(deleted)
280 }
281
282 pub fn delete_local_only(&self, hash: &Hash) -> Result<bool, StoreError> {
285 self.local.delete_sync(hash)
286 }
287
288 pub fn stats(&self) -> Result<hashtree_lmdb::LmdbStats, StoreError> {
290 self.local.stats()
291 }
292
293 pub fn list(&self) -> Result<Vec<Hash>, StoreError> {
295 self.local.list()
296 }
297
298 pub fn local_store(&self) -> Arc<LmdbBlobStore> {
300 Arc::clone(&self.local)
301 }
302}
303
304pub struct HashtreeStore {
305 env: heed::Env,
306 pins: Database<Str, Unit>,
308 sha256_index: Database<Str, Str>,
310 blob_owners: Database<Bytes, Unit>,
312 pubkey_blobs: Database<Str, Bytes>,
314 tree_meta: Database<Bytes, Bytes>,
316 blob_trees: Database<Bytes, Unit>,
318 tree_refs: Database<Str, Bytes>,
320 cached_roots: Database<Str, Bytes>,
322 router: StorageRouter,
324 max_size_bytes: u64,
326}
327
328impl HashtreeStore {
329 pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
331 Self::with_options(path, None, 10 * 1024 * 1024 * 1024)
332 }
333
334 pub fn with_s3<P: AsRef<Path>>(path: P, s3_config: Option<&S3Config>) -> Result<Self> {
336 Self::with_options(path, s3_config, 10 * 1024 * 1024 * 1024)
337 }
338
339 pub fn with_options<P: AsRef<Path>>(path: P, s3_config: Option<&S3Config>, max_size_bytes: u64) -> Result<Self> {
341 let path = path.as_ref();
342 std::fs::create_dir_all(path)?;
343
344 let env = unsafe {
345 EnvOpenOptions::new()
346 .map_size(10 * 1024 * 1024 * 1024) .max_dbs(9) .open(path)?
349 };
350
351 let mut wtxn = env.write_txn()?;
352 let pins = env.create_database(&mut wtxn, Some("pins"))?;
353 let sha256_index = env.create_database(&mut wtxn, Some("sha256_index"))?;
354 let blob_owners = env.create_database(&mut wtxn, Some("blob_owners"))?;
355 let pubkey_blobs = env.create_database(&mut wtxn, Some("pubkey_blobs"))?;
356 let tree_meta = env.create_database(&mut wtxn, Some("tree_meta"))?;
357 let blob_trees = env.create_database(&mut wtxn, Some("blob_trees"))?;
358 let tree_refs = env.create_database(&mut wtxn, Some("tree_refs"))?;
359 let cached_roots = env.create_database(&mut wtxn, Some("cached_roots"))?;
360 wtxn.commit()?;
361
362 let lmdb_store = Arc::new(LmdbBlobStore::new(path.join("blobs"))
364 .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?);
365
366 #[cfg(feature = "s3")]
368 let router = if let Some(s3_cfg) = s3_config {
369 tracing::info!("Initializing S3 storage backend: bucket={}, endpoint={}",
370 s3_cfg.bucket, s3_cfg.endpoint);
371
372 sync_block_on(async {
373 StorageRouter::with_s3(lmdb_store, s3_cfg).await
374 })?
375 } else {
376 StorageRouter::new(lmdb_store)
377 };
378
379 #[cfg(not(feature = "s3"))]
380 let router = {
381 if s3_config.is_some() {
382 tracing::warn!("S3 config provided but S3 feature not enabled. Using local storage only.");
383 }
384 StorageRouter::new(lmdb_store)
385 };
386
387 Ok(Self {
388 env,
389 pins,
390 sha256_index,
391 blob_owners,
392 pubkey_blobs,
393 tree_meta,
394 blob_trees,
395 tree_refs,
396 cached_roots,
397 router,
398 max_size_bytes,
399 })
400 }
401
402 pub fn router(&self) -> &StorageRouter {
404 &self.router
405 }
406
407 pub fn blob_store_arc(&self) -> Arc<LmdbBlobStore> {
409 self.router.local_store()
410 }
411
412 pub fn upload_file<P: AsRef<Path>>(&self, file_path: P) -> Result<String> {
414 self.upload_file_internal(file_path, true)
415 }
416
417 pub fn upload_file_no_pin<P: AsRef<Path>>(&self, file_path: P) -> Result<String> {
419 self.upload_file_internal(file_path, false)
420 }
421
422 fn upload_file_internal<P: AsRef<Path>>(&self, file_path: P, pin: bool) -> Result<String> {
423 let file_path = file_path.as_ref();
424 let file_content = std::fs::read(file_path)?;
425
426 let content_sha256 = sha256(&file_content);
428 let sha256_hex = to_hex(&content_sha256);
429
430 let store = self.router.local_store();
432 let tree = HashTree::new(HashTreeConfig::new(store).public());
433
434 let cid = sync_block_on(async {
435 tree.put(&file_content).await
436 }).context("Failed to store file")?;
437
438 let root_hex = to_hex(&cid.hash);
439
440 let mut wtxn = self.env.write_txn()?;
441
442 self.sha256_index.put(&mut wtxn, &sha256_hex, &root_hex)?;
444
445 if pin {
447 self.pins.put(&mut wtxn, &root_hex, &())?;
448 }
449
450 wtxn.commit()?;
451
452 Ok(root_hex)
453 }
454
455 pub fn upload_file_stream<R: Read, F>(
457 &self,
458 mut reader: R,
459 _file_name: impl Into<String>,
460 mut callback: F,
461 ) -> Result<String>
462 where
463 F: FnMut(&str),
464 {
465 let mut data = Vec::new();
467 reader.read_to_end(&mut data)?;
468
469 let content_sha256 = sha256(&data);
471 let sha256_hex = to_hex(&content_sha256);
472
473 let store = self.router.local_store();
475 let tree = HashTree::new(HashTreeConfig::new(store).public());
476
477 let cid = sync_block_on(async {
478 tree.put(&data).await
479 }).context("Failed to store file")?;
480
481 let root_hex = to_hex(&cid.hash);
482 callback(&root_hex);
483
484 let mut wtxn = self.env.write_txn()?;
485
486 self.sha256_index.put(&mut wtxn, &sha256_hex, &root_hex)?;
488
489 self.pins.put(&mut wtxn, &root_hex, &())?;
491
492 wtxn.commit()?;
493
494 Ok(root_hex)
495 }
496
497 pub fn upload_dir<P: AsRef<Path>>(&self, dir_path: P) -> Result<String> {
500 self.upload_dir_with_options(dir_path, true)
501 }
502
503 pub fn upload_dir_with_options<P: AsRef<Path>>(&self, dir_path: P, respect_gitignore: bool) -> Result<String> {
505 let dir_path = dir_path.as_ref();
506
507 let store = self.router.local_store();
508 let tree = HashTree::new(HashTreeConfig::new(store).public());
509
510 let root_cid = sync_block_on(async {
511 self.upload_dir_recursive(&tree, dir_path, dir_path, respect_gitignore).await
512 }).context("Failed to upload directory")?;
513
514 let root_hex = to_hex(&root_cid.hash);
515
516 let mut wtxn = self.env.write_txn()?;
517 self.pins.put(&mut wtxn, &root_hex, &())?;
518 wtxn.commit()?;
519
520 Ok(root_hex)
521 }
522
523 async fn upload_dir_recursive<S: Store>(
524 &self,
525 tree: &HashTree<S>,
526 _root_path: &Path,
527 current_path: &Path,
528 respect_gitignore: bool,
529 ) -> Result<Cid> {
530 use ignore::WalkBuilder;
531 use std::collections::HashMap;
532
533 let mut dir_contents: HashMap<String, Vec<(String, Cid)>> = HashMap::new();
535 dir_contents.insert(String::new(), Vec::new()); let walker = WalkBuilder::new(current_path)
538 .git_ignore(respect_gitignore)
539 .git_global(respect_gitignore)
540 .git_exclude(respect_gitignore)
541 .hidden(false)
542 .build();
543
544 for result in walker {
545 let entry = result?;
546 let path = entry.path();
547
548 if path == current_path {
550 continue;
551 }
552
553 let relative = path.strip_prefix(current_path)
554 .unwrap_or(path);
555
556 if path.is_file() {
557 let content = std::fs::read(path)?;
558 let cid = tree.put(&content).await
559 .map_err(|e| anyhow::anyhow!("Failed to upload file {}: {}", path.display(), e))?;
560
561 let parent = relative.parent()
563 .map(|p| p.to_string_lossy().to_string())
564 .unwrap_or_default();
565 let name = relative.file_name()
566 .map(|n| n.to_string_lossy().to_string())
567 .unwrap_or_default();
568
569 dir_contents.entry(parent).or_default().push((name, cid));
570 } else if path.is_dir() {
571 let dir_path = relative.to_string_lossy().to_string();
573 dir_contents.entry(dir_path).or_default();
574 }
575 }
576
577 self.build_directory_tree(tree, &mut dir_contents).await
579 }
580
581 async fn build_directory_tree<S: Store>(
582 &self,
583 tree: &HashTree<S>,
584 dir_contents: &mut std::collections::HashMap<String, Vec<(String, Cid)>>,
585 ) -> Result<Cid> {
586 let mut dirs: Vec<String> = dir_contents.keys().cloned().collect();
588 dirs.sort_by(|a, b| {
589 let depth_a = a.matches('/').count() + if a.is_empty() { 0 } else { 1 };
590 let depth_b = b.matches('/').count() + if b.is_empty() { 0 } else { 1 };
591 depth_b.cmp(&depth_a) });
593
594 let mut dir_cids: std::collections::HashMap<String, Cid> = std::collections::HashMap::new();
595
596 for dir_path in dirs {
597 let files = dir_contents.get(&dir_path).cloned().unwrap_or_default();
598
599 let mut entries: Vec<HashTreeDirEntry> = files.into_iter()
600 .map(|(name, cid)| HashTreeDirEntry::from_cid(name, &cid))
601 .collect();
602
603 for (subdir_path, cid) in &dir_cids {
605 let parent = std::path::Path::new(subdir_path)
606 .parent()
607 .map(|p| p.to_string_lossy().to_string())
608 .unwrap_or_default();
609
610 if parent == dir_path {
611 let name = std::path::Path::new(subdir_path)
612 .file_name()
613 .map(|n| n.to_string_lossy().to_string())
614 .unwrap_or_default();
615 entries.push(HashTreeDirEntry::from_cid(name, cid));
616 }
617 }
618
619 let cid = tree.put_directory(entries).await
620 .map_err(|e| anyhow::anyhow!("Failed to create directory node: {}", e))?;
621
622 dir_cids.insert(dir_path, cid);
623 }
624
625 dir_cids.get("")
627 .cloned()
628 .ok_or_else(|| anyhow::anyhow!("No root directory"))
629 }
630
631 pub fn upload_file_encrypted<P: AsRef<Path>>(&self, file_path: P) -> Result<String> {
633 let file_path = file_path.as_ref();
634 let file_content = std::fs::read(file_path)?;
635
636 let store = self.router.local_store();
638 let tree = HashTree::new(HashTreeConfig::new(store));
639
640 let cid = sync_block_on(async {
641 tree.put(&file_content).await
642 }).map_err(|e| anyhow::anyhow!("Failed to encrypt file: {}", e))?;
643
644 let cid_str = cid.to_string();
645
646 let mut wtxn = self.env.write_txn()?;
647 self.pins.put(&mut wtxn, &cid_str, &())?;
648 wtxn.commit()?;
649
650 Ok(cid_str)
651 }
652
653 pub fn upload_dir_encrypted<P: AsRef<Path>>(&self, dir_path: P) -> Result<String> {
656 self.upload_dir_encrypted_with_options(dir_path, true)
657 }
658
659 pub fn upload_dir_encrypted_with_options<P: AsRef<Path>>(&self, dir_path: P, respect_gitignore: bool) -> Result<String> {
662 let dir_path = dir_path.as_ref();
663 let store = self.router.local_store();
664
665 let tree = HashTree::new(HashTreeConfig::new(store));
667
668 let root_cid = sync_block_on(async {
669 self.upload_dir_recursive(&tree, dir_path, dir_path, respect_gitignore).await
670 }).context("Failed to upload encrypted directory")?;
671
672 let cid_str = root_cid.to_string(); let mut wtxn = self.env.write_txn()?;
675 self.pins.put(&mut wtxn, &to_hex(&root_cid.hash), &())?;
677 wtxn.commit()?;
678
679 Ok(cid_str)
680 }
681
682 pub fn get_tree_node(&self, hash_hex: &str) -> Result<Option<TreeNode>> {
684 let hash = from_hex(hash_hex)
685 .map_err(|e| anyhow::anyhow!("Invalid hash: {}", e))?;
686
687 let store = self.router.local_store();
688 let tree = HashTree::new(HashTreeConfig::new(store).public());
689
690 sync_block_on(async {
691 tree.get_tree_node(&hash).await
692 .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))
693 })
694 }
695
696 pub fn get_cid_by_sha256(&self, sha256_hex: &str) -> Result<Option<String>> {
698 let rtxn = self.env.read_txn()?;
699 Ok(self.sha256_index.get(&rtxn, sha256_hex)?.map(|s| s.to_string()))
700 }
701
702 pub fn put_blob(&self, data: &[u8]) -> Result<String> {
704 let hash = sha256(data);
705 self.router.put_sync(hash, data)
706 .map_err(|e| anyhow::anyhow!("Failed to store blob: {}", e))?;
707 Ok(to_hex(&hash))
708 }
709
710 pub fn get_blob(&self, sha256_hex: &str) -> Result<Option<Vec<u8>>> {
712 let hash = from_hex(sha256_hex)
713 .map_err(|e| anyhow::anyhow!("invalid hex: {}", e))?;
714 self.router.get_sync(&hash)
715 .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))
716 }
717
718 pub fn blob_exists(&self, sha256_hex: &str) -> Result<bool> {
720 let hash = from_hex(sha256_hex)
721 .map_err(|e| anyhow::anyhow!("invalid hex: {}", e))?;
722 self.router.exists(&hash)
723 .map_err(|e| anyhow::anyhow!("Failed to check blob: {}", e))
724 }
725
726 fn blob_owner_key(sha256_hex: &str, pubkey_hex: &str) -> Result<[u8; 64]> {
732 let sha256_bytes = from_hex(sha256_hex)
733 .map_err(|e| anyhow::anyhow!("invalid sha256 hex: {}", e))?;
734 let pubkey_bytes = from_hex(pubkey_hex)
735 .map_err(|e| anyhow::anyhow!("invalid pubkey hex: {}", e))?;
736 let mut key = [0u8; 64];
737 key[..32].copy_from_slice(&sha256_bytes);
738 key[32..].copy_from_slice(&pubkey_bytes);
739 Ok(key)
740 }
741
742 pub fn set_blob_owner(&self, sha256_hex: &str, pubkey: &str) -> Result<()> {
745 use std::time::{SystemTime, UNIX_EPOCH};
746
747 let key = Self::blob_owner_key(sha256_hex, pubkey)?;
748 let mut wtxn = self.env.write_txn()?;
749
750 self.blob_owners.put(&mut wtxn, &key[..], &())?;
752
753 let mut blobs: Vec<BlobMetadata> = self
755 .pubkey_blobs
756 .get(&wtxn, pubkey)?
757 .and_then(|b| serde_json::from_slice(b).ok())
758 .unwrap_or_default();
759
760 if !blobs.iter().any(|b| b.sha256 == sha256_hex) {
762 let now = SystemTime::now()
763 .duration_since(UNIX_EPOCH)
764 .unwrap()
765 .as_secs();
766
767 let size = self
769 .get_cid_by_sha256(sha256_hex)?
770 .and_then(|cid| self.get_file_chunk_metadata(&cid).ok().flatten())
771 .map(|m| m.total_size)
772 .unwrap_or(0);
773
774 blobs.push(BlobMetadata {
775 sha256: sha256_hex.to_string(),
776 size,
777 mime_type: "application/octet-stream".to_string(),
778 uploaded: now,
779 });
780
781 let blobs_json = serde_json::to_vec(&blobs)?;
782 self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
783 }
784
785 wtxn.commit()?;
786 Ok(())
787 }
788
789 pub fn is_blob_owner(&self, sha256_hex: &str, pubkey: &str) -> Result<bool> {
791 let key = Self::blob_owner_key(sha256_hex, pubkey)?;
792 let rtxn = self.env.read_txn()?;
793 Ok(self.blob_owners.get(&rtxn, &key[..])?.is_some())
794 }
795
796 pub fn get_blob_owners(&self, sha256_hex: &str) -> Result<Vec<String>> {
798 let sha256_bytes = from_hex(sha256_hex)
799 .map_err(|e| anyhow::anyhow!("invalid sha256 hex: {}", e))?;
800 let rtxn = self.env.read_txn()?;
801
802 let mut owners = Vec::new();
803 for item in self.blob_owners.prefix_iter(&rtxn, &sha256_bytes[..])? {
804 let (key, _) = item?;
805 if key.len() == 64 {
806 let pubkey_hex = to_hex(&key[32..64].try_into().unwrap());
808 owners.push(pubkey_hex);
809 }
810 }
811 Ok(owners)
812 }
813
814 pub fn blob_has_owners(&self, sha256_hex: &str) -> Result<bool> {
816 let sha256_bytes = from_hex(sha256_hex)
817 .map_err(|e| anyhow::anyhow!("invalid sha256 hex: {}", e))?;
818 let rtxn = self.env.read_txn()?;
819
820 for item in self.blob_owners.prefix_iter(&rtxn, &sha256_bytes[..])? {
822 if item.is_ok() {
823 return Ok(true);
824 }
825 }
826 Ok(false)
827 }
828
829 pub fn get_blob_owner(&self, sha256_hex: &str) -> Result<Option<String>> {
831 Ok(self.get_blob_owners(sha256_hex)?.into_iter().next())
832 }
833
834 pub fn delete_blossom_blob(&self, sha256_hex: &str, pubkey: &str) -> Result<bool> {
838 let key = Self::blob_owner_key(sha256_hex, pubkey)?;
839 let mut wtxn = self.env.write_txn()?;
840
841 self.blob_owners.delete(&mut wtxn, &key[..])?;
843
844 if let Some(blobs_bytes) = self.pubkey_blobs.get(&wtxn, pubkey)? {
846 if let Ok(mut blobs) = serde_json::from_slice::<Vec<BlobMetadata>>(blobs_bytes) {
847 blobs.retain(|b| b.sha256 != sha256_hex);
848 let blobs_json = serde_json::to_vec(&blobs)?;
849 self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
850 }
851 }
852
853 let sha256_bytes = from_hex(sha256_hex)
855 .map_err(|e| anyhow::anyhow!("invalid sha256 hex: {}", e))?;
856 let mut has_other_owners = false;
857 for item in self.blob_owners.prefix_iter(&wtxn, &sha256_bytes[..])? {
858 if item.is_ok() {
859 has_other_owners = true;
860 break;
861 }
862 }
863
864 if has_other_owners {
865 wtxn.commit()?;
866 tracing::debug!(
867 "Removed {} from blob {} owners, other owners remain",
868 &pubkey[..8.min(pubkey.len())],
869 &sha256_hex[..8.min(sha256_hex.len())]
870 );
871 return Ok(false);
872 }
873
874 tracing::info!(
876 "All owners removed from blob {}, deleting",
877 &sha256_hex[..8.min(sha256_hex.len())]
878 );
879
880 let root_hex = self.sha256_index.get(&wtxn, sha256_hex)?.map(|s| s.to_string());
882 if let Some(ref root_hex) = root_hex {
883 self.pins.delete(&mut wtxn, root_hex)?;
885 }
886 self.sha256_index.delete(&mut wtxn, sha256_hex)?;
887
888 let hash = from_hex(sha256_hex)
890 .map_err(|e| anyhow::anyhow!("invalid hex: {}", e))?;
891 let _ = self.router.delete_sync(&hash);
892
893 wtxn.commit()?;
894 Ok(true)
895 }
896
897 pub fn list_blobs_by_pubkey(&self, pubkey: &str) -> Result<Vec<crate::server::blossom::BlobDescriptor>> {
899 let rtxn = self.env.read_txn()?;
900
901 let blobs: Vec<BlobMetadata> = self
902 .pubkey_blobs
903 .get(&rtxn, pubkey)?
904 .and_then(|b| serde_json::from_slice(b).ok())
905 .unwrap_or_default();
906
907 Ok(blobs
908 .into_iter()
909 .map(|b| crate::server::blossom::BlobDescriptor {
910 url: format!("/{}", b.sha256),
911 sha256: b.sha256,
912 size: b.size,
913 mime_type: b.mime_type,
914 uploaded: b.uploaded,
915 })
916 .collect())
917 }
918
919 pub fn get_chunk(&self, chunk_hex: &str) -> Result<Option<Vec<u8>>> {
921 let hash = from_hex(chunk_hex)
922 .map_err(|e| anyhow::anyhow!("Invalid hash: {}", e))?;
923 self.router.get_sync(&hash)
924 .map_err(|e| anyhow::anyhow!("Failed to get chunk: {}", e))
925 }
926
927 pub fn get_file(&self, hash_hex: &str) -> Result<Option<Vec<u8>>> {
930 let hash = from_hex(hash_hex)
931 .map_err(|e| anyhow::anyhow!("Invalid hash: {}", e))?;
932
933 let store = self.router.local_store();
934 let tree = HashTree::new(HashTreeConfig::new(store).public());
935
936 sync_block_on(async {
937 tree.read_file(&hash).await
938 .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
939 })
940 }
941
942 pub fn get_file_chunk_metadata(&self, hash_hex: &str) -> Result<Option<FileChunkMetadata>> {
944 let hash = from_hex(hash_hex)
945 .map_err(|e| anyhow::anyhow!("Invalid hash: {}", e))?;
946
947 let store = self.router.local_store();
948 let tree = HashTree::new(HashTreeConfig::new(store.clone()).public());
949
950 sync_block_on(async {
951 let exists = store.has(&hash).await
954 .map_err(|e| anyhow::anyhow!("Failed to check existence: {}", e))?;
955
956 if !exists {
957 return Ok(None);
958 }
959
960 let total_size = tree.get_size(&hash).await
962 .map_err(|e| anyhow::anyhow!("Failed to get size: {}", e))?;
963
964 let is_tree_node = tree.is_tree(&hash).await
966 .map_err(|e| anyhow::anyhow!("Failed to check tree: {}", e))?;
967
968 if !is_tree_node {
969 return Ok(Some(FileChunkMetadata {
971 total_size,
972 chunk_cids: vec![],
973 chunk_sizes: vec![],
974 is_chunked: false,
975 }));
976 }
977
978 let node = match tree.get_tree_node(&hash).await
980 .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))? {
981 Some(n) => n,
982 None => return Ok(None),
983 };
984
985 let is_directory = tree.is_directory(&hash).await
987 .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
988
989 if is_directory {
990 return Ok(None); }
992
993 let chunk_cids: Vec<String> = node.links.iter().map(|l| to_hex(&l.hash)).collect();
995 let chunk_sizes: Vec<u64> = node.links.iter().map(|l| l.size).collect();
996
997 Ok(Some(FileChunkMetadata {
998 total_size,
999 chunk_cids,
1000 chunk_sizes,
1001 is_chunked: !node.links.is_empty(),
1002 }))
1003 })
1004 }
1005
1006 pub fn get_file_range(&self, hash_hex: &str, start: u64, end: Option<u64>) -> Result<Option<(Vec<u8>, u64)>> {
1008 let metadata = match self.get_file_chunk_metadata(hash_hex)? {
1009 Some(m) => m,
1010 None => return Ok(None),
1011 };
1012
1013 if metadata.total_size == 0 {
1014 return Ok(Some((Vec::new(), 0)));
1015 }
1016
1017 if start >= metadata.total_size {
1018 return Ok(None);
1019 }
1020
1021 let end = end.unwrap_or(metadata.total_size - 1).min(metadata.total_size - 1);
1022
1023 if !metadata.is_chunked {
1025 let content = self.get_file(hash_hex)?.unwrap_or_default();
1026 let range_content = if start < content.len() as u64 {
1027 content[start as usize..=(end as usize).min(content.len() - 1)].to_vec()
1028 } else {
1029 Vec::new()
1030 };
1031 return Ok(Some((range_content, metadata.total_size)));
1032 }
1033
1034 let mut result = Vec::new();
1036 let mut current_offset = 0u64;
1037
1038 for (i, chunk_cid) in metadata.chunk_cids.iter().enumerate() {
1039 let chunk_size = metadata.chunk_sizes[i];
1040 let chunk_end = current_offset + chunk_size - 1;
1041
1042 if chunk_end >= start && current_offset <= end {
1044 let chunk_content = match self.get_chunk(chunk_cid)? {
1045 Some(content) => content,
1046 None => {
1047 return Err(anyhow::anyhow!("Chunk {} not found", chunk_cid));
1048 }
1049 };
1050
1051 let chunk_read_start = if current_offset >= start {
1052 0
1053 } else {
1054 (start - current_offset) as usize
1055 };
1056
1057 let chunk_read_end = if chunk_end <= end {
1058 chunk_size as usize - 1
1059 } else {
1060 (end - current_offset) as usize
1061 };
1062
1063 result.extend_from_slice(&chunk_content[chunk_read_start..=chunk_read_end]);
1064 }
1065
1066 current_offset += chunk_size;
1067
1068 if current_offset > end {
1069 break;
1070 }
1071 }
1072
1073 Ok(Some((result, metadata.total_size)))
1074 }
1075
1076 pub fn stream_file_range_chunks_owned(
1078 self: Arc<Self>,
1079 hash_hex: &str,
1080 start: u64,
1081 end: u64,
1082 ) -> Result<Option<FileRangeChunksOwned>> {
1083 let metadata = match self.get_file_chunk_metadata(hash_hex)? {
1084 Some(m) => m,
1085 None => return Ok(None),
1086 };
1087
1088 if metadata.total_size == 0 || start >= metadata.total_size {
1089 return Ok(None);
1090 }
1091
1092 let end = end.min(metadata.total_size - 1);
1093
1094 Ok(Some(FileRangeChunksOwned {
1095 store: self,
1096 metadata,
1097 start,
1098 end,
1099 current_chunk_idx: 0,
1100 current_offset: 0,
1101 }))
1102 }
1103
1104 pub fn get_directory_listing(&self, hash_hex: &str) -> Result<Option<DirectoryListing>> {
1106 let hash = from_hex(hash_hex)
1107 .map_err(|e| anyhow::anyhow!("Invalid hash: {}", e))?;
1108
1109 let store = self.router.local_store();
1110 let tree = HashTree::new(HashTreeConfig::new(store).public());
1111
1112 sync_block_on(async {
1113 let is_dir = tree.is_directory(&hash).await
1115 .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1116
1117 if !is_dir {
1118 return Ok(None);
1119 }
1120
1121 let cid = hashtree_core::Cid::public(hash, 0);
1123 let tree_entries = tree.list_directory(&cid).await
1124 .map_err(|e| anyhow::anyhow!("Failed to list directory: {}", e))?;
1125
1126 let entries: Vec<DirEntry> = tree_entries.into_iter().map(|e| DirEntry {
1127 name: e.name,
1128 cid: to_hex(&e.hash),
1129 is_directory: e.link_type.is_tree(),
1130 size: e.size,
1131 }).collect();
1132
1133 Ok(Some(DirectoryListing {
1134 dir_name: String::new(),
1135 entries,
1136 }))
1137 })
1138 }
1139
1140 pub fn pin(&self, hash_hex: &str) -> Result<()> {
1142 let mut wtxn = self.env.write_txn()?;
1143 self.pins.put(&mut wtxn, hash_hex, &())?;
1144 wtxn.commit()?;
1145 Ok(())
1146 }
1147
1148 pub fn unpin(&self, hash_hex: &str) -> Result<()> {
1150 let mut wtxn = self.env.write_txn()?;
1151 self.pins.delete(&mut wtxn, hash_hex)?;
1152 wtxn.commit()?;
1153 Ok(())
1154 }
1155
1156 pub fn is_pinned(&self, hash_hex: &str) -> Result<bool> {
1158 let rtxn = self.env.read_txn()?;
1159 Ok(self.pins.get(&rtxn, hash_hex)?.is_some())
1160 }
1161
1162 pub fn list_pins(&self) -> Result<Vec<String>> {
1164 let rtxn = self.env.read_txn()?;
1165 let mut pins = Vec::new();
1166
1167 for item in self.pins.iter(&rtxn)? {
1168 let (hash_hex, _) = item?;
1169 pins.push(hash_hex.to_string());
1170 }
1171
1172 Ok(pins)
1173 }
1174
1175 pub fn list_pins_with_names(&self) -> Result<Vec<PinnedItem>> {
1177 let rtxn = self.env.read_txn()?;
1178 let store = self.router.local_store();
1179 let tree = HashTree::new(HashTreeConfig::new(store).public());
1180 let mut pins = Vec::new();
1181
1182 for item in self.pins.iter(&rtxn)? {
1183 let (hash_hex, _) = item?;
1184 let hash_hex_str = hash_hex.to_string();
1185
1186 let is_directory = if let Ok(hash) = from_hex(&hash_hex_str) {
1188 sync_block_on(async {
1189 tree.is_directory(&hash).await.unwrap_or(false)
1190 })
1191 } else {
1192 false
1193 };
1194
1195 pins.push(PinnedItem {
1196 cid: hash_hex_str,
1197 name: "Unknown".to_string(),
1198 is_directory,
1199 });
1200 }
1201
1202 Ok(pins)
1203 }
1204
1205 pub fn index_tree(
1212 &self,
1213 root_hash: &Hash,
1214 owner: &str,
1215 name: Option<&str>,
1216 priority: u8,
1217 ref_key: Option<&str>,
1218 ) -> Result<()> {
1219 let root_hex = to_hex(root_hash);
1220
1221 if let Some(key) = ref_key {
1223 let rtxn = self.env.read_txn()?;
1224 if let Some(old_hash_bytes) = self.tree_refs.get(&rtxn, key)? {
1225 if old_hash_bytes != root_hash.as_slice() {
1226 let old_hash: Hash = old_hash_bytes.try_into()
1227 .map_err(|_| anyhow::anyhow!("Invalid hash in tree_refs"))?;
1228 drop(rtxn);
1229 let _ = self.unindex_tree(&old_hash);
1231 tracing::debug!("Replaced old tree for ref {}", key);
1232 }
1233 }
1234 }
1235
1236 let store = self.router.local_store();
1237 let tree = HashTree::new(HashTreeConfig::new(store).public());
1238
1239 let (blob_hashes, total_size) = sync_block_on(async {
1241 self.collect_tree_blobs(&tree, root_hash).await
1242 })?;
1243
1244 let mut wtxn = self.env.write_txn()?;
1245
1246 for blob_hash in &blob_hashes {
1248 let mut key = [0u8; 64];
1249 key[..32].copy_from_slice(blob_hash);
1250 key[32..].copy_from_slice(root_hash);
1251 self.blob_trees.put(&mut wtxn, &key[..], &())?;
1252 }
1253
1254 let meta = TreeMeta {
1256 owner: owner.to_string(),
1257 name: name.map(|s| s.to_string()),
1258 synced_at: SystemTime::now()
1259 .duration_since(UNIX_EPOCH)
1260 .unwrap()
1261 .as_secs(),
1262 total_size,
1263 priority,
1264 };
1265 let meta_bytes = rmp_serde::to_vec(&meta)
1266 .map_err(|e| anyhow::anyhow!("Failed to serialize TreeMeta: {}", e))?;
1267 self.tree_meta.put(&mut wtxn, root_hash.as_slice(), &meta_bytes)?;
1268
1269 if let Some(key) = ref_key {
1271 self.tree_refs.put(&mut wtxn, key, root_hash.as_slice())?;
1272 }
1273
1274 wtxn.commit()?;
1275
1276 tracing::debug!(
1277 "Indexed tree {} ({} blobs, {} bytes, priority {})",
1278 &root_hex[..8],
1279 blob_hashes.len(),
1280 total_size,
1281 priority
1282 );
1283
1284 Ok(())
1285 }
1286
1287 async fn collect_tree_blobs<S: Store>(
1289 &self,
1290 tree: &HashTree<S>,
1291 root: &Hash,
1292 ) -> Result<(Vec<Hash>, u64)> {
1293 let mut blobs = Vec::new();
1294 let mut total_size = 0u64;
1295 let mut stack = vec![*root];
1296
1297 while let Some(hash) = stack.pop() {
1298 let is_tree = tree.is_tree(&hash).await
1300 .map_err(|e| anyhow::anyhow!("Failed to check tree: {}", e))?;
1301
1302 if is_tree {
1303 if let Some(node) = tree.get_tree_node(&hash).await
1305 .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
1306 {
1307 for link in &node.links {
1308 stack.push(link.hash);
1309 }
1310 }
1311 } else {
1312 if let Some(data) = self.router.get_sync(&hash)
1314 .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))?
1315 {
1316 total_size += data.len() as u64;
1317 blobs.push(hash);
1318 }
1319 }
1320 }
1321
1322 Ok((blobs, total_size))
1323 }
1324
1325 pub fn unindex_tree(&self, root_hash: &Hash) -> Result<u64> {
1328 let root_hex = to_hex(root_hash);
1329
1330 let store = self.router.local_store();
1331 let tree = HashTree::new(HashTreeConfig::new(store).public());
1332
1333 let (blob_hashes, _) = sync_block_on(async {
1335 self.collect_tree_blobs(&tree, root_hash).await
1336 })?;
1337
1338 let mut wtxn = self.env.write_txn()?;
1339 let mut freed = 0u64;
1340
1341 for blob_hash in &blob_hashes {
1343 let mut key = [0u8; 64];
1345 key[..32].copy_from_slice(blob_hash);
1346 key[32..].copy_from_slice(root_hash);
1347 self.blob_trees.delete(&mut wtxn, &key[..])?;
1348
1349 let rtxn = self.env.read_txn()?;
1351 let mut has_other_tree = false;
1352
1353 for item in self.blob_trees.prefix_iter(&rtxn, &blob_hash[..])? {
1354 if item.is_ok() {
1355 has_other_tree = true;
1356 break;
1357 }
1358 }
1359 drop(rtxn);
1360
1361 if !has_other_tree {
1363 if let Some(data) = self.router.get_sync(blob_hash)
1364 .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))?
1365 {
1366 freed += data.len() as u64;
1367 self.router.delete_local_only(blob_hash)
1369 .map_err(|e| anyhow::anyhow!("Failed to delete blob: {}", e))?;
1370 }
1371 }
1372 }
1373
1374 if let Some(data) = self.router.get_sync(root_hash)
1376 .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
1377 {
1378 freed += data.len() as u64;
1379 self.router.delete_local_only(root_hash)
1381 .map_err(|e| anyhow::anyhow!("Failed to delete tree node: {}", e))?;
1382 }
1383
1384 self.tree_meta.delete(&mut wtxn, root_hash.as_slice())?;
1386
1387 wtxn.commit()?;
1388
1389 tracing::debug!(
1390 "Unindexed tree {} ({} bytes freed)",
1391 &root_hex[..8],
1392 freed
1393 );
1394
1395 Ok(freed)
1396 }
1397
1398 pub fn get_tree_meta(&self, root_hash: &Hash) -> Result<Option<TreeMeta>> {
1400 let rtxn = self.env.read_txn()?;
1401 if let Some(bytes) = self.tree_meta.get(&rtxn, root_hash.as_slice())? {
1402 let meta: TreeMeta = rmp_serde::from_slice(bytes)
1403 .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
1404 Ok(Some(meta))
1405 } else {
1406 Ok(None)
1407 }
1408 }
1409
1410 pub fn list_indexed_trees(&self) -> Result<Vec<(Hash, TreeMeta)>> {
1412 let rtxn = self.env.read_txn()?;
1413 let mut trees = Vec::new();
1414
1415 for item in self.tree_meta.iter(&rtxn)? {
1416 let (hash_bytes, meta_bytes) = item?;
1417 let hash: Hash = hash_bytes.try_into()
1418 .map_err(|_| anyhow::anyhow!("Invalid hash in tree_meta"))?;
1419 let meta: TreeMeta = rmp_serde::from_slice(meta_bytes)
1420 .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
1421 trees.push((hash, meta));
1422 }
1423
1424 Ok(trees)
1425 }
1426
1427 pub fn tracked_size(&self) -> Result<u64> {
1429 let rtxn = self.env.read_txn()?;
1430 let mut total = 0u64;
1431
1432 for item in self.tree_meta.iter(&rtxn)? {
1433 let (_, bytes) = item?;
1434 let meta: TreeMeta = rmp_serde::from_slice(bytes)
1435 .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
1436 total += meta.total_size;
1437 }
1438
1439 Ok(total)
1440 }
1441
1442 fn get_evictable_trees(&self) -> Result<Vec<(Hash, TreeMeta)>> {
1444 let mut trees = self.list_indexed_trees()?;
1445
1446 trees.sort_by(|a, b| {
1448 match a.1.priority.cmp(&b.1.priority) {
1449 std::cmp::Ordering::Equal => a.1.synced_at.cmp(&b.1.synced_at),
1450 other => other,
1451 }
1452 });
1453
1454 Ok(trees)
1455 }
1456
1457 pub fn evict_if_needed(&self) -> Result<u64> {
1464 let stats = self.router.stats()
1466 .map_err(|e| anyhow::anyhow!("Failed to get stats: {}", e))?;
1467 let current = stats.total_bytes;
1468
1469 if current <= self.max_size_bytes {
1470 return Ok(0);
1471 }
1472
1473 let target = self.max_size_bytes * 90 / 100;
1475 let mut freed = 0u64;
1476 let mut current_size = current;
1477
1478 let orphan_freed = self.evict_orphaned_blobs()?;
1480 freed += orphan_freed;
1481 current_size = current_size.saturating_sub(orphan_freed);
1482
1483 if orphan_freed > 0 {
1484 tracing::info!("Evicted orphaned blobs: {} bytes freed", orphan_freed);
1485 }
1486
1487 if current_size <= target {
1489 if freed > 0 {
1490 tracing::info!("Eviction complete: {} bytes freed", freed);
1491 }
1492 return Ok(freed);
1493 }
1494
1495 let evictable = self.get_evictable_trees()?;
1498
1499 for (root_hash, meta) in evictable {
1500 if current_size <= target {
1501 break;
1502 }
1503
1504 let root_hex = to_hex(&root_hash);
1505
1506 if self.is_pinned(&root_hex)? {
1508 continue;
1509 }
1510
1511 let tree_freed = self.unindex_tree(&root_hash)?;
1512 freed += tree_freed;
1513 current_size = current_size.saturating_sub(tree_freed);
1514
1515 tracing::info!(
1516 "Evicted tree {} (owner={}, priority={}, {} bytes)",
1517 &root_hex[..8],
1518 &meta.owner[..8.min(meta.owner.len())],
1519 meta.priority,
1520 tree_freed
1521 );
1522 }
1523
1524 if freed > 0 {
1525 tracing::info!("Eviction complete: {} bytes freed", freed);
1526 }
1527
1528 Ok(freed)
1529 }
1530
1531 fn evict_orphaned_blobs(&self) -> Result<u64> {
1533 let mut freed = 0u64;
1534
1535 let all_hashes = self.router.list()
1537 .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
1538
1539 let rtxn = self.env.read_txn()?;
1541 let pinned: HashSet<String> = self.pins.iter(&rtxn)?
1542 .filter_map(|item| item.ok())
1543 .map(|(hash_hex, _)| hash_hex.to_string())
1544 .collect();
1545
1546 let mut blobs_in_trees: HashSet<Hash> = HashSet::new();
1549 for item in self.blob_trees.iter(&rtxn)? {
1550 if let Ok((key_bytes, _)) = item {
1551 if key_bytes.len() >= 32 {
1552 let blob_hash: Hash = key_bytes[..32].try_into().unwrap();
1553 blobs_in_trees.insert(blob_hash);
1554 }
1555 }
1556 }
1557 drop(rtxn);
1558
1559 for hash in all_hashes {
1561 let hash_hex = to_hex(&hash);
1562
1563 if pinned.contains(&hash_hex) {
1565 continue;
1566 }
1567
1568 if blobs_in_trees.contains(&hash) {
1570 continue;
1571 }
1572
1573 if let Ok(Some(data)) = self.router.get_sync(&hash) {
1575 freed += data.len() as u64;
1576 let _ = self.router.delete_local_only(&hash);
1577 tracing::debug!("Deleted orphaned blob {} ({} bytes)", &hash_hex[..8], data.len());
1578 }
1579 }
1580
1581 Ok(freed)
1582 }
1583
1584 pub fn max_size_bytes(&self) -> u64 {
1586 self.max_size_bytes
1587 }
1588
1589 pub fn storage_by_priority(&self) -> Result<StorageByPriority> {
1591 let rtxn = self.env.read_txn()?;
1592 let mut own = 0u64;
1593 let mut followed = 0u64;
1594 let mut other = 0u64;
1595
1596 for item in self.tree_meta.iter(&rtxn)? {
1597 let (_, bytes) = item?;
1598 let meta: TreeMeta = rmp_serde::from_slice(bytes)
1599 .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
1600
1601 if meta.priority >= PRIORITY_OWN {
1602 own += meta.total_size;
1603 } else if meta.priority >= PRIORITY_FOLLOWED {
1604 followed += meta.total_size;
1605 } else {
1606 other += meta.total_size;
1607 }
1608 }
1609
1610 Ok(StorageByPriority { own, followed, other })
1611 }
1612
1613 pub fn get_storage_stats(&self) -> Result<StorageStats> {
1615 let rtxn = self.env.read_txn()?;
1616 let total_pins = self.pins.len(&rtxn)? as usize;
1617
1618 let stats = self.router.stats()
1619 .map_err(|e| anyhow::anyhow!("Failed to get stats: {}", e))?;
1620
1621 Ok(StorageStats {
1622 total_dags: stats.count,
1623 pinned_dags: total_pins,
1624 total_bytes: stats.total_bytes,
1625 })
1626 }
1627
1628 pub fn get_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<Option<CachedRoot>> {
1632 let key = format!("{}/{}", pubkey_hex, tree_name);
1633 let rtxn = self.env.read_txn()?;
1634 if let Some(bytes) = self.cached_roots.get(&rtxn, &key)? {
1635 let root: CachedRoot = rmp_serde::from_slice(bytes)
1636 .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
1637 Ok(Some(root))
1638 } else {
1639 Ok(None)
1640 }
1641 }
1642
1643 pub fn set_cached_root(
1645 &self,
1646 pubkey_hex: &str,
1647 tree_name: &str,
1648 hash: &str,
1649 key: Option<&str>,
1650 visibility: &str,
1651 updated_at: u64,
1652 ) -> Result<()> {
1653 let db_key = format!("{}/{}", pubkey_hex, tree_name);
1654 let root = CachedRoot {
1655 hash: hash.to_string(),
1656 key: key.map(|k| k.to_string()),
1657 updated_at,
1658 visibility: visibility.to_string(),
1659 };
1660 let bytes = rmp_serde::to_vec(&root)
1661 .map_err(|e| anyhow::anyhow!("Failed to serialize CachedRoot: {}", e))?;
1662 let mut wtxn = self.env.write_txn()?;
1663 self.cached_roots.put(&mut wtxn, &db_key, &bytes)?;
1664 wtxn.commit()?;
1665 Ok(())
1666 }
1667
1668 pub fn list_cached_roots(&self, pubkey_hex: &str) -> Result<Vec<(String, CachedRoot)>> {
1670 let prefix = format!("{}/", pubkey_hex);
1671 let rtxn = self.env.read_txn()?;
1672 let mut results = Vec::new();
1673
1674 for item in self.cached_roots.iter(&rtxn)? {
1675 let (key, bytes) = item?;
1676 if key.starts_with(&prefix) {
1677 let tree_name = key.strip_prefix(&prefix).unwrap_or(key);
1678 let root: CachedRoot = rmp_serde::from_slice(bytes)
1679 .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
1680 results.push((tree_name.to_string(), root));
1681 }
1682 }
1683
1684 Ok(results)
1685 }
1686
1687 pub fn delete_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<bool> {
1689 let key = format!("{}/{}", pubkey_hex, tree_name);
1690 let mut wtxn = self.env.write_txn()?;
1691 let deleted = self.cached_roots.delete(&mut wtxn, &key)?;
1692 wtxn.commit()?;
1693 Ok(deleted)
1694 }
1695
1696 pub fn gc(&self) -> Result<GcStats> {
1698 let rtxn = self.env.read_txn()?;
1699
1700 let pinned: HashSet<String> = self.pins.iter(&rtxn)?
1702 .filter_map(|item| item.ok())
1703 .map(|(hash_hex, _)| hash_hex.to_string())
1704 .collect();
1705
1706 drop(rtxn);
1707
1708 let all_hashes = self.router.list()
1710 .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
1711
1712 let mut deleted = 0;
1714 let mut freed_bytes = 0u64;
1715
1716 for hash in all_hashes {
1717 let hash_hex = to_hex(&hash);
1718 if !pinned.contains(&hash_hex) {
1719 if let Ok(Some(data)) = self.router.get_sync(&hash) {
1720 freed_bytes += data.len() as u64;
1721 let _ = self.router.delete_local_only(&hash);
1723 deleted += 1;
1724 }
1725 }
1726 }
1727
1728 Ok(GcStats {
1729 deleted_dags: deleted,
1730 freed_bytes,
1731 })
1732 }
1733}
1734
1735#[derive(Debug)]
1736pub struct StorageStats {
1737 pub total_dags: usize,
1738 pub pinned_dags: usize,
1739 pub total_bytes: u64,
1740}
1741
1742#[derive(Debug, Clone)]
1744pub struct StorageByPriority {
1745 pub own: u64,
1747 pub followed: u64,
1749 pub other: u64,
1751}
1752
1753#[derive(Debug, Clone)]
1754pub struct FileChunkMetadata {
1755 pub total_size: u64,
1756 pub chunk_cids: Vec<String>,
1757 pub chunk_sizes: Vec<u64>,
1758 pub is_chunked: bool,
1759}
1760
1761pub struct FileRangeChunksOwned {
1763 store: Arc<HashtreeStore>,
1764 metadata: FileChunkMetadata,
1765 start: u64,
1766 end: u64,
1767 current_chunk_idx: usize,
1768 current_offset: u64,
1769}
1770
1771impl Iterator for FileRangeChunksOwned {
1772 type Item = Result<Vec<u8>>;
1773
1774 fn next(&mut self) -> Option<Self::Item> {
1775 if !self.metadata.is_chunked || self.current_chunk_idx >= self.metadata.chunk_cids.len() {
1776 return None;
1777 }
1778
1779 if self.current_offset > self.end {
1780 return None;
1781 }
1782
1783 let chunk_cid = &self.metadata.chunk_cids[self.current_chunk_idx];
1784 let chunk_size = self.metadata.chunk_sizes[self.current_chunk_idx];
1785 let chunk_end = self.current_offset + chunk_size - 1;
1786
1787 self.current_chunk_idx += 1;
1788
1789 if chunk_end < self.start || self.current_offset > self.end {
1790 self.current_offset += chunk_size;
1791 return self.next();
1792 }
1793
1794 let chunk_content = match self.store.get_chunk(chunk_cid) {
1795 Ok(Some(content)) => content,
1796 Ok(None) => {
1797 return Some(Err(anyhow::anyhow!("Chunk {} not found", chunk_cid)));
1798 }
1799 Err(e) => {
1800 return Some(Err(e));
1801 }
1802 };
1803
1804 let chunk_read_start = if self.current_offset >= self.start {
1805 0
1806 } else {
1807 (self.start - self.current_offset) as usize
1808 };
1809
1810 let chunk_read_end = if chunk_end <= self.end {
1811 chunk_size as usize - 1
1812 } else {
1813 (self.end - self.current_offset) as usize
1814 };
1815
1816 let result = chunk_content[chunk_read_start..=chunk_read_end].to_vec();
1817 self.current_offset += chunk_size;
1818
1819 Some(Ok(result))
1820 }
1821}
1822
1823#[derive(Debug)]
1824pub struct GcStats {
1825 pub deleted_dags: usize,
1826 pub freed_bytes: u64,
1827}
1828
1829#[derive(Debug, Clone)]
1830pub struct DirEntry {
1831 pub name: String,
1832 pub cid: String,
1833 pub is_directory: bool,
1834 pub size: u64,
1835}
1836
1837#[derive(Debug, Clone)]
1838pub struct DirectoryListing {
1839 pub dir_name: String,
1840 pub entries: Vec<DirEntry>,
1841}
1842
1843#[derive(Debug, Clone)]
1844pub struct PinnedItem {
1845 pub cid: String,
1846 pub name: String,
1847 pub is_directory: bool,
1848}
1849
1850#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
1852pub struct BlobMetadata {
1853 pub sha256: String,
1854 pub size: u64,
1855 pub mime_type: String,
1856 pub uploaded: u64,
1857}
1858
1859impl crate::webrtc::ContentStore for HashtreeStore {
1861 fn get(&self, hash_hex: &str) -> Result<Option<Vec<u8>>> {
1862 self.get_chunk(hash_hex)
1863 }
1864}