1use anyhow::{Context, Result};
2use async_trait::async_trait;
3use heed::{Database, EnvOpenOptions};
4use heed::types::*;
5use hashtree_lmdb::LmdbBlobStore;
6use hashtree_core::{
7 HashTree, HashTreeConfig, Cid,
8 sha256, to_hex, from_hex, TreeNode, DirEntry as HashTreeDirEntry,
9 types::Hash,
10};
11use hashtree_core::store::{Store, StoreError};
12use serde::{Deserialize, Serialize};
13use std::path::Path;
14use std::collections::HashSet;
15use std::io::Read;
16use std::sync::Arc;
17use std::time::{SystemTime, UNIX_EPOCH};
18use futures::executor::block_on as sync_block_on;
19
20pub const PRIORITY_OTHER: u8 = 64;
22pub const PRIORITY_FOLLOWED: u8 = 128;
23pub const PRIORITY_OWN: u8 = 255;
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct TreeMeta {
28 pub owner: String,
30 pub name: Option<String>,
32 pub synced_at: u64,
34 pub total_size: u64,
36 pub priority: u8,
38}
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct CachedRoot {
43 pub hash: String,
45 pub key: Option<String>,
47 pub updated_at: u64,
49 pub visibility: String,
51}
52
53#[cfg(feature = "s3")]
54use tokio::sync::mpsc;
55
56use crate::config::S3Config;
57
58#[cfg(feature = "s3")]
60enum S3SyncMessage {
61 Upload { hash: Hash, data: Vec<u8> },
62 Delete { hash: Hash },
63}
64
65pub struct StorageRouter {
70 local: Arc<LmdbBlobStore>,
72 #[cfg(feature = "s3")]
74 s3_client: Option<aws_sdk_s3::Client>,
75 #[cfg(feature = "s3")]
76 s3_bucket: Option<String>,
77 #[cfg(feature = "s3")]
78 s3_prefix: String,
79 #[cfg(feature = "s3")]
81 sync_tx: Option<mpsc::UnboundedSender<S3SyncMessage>>,
82}
83
84impl StorageRouter {
85 pub fn new(local: Arc<LmdbBlobStore>) -> Self {
87 Self {
88 local,
89 #[cfg(feature = "s3")]
90 s3_client: None,
91 #[cfg(feature = "s3")]
92 s3_bucket: None,
93 #[cfg(feature = "s3")]
94 s3_prefix: String::new(),
95 #[cfg(feature = "s3")]
96 sync_tx: None,
97 }
98 }
99
100 #[cfg(feature = "s3")]
102 pub async fn with_s3(local: Arc<LmdbBlobStore>, config: &S3Config) -> Result<Self, anyhow::Error> {
103 use aws_sdk_s3::Client as S3Client;
104
105 let mut aws_config_loader = aws_config::from_env();
107 aws_config_loader = aws_config_loader.region(aws_sdk_s3::config::Region::new(config.region.clone()));
108 let aws_config = aws_config_loader.load().await;
109
110 let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&aws_config);
112 s3_config_builder = s3_config_builder
113 .endpoint_url(&config.endpoint)
114 .force_path_style(true);
115
116 let s3_client = S3Client::from_conf(s3_config_builder.build());
117 let bucket = config.bucket.clone();
118 let prefix = config.prefix.clone().unwrap_or_default();
119
120 let (sync_tx, mut sync_rx) = mpsc::unbounded_channel::<S3SyncMessage>();
122
123 let sync_client = s3_client.clone();
125 let sync_bucket = bucket.clone();
126 let sync_prefix = prefix.clone();
127
128 tokio::spawn(async move {
129 use aws_sdk_s3::primitives::ByteStream;
130
131 tracing::info!("S3 background sync task started");
132
133 while let Some(msg) = sync_rx.recv().await {
134 match msg {
135 S3SyncMessage::Upload { hash, data } => {
136 let key = format!("{}{}.bin", sync_prefix, to_hex(&hash));
137 tracing::info!("S3 uploading {} ({} bytes)", &key, data.len());
138
139 match sync_client
140 .put_object()
141 .bucket(&sync_bucket)
142 .key(&key)
143 .body(ByteStream::from(data))
144 .send()
145 .await
146 {
147 Ok(_) => tracing::info!("S3 upload succeeded for {}", &key),
148 Err(e) => tracing::error!("S3 upload failed for {}: {}", &key, e),
149 }
150 }
151 S3SyncMessage::Delete { hash } => {
152 let key = format!("{}{}.bin", sync_prefix, to_hex(&hash));
153 tracing::debug!("S3 deleting {}", &key);
154
155 if let Err(e) = sync_client
156 .delete_object()
157 .bucket(&sync_bucket)
158 .key(&key)
159 .send()
160 .await
161 {
162 tracing::error!("S3 delete failed for {}: {}", &key[..16.min(key.len())], e);
163 }
164 }
165 }
166 }
167 });
168
169 tracing::info!("S3 storage initialized: bucket={}, prefix={}", bucket, prefix);
170
171 Ok(Self {
172 local,
173 s3_client: Some(s3_client),
174 s3_bucket: Some(bucket),
175 s3_prefix: prefix,
176 sync_tx: Some(sync_tx),
177 })
178 }
179
180 pub fn put_sync(&self, hash: Hash, data: &[u8]) -> Result<bool, StoreError> {
182 let is_new = self.local.put_sync(hash, data)?;
184
185 #[cfg(feature = "s3")]
188 if let Some(ref tx) = self.sync_tx {
189 tracing::info!("Queueing S3 upload for {} ({} bytes, is_new={})",
190 crate::storage::to_hex(&hash)[..16].to_string(), data.len(), is_new);
191 if let Err(e) = tx.send(S3SyncMessage::Upload { hash, data: data.to_vec() }) {
192 tracing::error!("Failed to queue S3 upload: {}", e);
193 }
194 }
195
196 Ok(is_new)
197 }
198
199 pub fn get_sync(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
201 if let Some(data) = self.local.get_sync(hash)? {
203 return Ok(Some(data));
204 }
205
206 #[cfg(feature = "s3")]
208 if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
209 let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
210
211 match sync_block_on(async {
212 client.get_object()
213 .bucket(bucket)
214 .key(&key)
215 .send()
216 .await
217 }) {
218 Ok(output) => {
219 if let Ok(body) = sync_block_on(output.body.collect()) {
220 let data = body.into_bytes().to_vec();
221 let _ = self.local.put_sync(*hash, &data);
223 return Ok(Some(data));
224 }
225 }
226 Err(e) => {
227 let service_err = e.into_service_error();
228 if !service_err.is_no_such_key() {
229 tracing::warn!("S3 get failed: {}", service_err);
230 }
231 }
232 }
233 }
234
235 Ok(None)
236 }
237
238 pub fn exists(&self, hash: &Hash) -> Result<bool, StoreError> {
240 if self.local.exists(hash)? {
242 return Ok(true);
243 }
244
245 #[cfg(feature = "s3")]
247 if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
248 let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
249
250 match sync_block_on(async {
251 client.head_object()
252 .bucket(bucket)
253 .key(&key)
254 .send()
255 .await
256 }) {
257 Ok(_) => return Ok(true),
258 Err(e) => {
259 let service_err = e.into_service_error();
260 if !service_err.is_not_found() {
261 tracing::warn!("S3 head failed: {}", service_err);
262 }
263 }
264 }
265 }
266
267 Ok(false)
268 }
269
270 pub fn delete_sync(&self, hash: &Hash) -> Result<bool, StoreError> {
272 let deleted = self.local.delete_sync(hash)?;
273
274 #[cfg(feature = "s3")]
276 if let Some(ref tx) = self.sync_tx {
277 let _ = tx.send(S3SyncMessage::Delete { hash: *hash });
278 }
279
280 Ok(deleted)
281 }
282
283 pub fn delete_local_only(&self, hash: &Hash) -> Result<bool, StoreError> {
286 self.local.delete_sync(hash)
287 }
288
289 pub fn stats(&self) -> Result<hashtree_lmdb::LmdbStats, StoreError> {
291 self.local.stats()
292 }
293
294 pub fn list(&self) -> Result<Vec<Hash>, StoreError> {
296 self.local.list()
297 }
298
299 pub fn local_store(&self) -> Arc<LmdbBlobStore> {
301 Arc::clone(&self.local)
302 }
303}
304
305#[async_trait]
308impl Store for StorageRouter {
309 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
310 self.put_sync(hash, &data)
311 }
312
313 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
314 self.get_sync(hash)
315 }
316
317 async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
318 self.exists(hash)
319 }
320
321 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
322 self.delete_sync(hash)
323 }
324}
325
326pub struct HashtreeStore {
327 env: heed::Env,
328 pins: Database<Str, Unit>,
330 sha256_index: Database<Str, Str>,
332 blob_owners: Database<Bytes, Unit>,
334 pubkey_blobs: Database<Str, Bytes>,
336 tree_meta: Database<Bytes, Bytes>,
338 blob_trees: Database<Bytes, Unit>,
340 tree_refs: Database<Str, Bytes>,
342 cached_roots: Database<Str, Bytes>,
344 router: Arc<StorageRouter>,
346 max_size_bytes: u64,
348}
349
350impl HashtreeStore {
351 pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
353 Self::with_options(path, None, 10 * 1024 * 1024 * 1024)
354 }
355
356 pub fn with_s3<P: AsRef<Path>>(path: P, s3_config: Option<&S3Config>) -> Result<Self> {
358 Self::with_options(path, s3_config, 10 * 1024 * 1024 * 1024)
359 }
360
361 pub fn with_options<P: AsRef<Path>>(path: P, s3_config: Option<&S3Config>, max_size_bytes: u64) -> Result<Self> {
363 let path = path.as_ref();
364 std::fs::create_dir_all(path)?;
365
366 let env = unsafe {
367 EnvOpenOptions::new()
368 .map_size(10 * 1024 * 1024 * 1024) .max_dbs(9) .open(path)?
371 };
372
373 let mut wtxn = env.write_txn()?;
374 let pins = env.create_database(&mut wtxn, Some("pins"))?;
375 let sha256_index = env.create_database(&mut wtxn, Some("sha256_index"))?;
376 let blob_owners = env.create_database(&mut wtxn, Some("blob_owners"))?;
377 let pubkey_blobs = env.create_database(&mut wtxn, Some("pubkey_blobs"))?;
378 let tree_meta = env.create_database(&mut wtxn, Some("tree_meta"))?;
379 let blob_trees = env.create_database(&mut wtxn, Some("blob_trees"))?;
380 let tree_refs = env.create_database(&mut wtxn, Some("tree_refs"))?;
381 let cached_roots = env.create_database(&mut wtxn, Some("cached_roots"))?;
382 wtxn.commit()?;
383
384 let lmdb_store = Arc::new(LmdbBlobStore::new(path.join("blobs"))
386 .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?);
387
388 #[cfg(feature = "s3")]
390 let router = Arc::new(if let Some(s3_cfg) = s3_config {
391 tracing::info!("Initializing S3 storage backend: bucket={}, endpoint={}",
392 s3_cfg.bucket, s3_cfg.endpoint);
393
394 sync_block_on(async {
395 StorageRouter::with_s3(lmdb_store, s3_cfg).await
396 })?
397 } else {
398 StorageRouter::new(lmdb_store)
399 });
400
401 #[cfg(not(feature = "s3"))]
402 let router = Arc::new({
403 if s3_config.is_some() {
404 tracing::warn!("S3 config provided but S3 feature not enabled. Using local storage only.");
405 }
406 StorageRouter::new(lmdb_store)
407 });
408
409 Ok(Self {
410 env,
411 pins,
412 sha256_index,
413 blob_owners,
414 pubkey_blobs,
415 tree_meta,
416 blob_trees,
417 tree_refs,
418 cached_roots,
419 router,
420 max_size_bytes,
421 })
422 }
423
424 pub fn router(&self) -> &StorageRouter {
426 &self.router
427 }
428
429 pub fn store_arc(&self) -> Arc<StorageRouter> {
432 Arc::clone(&self.router)
433 }
434
435 pub fn upload_file<P: AsRef<Path>>(&self, file_path: P) -> Result<String> {
437 self.upload_file_internal(file_path, true)
438 }
439
440 pub fn upload_file_no_pin<P: AsRef<Path>>(&self, file_path: P) -> Result<String> {
442 self.upload_file_internal(file_path, false)
443 }
444
445 fn upload_file_internal<P: AsRef<Path>>(&self, file_path: P, pin: bool) -> Result<String> {
446 let file_path = file_path.as_ref();
447 let file_content = std::fs::read(file_path)?;
448
449 let content_sha256 = sha256(&file_content);
451 let sha256_hex = to_hex(&content_sha256);
452
453 let store = self.store_arc();
455 let tree = HashTree::new(HashTreeConfig::new(store).public());
456
457 let cid = sync_block_on(async {
458 tree.put(&file_content).await
459 }).context("Failed to store file")?;
460
461 let root_hex = to_hex(&cid.hash);
462
463 let mut wtxn = self.env.write_txn()?;
464
465 self.sha256_index.put(&mut wtxn, &sha256_hex, &root_hex)?;
467
468 if pin {
470 self.pins.put(&mut wtxn, &root_hex, &())?;
471 }
472
473 wtxn.commit()?;
474
475 Ok(root_hex)
476 }
477
478 pub fn upload_file_stream<R: Read, F>(
480 &self,
481 mut reader: R,
482 _file_name: impl Into<String>,
483 mut callback: F,
484 ) -> Result<String>
485 where
486 F: FnMut(&str),
487 {
488 let mut data = Vec::new();
490 reader.read_to_end(&mut data)?;
491
492 let content_sha256 = sha256(&data);
494 let sha256_hex = to_hex(&content_sha256);
495
496 let store = self.store_arc();
498 let tree = HashTree::new(HashTreeConfig::new(store).public());
499
500 let cid = sync_block_on(async {
501 tree.put(&data).await
502 }).context("Failed to store file")?;
503
504 let root_hex = to_hex(&cid.hash);
505 callback(&root_hex);
506
507 let mut wtxn = self.env.write_txn()?;
508
509 self.sha256_index.put(&mut wtxn, &sha256_hex, &root_hex)?;
511
512 self.pins.put(&mut wtxn, &root_hex, &())?;
514
515 wtxn.commit()?;
516
517 Ok(root_hex)
518 }
519
520 pub fn upload_dir<P: AsRef<Path>>(&self, dir_path: P) -> Result<String> {
523 self.upload_dir_with_options(dir_path, true)
524 }
525
526 pub fn upload_dir_with_options<P: AsRef<Path>>(&self, dir_path: P, respect_gitignore: bool) -> Result<String> {
528 let dir_path = dir_path.as_ref();
529
530 let store = self.store_arc();
531 let tree = HashTree::new(HashTreeConfig::new(store).public());
532
533 let root_cid = sync_block_on(async {
534 self.upload_dir_recursive(&tree, dir_path, dir_path, respect_gitignore).await
535 }).context("Failed to upload directory")?;
536
537 let root_hex = to_hex(&root_cid.hash);
538
539 let mut wtxn = self.env.write_txn()?;
540 self.pins.put(&mut wtxn, &root_hex, &())?;
541 wtxn.commit()?;
542
543 Ok(root_hex)
544 }
545
546 async fn upload_dir_recursive<S: Store>(
547 &self,
548 tree: &HashTree<S>,
549 _root_path: &Path,
550 current_path: &Path,
551 respect_gitignore: bool,
552 ) -> Result<Cid> {
553 use ignore::WalkBuilder;
554 use std::collections::HashMap;
555
556 let mut dir_contents: HashMap<String, Vec<(String, Cid)>> = HashMap::new();
558 dir_contents.insert(String::new(), Vec::new()); let walker = WalkBuilder::new(current_path)
561 .git_ignore(respect_gitignore)
562 .git_global(respect_gitignore)
563 .git_exclude(respect_gitignore)
564 .hidden(false)
565 .build();
566
567 for result in walker {
568 let entry = result?;
569 let path = entry.path();
570
571 if path == current_path {
573 continue;
574 }
575
576 let relative = path.strip_prefix(current_path)
577 .unwrap_or(path);
578
579 if path.is_file() {
580 let content = std::fs::read(path)?;
581 let cid = tree.put(&content).await
582 .map_err(|e| anyhow::anyhow!("Failed to upload file {}: {}", path.display(), e))?;
583
584 let parent = relative.parent()
586 .map(|p| p.to_string_lossy().to_string())
587 .unwrap_or_default();
588 let name = relative.file_name()
589 .map(|n| n.to_string_lossy().to_string())
590 .unwrap_or_default();
591
592 dir_contents.entry(parent).or_default().push((name, cid));
593 } else if path.is_dir() {
594 let dir_path = relative.to_string_lossy().to_string();
596 dir_contents.entry(dir_path).or_default();
597 }
598 }
599
600 self.build_directory_tree(tree, &mut dir_contents).await
602 }
603
604 async fn build_directory_tree<S: Store>(
605 &self,
606 tree: &HashTree<S>,
607 dir_contents: &mut std::collections::HashMap<String, Vec<(String, Cid)>>,
608 ) -> Result<Cid> {
609 let mut dirs: Vec<String> = dir_contents.keys().cloned().collect();
611 dirs.sort_by(|a, b| {
612 let depth_a = a.matches('/').count() + if a.is_empty() { 0 } else { 1 };
613 let depth_b = b.matches('/').count() + if b.is_empty() { 0 } else { 1 };
614 depth_b.cmp(&depth_a) });
616
617 let mut dir_cids: std::collections::HashMap<String, Cid> = std::collections::HashMap::new();
618
619 for dir_path in dirs {
620 let files = dir_contents.get(&dir_path).cloned().unwrap_or_default();
621
622 let mut entries: Vec<HashTreeDirEntry> = files.into_iter()
623 .map(|(name, cid)| HashTreeDirEntry::from_cid(name, &cid))
624 .collect();
625
626 for (subdir_path, cid) in &dir_cids {
628 let parent = std::path::Path::new(subdir_path)
629 .parent()
630 .map(|p| p.to_string_lossy().to_string())
631 .unwrap_or_default();
632
633 if parent == dir_path {
634 let name = std::path::Path::new(subdir_path)
635 .file_name()
636 .map(|n| n.to_string_lossy().to_string())
637 .unwrap_or_default();
638 entries.push(HashTreeDirEntry::from_cid(name, cid));
639 }
640 }
641
642 let cid = tree.put_directory(entries).await
643 .map_err(|e| anyhow::anyhow!("Failed to create directory node: {}", e))?;
644
645 dir_cids.insert(dir_path, cid);
646 }
647
648 dir_cids.get("")
650 .cloned()
651 .ok_or_else(|| anyhow::anyhow!("No root directory"))
652 }
653
654 pub fn upload_file_encrypted<P: AsRef<Path>>(&self, file_path: P) -> Result<String> {
656 let file_path = file_path.as_ref();
657 let file_content = std::fs::read(file_path)?;
658
659 let store = self.store_arc();
661 let tree = HashTree::new(HashTreeConfig::new(store));
662
663 let cid = sync_block_on(async {
664 tree.put(&file_content).await
665 }).map_err(|e| anyhow::anyhow!("Failed to encrypt file: {}", e))?;
666
667 let cid_str = cid.to_string();
668
669 let mut wtxn = self.env.write_txn()?;
670 self.pins.put(&mut wtxn, &cid_str, &())?;
671 wtxn.commit()?;
672
673 Ok(cid_str)
674 }
675
676 pub fn upload_dir_encrypted<P: AsRef<Path>>(&self, dir_path: P) -> Result<String> {
679 self.upload_dir_encrypted_with_options(dir_path, true)
680 }
681
682 pub fn upload_dir_encrypted_with_options<P: AsRef<Path>>(&self, dir_path: P, respect_gitignore: bool) -> Result<String> {
685 let dir_path = dir_path.as_ref();
686 let store = self.store_arc();
687
688 let tree = HashTree::new(HashTreeConfig::new(store));
690
691 let root_cid = sync_block_on(async {
692 self.upload_dir_recursive(&tree, dir_path, dir_path, respect_gitignore).await
693 }).context("Failed to upload encrypted directory")?;
694
695 let cid_str = root_cid.to_string(); let mut wtxn = self.env.write_txn()?;
698 self.pins.put(&mut wtxn, &to_hex(&root_cid.hash), &())?;
700 wtxn.commit()?;
701
702 Ok(cid_str)
703 }
704
705 pub fn get_tree_node(&self, hash_hex: &str) -> Result<Option<TreeNode>> {
707 let hash = from_hex(hash_hex)
708 .map_err(|e| anyhow::anyhow!("Invalid hash: {}", e))?;
709
710 let store = self.store_arc();
711 let tree = HashTree::new(HashTreeConfig::new(store).public());
712
713 sync_block_on(async {
714 tree.get_tree_node(&hash).await
715 .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))
716 })
717 }
718
719 pub fn get_cid_by_sha256(&self, sha256_hex: &str) -> Result<Option<String>> {
721 let rtxn = self.env.read_txn()?;
722 Ok(self.sha256_index.get(&rtxn, sha256_hex)?.map(|s| s.to_string()))
723 }
724
725 pub fn put_blob(&self, data: &[u8]) -> Result<String> {
727 let hash = sha256(data);
728 self.router.put_sync(hash, data)
729 .map_err(|e| anyhow::anyhow!("Failed to store blob: {}", e))?;
730 Ok(to_hex(&hash))
731 }
732
733 pub fn get_blob(&self, sha256_hex: &str) -> Result<Option<Vec<u8>>> {
735 let hash = from_hex(sha256_hex)
736 .map_err(|e| anyhow::anyhow!("invalid hex: {}", e))?;
737 self.router.get_sync(&hash)
738 .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))
739 }
740
741 pub fn blob_exists(&self, sha256_hex: &str) -> Result<bool> {
743 let hash = from_hex(sha256_hex)
744 .map_err(|e| anyhow::anyhow!("invalid hex: {}", e))?;
745 self.router.exists(&hash)
746 .map_err(|e| anyhow::anyhow!("Failed to check blob: {}", e))
747 }
748
749 fn blob_owner_key(sha256_hex: &str, pubkey_hex: &str) -> Result<[u8; 64]> {
755 let sha256_bytes = from_hex(sha256_hex)
756 .map_err(|e| anyhow::anyhow!("invalid sha256 hex: {}", e))?;
757 let pubkey_bytes = from_hex(pubkey_hex)
758 .map_err(|e| anyhow::anyhow!("invalid pubkey hex: {}", e))?;
759 let mut key = [0u8; 64];
760 key[..32].copy_from_slice(&sha256_bytes);
761 key[32..].copy_from_slice(&pubkey_bytes);
762 Ok(key)
763 }
764
765 pub fn set_blob_owner(&self, sha256_hex: &str, pubkey: &str) -> Result<()> {
768 use std::time::{SystemTime, UNIX_EPOCH};
769
770 let key = Self::blob_owner_key(sha256_hex, pubkey)?;
771 let mut wtxn = self.env.write_txn()?;
772
773 self.blob_owners.put(&mut wtxn, &key[..], &())?;
775
776 let mut blobs: Vec<BlobMetadata> = self
778 .pubkey_blobs
779 .get(&wtxn, pubkey)?
780 .and_then(|b| serde_json::from_slice(b).ok())
781 .unwrap_or_default();
782
783 if !blobs.iter().any(|b| b.sha256 == sha256_hex) {
785 let now = SystemTime::now()
786 .duration_since(UNIX_EPOCH)
787 .unwrap()
788 .as_secs();
789
790 let size = self
792 .get_cid_by_sha256(sha256_hex)?
793 .and_then(|cid| self.get_file_chunk_metadata(&cid).ok().flatten())
794 .map(|m| m.total_size)
795 .unwrap_or(0);
796
797 blobs.push(BlobMetadata {
798 sha256: sha256_hex.to_string(),
799 size,
800 mime_type: "application/octet-stream".to_string(),
801 uploaded: now,
802 });
803
804 let blobs_json = serde_json::to_vec(&blobs)?;
805 self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
806 }
807
808 wtxn.commit()?;
809 Ok(())
810 }
811
812 pub fn is_blob_owner(&self, sha256_hex: &str, pubkey: &str) -> Result<bool> {
814 let key = Self::blob_owner_key(sha256_hex, pubkey)?;
815 let rtxn = self.env.read_txn()?;
816 Ok(self.blob_owners.get(&rtxn, &key[..])?.is_some())
817 }
818
819 pub fn get_blob_owners(&self, sha256_hex: &str) -> Result<Vec<String>> {
821 let sha256_bytes = from_hex(sha256_hex)
822 .map_err(|e| anyhow::anyhow!("invalid sha256 hex: {}", e))?;
823 let rtxn = self.env.read_txn()?;
824
825 let mut owners = Vec::new();
826 for item in self.blob_owners.prefix_iter(&rtxn, &sha256_bytes[..])? {
827 let (key, _) = item?;
828 if key.len() == 64 {
829 let pubkey_hex = to_hex(&key[32..64].try_into().unwrap());
831 owners.push(pubkey_hex);
832 }
833 }
834 Ok(owners)
835 }
836
837 pub fn blob_has_owners(&self, sha256_hex: &str) -> Result<bool> {
839 let sha256_bytes = from_hex(sha256_hex)
840 .map_err(|e| anyhow::anyhow!("invalid sha256 hex: {}", e))?;
841 let rtxn = self.env.read_txn()?;
842
843 for item in self.blob_owners.prefix_iter(&rtxn, &sha256_bytes[..])? {
845 if item.is_ok() {
846 return Ok(true);
847 }
848 }
849 Ok(false)
850 }
851
852 pub fn get_blob_owner(&self, sha256_hex: &str) -> Result<Option<String>> {
854 Ok(self.get_blob_owners(sha256_hex)?.into_iter().next())
855 }
856
857 pub fn delete_blossom_blob(&self, sha256_hex: &str, pubkey: &str) -> Result<bool> {
861 let key = Self::blob_owner_key(sha256_hex, pubkey)?;
862 let mut wtxn = self.env.write_txn()?;
863
864 self.blob_owners.delete(&mut wtxn, &key[..])?;
866
867 if let Some(blobs_bytes) = self.pubkey_blobs.get(&wtxn, pubkey)? {
869 if let Ok(mut blobs) = serde_json::from_slice::<Vec<BlobMetadata>>(blobs_bytes) {
870 blobs.retain(|b| b.sha256 != sha256_hex);
871 let blobs_json = serde_json::to_vec(&blobs)?;
872 self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
873 }
874 }
875
876 let sha256_bytes = from_hex(sha256_hex)
878 .map_err(|e| anyhow::anyhow!("invalid sha256 hex: {}", e))?;
879 let mut has_other_owners = false;
880 for item in self.blob_owners.prefix_iter(&wtxn, &sha256_bytes[..])? {
881 if item.is_ok() {
882 has_other_owners = true;
883 break;
884 }
885 }
886
887 if has_other_owners {
888 wtxn.commit()?;
889 tracing::debug!(
890 "Removed {} from blob {} owners, other owners remain",
891 &pubkey[..8.min(pubkey.len())],
892 &sha256_hex[..8.min(sha256_hex.len())]
893 );
894 return Ok(false);
895 }
896
897 tracing::info!(
899 "All owners removed from blob {}, deleting",
900 &sha256_hex[..8.min(sha256_hex.len())]
901 );
902
903 let root_hex = self.sha256_index.get(&wtxn, sha256_hex)?.map(|s| s.to_string());
905 if let Some(ref root_hex) = root_hex {
906 self.pins.delete(&mut wtxn, root_hex)?;
908 }
909 self.sha256_index.delete(&mut wtxn, sha256_hex)?;
910
911 let hash = from_hex(sha256_hex)
913 .map_err(|e| anyhow::anyhow!("invalid hex: {}", e))?;
914 let _ = self.router.delete_sync(&hash);
915
916 wtxn.commit()?;
917 Ok(true)
918 }
919
920 pub fn list_blobs_by_pubkey(&self, pubkey: &str) -> Result<Vec<crate::server::blossom::BlobDescriptor>> {
922 let rtxn = self.env.read_txn()?;
923
924 let blobs: Vec<BlobMetadata> = self
925 .pubkey_blobs
926 .get(&rtxn, pubkey)?
927 .and_then(|b| serde_json::from_slice(b).ok())
928 .unwrap_or_default();
929
930 Ok(blobs
931 .into_iter()
932 .map(|b| crate::server::blossom::BlobDescriptor {
933 url: format!("/{}", b.sha256),
934 sha256: b.sha256,
935 size: b.size,
936 mime_type: b.mime_type,
937 uploaded: b.uploaded,
938 })
939 .collect())
940 }
941
942 pub fn get_chunk(&self, chunk_hex: &str) -> Result<Option<Vec<u8>>> {
944 let hash = from_hex(chunk_hex)
945 .map_err(|e| anyhow::anyhow!("Invalid hash: {}", e))?;
946 self.router.get_sync(&hash)
947 .map_err(|e| anyhow::anyhow!("Failed to get chunk: {}", e))
948 }
949
950 pub fn get_file(&self, hash_hex: &str) -> Result<Option<Vec<u8>>> {
953 let hash = from_hex(hash_hex)
954 .map_err(|e| anyhow::anyhow!("Invalid hash: {}", e))?;
955
956 let store = self.store_arc();
957 let tree = HashTree::new(HashTreeConfig::new(store).public());
958
959 sync_block_on(async {
960 tree.read_file(&hash).await
961 .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
962 })
963 }
964
965 pub fn get_file_chunk_metadata(&self, hash_hex: &str) -> Result<Option<FileChunkMetadata>> {
967 let hash = from_hex(hash_hex)
968 .map_err(|e| anyhow::anyhow!("Invalid hash: {}", e))?;
969
970 let store = self.store_arc();
971 let tree = HashTree::new(HashTreeConfig::new(store.clone()).public());
972
973 sync_block_on(async {
974 let exists = store.has(&hash).await
977 .map_err(|e| anyhow::anyhow!("Failed to check existence: {}", e))?;
978
979 if !exists {
980 return Ok(None);
981 }
982
983 let total_size = tree.get_size(&hash).await
985 .map_err(|e| anyhow::anyhow!("Failed to get size: {}", e))?;
986
987 let is_tree_node = tree.is_tree(&hash).await
989 .map_err(|e| anyhow::anyhow!("Failed to check tree: {}", e))?;
990
991 if !is_tree_node {
992 return Ok(Some(FileChunkMetadata {
994 total_size,
995 chunk_cids: vec![],
996 chunk_sizes: vec![],
997 is_chunked: false,
998 }));
999 }
1000
1001 let node = match tree.get_tree_node(&hash).await
1003 .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))? {
1004 Some(n) => n,
1005 None => return Ok(None),
1006 };
1007
1008 let is_directory = tree.is_directory(&hash).await
1010 .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1011
1012 if is_directory {
1013 return Ok(None); }
1015
1016 let chunk_cids: Vec<String> = node.links.iter().map(|l| to_hex(&l.hash)).collect();
1018 let chunk_sizes: Vec<u64> = node.links.iter().map(|l| l.size).collect();
1019
1020 Ok(Some(FileChunkMetadata {
1021 total_size,
1022 chunk_cids,
1023 chunk_sizes,
1024 is_chunked: !node.links.is_empty(),
1025 }))
1026 })
1027 }
1028
1029 pub fn get_file_range(&self, hash_hex: &str, start: u64, end: Option<u64>) -> Result<Option<(Vec<u8>, u64)>> {
1031 let metadata = match self.get_file_chunk_metadata(hash_hex)? {
1032 Some(m) => m,
1033 None => return Ok(None),
1034 };
1035
1036 if metadata.total_size == 0 {
1037 return Ok(Some((Vec::new(), 0)));
1038 }
1039
1040 if start >= metadata.total_size {
1041 return Ok(None);
1042 }
1043
1044 let end = end.unwrap_or(metadata.total_size - 1).min(metadata.total_size - 1);
1045
1046 if !metadata.is_chunked {
1048 let content = self.get_file(hash_hex)?.unwrap_or_default();
1049 let range_content = if start < content.len() as u64 {
1050 content[start as usize..=(end as usize).min(content.len() - 1)].to_vec()
1051 } else {
1052 Vec::new()
1053 };
1054 return Ok(Some((range_content, metadata.total_size)));
1055 }
1056
1057 let mut result = Vec::new();
1059 let mut current_offset = 0u64;
1060
1061 for (i, chunk_cid) in metadata.chunk_cids.iter().enumerate() {
1062 let chunk_size = metadata.chunk_sizes[i];
1063 let chunk_end = current_offset + chunk_size - 1;
1064
1065 if chunk_end >= start && current_offset <= end {
1067 let chunk_content = match self.get_chunk(chunk_cid)? {
1068 Some(content) => content,
1069 None => {
1070 return Err(anyhow::anyhow!("Chunk {} not found", chunk_cid));
1071 }
1072 };
1073
1074 let chunk_read_start = if current_offset >= start {
1075 0
1076 } else {
1077 (start - current_offset) as usize
1078 };
1079
1080 let chunk_read_end = if chunk_end <= end {
1081 chunk_size as usize - 1
1082 } else {
1083 (end - current_offset) as usize
1084 };
1085
1086 result.extend_from_slice(&chunk_content[chunk_read_start..=chunk_read_end]);
1087 }
1088
1089 current_offset += chunk_size;
1090
1091 if current_offset > end {
1092 break;
1093 }
1094 }
1095
1096 Ok(Some((result, metadata.total_size)))
1097 }
1098
1099 pub fn stream_file_range_chunks_owned(
1101 self: Arc<Self>,
1102 hash_hex: &str,
1103 start: u64,
1104 end: u64,
1105 ) -> Result<Option<FileRangeChunksOwned>> {
1106 let metadata = match self.get_file_chunk_metadata(hash_hex)? {
1107 Some(m) => m,
1108 None => return Ok(None),
1109 };
1110
1111 if metadata.total_size == 0 || start >= metadata.total_size {
1112 return Ok(None);
1113 }
1114
1115 let end = end.min(metadata.total_size - 1);
1116
1117 Ok(Some(FileRangeChunksOwned {
1118 store: self,
1119 metadata,
1120 start,
1121 end,
1122 current_chunk_idx: 0,
1123 current_offset: 0,
1124 }))
1125 }
1126
1127 pub fn get_directory_listing(&self, hash_hex: &str) -> Result<Option<DirectoryListing>> {
1129 let hash = from_hex(hash_hex)
1130 .map_err(|e| anyhow::anyhow!("Invalid hash: {}", e))?;
1131
1132 let store = self.store_arc();
1133 let tree = HashTree::new(HashTreeConfig::new(store).public());
1134
1135 sync_block_on(async {
1136 let is_dir = tree.is_directory(&hash).await
1138 .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1139
1140 if !is_dir {
1141 return Ok(None);
1142 }
1143
1144 let cid = hashtree_core::Cid::public(hash, 0);
1146 let tree_entries = tree.list_directory(&cid).await
1147 .map_err(|e| anyhow::anyhow!("Failed to list directory: {}", e))?;
1148
1149 let entries: Vec<DirEntry> = tree_entries.into_iter().map(|e| DirEntry {
1150 name: e.name,
1151 cid: to_hex(&e.hash),
1152 is_directory: e.link_type.is_tree(),
1153 size: e.size,
1154 }).collect();
1155
1156 Ok(Some(DirectoryListing {
1157 dir_name: String::new(),
1158 entries,
1159 }))
1160 })
1161 }
1162
1163 pub fn pin(&self, hash_hex: &str) -> Result<()> {
1165 let mut wtxn = self.env.write_txn()?;
1166 self.pins.put(&mut wtxn, hash_hex, &())?;
1167 wtxn.commit()?;
1168 Ok(())
1169 }
1170
1171 pub fn unpin(&self, hash_hex: &str) -> Result<()> {
1173 let mut wtxn = self.env.write_txn()?;
1174 self.pins.delete(&mut wtxn, hash_hex)?;
1175 wtxn.commit()?;
1176 Ok(())
1177 }
1178
1179 pub fn is_pinned(&self, hash_hex: &str) -> Result<bool> {
1181 let rtxn = self.env.read_txn()?;
1182 Ok(self.pins.get(&rtxn, hash_hex)?.is_some())
1183 }
1184
1185 pub fn list_pins(&self) -> Result<Vec<String>> {
1187 let rtxn = self.env.read_txn()?;
1188 let mut pins = Vec::new();
1189
1190 for item in self.pins.iter(&rtxn)? {
1191 let (hash_hex, _) = item?;
1192 pins.push(hash_hex.to_string());
1193 }
1194
1195 Ok(pins)
1196 }
1197
1198 pub fn list_pins_with_names(&self) -> Result<Vec<PinnedItem>> {
1200 let rtxn = self.env.read_txn()?;
1201 let store = self.store_arc();
1202 let tree = HashTree::new(HashTreeConfig::new(store).public());
1203 let mut pins = Vec::new();
1204
1205 for item in self.pins.iter(&rtxn)? {
1206 let (hash_hex, _) = item?;
1207 let hash_hex_str = hash_hex.to_string();
1208
1209 let is_directory = if let Ok(hash) = from_hex(&hash_hex_str) {
1211 sync_block_on(async {
1212 tree.is_directory(&hash).await.unwrap_or(false)
1213 })
1214 } else {
1215 false
1216 };
1217
1218 pins.push(PinnedItem {
1219 cid: hash_hex_str,
1220 name: "Unknown".to_string(),
1221 is_directory,
1222 });
1223 }
1224
1225 Ok(pins)
1226 }
1227
1228 pub fn index_tree(
1235 &self,
1236 root_hash: &Hash,
1237 owner: &str,
1238 name: Option<&str>,
1239 priority: u8,
1240 ref_key: Option<&str>,
1241 ) -> Result<()> {
1242 let root_hex = to_hex(root_hash);
1243
1244 if let Some(key) = ref_key {
1246 let rtxn = self.env.read_txn()?;
1247 if let Some(old_hash_bytes) = self.tree_refs.get(&rtxn, key)? {
1248 if old_hash_bytes != root_hash.as_slice() {
1249 let old_hash: Hash = old_hash_bytes.try_into()
1250 .map_err(|_| anyhow::anyhow!("Invalid hash in tree_refs"))?;
1251 drop(rtxn);
1252 let _ = self.unindex_tree(&old_hash);
1254 tracing::debug!("Replaced old tree for ref {}", key);
1255 }
1256 }
1257 }
1258
1259 let store = self.store_arc();
1260 let tree = HashTree::new(HashTreeConfig::new(store).public());
1261
1262 let (blob_hashes, total_size) = sync_block_on(async {
1264 self.collect_tree_blobs(&tree, root_hash).await
1265 })?;
1266
1267 let mut wtxn = self.env.write_txn()?;
1268
1269 for blob_hash in &blob_hashes {
1271 let mut key = [0u8; 64];
1272 key[..32].copy_from_slice(blob_hash);
1273 key[32..].copy_from_slice(root_hash);
1274 self.blob_trees.put(&mut wtxn, &key[..], &())?;
1275 }
1276
1277 let meta = TreeMeta {
1279 owner: owner.to_string(),
1280 name: name.map(|s| s.to_string()),
1281 synced_at: SystemTime::now()
1282 .duration_since(UNIX_EPOCH)
1283 .unwrap()
1284 .as_secs(),
1285 total_size,
1286 priority,
1287 };
1288 let meta_bytes = rmp_serde::to_vec(&meta)
1289 .map_err(|e| anyhow::anyhow!("Failed to serialize TreeMeta: {}", e))?;
1290 self.tree_meta.put(&mut wtxn, root_hash.as_slice(), &meta_bytes)?;
1291
1292 if let Some(key) = ref_key {
1294 self.tree_refs.put(&mut wtxn, key, root_hash.as_slice())?;
1295 }
1296
1297 wtxn.commit()?;
1298
1299 tracing::debug!(
1300 "Indexed tree {} ({} blobs, {} bytes, priority {})",
1301 &root_hex[..8],
1302 blob_hashes.len(),
1303 total_size,
1304 priority
1305 );
1306
1307 Ok(())
1308 }
1309
1310 async fn collect_tree_blobs<S: Store>(
1312 &self,
1313 tree: &HashTree<S>,
1314 root: &Hash,
1315 ) -> Result<(Vec<Hash>, u64)> {
1316 let mut blobs = Vec::new();
1317 let mut total_size = 0u64;
1318 let mut stack = vec![*root];
1319
1320 while let Some(hash) = stack.pop() {
1321 let is_tree = tree.is_tree(&hash).await
1323 .map_err(|e| anyhow::anyhow!("Failed to check tree: {}", e))?;
1324
1325 if is_tree {
1326 if let Some(node) = tree.get_tree_node(&hash).await
1328 .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
1329 {
1330 for link in &node.links {
1331 stack.push(link.hash);
1332 }
1333 }
1334 } else {
1335 if let Some(data) = self.router.get_sync(&hash)
1337 .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))?
1338 {
1339 total_size += data.len() as u64;
1340 blobs.push(hash);
1341 }
1342 }
1343 }
1344
1345 Ok((blobs, total_size))
1346 }
1347
1348 pub fn unindex_tree(&self, root_hash: &Hash) -> Result<u64> {
1351 let root_hex = to_hex(root_hash);
1352
1353 let store = self.store_arc();
1354 let tree = HashTree::new(HashTreeConfig::new(store).public());
1355
1356 let (blob_hashes, _) = sync_block_on(async {
1358 self.collect_tree_blobs(&tree, root_hash).await
1359 })?;
1360
1361 let mut wtxn = self.env.write_txn()?;
1362 let mut freed = 0u64;
1363
1364 for blob_hash in &blob_hashes {
1366 let mut key = [0u8; 64];
1368 key[..32].copy_from_slice(blob_hash);
1369 key[32..].copy_from_slice(root_hash);
1370 self.blob_trees.delete(&mut wtxn, &key[..])?;
1371
1372 let rtxn = self.env.read_txn()?;
1374 let mut has_other_tree = false;
1375
1376 for item in self.blob_trees.prefix_iter(&rtxn, &blob_hash[..])? {
1377 if item.is_ok() {
1378 has_other_tree = true;
1379 break;
1380 }
1381 }
1382 drop(rtxn);
1383
1384 if !has_other_tree {
1386 if let Some(data) = self.router.get_sync(blob_hash)
1387 .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))?
1388 {
1389 freed += data.len() as u64;
1390 self.router.delete_local_only(blob_hash)
1392 .map_err(|e| anyhow::anyhow!("Failed to delete blob: {}", e))?;
1393 }
1394 }
1395 }
1396
1397 if let Some(data) = self.router.get_sync(root_hash)
1399 .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
1400 {
1401 freed += data.len() as u64;
1402 self.router.delete_local_only(root_hash)
1404 .map_err(|e| anyhow::anyhow!("Failed to delete tree node: {}", e))?;
1405 }
1406
1407 self.tree_meta.delete(&mut wtxn, root_hash.as_slice())?;
1409
1410 wtxn.commit()?;
1411
1412 tracing::debug!(
1413 "Unindexed tree {} ({} bytes freed)",
1414 &root_hex[..8],
1415 freed
1416 );
1417
1418 Ok(freed)
1419 }
1420
1421 pub fn get_tree_meta(&self, root_hash: &Hash) -> Result<Option<TreeMeta>> {
1423 let rtxn = self.env.read_txn()?;
1424 if let Some(bytes) = self.tree_meta.get(&rtxn, root_hash.as_slice())? {
1425 let meta: TreeMeta = rmp_serde::from_slice(bytes)
1426 .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
1427 Ok(Some(meta))
1428 } else {
1429 Ok(None)
1430 }
1431 }
1432
1433 pub fn list_indexed_trees(&self) -> Result<Vec<(Hash, TreeMeta)>> {
1435 let rtxn = self.env.read_txn()?;
1436 let mut trees = Vec::new();
1437
1438 for item in self.tree_meta.iter(&rtxn)? {
1439 let (hash_bytes, meta_bytes) = item?;
1440 let hash: Hash = hash_bytes.try_into()
1441 .map_err(|_| anyhow::anyhow!("Invalid hash in tree_meta"))?;
1442 let meta: TreeMeta = rmp_serde::from_slice(meta_bytes)
1443 .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
1444 trees.push((hash, meta));
1445 }
1446
1447 Ok(trees)
1448 }
1449
1450 pub fn tracked_size(&self) -> Result<u64> {
1452 let rtxn = self.env.read_txn()?;
1453 let mut total = 0u64;
1454
1455 for item in self.tree_meta.iter(&rtxn)? {
1456 let (_, bytes) = item?;
1457 let meta: TreeMeta = rmp_serde::from_slice(bytes)
1458 .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
1459 total += meta.total_size;
1460 }
1461
1462 Ok(total)
1463 }
1464
1465 fn get_evictable_trees(&self) -> Result<Vec<(Hash, TreeMeta)>> {
1467 let mut trees = self.list_indexed_trees()?;
1468
1469 trees.sort_by(|a, b| {
1471 match a.1.priority.cmp(&b.1.priority) {
1472 std::cmp::Ordering::Equal => a.1.synced_at.cmp(&b.1.synced_at),
1473 other => other,
1474 }
1475 });
1476
1477 Ok(trees)
1478 }
1479
1480 pub fn evict_if_needed(&self) -> Result<u64> {
1487 let stats = self.router.stats()
1489 .map_err(|e| anyhow::anyhow!("Failed to get stats: {}", e))?;
1490 let current = stats.total_bytes;
1491
1492 if current <= self.max_size_bytes {
1493 return Ok(0);
1494 }
1495
1496 let target = self.max_size_bytes * 90 / 100;
1498 let mut freed = 0u64;
1499 let mut current_size = current;
1500
1501 let orphan_freed = self.evict_orphaned_blobs()?;
1503 freed += orphan_freed;
1504 current_size = current_size.saturating_sub(orphan_freed);
1505
1506 if orphan_freed > 0 {
1507 tracing::info!("Evicted orphaned blobs: {} bytes freed", orphan_freed);
1508 }
1509
1510 if current_size <= target {
1512 if freed > 0 {
1513 tracing::info!("Eviction complete: {} bytes freed", freed);
1514 }
1515 return Ok(freed);
1516 }
1517
1518 let evictable = self.get_evictable_trees()?;
1521
1522 for (root_hash, meta) in evictable {
1523 if current_size <= target {
1524 break;
1525 }
1526
1527 let root_hex = to_hex(&root_hash);
1528
1529 if self.is_pinned(&root_hex)? {
1531 continue;
1532 }
1533
1534 let tree_freed = self.unindex_tree(&root_hash)?;
1535 freed += tree_freed;
1536 current_size = current_size.saturating_sub(tree_freed);
1537
1538 tracing::info!(
1539 "Evicted tree {} (owner={}, priority={}, {} bytes)",
1540 &root_hex[..8],
1541 &meta.owner[..8.min(meta.owner.len())],
1542 meta.priority,
1543 tree_freed
1544 );
1545 }
1546
1547 if freed > 0 {
1548 tracing::info!("Eviction complete: {} bytes freed", freed);
1549 }
1550
1551 Ok(freed)
1552 }
1553
1554 fn evict_orphaned_blobs(&self) -> Result<u64> {
1556 let mut freed = 0u64;
1557
1558 let all_hashes = self.router.list()
1560 .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
1561
1562 let rtxn = self.env.read_txn()?;
1564 let pinned: HashSet<String> = self.pins.iter(&rtxn)?
1565 .filter_map(|item| item.ok())
1566 .map(|(hash_hex, _)| hash_hex.to_string())
1567 .collect();
1568
1569 let mut blobs_in_trees: HashSet<Hash> = HashSet::new();
1572 for item in self.blob_trees.iter(&rtxn)? {
1573 if let Ok((key_bytes, _)) = item {
1574 if key_bytes.len() >= 32 {
1575 let blob_hash: Hash = key_bytes[..32].try_into().unwrap();
1576 blobs_in_trees.insert(blob_hash);
1577 }
1578 }
1579 }
1580 drop(rtxn);
1581
1582 for hash in all_hashes {
1584 let hash_hex = to_hex(&hash);
1585
1586 if pinned.contains(&hash_hex) {
1588 continue;
1589 }
1590
1591 if blobs_in_trees.contains(&hash) {
1593 continue;
1594 }
1595
1596 if let Ok(Some(data)) = self.router.get_sync(&hash) {
1598 freed += data.len() as u64;
1599 let _ = self.router.delete_local_only(&hash);
1600 tracing::debug!("Deleted orphaned blob {} ({} bytes)", &hash_hex[..8], data.len());
1601 }
1602 }
1603
1604 Ok(freed)
1605 }
1606
1607 pub fn max_size_bytes(&self) -> u64 {
1609 self.max_size_bytes
1610 }
1611
1612 pub fn storage_by_priority(&self) -> Result<StorageByPriority> {
1614 let rtxn = self.env.read_txn()?;
1615 let mut own = 0u64;
1616 let mut followed = 0u64;
1617 let mut other = 0u64;
1618
1619 for item in self.tree_meta.iter(&rtxn)? {
1620 let (_, bytes) = item?;
1621 let meta: TreeMeta = rmp_serde::from_slice(bytes)
1622 .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
1623
1624 if meta.priority >= PRIORITY_OWN {
1625 own += meta.total_size;
1626 } else if meta.priority >= PRIORITY_FOLLOWED {
1627 followed += meta.total_size;
1628 } else {
1629 other += meta.total_size;
1630 }
1631 }
1632
1633 Ok(StorageByPriority { own, followed, other })
1634 }
1635
1636 pub fn get_storage_stats(&self) -> Result<StorageStats> {
1638 let rtxn = self.env.read_txn()?;
1639 let total_pins = self.pins.len(&rtxn)? as usize;
1640
1641 let stats = self.router.stats()
1642 .map_err(|e| anyhow::anyhow!("Failed to get stats: {}", e))?;
1643
1644 Ok(StorageStats {
1645 total_dags: stats.count,
1646 pinned_dags: total_pins,
1647 total_bytes: stats.total_bytes,
1648 })
1649 }
1650
1651 pub fn get_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<Option<CachedRoot>> {
1655 let key = format!("{}/{}", pubkey_hex, tree_name);
1656 let rtxn = self.env.read_txn()?;
1657 if let Some(bytes) = self.cached_roots.get(&rtxn, &key)? {
1658 let root: CachedRoot = rmp_serde::from_slice(bytes)
1659 .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
1660 Ok(Some(root))
1661 } else {
1662 Ok(None)
1663 }
1664 }
1665
1666 pub fn set_cached_root(
1668 &self,
1669 pubkey_hex: &str,
1670 tree_name: &str,
1671 hash: &str,
1672 key: Option<&str>,
1673 visibility: &str,
1674 updated_at: u64,
1675 ) -> Result<()> {
1676 let db_key = format!("{}/{}", pubkey_hex, tree_name);
1677 let root = CachedRoot {
1678 hash: hash.to_string(),
1679 key: key.map(|k| k.to_string()),
1680 updated_at,
1681 visibility: visibility.to_string(),
1682 };
1683 let bytes = rmp_serde::to_vec(&root)
1684 .map_err(|e| anyhow::anyhow!("Failed to serialize CachedRoot: {}", e))?;
1685 let mut wtxn = self.env.write_txn()?;
1686 self.cached_roots.put(&mut wtxn, &db_key, &bytes)?;
1687 wtxn.commit()?;
1688 Ok(())
1689 }
1690
1691 pub fn list_cached_roots(&self, pubkey_hex: &str) -> Result<Vec<(String, CachedRoot)>> {
1693 let prefix = format!("{}/", pubkey_hex);
1694 let rtxn = self.env.read_txn()?;
1695 let mut results = Vec::new();
1696
1697 for item in self.cached_roots.iter(&rtxn)? {
1698 let (key, bytes) = item?;
1699 if key.starts_with(&prefix) {
1700 let tree_name = key.strip_prefix(&prefix).unwrap_or(key);
1701 let root: CachedRoot = rmp_serde::from_slice(bytes)
1702 .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
1703 results.push((tree_name.to_string(), root));
1704 }
1705 }
1706
1707 Ok(results)
1708 }
1709
1710 pub fn delete_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<bool> {
1712 let key = format!("{}/{}", pubkey_hex, tree_name);
1713 let mut wtxn = self.env.write_txn()?;
1714 let deleted = self.cached_roots.delete(&mut wtxn, &key)?;
1715 wtxn.commit()?;
1716 Ok(deleted)
1717 }
1718
1719 pub fn gc(&self) -> Result<GcStats> {
1721 let rtxn = self.env.read_txn()?;
1722
1723 let pinned: HashSet<String> = self.pins.iter(&rtxn)?
1725 .filter_map(|item| item.ok())
1726 .map(|(hash_hex, _)| hash_hex.to_string())
1727 .collect();
1728
1729 drop(rtxn);
1730
1731 let all_hashes = self.router.list()
1733 .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
1734
1735 let mut deleted = 0;
1737 let mut freed_bytes = 0u64;
1738
1739 for hash in all_hashes {
1740 let hash_hex = to_hex(&hash);
1741 if !pinned.contains(&hash_hex) {
1742 if let Ok(Some(data)) = self.router.get_sync(&hash) {
1743 freed_bytes += data.len() as u64;
1744 let _ = self.router.delete_local_only(&hash);
1746 deleted += 1;
1747 }
1748 }
1749 }
1750
1751 Ok(GcStats {
1752 deleted_dags: deleted,
1753 freed_bytes,
1754 })
1755 }
1756
1757 pub fn verify_lmdb_integrity(&self, delete: bool) -> Result<VerifyResult> {
1760 let all_hashes = self.router.list()
1761 .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
1762
1763 let total = all_hashes.len();
1764 let mut valid = 0;
1765 let mut corrupted = 0;
1766 let mut deleted = 0;
1767 let mut corrupted_hashes = Vec::new();
1768
1769 for hash in &all_hashes {
1770 let hash_hex = to_hex(hash);
1771
1772 match self.router.get_sync(hash) {
1773 Ok(Some(data)) => {
1774 let actual_hash = sha256(&data);
1776
1777 if actual_hash == *hash {
1778 valid += 1;
1779 } else {
1780 corrupted += 1;
1781 let actual_hex = to_hex(&actual_hash);
1782 println!(" CORRUPTED: key={} actual={} size={}",
1783 &hash_hex[..16], &actual_hex[..16], data.len());
1784 corrupted_hashes.push(*hash);
1785 }
1786 }
1787 Ok(None) => {
1788 corrupted += 1;
1790 println!(" MISSING: key={}", &hash_hex[..16]);
1791 corrupted_hashes.push(*hash);
1792 }
1793 Err(e) => {
1794 corrupted += 1;
1795 println!(" ERROR: key={} err={}", &hash_hex[..16], e);
1796 corrupted_hashes.push(*hash);
1797 }
1798 }
1799 }
1800
1801 if delete {
1803 for hash in &corrupted_hashes {
1804 match self.router.delete_sync(hash) {
1805 Ok(true) => deleted += 1,
1806 Ok(false) => {} Err(e) => {
1808 let hash_hex = to_hex(hash);
1809 println!(" Failed to delete {}: {}", &hash_hex[..16], e);
1810 }
1811 }
1812 }
1813 }
1814
1815 Ok(VerifyResult {
1816 total,
1817 valid,
1818 corrupted,
1819 deleted,
1820 })
1821 }
1822
1823 #[cfg(feature = "s3")]
1826 pub async fn verify_r2_integrity(&self, delete: bool) -> Result<VerifyResult> {
1827 use aws_sdk_s3::Client as S3Client;
1828
1829 let config = crate::config::Config::load()?;
1832 let s3_config = config.storage.s3
1833 .ok_or_else(|| anyhow::anyhow!("S3 not configured"))?;
1834
1835 let aws_config = aws_config::from_env()
1837 .region(aws_sdk_s3::config::Region::new(s3_config.region.clone()))
1838 .load()
1839 .await;
1840
1841 let s3_client = S3Client::from_conf(
1842 aws_sdk_s3::config::Builder::from(&aws_config)
1843 .endpoint_url(&s3_config.endpoint)
1844 .force_path_style(true)
1845 .build()
1846 );
1847
1848 let bucket = &s3_config.bucket;
1849 let prefix = s3_config.prefix.as_deref().unwrap_or("");
1850
1851 let mut total = 0;
1852 let mut valid = 0;
1853 let mut corrupted = 0;
1854 let mut deleted = 0;
1855 let mut corrupted_keys = Vec::new();
1856
1857 let mut continuation_token: Option<String> = None;
1859
1860 loop {
1861 let mut list_req = s3_client.list_objects_v2()
1862 .bucket(bucket)
1863 .prefix(prefix);
1864
1865 if let Some(ref token) = continuation_token {
1866 list_req = list_req.continuation_token(token);
1867 }
1868
1869 let list_resp = list_req.send().await
1870 .map_err(|e| anyhow::anyhow!("Failed to list S3 objects: {}", e))?;
1871
1872 for object in list_resp.contents() {
1873 let key = object.key().unwrap_or("");
1874
1875 if !key.ends_with(".bin") {
1877 continue;
1878 }
1879
1880 total += 1;
1881
1882 let filename = key.strip_prefix(prefix).unwrap_or(key);
1884 let expected_hash_hex = filename.strip_suffix(".bin").unwrap_or(filename);
1885
1886 if expected_hash_hex.len() != 64 {
1888 corrupted += 1;
1889 println!(" INVALID KEY: {}", key);
1890 corrupted_keys.push(key.to_string());
1891 continue;
1892 }
1893
1894 let expected_hash = match from_hex(expected_hash_hex) {
1895 Ok(h) => h,
1896 Err(_) => {
1897 corrupted += 1;
1898 println!(" INVALID HEX: {}", key);
1899 corrupted_keys.push(key.to_string());
1900 continue;
1901 }
1902 };
1903
1904 match s3_client.get_object()
1906 .bucket(bucket)
1907 .key(key)
1908 .send()
1909 .await
1910 {
1911 Ok(resp) => {
1912 match resp.body.collect().await {
1913 Ok(bytes) => {
1914 let data = bytes.into_bytes();
1915 let actual_hash = sha256(&data);
1916
1917 if actual_hash == expected_hash {
1918 valid += 1;
1919 } else {
1920 corrupted += 1;
1921 let actual_hex = to_hex(&actual_hash);
1922 println!(" CORRUPTED: key={} actual={} size={}",
1923 &expected_hash_hex[..16], &actual_hex[..16], data.len());
1924 corrupted_keys.push(key.to_string());
1925 }
1926 }
1927 Err(e) => {
1928 corrupted += 1;
1929 println!(" READ ERROR: {} - {}", key, e);
1930 corrupted_keys.push(key.to_string());
1931 }
1932 }
1933 }
1934 Err(e) => {
1935 corrupted += 1;
1936 println!(" FETCH ERROR: {} - {}", key, e);
1937 corrupted_keys.push(key.to_string());
1938 }
1939 }
1940
1941 if total % 100 == 0 {
1943 println!(" Progress: {} objects checked, {} corrupted so far", total, corrupted);
1944 }
1945 }
1946
1947 if list_resp.is_truncated() == Some(true) {
1949 continuation_token = list_resp.next_continuation_token().map(|s| s.to_string());
1950 } else {
1951 break;
1952 }
1953 }
1954
1955 if delete {
1957 for key in &corrupted_keys {
1958 match s3_client.delete_object()
1959 .bucket(bucket)
1960 .key(key)
1961 .send()
1962 .await
1963 {
1964 Ok(_) => deleted += 1,
1965 Err(e) => {
1966 println!(" Failed to delete {}: {}", key, e);
1967 }
1968 }
1969 }
1970 }
1971
1972 Ok(VerifyResult {
1973 total,
1974 valid,
1975 corrupted,
1976 deleted,
1977 })
1978 }
1979
1980 #[cfg(not(feature = "s3"))]
1982 pub async fn verify_r2_integrity(&self, _delete: bool) -> Result<VerifyResult> {
1983 Err(anyhow::anyhow!("S3 feature not enabled"))
1984 }
1985}
1986
1987#[derive(Debug, Clone)]
1989pub struct VerifyResult {
1990 pub total: usize,
1991 pub valid: usize,
1992 pub corrupted: usize,
1993 pub deleted: usize,
1994}
1995
1996#[derive(Debug)]
1997pub struct StorageStats {
1998 pub total_dags: usize,
1999 pub pinned_dags: usize,
2000 pub total_bytes: u64,
2001}
2002
2003#[derive(Debug, Clone)]
2005pub struct StorageByPriority {
2006 pub own: u64,
2008 pub followed: u64,
2010 pub other: u64,
2012}
2013
2014#[derive(Debug, Clone)]
2015pub struct FileChunkMetadata {
2016 pub total_size: u64,
2017 pub chunk_cids: Vec<String>,
2018 pub chunk_sizes: Vec<u64>,
2019 pub is_chunked: bool,
2020}
2021
2022pub struct FileRangeChunksOwned {
2024 store: Arc<HashtreeStore>,
2025 metadata: FileChunkMetadata,
2026 start: u64,
2027 end: u64,
2028 current_chunk_idx: usize,
2029 current_offset: u64,
2030}
2031
2032impl Iterator for FileRangeChunksOwned {
2033 type Item = Result<Vec<u8>>;
2034
2035 fn next(&mut self) -> Option<Self::Item> {
2036 if !self.metadata.is_chunked || self.current_chunk_idx >= self.metadata.chunk_cids.len() {
2037 return None;
2038 }
2039
2040 if self.current_offset > self.end {
2041 return None;
2042 }
2043
2044 let chunk_cid = &self.metadata.chunk_cids[self.current_chunk_idx];
2045 let chunk_size = self.metadata.chunk_sizes[self.current_chunk_idx];
2046 let chunk_end = self.current_offset + chunk_size - 1;
2047
2048 self.current_chunk_idx += 1;
2049
2050 if chunk_end < self.start || self.current_offset > self.end {
2051 self.current_offset += chunk_size;
2052 return self.next();
2053 }
2054
2055 let chunk_content = match self.store.get_chunk(chunk_cid) {
2056 Ok(Some(content)) => content,
2057 Ok(None) => {
2058 return Some(Err(anyhow::anyhow!("Chunk {} not found", chunk_cid)));
2059 }
2060 Err(e) => {
2061 return Some(Err(e));
2062 }
2063 };
2064
2065 let chunk_read_start = if self.current_offset >= self.start {
2066 0
2067 } else {
2068 (self.start - self.current_offset) as usize
2069 };
2070
2071 let chunk_read_end = if chunk_end <= self.end {
2072 chunk_size as usize - 1
2073 } else {
2074 (self.end - self.current_offset) as usize
2075 };
2076
2077 let result = chunk_content[chunk_read_start..=chunk_read_end].to_vec();
2078 self.current_offset += chunk_size;
2079
2080 Some(Ok(result))
2081 }
2082}
2083
2084#[derive(Debug)]
2085pub struct GcStats {
2086 pub deleted_dags: usize,
2087 pub freed_bytes: u64,
2088}
2089
2090#[derive(Debug, Clone)]
2091pub struct DirEntry {
2092 pub name: String,
2093 pub cid: String,
2094 pub is_directory: bool,
2095 pub size: u64,
2096}
2097
2098#[derive(Debug, Clone)]
2099pub struct DirectoryListing {
2100 pub dir_name: String,
2101 pub entries: Vec<DirEntry>,
2102}
2103
2104#[derive(Debug, Clone)]
2105pub struct PinnedItem {
2106 pub cid: String,
2107 pub name: String,
2108 pub is_directory: bool,
2109}
2110
2111#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
2113pub struct BlobMetadata {
2114 pub sha256: String,
2115 pub size: u64,
2116 pub mime_type: String,
2117 pub uploaded: u64,
2118}
2119
2120impl crate::webrtc::ContentStore for HashtreeStore {
2122 fn get(&self, hash_hex: &str) -> Result<Option<Vec<u8>>> {
2123 self.get_chunk(hash_hex)
2124 }
2125}