1use anyhow::{Context, Result};
2use async_trait::async_trait;
3use heed::{Database, EnvOpenOptions};
4use heed::types::*;
5use hashtree_lmdb::LmdbBlobStore;
6use hashtree_core::{
7 HashTree, HashTreeConfig, Cid,
8 sha256, to_hex, from_hex, TreeNode, DirEntry as HashTreeDirEntry,
9 types::Hash,
10};
11use hashtree_core::store::{Store, StoreError};
12use serde::{Deserialize, Serialize};
13use std::path::Path;
14use std::collections::HashSet;
15use std::io::Read;
16use std::sync::Arc;
17use std::time::{SystemTime, UNIX_EPOCH};
18use futures::executor::block_on as sync_block_on;
19
20pub const PRIORITY_OTHER: u8 = 64;
22pub const PRIORITY_FOLLOWED: u8 = 128;
23pub const PRIORITY_OWN: u8 = 255;
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct TreeMeta {
28 pub owner: String,
30 pub name: Option<String>,
32 pub synced_at: u64,
34 pub total_size: u64,
36 pub priority: u8,
38}
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct CachedRoot {
43 pub hash: String,
45 pub key: Option<String>,
47 pub updated_at: u64,
49 pub visibility: String,
51}
52
53#[cfg(feature = "s3")]
54use tokio::sync::mpsc;
55
56use crate::config::S3Config;
57
58#[cfg(feature = "s3")]
60enum S3SyncMessage {
61 Upload { hash: Hash, data: Vec<u8> },
62 Delete { hash: Hash },
63}
64
65pub struct StorageRouter {
70 local: Arc<LmdbBlobStore>,
72 #[cfg(feature = "s3")]
74 s3_client: Option<aws_sdk_s3::Client>,
75 #[cfg(feature = "s3")]
76 s3_bucket: Option<String>,
77 #[cfg(feature = "s3")]
78 s3_prefix: String,
79 #[cfg(feature = "s3")]
81 sync_tx: Option<mpsc::UnboundedSender<S3SyncMessage>>,
82}
83
84impl StorageRouter {
85 pub fn new(local: Arc<LmdbBlobStore>) -> Self {
87 Self {
88 local,
89 #[cfg(feature = "s3")]
90 s3_client: None,
91 #[cfg(feature = "s3")]
92 s3_bucket: None,
93 #[cfg(feature = "s3")]
94 s3_prefix: String::new(),
95 #[cfg(feature = "s3")]
96 sync_tx: None,
97 }
98 }
99
100 #[cfg(feature = "s3")]
102 pub async fn with_s3(local: Arc<LmdbBlobStore>, config: &S3Config) -> Result<Self, anyhow::Error> {
103 use aws_sdk_s3::Client as S3Client;
104
105 let mut aws_config_loader = aws_config::from_env();
107 aws_config_loader = aws_config_loader.region(aws_sdk_s3::config::Region::new(config.region.clone()));
108 let aws_config = aws_config_loader.load().await;
109
110 let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&aws_config);
112 s3_config_builder = s3_config_builder
113 .endpoint_url(&config.endpoint)
114 .force_path_style(true);
115
116 let s3_client = S3Client::from_conf(s3_config_builder.build());
117 let bucket = config.bucket.clone();
118 let prefix = config.prefix.clone().unwrap_or_default();
119
120 let (sync_tx, mut sync_rx) = mpsc::unbounded_channel::<S3SyncMessage>();
122
123 let sync_client = s3_client.clone();
125 let sync_bucket = bucket.clone();
126 let sync_prefix = prefix.clone();
127
128 tokio::spawn(async move {
129 use aws_sdk_s3::primitives::ByteStream;
130
131 tracing::info!("S3 background sync task started");
132
133 let semaphore = std::sync::Arc::new(tokio::sync::Semaphore::new(32));
135 let client = std::sync::Arc::new(sync_client);
136 let bucket = std::sync::Arc::new(sync_bucket);
137 let prefix = std::sync::Arc::new(sync_prefix);
138
139 while let Some(msg) = sync_rx.recv().await {
140 let client = client.clone();
141 let bucket = bucket.clone();
142 let prefix = prefix.clone();
143 let semaphore = semaphore.clone();
144
145 tokio::spawn(async move {
147 let _permit = semaphore.acquire().await;
149
150 match msg {
151 S3SyncMessage::Upload { hash, data } => {
152 let key = format!("{}{}.bin", prefix, to_hex(&hash));
153 tracing::debug!("S3 uploading {} ({} bytes)", &key, data.len());
154
155 match client
156 .put_object()
157 .bucket(bucket.as_str())
158 .key(&key)
159 .body(ByteStream::from(data))
160 .send()
161 .await
162 {
163 Ok(_) => tracing::debug!("S3 upload succeeded: {}", &key),
164 Err(e) => tracing::error!("S3 upload failed {}: {}", &key, e),
165 }
166 }
167 S3SyncMessage::Delete { hash } => {
168 let key = format!("{}{}.bin", prefix, to_hex(&hash));
169 tracing::debug!("S3 deleting {}", &key);
170
171 if let Err(e) = client
172 .delete_object()
173 .bucket(bucket.as_str())
174 .key(&key)
175 .send()
176 .await
177 {
178 tracing::error!("S3 delete failed {}: {}", &key, e);
179 }
180 }
181 }
182 });
183 }
184 });
185
186 tracing::info!("S3 storage initialized: bucket={}, prefix={}", bucket, prefix);
187
188 Ok(Self {
189 local,
190 s3_client: Some(s3_client),
191 s3_bucket: Some(bucket),
192 s3_prefix: prefix,
193 sync_tx: Some(sync_tx),
194 })
195 }
196
197 pub fn put_sync(&self, hash: Hash, data: &[u8]) -> Result<bool, StoreError> {
199 let is_new = self.local.put_sync(hash, data)?;
201
202 #[cfg(feature = "s3")]
205 if let Some(ref tx) = self.sync_tx {
206 tracing::info!("Queueing S3 upload for {} ({} bytes, is_new={})",
207 crate::storage::to_hex(&hash)[..16].to_string(), data.len(), is_new);
208 if let Err(e) = tx.send(S3SyncMessage::Upload { hash, data: data.to_vec() }) {
209 tracing::error!("Failed to queue S3 upload: {}", e);
210 }
211 }
212
213 Ok(is_new)
214 }
215
216 pub fn get_sync(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
218 if let Some(data) = self.local.get_sync(hash)? {
220 return Ok(Some(data));
221 }
222
223 #[cfg(feature = "s3")]
225 if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
226 let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
227
228 match sync_block_on(async {
229 client.get_object()
230 .bucket(bucket)
231 .key(&key)
232 .send()
233 .await
234 }) {
235 Ok(output) => {
236 if let Ok(body) = sync_block_on(output.body.collect()) {
237 let data = body.into_bytes().to_vec();
238 let _ = self.local.put_sync(*hash, &data);
240 return Ok(Some(data));
241 }
242 }
243 Err(e) => {
244 let service_err = e.into_service_error();
245 if !service_err.is_no_such_key() {
246 tracing::warn!("S3 get failed: {}", service_err);
247 }
248 }
249 }
250 }
251
252 Ok(None)
253 }
254
255 pub fn exists(&self, hash: &Hash) -> Result<bool, StoreError> {
257 if self.local.exists(hash)? {
259 return Ok(true);
260 }
261
262 #[cfg(feature = "s3")]
264 if let (Some(ref client), Some(ref bucket)) = (&self.s3_client, &self.s3_bucket) {
265 let key = format!("{}{}.bin", self.s3_prefix, to_hex(hash));
266
267 match sync_block_on(async {
268 client.head_object()
269 .bucket(bucket)
270 .key(&key)
271 .send()
272 .await
273 }) {
274 Ok(_) => return Ok(true),
275 Err(e) => {
276 let service_err = e.into_service_error();
277 if !service_err.is_not_found() {
278 tracing::warn!("S3 head failed: {}", service_err);
279 }
280 }
281 }
282 }
283
284 Ok(false)
285 }
286
287 pub fn delete_sync(&self, hash: &Hash) -> Result<bool, StoreError> {
289 let deleted = self.local.delete_sync(hash)?;
290
291 #[cfg(feature = "s3")]
293 if let Some(ref tx) = self.sync_tx {
294 let _ = tx.send(S3SyncMessage::Delete { hash: *hash });
295 }
296
297 Ok(deleted)
298 }
299
300 pub fn delete_local_only(&self, hash: &Hash) -> Result<bool, StoreError> {
303 self.local.delete_sync(hash)
304 }
305
306 pub fn stats(&self) -> Result<hashtree_lmdb::LmdbStats, StoreError> {
308 self.local.stats()
309 }
310
311 pub fn list(&self) -> Result<Vec<Hash>, StoreError> {
313 self.local.list()
314 }
315
316 pub fn local_store(&self) -> Arc<LmdbBlobStore> {
318 Arc::clone(&self.local)
319 }
320}
321
322#[async_trait]
325impl Store for StorageRouter {
326 async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
327 self.put_sync(hash, &data)
328 }
329
330 async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
331 self.get_sync(hash)
332 }
333
334 async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
335 self.exists(hash)
336 }
337
338 async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
339 self.delete_sync(hash)
340 }
341}
342
343pub struct HashtreeStore {
344 env: heed::Env,
345 pins: Database<Bytes, Unit>,
347 blob_owners: Database<Bytes, Unit>,
349 pubkey_blobs: Database<Bytes, Bytes>,
351 tree_meta: Database<Bytes, Bytes>,
353 blob_trees: Database<Bytes, Unit>,
355 tree_refs: Database<Str, Bytes>,
357 cached_roots: Database<Str, Bytes>,
359 router: Arc<StorageRouter>,
361 max_size_bytes: u64,
363}
364
365impl HashtreeStore {
366 pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
368 Self::with_options(path, None, 10 * 1024 * 1024 * 1024)
369 }
370
371 pub fn with_s3<P: AsRef<Path>>(path: P, s3_config: Option<&S3Config>) -> Result<Self> {
373 Self::with_options(path, s3_config, 10 * 1024 * 1024 * 1024)
374 }
375
376 pub fn with_options<P: AsRef<Path>>(path: P, s3_config: Option<&S3Config>, max_size_bytes: u64) -> Result<Self> {
378 let path = path.as_ref();
379 std::fs::create_dir_all(path)?;
380
381 let env = unsafe {
382 EnvOpenOptions::new()
383 .map_size(10 * 1024 * 1024 * 1024) .max_dbs(8) .open(path)?
386 };
387
388 let mut wtxn = env.write_txn()?;
389 let pins = env.create_database(&mut wtxn, Some("pins"))?;
390 let blob_owners = env.create_database(&mut wtxn, Some("blob_owners"))?;
391 let pubkey_blobs = env.create_database(&mut wtxn, Some("pubkey_blobs"))?;
392 let tree_meta = env.create_database(&mut wtxn, Some("tree_meta"))?;
393 let blob_trees = env.create_database(&mut wtxn, Some("blob_trees"))?;
394 let tree_refs = env.create_database(&mut wtxn, Some("tree_refs"))?;
395 let cached_roots = env.create_database(&mut wtxn, Some("cached_roots"))?;
396 wtxn.commit()?;
397
398 let lmdb_store = Arc::new(LmdbBlobStore::new(path.join("blobs"))
400 .map_err(|e| anyhow::anyhow!("Failed to create blob store: {}", e))?);
401
402 #[cfg(feature = "s3")]
404 let router = Arc::new(if let Some(s3_cfg) = s3_config {
405 tracing::info!("Initializing S3 storage backend: bucket={}, endpoint={}",
406 s3_cfg.bucket, s3_cfg.endpoint);
407
408 sync_block_on(async {
409 StorageRouter::with_s3(lmdb_store, s3_cfg).await
410 })?
411 } else {
412 StorageRouter::new(lmdb_store)
413 });
414
415 #[cfg(not(feature = "s3"))]
416 let router = Arc::new({
417 if s3_config.is_some() {
418 tracing::warn!("S3 config provided but S3 feature not enabled. Using local storage only.");
419 }
420 StorageRouter::new(lmdb_store)
421 });
422
423 Ok(Self {
424 env,
425 pins,
426 blob_owners,
427 pubkey_blobs,
428 tree_meta,
429 blob_trees,
430 tree_refs,
431 cached_roots,
432 router,
433 max_size_bytes,
434 })
435 }
436
437 pub fn router(&self) -> &StorageRouter {
439 &self.router
440 }
441
442 pub fn store_arc(&self) -> Arc<StorageRouter> {
445 Arc::clone(&self.router)
446 }
447
448 pub fn upload_file<P: AsRef<Path>>(&self, file_path: P) -> Result<String> {
450 self.upload_file_internal(file_path, true)
451 }
452
453 pub fn upload_file_no_pin<P: AsRef<Path>>(&self, file_path: P) -> Result<String> {
455 self.upload_file_internal(file_path, false)
456 }
457
458 fn upload_file_internal<P: AsRef<Path>>(&self, file_path: P, pin: bool) -> Result<String> {
459 let file_path = file_path.as_ref();
460 let file_content = std::fs::read(file_path)?;
461
462 let store = self.store_arc();
464 let tree = HashTree::new(HashTreeConfig::new(store).public());
465
466 let (cid, _size) = sync_block_on(async {
467 tree.put(&file_content).await
468 }).context("Failed to store file")?;
469
470 if pin {
472 let mut wtxn = self.env.write_txn()?;
473 self.pins.put(&mut wtxn, cid.hash.as_slice(), &())?;
474 wtxn.commit()?;
475 }
476
477 Ok(to_hex(&cid.hash))
478 }
479
480 pub fn upload_file_stream<R: Read, F>(
482 &self,
483 mut reader: R,
484 _file_name: impl Into<String>,
485 mut callback: F,
486 ) -> Result<String>
487 where
488 F: FnMut(&str),
489 {
490 let mut data = Vec::new();
491 reader.read_to_end(&mut data)?;
492
493 let store = self.store_arc();
495 let tree = HashTree::new(HashTreeConfig::new(store).public());
496
497 let (cid, _size) = sync_block_on(async {
498 tree.put(&data).await
499 }).context("Failed to store file")?;
500
501 let root_hex = to_hex(&cid.hash);
502 callback(&root_hex);
503
504 let mut wtxn = self.env.write_txn()?;
506 self.pins.put(&mut wtxn, cid.hash.as_slice(), &())?;
507 wtxn.commit()?;
508
509 Ok(root_hex)
510 }
511
512 pub fn upload_dir<P: AsRef<Path>>(&self, dir_path: P) -> Result<String> {
515 self.upload_dir_with_options(dir_path, true)
516 }
517
518 pub fn upload_dir_with_options<P: AsRef<Path>>(&self, dir_path: P, respect_gitignore: bool) -> Result<String> {
520 let dir_path = dir_path.as_ref();
521
522 let store = self.store_arc();
523 let tree = HashTree::new(HashTreeConfig::new(store).public());
524
525 let root_cid = sync_block_on(async {
526 self.upload_dir_recursive(&tree, dir_path, dir_path, respect_gitignore).await
527 }).context("Failed to upload directory")?;
528
529 let root_hex = to_hex(&root_cid.hash);
530
531 let mut wtxn = self.env.write_txn()?;
532 self.pins.put(&mut wtxn, root_cid.hash.as_slice(), &())?;
533 wtxn.commit()?;
534
535 Ok(root_hex)
536 }
537
538 async fn upload_dir_recursive<S: Store>(
539 &self,
540 tree: &HashTree<S>,
541 _root_path: &Path,
542 current_path: &Path,
543 respect_gitignore: bool,
544 ) -> Result<Cid> {
545 use ignore::WalkBuilder;
546 use std::collections::HashMap;
547
548 let mut dir_contents: HashMap<String, Vec<(String, Cid)>> = HashMap::new();
550 dir_contents.insert(String::new(), Vec::new()); let walker = WalkBuilder::new(current_path)
553 .git_ignore(respect_gitignore)
554 .git_global(respect_gitignore)
555 .git_exclude(respect_gitignore)
556 .hidden(false)
557 .build();
558
559 for result in walker {
560 let entry = result?;
561 let path = entry.path();
562
563 if path == current_path {
565 continue;
566 }
567
568 let relative = path.strip_prefix(current_path)
569 .unwrap_or(path);
570
571 if path.is_file() {
572 let content = std::fs::read(path)?;
573 let (cid, _size) = tree.put(&content).await
574 .map_err(|e| anyhow::anyhow!("Failed to upload file {}: {}", path.display(), e))?;
575
576 let parent = relative.parent()
578 .map(|p| p.to_string_lossy().to_string())
579 .unwrap_or_default();
580 let name = relative.file_name()
581 .map(|n| n.to_string_lossy().to_string())
582 .unwrap_or_default();
583
584 dir_contents.entry(parent).or_default().push((name, cid));
585 } else if path.is_dir() {
586 let dir_path = relative.to_string_lossy().to_string();
588 dir_contents.entry(dir_path).or_default();
589 }
590 }
591
592 self.build_directory_tree(tree, &mut dir_contents).await
594 }
595
596 async fn build_directory_tree<S: Store>(
597 &self,
598 tree: &HashTree<S>,
599 dir_contents: &mut std::collections::HashMap<String, Vec<(String, Cid)>>,
600 ) -> Result<Cid> {
601 let mut dirs: Vec<String> = dir_contents.keys().cloned().collect();
603 dirs.sort_by(|a, b| {
604 let depth_a = a.matches('/').count() + if a.is_empty() { 0 } else { 1 };
605 let depth_b = b.matches('/').count() + if b.is_empty() { 0 } else { 1 };
606 depth_b.cmp(&depth_a) });
608
609 let mut dir_cids: std::collections::HashMap<String, Cid> = std::collections::HashMap::new();
610
611 for dir_path in dirs {
612 let files = dir_contents.get(&dir_path).cloned().unwrap_or_default();
613
614 let mut entries: Vec<HashTreeDirEntry> = files.into_iter()
615 .map(|(name, cid)| HashTreeDirEntry::from_cid(name, &cid))
616 .collect();
617
618 for (subdir_path, cid) in &dir_cids {
620 let parent = std::path::Path::new(subdir_path)
621 .parent()
622 .map(|p| p.to_string_lossy().to_string())
623 .unwrap_or_default();
624
625 if parent == dir_path {
626 let name = std::path::Path::new(subdir_path)
627 .file_name()
628 .map(|n| n.to_string_lossy().to_string())
629 .unwrap_or_default();
630 entries.push(HashTreeDirEntry::from_cid(name, cid));
631 }
632 }
633
634 let cid = tree.put_directory(entries).await
635 .map_err(|e| anyhow::anyhow!("Failed to create directory node: {}", e))?;
636
637 dir_cids.insert(dir_path, cid);
638 }
639
640 dir_cids.get("")
642 .cloned()
643 .ok_or_else(|| anyhow::anyhow!("No root directory"))
644 }
645
646 pub fn upload_file_encrypted<P: AsRef<Path>>(&self, file_path: P) -> Result<String> {
648 let file_path = file_path.as_ref();
649 let file_content = std::fs::read(file_path)?;
650
651 let store = self.store_arc();
653 let tree = HashTree::new(HashTreeConfig::new(store));
654
655 let (cid, _size) = sync_block_on(async {
656 tree.put(&file_content).await
657 }).map_err(|e| anyhow::anyhow!("Failed to encrypt file: {}", e))?;
658
659 let cid_str = cid.to_string();
660
661 let mut wtxn = self.env.write_txn()?;
662 self.pins.put(&mut wtxn, cid.hash.as_slice(), &())?;
663 wtxn.commit()?;
664
665 Ok(cid_str)
666 }
667
668 pub fn upload_dir_encrypted<P: AsRef<Path>>(&self, dir_path: P) -> Result<String> {
671 self.upload_dir_encrypted_with_options(dir_path, true)
672 }
673
674 pub fn upload_dir_encrypted_with_options<P: AsRef<Path>>(&self, dir_path: P, respect_gitignore: bool) -> Result<String> {
677 let dir_path = dir_path.as_ref();
678 let store = self.store_arc();
679
680 let tree = HashTree::new(HashTreeConfig::new(store));
682
683 let root_cid = sync_block_on(async {
684 self.upload_dir_recursive(&tree, dir_path, dir_path, respect_gitignore).await
685 }).context("Failed to upload encrypted directory")?;
686
687 let cid_str = root_cid.to_string(); let mut wtxn = self.env.write_txn()?;
690 self.pins.put(&mut wtxn, root_cid.hash.as_slice(), &())?;
692 wtxn.commit()?;
693
694 Ok(cid_str)
695 }
696
697 pub fn get_tree_node(&self, hash: &[u8; 32]) -> Result<Option<TreeNode>> {
699 let store = self.store_arc();
700 let tree = HashTree::new(HashTreeConfig::new(store).public());
701
702 sync_block_on(async {
703 tree.get_tree_node(hash).await
704 .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))
705 })
706 }
707
708 pub fn put_blob(&self, data: &[u8]) -> Result<String> {
710 let hash = sha256(data);
711 self.router.put_sync(hash, data)
712 .map_err(|e| anyhow::anyhow!("Failed to store blob: {}", e))?;
713 Ok(to_hex(&hash))
714 }
715
716 pub fn get_blob(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
718 self.router.get_sync(hash)
719 .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))
720 }
721
722 pub fn blob_exists(&self, hash: &[u8; 32]) -> Result<bool> {
724 self.router.exists(hash)
725 .map_err(|e| anyhow::anyhow!("Failed to check blob: {}", e))
726 }
727
728 fn blob_owner_key(sha256: &[u8; 32], pubkey: &[u8; 32]) -> [u8; 64] {
734 let mut key = [0u8; 64];
735 key[..32].copy_from_slice(sha256);
736 key[32..].copy_from_slice(pubkey);
737 key
738 }
739
740 pub fn set_blob_owner(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<()> {
743 let key = Self::blob_owner_key(sha256, pubkey);
744 let mut wtxn = self.env.write_txn()?;
745
746 self.blob_owners.put(&mut wtxn, &key[..], &())?;
748
749 let sha256_hex = to_hex(sha256);
751
752 let mut blobs: Vec<BlobMetadata> = self
754 .pubkey_blobs
755 .get(&wtxn, pubkey)?
756 .and_then(|b| serde_json::from_slice(b).ok())
757 .unwrap_or_default();
758
759 if !blobs.iter().any(|b| b.sha256 == sha256_hex) {
761 let now = SystemTime::now()
762 .duration_since(UNIX_EPOCH)
763 .unwrap()
764 .as_secs();
765
766 let size = self
768 .get_blob(sha256)?
769 .map(|data| data.len() as u64)
770 .unwrap_or(0);
771
772 blobs.push(BlobMetadata {
773 sha256: sha256_hex,
774 size,
775 mime_type: "application/octet-stream".to_string(),
776 uploaded: now,
777 });
778
779 let blobs_json = serde_json::to_vec(&blobs)?;
780 self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
781 }
782
783 wtxn.commit()?;
784 Ok(())
785 }
786
787 pub fn is_blob_owner(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<bool> {
789 let key = Self::blob_owner_key(sha256, pubkey);
790 let rtxn = self.env.read_txn()?;
791 Ok(self.blob_owners.get(&rtxn, &key[..])?.is_some())
792 }
793
794 pub fn get_blob_owners(&self, sha256: &[u8; 32]) -> Result<Vec<[u8; 32]>> {
796 let rtxn = self.env.read_txn()?;
797
798 let mut owners = Vec::new();
799 for item in self.blob_owners.prefix_iter(&rtxn, &sha256[..])? {
800 let (key, _) = item?;
801 if key.len() == 64 {
802 let mut pubkey = [0u8; 32];
804 pubkey.copy_from_slice(&key[32..64]);
805 owners.push(pubkey);
806 }
807 }
808 Ok(owners)
809 }
810
811 pub fn blob_has_owners(&self, sha256: &[u8; 32]) -> Result<bool> {
813 let rtxn = self.env.read_txn()?;
814
815 for item in self.blob_owners.prefix_iter(&rtxn, &sha256[..])? {
817 if item.is_ok() {
818 return Ok(true);
819 }
820 }
821 Ok(false)
822 }
823
824 pub fn get_blob_owner(&self, sha256: &[u8; 32]) -> Result<Option<[u8; 32]>> {
826 Ok(self.get_blob_owners(sha256)?.into_iter().next())
827 }
828
829 pub fn delete_blossom_blob(&self, sha256: &[u8; 32], pubkey: &[u8; 32]) -> Result<bool> {
833 let key = Self::blob_owner_key(sha256, pubkey);
834 let mut wtxn = self.env.write_txn()?;
835
836 self.blob_owners.delete(&mut wtxn, &key[..])?;
838
839 let sha256_hex = to_hex(sha256);
841
842 if let Some(blobs_bytes) = self.pubkey_blobs.get(&wtxn, pubkey)? {
844 if let Ok(mut blobs) = serde_json::from_slice::<Vec<BlobMetadata>>(blobs_bytes) {
845 blobs.retain(|b| b.sha256 != sha256_hex);
846 let blobs_json = serde_json::to_vec(&blobs)?;
847 self.pubkey_blobs.put(&mut wtxn, pubkey, &blobs_json)?;
848 }
849 }
850
851 let mut has_other_owners = false;
853 for item in self.blob_owners.prefix_iter(&wtxn, &sha256[..])? {
854 if item.is_ok() {
855 has_other_owners = true;
856 break;
857 }
858 }
859
860 if has_other_owners {
861 wtxn.commit()?;
862 tracing::debug!(
863 "Removed {} from blob {} owners, other owners remain",
864 &to_hex(pubkey)[..8],
865 &sha256_hex[..8]
866 );
867 return Ok(false);
868 }
869
870 tracing::info!(
872 "All owners removed from blob {}, deleting",
873 &sha256_hex[..8]
874 );
875
876 let _ = self.router.delete_sync(sha256);
878
879 wtxn.commit()?;
880 Ok(true)
881 }
882
883 pub fn list_blobs_by_pubkey(&self, pubkey: &[u8; 32]) -> Result<Vec<crate::server::blossom::BlobDescriptor>> {
885 let rtxn = self.env.read_txn()?;
886
887 let blobs: Vec<BlobMetadata> = self
888 .pubkey_blobs
889 .get(&rtxn, pubkey)?
890 .and_then(|b| serde_json::from_slice(b).ok())
891 .unwrap_or_default();
892
893 Ok(blobs
894 .into_iter()
895 .map(|b| crate::server::blossom::BlobDescriptor {
896 url: format!("/{}", b.sha256),
897 sha256: b.sha256,
898 size: b.size,
899 mime_type: b.mime_type,
900 uploaded: b.uploaded,
901 })
902 .collect())
903 }
904
905 pub fn get_chunk(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
907 self.router.get_sync(hash)
908 .map_err(|e| anyhow::anyhow!("Failed to get chunk: {}", e))
909 }
910
911 pub fn get_file(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
914 let store = self.store_arc();
915 let tree = HashTree::new(HashTreeConfig::new(store).public());
916
917 sync_block_on(async {
918 tree.read_file(hash).await
919 .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
920 })
921 }
922
923 pub fn get_file_by_cid(&self, cid: &Cid) -> Result<Option<Vec<u8>>> {
926 let store = self.store_arc();
927 let tree = HashTree::new(HashTreeConfig::new(store).public());
928
929 sync_block_on(async {
930 tree.get(cid).await
931 .map_err(|e| anyhow::anyhow!("Failed to read file: {}", e))
932 })
933 }
934
935 pub fn get_file_chunk_metadata(&self, hash: &[u8; 32]) -> Result<Option<FileChunkMetadata>> {
937 let store = self.store_arc();
938 let tree = HashTree::new(HashTreeConfig::new(store.clone()).public());
939
940 sync_block_on(async {
941 let exists = store.has(&hash).await
944 .map_err(|e| anyhow::anyhow!("Failed to check existence: {}", e))?;
945
946 if !exists {
947 return Ok(None);
948 }
949
950 let total_size = tree.get_size(&hash).await
952 .map_err(|e| anyhow::anyhow!("Failed to get size: {}", e))?;
953
954 let is_tree_node = tree.is_tree(&hash).await
956 .map_err(|e| anyhow::anyhow!("Failed to check tree: {}", e))?;
957
958 if !is_tree_node {
959 return Ok(Some(FileChunkMetadata {
961 total_size,
962 chunk_hashes: vec![],
963 chunk_sizes: vec![],
964 is_chunked: false,
965 }));
966 }
967
968 let node = match tree.get_tree_node(&hash).await
970 .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))? {
971 Some(n) => n,
972 None => return Ok(None),
973 };
974
975 let is_directory = tree.is_directory(&hash).await
977 .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
978
979 if is_directory {
980 return Ok(None); }
982
983 let chunk_hashes: Vec<Hash> = node.links.iter().map(|l| l.hash).collect();
985 let chunk_sizes: Vec<u64> = node.links.iter().map(|l| l.size).collect();
986
987 Ok(Some(FileChunkMetadata {
988 total_size,
989 chunk_hashes,
990 chunk_sizes,
991 is_chunked: !node.links.is_empty(),
992 }))
993 })
994 }
995
996 pub fn get_file_range(&self, hash: &[u8; 32], start: u64, end: Option<u64>) -> Result<Option<(Vec<u8>, u64)>> {
998 let metadata = match self.get_file_chunk_metadata(hash)? {
999 Some(m) => m,
1000 None => return Ok(None),
1001 };
1002
1003 if metadata.total_size == 0 {
1004 return Ok(Some((Vec::new(), 0)));
1005 }
1006
1007 if start >= metadata.total_size {
1008 return Ok(None);
1009 }
1010
1011 let end = end.unwrap_or(metadata.total_size - 1).min(metadata.total_size - 1);
1012
1013 if !metadata.is_chunked {
1015 let content = self.get_file(hash)?.unwrap_or_default();
1016 let range_content = if start < content.len() as u64 {
1017 content[start as usize..=(end as usize).min(content.len() - 1)].to_vec()
1018 } else {
1019 Vec::new()
1020 };
1021 return Ok(Some((range_content, metadata.total_size)));
1022 }
1023
1024 let mut result = Vec::new();
1026 let mut current_offset = 0u64;
1027
1028 for (i, chunk_hash) in metadata.chunk_hashes.iter().enumerate() {
1029 let chunk_size = metadata.chunk_sizes[i];
1030 let chunk_end = current_offset + chunk_size - 1;
1031
1032 if chunk_end >= start && current_offset <= end {
1034 let chunk_content = match self.get_chunk(chunk_hash)? {
1035 Some(content) => content,
1036 None => {
1037 return Err(anyhow::anyhow!("Chunk {} not found", to_hex(chunk_hash)));
1038 }
1039 };
1040
1041 let chunk_read_start = if current_offset >= start {
1042 0
1043 } else {
1044 (start - current_offset) as usize
1045 };
1046
1047 let chunk_read_end = if chunk_end <= end {
1048 chunk_size as usize - 1
1049 } else {
1050 (end - current_offset) as usize
1051 };
1052
1053 result.extend_from_slice(&chunk_content[chunk_read_start..=chunk_read_end]);
1054 }
1055
1056 current_offset += chunk_size;
1057
1058 if current_offset > end {
1059 break;
1060 }
1061 }
1062
1063 Ok(Some((result, metadata.total_size)))
1064 }
1065
1066 pub fn stream_file_range_chunks_owned(
1068 self: Arc<Self>,
1069 hash: &[u8; 32],
1070 start: u64,
1071 end: u64,
1072 ) -> Result<Option<FileRangeChunksOwned>> {
1073 let metadata = match self.get_file_chunk_metadata(hash)? {
1074 Some(m) => m,
1075 None => return Ok(None),
1076 };
1077
1078 if metadata.total_size == 0 || start >= metadata.total_size {
1079 return Ok(None);
1080 }
1081
1082 let end = end.min(metadata.total_size - 1);
1083
1084 Ok(Some(FileRangeChunksOwned {
1085 store: self,
1086 metadata,
1087 start,
1088 end,
1089 current_chunk_idx: 0,
1090 current_offset: 0,
1091 }))
1092 }
1093
1094 pub fn get_directory_listing(&self, hash: &[u8; 32]) -> Result<Option<DirectoryListing>> {
1096 let store = self.store_arc();
1097 let tree = HashTree::new(HashTreeConfig::new(store).public());
1098
1099 sync_block_on(async {
1100 let is_dir = tree.is_directory(&hash).await
1102 .map_err(|e| anyhow::anyhow!("Failed to check directory: {}", e))?;
1103
1104 if !is_dir {
1105 return Ok(None);
1106 }
1107
1108 let cid = hashtree_core::Cid::public(*hash);
1110 let tree_entries = tree.list_directory(&cid).await
1111 .map_err(|e| anyhow::anyhow!("Failed to list directory: {}", e))?;
1112
1113 let entries: Vec<DirEntry> = tree_entries.into_iter().map(|e| DirEntry {
1114 name: e.name,
1115 cid: to_hex(&e.hash),
1116 is_directory: e.link_type.is_tree(),
1117 size: e.size,
1118 }).collect();
1119
1120 Ok(Some(DirectoryListing {
1121 dir_name: String::new(),
1122 entries,
1123 }))
1124 })
1125 }
1126
1127 pub fn pin(&self, hash: &[u8; 32]) -> Result<()> {
1129 let mut wtxn = self.env.write_txn()?;
1130 self.pins.put(&mut wtxn, hash.as_slice(), &())?;
1131 wtxn.commit()?;
1132 Ok(())
1133 }
1134
1135 pub fn unpin(&self, hash: &[u8; 32]) -> Result<()> {
1137 let mut wtxn = self.env.write_txn()?;
1138 self.pins.delete(&mut wtxn, hash.as_slice())?;
1139 wtxn.commit()?;
1140 Ok(())
1141 }
1142
1143 pub fn is_pinned(&self, hash: &[u8; 32]) -> Result<bool> {
1145 let rtxn = self.env.read_txn()?;
1146 Ok(self.pins.get(&rtxn, hash.as_slice())?.is_some())
1147 }
1148
1149 pub fn list_pins_raw(&self) -> Result<Vec<[u8; 32]>> {
1151 let rtxn = self.env.read_txn()?;
1152 let mut pins = Vec::new();
1153
1154 for item in self.pins.iter(&rtxn)? {
1155 let (hash_bytes, _) = item?;
1156 if hash_bytes.len() == 32 {
1157 let mut hash = [0u8; 32];
1158 hash.copy_from_slice(hash_bytes);
1159 pins.push(hash);
1160 }
1161 }
1162
1163 Ok(pins)
1164 }
1165
1166 pub fn list_pins_with_names(&self) -> Result<Vec<PinnedItem>> {
1168 let rtxn = self.env.read_txn()?;
1169 let store = self.store_arc();
1170 let tree = HashTree::new(HashTreeConfig::new(store).public());
1171 let mut pins = Vec::new();
1172
1173 for item in self.pins.iter(&rtxn)? {
1174 let (hash_bytes, _) = item?;
1175 if hash_bytes.len() != 32 {
1176 continue;
1177 }
1178 let mut hash = [0u8; 32];
1179 hash.copy_from_slice(hash_bytes);
1180
1181 let is_directory = sync_block_on(async {
1183 tree.is_directory(&hash).await.unwrap_or(false)
1184 });
1185
1186 pins.push(PinnedItem {
1187 cid: to_hex(&hash),
1188 name: "Unknown".to_string(),
1189 is_directory,
1190 });
1191 }
1192
1193 Ok(pins)
1194 }
1195
1196 pub fn index_tree(
1203 &self,
1204 root_hash: &Hash,
1205 owner: &str,
1206 name: Option<&str>,
1207 priority: u8,
1208 ref_key: Option<&str>,
1209 ) -> Result<()> {
1210 let root_hex = to_hex(root_hash);
1211
1212 if let Some(key) = ref_key {
1214 let rtxn = self.env.read_txn()?;
1215 if let Some(old_hash_bytes) = self.tree_refs.get(&rtxn, key)? {
1216 if old_hash_bytes != root_hash.as_slice() {
1217 let old_hash: Hash = old_hash_bytes.try_into()
1218 .map_err(|_| anyhow::anyhow!("Invalid hash in tree_refs"))?;
1219 drop(rtxn);
1220 let _ = self.unindex_tree(&old_hash);
1222 tracing::debug!("Replaced old tree for ref {}", key);
1223 }
1224 }
1225 }
1226
1227 let store = self.store_arc();
1228 let tree = HashTree::new(HashTreeConfig::new(store).public());
1229
1230 let (blob_hashes, total_size) = sync_block_on(async {
1232 self.collect_tree_blobs(&tree, root_hash).await
1233 })?;
1234
1235 let mut wtxn = self.env.write_txn()?;
1236
1237 for blob_hash in &blob_hashes {
1239 let mut key = [0u8; 64];
1240 key[..32].copy_from_slice(blob_hash);
1241 key[32..].copy_from_slice(root_hash);
1242 self.blob_trees.put(&mut wtxn, &key[..], &())?;
1243 }
1244
1245 let meta = TreeMeta {
1247 owner: owner.to_string(),
1248 name: name.map(|s| s.to_string()),
1249 synced_at: SystemTime::now()
1250 .duration_since(UNIX_EPOCH)
1251 .unwrap()
1252 .as_secs(),
1253 total_size,
1254 priority,
1255 };
1256 let meta_bytes = rmp_serde::to_vec(&meta)
1257 .map_err(|e| anyhow::anyhow!("Failed to serialize TreeMeta: {}", e))?;
1258 self.tree_meta.put(&mut wtxn, root_hash.as_slice(), &meta_bytes)?;
1259
1260 if let Some(key) = ref_key {
1262 self.tree_refs.put(&mut wtxn, key, root_hash.as_slice())?;
1263 }
1264
1265 wtxn.commit()?;
1266
1267 tracing::debug!(
1268 "Indexed tree {} ({} blobs, {} bytes, priority {})",
1269 &root_hex[..8],
1270 blob_hashes.len(),
1271 total_size,
1272 priority
1273 );
1274
1275 Ok(())
1276 }
1277
1278 async fn collect_tree_blobs<S: Store>(
1280 &self,
1281 tree: &HashTree<S>,
1282 root: &Hash,
1283 ) -> Result<(Vec<Hash>, u64)> {
1284 let mut blobs = Vec::new();
1285 let mut total_size = 0u64;
1286 let mut stack = vec![*root];
1287
1288 while let Some(hash) = stack.pop() {
1289 let is_tree = tree.is_tree(&hash).await
1291 .map_err(|e| anyhow::anyhow!("Failed to check tree: {}", e))?;
1292
1293 if is_tree {
1294 if let Some(node) = tree.get_tree_node(&hash).await
1296 .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
1297 {
1298 for link in &node.links {
1299 stack.push(link.hash);
1300 }
1301 }
1302 } else {
1303 if let Some(data) = self.router.get_sync(&hash)
1305 .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))?
1306 {
1307 total_size += data.len() as u64;
1308 blobs.push(hash);
1309 }
1310 }
1311 }
1312
1313 Ok((blobs, total_size))
1314 }
1315
1316 pub fn unindex_tree(&self, root_hash: &Hash) -> Result<u64> {
1319 let root_hex = to_hex(root_hash);
1320
1321 let store = self.store_arc();
1322 let tree = HashTree::new(HashTreeConfig::new(store).public());
1323
1324 let (blob_hashes, _) = sync_block_on(async {
1326 self.collect_tree_blobs(&tree, root_hash).await
1327 })?;
1328
1329 let mut wtxn = self.env.write_txn()?;
1330 let mut freed = 0u64;
1331
1332 for blob_hash in &blob_hashes {
1334 let mut key = [0u8; 64];
1336 key[..32].copy_from_slice(blob_hash);
1337 key[32..].copy_from_slice(root_hash);
1338 self.blob_trees.delete(&mut wtxn, &key[..])?;
1339
1340 let rtxn = self.env.read_txn()?;
1342 let mut has_other_tree = false;
1343
1344 for item in self.blob_trees.prefix_iter(&rtxn, &blob_hash[..])? {
1345 if item.is_ok() {
1346 has_other_tree = true;
1347 break;
1348 }
1349 }
1350 drop(rtxn);
1351
1352 if !has_other_tree {
1354 if let Some(data) = self.router.get_sync(blob_hash)
1355 .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))?
1356 {
1357 freed += data.len() as u64;
1358 self.router.delete_local_only(blob_hash)
1360 .map_err(|e| anyhow::anyhow!("Failed to delete blob: {}", e))?;
1361 }
1362 }
1363 }
1364
1365 if let Some(data) = self.router.get_sync(root_hash)
1367 .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
1368 {
1369 freed += data.len() as u64;
1370 self.router.delete_local_only(root_hash)
1372 .map_err(|e| anyhow::anyhow!("Failed to delete tree node: {}", e))?;
1373 }
1374
1375 self.tree_meta.delete(&mut wtxn, root_hash.as_slice())?;
1377
1378 wtxn.commit()?;
1379
1380 tracing::debug!(
1381 "Unindexed tree {} ({} bytes freed)",
1382 &root_hex[..8],
1383 freed
1384 );
1385
1386 Ok(freed)
1387 }
1388
1389 pub fn get_tree_meta(&self, root_hash: &Hash) -> Result<Option<TreeMeta>> {
1391 let rtxn = self.env.read_txn()?;
1392 if let Some(bytes) = self.tree_meta.get(&rtxn, root_hash.as_slice())? {
1393 let meta: TreeMeta = rmp_serde::from_slice(bytes)
1394 .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
1395 Ok(Some(meta))
1396 } else {
1397 Ok(None)
1398 }
1399 }
1400
1401 pub fn list_indexed_trees(&self) -> Result<Vec<(Hash, TreeMeta)>> {
1403 let rtxn = self.env.read_txn()?;
1404 let mut trees = Vec::new();
1405
1406 for item in self.tree_meta.iter(&rtxn)? {
1407 let (hash_bytes, meta_bytes) = item?;
1408 let hash: Hash = hash_bytes.try_into()
1409 .map_err(|_| anyhow::anyhow!("Invalid hash in tree_meta"))?;
1410 let meta: TreeMeta = rmp_serde::from_slice(meta_bytes)
1411 .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
1412 trees.push((hash, meta));
1413 }
1414
1415 Ok(trees)
1416 }
1417
1418 pub fn tracked_size(&self) -> Result<u64> {
1420 let rtxn = self.env.read_txn()?;
1421 let mut total = 0u64;
1422
1423 for item in self.tree_meta.iter(&rtxn)? {
1424 let (_, bytes) = item?;
1425 let meta: TreeMeta = rmp_serde::from_slice(bytes)
1426 .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
1427 total += meta.total_size;
1428 }
1429
1430 Ok(total)
1431 }
1432
1433 fn get_evictable_trees(&self) -> Result<Vec<(Hash, TreeMeta)>> {
1435 let mut trees = self.list_indexed_trees()?;
1436
1437 trees.sort_by(|a, b| {
1439 match a.1.priority.cmp(&b.1.priority) {
1440 std::cmp::Ordering::Equal => a.1.synced_at.cmp(&b.1.synced_at),
1441 other => other,
1442 }
1443 });
1444
1445 Ok(trees)
1446 }
1447
1448 pub fn evict_if_needed(&self) -> Result<u64> {
1455 let stats = self.router.stats()
1457 .map_err(|e| anyhow::anyhow!("Failed to get stats: {}", e))?;
1458 let current = stats.total_bytes;
1459
1460 if current <= self.max_size_bytes {
1461 return Ok(0);
1462 }
1463
1464 let target = self.max_size_bytes * 90 / 100;
1466 let mut freed = 0u64;
1467 let mut current_size = current;
1468
1469 let orphan_freed = self.evict_orphaned_blobs()?;
1471 freed += orphan_freed;
1472 current_size = current_size.saturating_sub(orphan_freed);
1473
1474 if orphan_freed > 0 {
1475 tracing::info!("Evicted orphaned blobs: {} bytes freed", orphan_freed);
1476 }
1477
1478 if current_size <= target {
1480 if freed > 0 {
1481 tracing::info!("Eviction complete: {} bytes freed", freed);
1482 }
1483 return Ok(freed);
1484 }
1485
1486 let evictable = self.get_evictable_trees()?;
1489
1490 for (root_hash, meta) in evictable {
1491 if current_size <= target {
1492 break;
1493 }
1494
1495 let root_hex = to_hex(&root_hash);
1496
1497 if self.is_pinned(&root_hash)? {
1499 continue;
1500 }
1501
1502 let tree_freed = self.unindex_tree(&root_hash)?;
1503 freed += tree_freed;
1504 current_size = current_size.saturating_sub(tree_freed);
1505
1506 tracing::info!(
1507 "Evicted tree {} (owner={}, priority={}, {} bytes)",
1508 &root_hex[..8],
1509 &meta.owner[..8.min(meta.owner.len())],
1510 meta.priority,
1511 tree_freed
1512 );
1513 }
1514
1515 if freed > 0 {
1516 tracing::info!("Eviction complete: {} bytes freed", freed);
1517 }
1518
1519 Ok(freed)
1520 }
1521
1522 fn evict_orphaned_blobs(&self) -> Result<u64> {
1524 let mut freed = 0u64;
1525
1526 let all_hashes = self.router.list()
1528 .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
1529
1530 let rtxn = self.env.read_txn()?;
1532 let pinned: HashSet<Hash> = self.pins.iter(&rtxn)?
1533 .filter_map(|item| item.ok())
1534 .filter_map(|(hash_bytes, _)| {
1535 if hash_bytes.len() == 32 {
1536 let mut hash = [0u8; 32];
1537 hash.copy_from_slice(hash_bytes);
1538 Some(hash)
1539 } else {
1540 None
1541 }
1542 })
1543 .collect();
1544
1545 let mut blobs_in_trees: HashSet<Hash> = HashSet::new();
1548 for item in self.blob_trees.iter(&rtxn)? {
1549 if let Ok((key_bytes, _)) = item {
1550 if key_bytes.len() >= 32 {
1551 let blob_hash: Hash = key_bytes[..32].try_into().unwrap();
1552 blobs_in_trees.insert(blob_hash);
1553 }
1554 }
1555 }
1556 drop(rtxn);
1557
1558 for hash in all_hashes {
1560 if pinned.contains(&hash) {
1562 continue;
1563 }
1564
1565 if blobs_in_trees.contains(&hash) {
1567 continue;
1568 }
1569
1570 if let Ok(Some(data)) = self.router.get_sync(&hash) {
1572 freed += data.len() as u64;
1573 let _ = self.router.delete_local_only(&hash);
1574 tracing::debug!("Deleted orphaned blob {} ({} bytes)", &to_hex(&hash)[..8], data.len());
1575 }
1576 }
1577
1578 Ok(freed)
1579 }
1580
1581 pub fn max_size_bytes(&self) -> u64 {
1583 self.max_size_bytes
1584 }
1585
1586 pub fn storage_by_priority(&self) -> Result<StorageByPriority> {
1588 let rtxn = self.env.read_txn()?;
1589 let mut own = 0u64;
1590 let mut followed = 0u64;
1591 let mut other = 0u64;
1592
1593 for item in self.tree_meta.iter(&rtxn)? {
1594 let (_, bytes) = item?;
1595 let meta: TreeMeta = rmp_serde::from_slice(bytes)
1596 .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
1597
1598 if meta.priority >= PRIORITY_OWN {
1599 own += meta.total_size;
1600 } else if meta.priority >= PRIORITY_FOLLOWED {
1601 followed += meta.total_size;
1602 } else {
1603 other += meta.total_size;
1604 }
1605 }
1606
1607 Ok(StorageByPriority { own, followed, other })
1608 }
1609
1610 pub fn get_storage_stats(&self) -> Result<StorageStats> {
1612 let rtxn = self.env.read_txn()?;
1613 let total_pins = self.pins.len(&rtxn)? as usize;
1614
1615 let stats = self.router.stats()
1616 .map_err(|e| anyhow::anyhow!("Failed to get stats: {}", e))?;
1617
1618 Ok(StorageStats {
1619 total_dags: stats.count,
1620 pinned_dags: total_pins,
1621 total_bytes: stats.total_bytes,
1622 })
1623 }
1624
1625 pub fn get_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<Option<CachedRoot>> {
1629 let key = format!("{}/{}", pubkey_hex, tree_name);
1630 let rtxn = self.env.read_txn()?;
1631 if let Some(bytes) = self.cached_roots.get(&rtxn, &key)? {
1632 let root: CachedRoot = rmp_serde::from_slice(bytes)
1633 .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
1634 Ok(Some(root))
1635 } else {
1636 Ok(None)
1637 }
1638 }
1639
1640 pub fn set_cached_root(
1642 &self,
1643 pubkey_hex: &str,
1644 tree_name: &str,
1645 hash: &str,
1646 key: Option<&str>,
1647 visibility: &str,
1648 updated_at: u64,
1649 ) -> Result<()> {
1650 let db_key = format!("{}/{}", pubkey_hex, tree_name);
1651 let root = CachedRoot {
1652 hash: hash.to_string(),
1653 key: key.map(|k| k.to_string()),
1654 updated_at,
1655 visibility: visibility.to_string(),
1656 };
1657 let bytes = rmp_serde::to_vec(&root)
1658 .map_err(|e| anyhow::anyhow!("Failed to serialize CachedRoot: {}", e))?;
1659 let mut wtxn = self.env.write_txn()?;
1660 self.cached_roots.put(&mut wtxn, &db_key, &bytes)?;
1661 wtxn.commit()?;
1662 Ok(())
1663 }
1664
1665 pub fn list_cached_roots(&self, pubkey_hex: &str) -> Result<Vec<(String, CachedRoot)>> {
1667 let prefix = format!("{}/", pubkey_hex);
1668 let rtxn = self.env.read_txn()?;
1669 let mut results = Vec::new();
1670
1671 for item in self.cached_roots.iter(&rtxn)? {
1672 let (key, bytes) = item?;
1673 if key.starts_with(&prefix) {
1674 let tree_name = key.strip_prefix(&prefix).unwrap_or(key);
1675 let root: CachedRoot = rmp_serde::from_slice(bytes)
1676 .map_err(|e| anyhow::anyhow!("Failed to deserialize CachedRoot: {}", e))?;
1677 results.push((tree_name.to_string(), root));
1678 }
1679 }
1680
1681 Ok(results)
1682 }
1683
1684 pub fn delete_cached_root(&self, pubkey_hex: &str, tree_name: &str) -> Result<bool> {
1686 let key = format!("{}/{}", pubkey_hex, tree_name);
1687 let mut wtxn = self.env.write_txn()?;
1688 let deleted = self.cached_roots.delete(&mut wtxn, &key)?;
1689 wtxn.commit()?;
1690 Ok(deleted)
1691 }
1692
1693 pub fn gc(&self) -> Result<GcStats> {
1695 let rtxn = self.env.read_txn()?;
1696
1697 let pinned: HashSet<Hash> = self.pins.iter(&rtxn)?
1699 .filter_map(|item| item.ok())
1700 .filter_map(|(hash_bytes, _)| {
1701 if hash_bytes.len() == 32 {
1702 let mut hash = [0u8; 32];
1703 hash.copy_from_slice(hash_bytes);
1704 Some(hash)
1705 } else {
1706 None
1707 }
1708 })
1709 .collect();
1710
1711 drop(rtxn);
1712
1713 let all_hashes = self.router.list()
1715 .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
1716
1717 let mut deleted = 0;
1719 let mut freed_bytes = 0u64;
1720
1721 for hash in all_hashes {
1722 if !pinned.contains(&hash) {
1723 if let Ok(Some(data)) = self.router.get_sync(&hash) {
1724 freed_bytes += data.len() as u64;
1725 let _ = self.router.delete_local_only(&hash);
1727 deleted += 1;
1728 }
1729 }
1730 }
1731
1732 Ok(GcStats {
1733 deleted_dags: deleted,
1734 freed_bytes,
1735 })
1736 }
1737
1738 pub fn verify_lmdb_integrity(&self, delete: bool) -> Result<VerifyResult> {
1741 let all_hashes = self.router.list()
1742 .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
1743
1744 let total = all_hashes.len();
1745 let mut valid = 0;
1746 let mut corrupted = 0;
1747 let mut deleted = 0;
1748 let mut corrupted_hashes = Vec::new();
1749
1750 for hash in &all_hashes {
1751 let hash_hex = to_hex(hash);
1752
1753 match self.router.get_sync(hash) {
1754 Ok(Some(data)) => {
1755 let actual_hash = sha256(&data);
1757
1758 if actual_hash == *hash {
1759 valid += 1;
1760 } else {
1761 corrupted += 1;
1762 let actual_hex = to_hex(&actual_hash);
1763 println!(" CORRUPTED: key={} actual={} size={}",
1764 &hash_hex[..16], &actual_hex[..16], data.len());
1765 corrupted_hashes.push(*hash);
1766 }
1767 }
1768 Ok(None) => {
1769 corrupted += 1;
1771 println!(" MISSING: key={}", &hash_hex[..16]);
1772 corrupted_hashes.push(*hash);
1773 }
1774 Err(e) => {
1775 corrupted += 1;
1776 println!(" ERROR: key={} err={}", &hash_hex[..16], e);
1777 corrupted_hashes.push(*hash);
1778 }
1779 }
1780 }
1781
1782 if delete {
1784 for hash in &corrupted_hashes {
1785 match self.router.delete_sync(hash) {
1786 Ok(true) => deleted += 1,
1787 Ok(false) => {} Err(e) => {
1789 let hash_hex = to_hex(hash);
1790 println!(" Failed to delete {}: {}", &hash_hex[..16], e);
1791 }
1792 }
1793 }
1794 }
1795
1796 Ok(VerifyResult {
1797 total,
1798 valid,
1799 corrupted,
1800 deleted,
1801 })
1802 }
1803
1804 #[cfg(feature = "s3")]
1807 pub async fn verify_r2_integrity(&self, delete: bool) -> Result<VerifyResult> {
1808 use aws_sdk_s3::Client as S3Client;
1809
1810 let config = crate::config::Config::load()?;
1813 let s3_config = config.storage.s3
1814 .ok_or_else(|| anyhow::anyhow!("S3 not configured"))?;
1815
1816 let aws_config = aws_config::from_env()
1818 .region(aws_sdk_s3::config::Region::new(s3_config.region.clone()))
1819 .load()
1820 .await;
1821
1822 let s3_client = S3Client::from_conf(
1823 aws_sdk_s3::config::Builder::from(&aws_config)
1824 .endpoint_url(&s3_config.endpoint)
1825 .force_path_style(true)
1826 .build()
1827 );
1828
1829 let bucket = &s3_config.bucket;
1830 let prefix = s3_config.prefix.as_deref().unwrap_or("");
1831
1832 let mut total = 0;
1833 let mut valid = 0;
1834 let mut corrupted = 0;
1835 let mut deleted = 0;
1836 let mut corrupted_keys = Vec::new();
1837
1838 let mut continuation_token: Option<String> = None;
1840
1841 loop {
1842 let mut list_req = s3_client.list_objects_v2()
1843 .bucket(bucket)
1844 .prefix(prefix);
1845
1846 if let Some(ref token) = continuation_token {
1847 list_req = list_req.continuation_token(token);
1848 }
1849
1850 let list_resp = list_req.send().await
1851 .map_err(|e| anyhow::anyhow!("Failed to list S3 objects: {}", e))?;
1852
1853 for object in list_resp.contents() {
1854 let key = object.key().unwrap_or("");
1855
1856 if !key.ends_with(".bin") {
1858 continue;
1859 }
1860
1861 total += 1;
1862
1863 let filename = key.strip_prefix(prefix).unwrap_or(key);
1865 let expected_hash_hex = filename.strip_suffix(".bin").unwrap_or(filename);
1866
1867 if expected_hash_hex.len() != 64 {
1869 corrupted += 1;
1870 println!(" INVALID KEY: {}", key);
1871 corrupted_keys.push(key.to_string());
1872 continue;
1873 }
1874
1875 let expected_hash = match from_hex(expected_hash_hex) {
1876 Ok(h) => h,
1877 Err(_) => {
1878 corrupted += 1;
1879 println!(" INVALID HEX: {}", key);
1880 corrupted_keys.push(key.to_string());
1881 continue;
1882 }
1883 };
1884
1885 match s3_client.get_object()
1887 .bucket(bucket)
1888 .key(key)
1889 .send()
1890 .await
1891 {
1892 Ok(resp) => {
1893 match resp.body.collect().await {
1894 Ok(bytes) => {
1895 let data = bytes.into_bytes();
1896 let actual_hash = sha256(&data);
1897
1898 if actual_hash == expected_hash {
1899 valid += 1;
1900 } else {
1901 corrupted += 1;
1902 let actual_hex = to_hex(&actual_hash);
1903 println!(" CORRUPTED: key={} actual={} size={}",
1904 &expected_hash_hex[..16], &actual_hex[..16], data.len());
1905 corrupted_keys.push(key.to_string());
1906 }
1907 }
1908 Err(e) => {
1909 corrupted += 1;
1910 println!(" READ ERROR: {} - {}", key, e);
1911 corrupted_keys.push(key.to_string());
1912 }
1913 }
1914 }
1915 Err(e) => {
1916 corrupted += 1;
1917 println!(" FETCH ERROR: {} - {}", key, e);
1918 corrupted_keys.push(key.to_string());
1919 }
1920 }
1921
1922 if total % 100 == 0 {
1924 println!(" Progress: {} objects checked, {} corrupted so far", total, corrupted);
1925 }
1926 }
1927
1928 if list_resp.is_truncated() == Some(true) {
1930 continuation_token = list_resp.next_continuation_token().map(|s| s.to_string());
1931 } else {
1932 break;
1933 }
1934 }
1935
1936 if delete {
1938 for key in &corrupted_keys {
1939 match s3_client.delete_object()
1940 .bucket(bucket)
1941 .key(key)
1942 .send()
1943 .await
1944 {
1945 Ok(_) => deleted += 1,
1946 Err(e) => {
1947 println!(" Failed to delete {}: {}", key, e);
1948 }
1949 }
1950 }
1951 }
1952
1953 Ok(VerifyResult {
1954 total,
1955 valid,
1956 corrupted,
1957 deleted,
1958 })
1959 }
1960
1961 #[cfg(not(feature = "s3"))]
1963 pub async fn verify_r2_integrity(&self, _delete: bool) -> Result<VerifyResult> {
1964 Err(anyhow::anyhow!("S3 feature not enabled"))
1965 }
1966}
1967
1968#[derive(Debug, Clone)]
1970pub struct VerifyResult {
1971 pub total: usize,
1972 pub valid: usize,
1973 pub corrupted: usize,
1974 pub deleted: usize,
1975}
1976
1977#[derive(Debug)]
1978pub struct StorageStats {
1979 pub total_dags: usize,
1980 pub pinned_dags: usize,
1981 pub total_bytes: u64,
1982}
1983
1984#[derive(Debug, Clone)]
1986pub struct StorageByPriority {
1987 pub own: u64,
1989 pub followed: u64,
1991 pub other: u64,
1993}
1994
1995#[derive(Debug, Clone)]
1996pub struct FileChunkMetadata {
1997 pub total_size: u64,
1998 pub chunk_hashes: Vec<Hash>,
1999 pub chunk_sizes: Vec<u64>,
2000 pub is_chunked: bool,
2001}
2002
2003pub struct FileRangeChunksOwned {
2005 store: Arc<HashtreeStore>,
2006 metadata: FileChunkMetadata,
2007 start: u64,
2008 end: u64,
2009 current_chunk_idx: usize,
2010 current_offset: u64,
2011}
2012
2013impl Iterator for FileRangeChunksOwned {
2014 type Item = Result<Vec<u8>>;
2015
2016 fn next(&mut self) -> Option<Self::Item> {
2017 if !self.metadata.is_chunked || self.current_chunk_idx >= self.metadata.chunk_hashes.len() {
2018 return None;
2019 }
2020
2021 if self.current_offset > self.end {
2022 return None;
2023 }
2024
2025 let chunk_hash = &self.metadata.chunk_hashes[self.current_chunk_idx];
2026 let chunk_size = self.metadata.chunk_sizes[self.current_chunk_idx];
2027 let chunk_end = self.current_offset + chunk_size - 1;
2028
2029 self.current_chunk_idx += 1;
2030
2031 if chunk_end < self.start || self.current_offset > self.end {
2032 self.current_offset += chunk_size;
2033 return self.next();
2034 }
2035
2036 let chunk_content = match self.store.get_chunk(chunk_hash) {
2037 Ok(Some(content)) => content,
2038 Ok(None) => {
2039 return Some(Err(anyhow::anyhow!("Chunk {} not found", to_hex(chunk_hash))));
2040 }
2041 Err(e) => {
2042 return Some(Err(e));
2043 }
2044 };
2045
2046 let chunk_read_start = if self.current_offset >= self.start {
2047 0
2048 } else {
2049 (self.start - self.current_offset) as usize
2050 };
2051
2052 let chunk_read_end = if chunk_end <= self.end {
2053 chunk_size as usize - 1
2054 } else {
2055 (self.end - self.current_offset) as usize
2056 };
2057
2058 let result = chunk_content[chunk_read_start..=chunk_read_end].to_vec();
2059 self.current_offset += chunk_size;
2060
2061 Some(Ok(result))
2062 }
2063}
2064
2065#[derive(Debug)]
2066pub struct GcStats {
2067 pub deleted_dags: usize,
2068 pub freed_bytes: u64,
2069}
2070
2071#[derive(Debug, Clone)]
2072pub struct DirEntry {
2073 pub name: String,
2074 pub cid: String,
2075 pub is_directory: bool,
2076 pub size: u64,
2077}
2078
2079#[derive(Debug, Clone)]
2080pub struct DirectoryListing {
2081 pub dir_name: String,
2082 pub entries: Vec<DirEntry>,
2083}
2084
2085#[derive(Debug, Clone)]
2086pub struct PinnedItem {
2087 pub cid: String,
2088 pub name: String,
2089 pub is_directory: bool,
2090}
2091
2092#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
2094pub struct BlobMetadata {
2095 pub sha256: String,
2096 pub size: u64,
2097 pub mime_type: String,
2098 pub uploaded: u64,
2099}
2100
2101impl crate::webrtc::ContentStore for HashtreeStore {
2103 fn get(&self, hash_hex: &str) -> Result<Option<Vec<u8>>> {
2104 let hash = from_hex(hash_hex)
2105 .map_err(|e| anyhow::anyhow!("Invalid hash: {}", e))?;
2106 self.get_chunk(&hash)
2107 }
2108}