1use anyhow::Result;
2use futures::executor::block_on as sync_block_on;
3use hashtree_core::store::Store;
4use hashtree_core::{to_hex, types::Hash, HashTree, HashTreeConfig};
5use serde::de::{self, IgnoredAny, MapAccess, SeqAccess, Visitor};
6use serde::{Deserialize, Serialize};
7use std::collections::HashSet;
8use std::path::{Path, PathBuf};
9use std::time::{SystemTime, UNIX_EPOCH};
10
11use super::{BlobMetadata, HashtreeStore, PRIORITY_FOLLOWED, PRIORITY_OWN};
12
13#[derive(Debug, Clone, Serialize)]
15pub struct TreeMeta {
16 pub owner: String,
18 pub name: Option<String>,
20 pub synced_at: u64,
22 pub total_size: u64,
24 pub priority: u8,
26}
27
28impl<'de> Deserialize<'de> for TreeMeta {
29 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
30 where
31 D: serde::Deserializer<'de>,
32 {
33 const FIELDS: &[&str] = &[
34 "owner",
35 "name",
36 "synced_at",
37 "last_accessed_at",
38 "total_size",
39 "priority",
40 ];
41
42 struct TreeMetaVisitor;
43
44 impl<'de> Visitor<'de> for TreeMetaVisitor {
45 type Value = TreeMeta;
46
47 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
48 formatter.write_str("TreeMeta as current or legacy metadata")
49 }
50
51 fn visit_seq<A>(self, mut seq: A) -> std::result::Result<Self::Value, A::Error>
52 where
53 A: SeqAccess<'de>,
54 {
55 let has_accidental_access_field = matches!(seq.size_hint(), Some(6));
56 let owner = seq
57 .next_element()?
58 .ok_or_else(|| de::Error::invalid_length(0, &self))?;
59 let name = seq
60 .next_element()?
61 .ok_or_else(|| de::Error::invalid_length(1, &self))?;
62 let synced_at = seq
63 .next_element()?
64 .ok_or_else(|| de::Error::invalid_length(2, &self))?;
65
66 if has_accidental_access_field {
67 let _: IgnoredAny = seq
68 .next_element()?
69 .ok_or_else(|| de::Error::invalid_length(3, &self))?;
70 }
71
72 let total_size = seq
73 .next_element()?
74 .ok_or_else(|| de::Error::invalid_length(3, &self))?;
75 let priority = seq
76 .next_element()?
77 .ok_or_else(|| de::Error::invalid_length(4, &self))?;
78
79 Ok(TreeMeta {
80 owner,
81 name,
82 synced_at,
83 total_size,
84 priority,
85 })
86 }
87
88 fn visit_map<A>(self, mut map: A) -> std::result::Result<Self::Value, A::Error>
89 where
90 A: MapAccess<'de>,
91 {
92 let mut owner = None;
93 let mut name = None;
94 let mut synced_at = None;
95 let mut total_size = None;
96 let mut priority = None;
97
98 while let Some(key) = map.next_key::<String>()? {
99 match key.as_str() {
100 "owner" => owner = Some(map.next_value()?),
101 "name" => name = Some(map.next_value()?),
102 "synced_at" => synced_at = Some(map.next_value()?),
103 "last_accessed_at" => {
104 let _: IgnoredAny = map.next_value()?;
105 }
106 "total_size" => total_size = Some(map.next_value()?),
107 "priority" => priority = Some(map.next_value()?),
108 _ => {
109 let _: IgnoredAny = map.next_value()?;
110 }
111 }
112 }
113
114 Ok(TreeMeta {
115 owner: owner.ok_or_else(|| de::Error::missing_field("owner"))?,
116 name: name.unwrap_or(None),
117 synced_at: synced_at.ok_or_else(|| de::Error::missing_field("synced_at"))?,
118 total_size: total_size.ok_or_else(|| de::Error::missing_field("total_size"))?,
119 priority: priority.ok_or_else(|| de::Error::missing_field("priority"))?,
120 })
121 }
122 }
123
124 deserializer.deserialize_struct("TreeMeta", FIELDS, TreeMetaVisitor)
125 }
126}
127
128#[derive(Debug)]
129pub struct StorageStats {
130 pub total_dags: usize,
131 pub pinned_dags: usize,
132 pub total_bytes: u64,
133}
134
135#[derive(Debug, Clone)]
137pub struct StorageByPriority {
138 pub own: u64,
140 pub followed: u64,
142 pub other: u64,
144}
145
146#[derive(Debug, Clone)]
147pub struct PinnedItem {
148 pub cid: String,
149 pub name: String,
150 pub is_directory: bool,
151 pub size_bytes: u64,
152}
153
154#[derive(Debug, Clone)]
155pub struct OwnedBlobStats {
156 pub owner: [u8; 32],
157 pub count: usize,
158 pub total_bytes: u64,
159}
160
161fn pinned_item_name(hash: &Hash, meta: Option<&TreeMeta>) -> String {
162 let Some(meta) = meta else {
163 return to_hex(hash);
164 };
165
166 match (meta.owner.as_str(), meta.name.as_deref()) {
167 ("pinned", Some(name)) => name.to_string(),
168 ("", Some(name)) => name.to_string(),
169 (owner, Some(name)) if !owner.is_empty() => format!("{owner}/{name}"),
170 (owner, None) if !owner.is_empty() && owner != "pinned" => owner.to_string(),
171 _ => to_hex(hash),
172 }
173}
174
175fn unix_timestamp_now() -> u64 {
176 SystemTime::now()
177 .duration_since(UNIX_EPOCH)
178 .unwrap_or_default()
179 .as_secs()
180}
181
182impl HashtreeStore {
183 fn socialgraph_root_files(&self) -> [PathBuf; 4] {
184 let socialgraph = self.base_path().join("socialgraph");
185 [
186 socialgraph.join("events-root.msgpack"),
187 socialgraph.join("events-root-ambient.msgpack"),
188 socialgraph.join("profile-search-root.msgpack"),
189 socialgraph.join("profiles-by-pubkey-root.msgpack"),
190 ]
191 }
192
193 fn read_stored_cid(path: &Path) -> Result<Option<Hash>> {
194 #[derive(Deserialize)]
195 struct StoredCid {
196 hash: [u8; 32],
197 #[allow(dead_code)]
198 key: Option<[u8; 32]>,
199 }
200
201 let Ok(bytes) = std::fs::read(path) else {
202 return Ok(None);
203 };
204 let stored: StoredCid = rmp_serde::from_slice(&bytes)
205 .map_err(|e| anyhow::anyhow!("Failed to decode root file {}: {}", path.display(), e))?;
206 Ok(Some(stored.hash))
207 }
208
209 async fn collect_tree_hashes<S: Store>(
210 &self,
211 tree: &HashTree<S>,
212 root: &Hash,
213 ) -> Result<HashSet<Hash>> {
214 let mut hashes = HashSet::new();
215 let mut stack = vec![*root];
216
217 while let Some(hash) = stack.pop() {
218 if !hashes.insert(hash) {
219 continue;
220 }
221
222 let is_tree = tree
223 .is_tree(&hash)
224 .await
225 .map_err(|e| anyhow::anyhow!("Failed to check tree: {}", e))?;
226
227 if !is_tree {
228 continue;
229 }
230
231 if let Some(node) = tree
232 .get_tree_node(&hash)
233 .await
234 .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
235 {
236 for link in &node.links {
237 stack.push(link.hash);
238 }
239 }
240 }
241
242 Ok(hashes)
243 }
244
245 fn protected_hashes(&self) -> Result<HashSet<Hash>> {
246 let mut protected = HashSet::new();
247
248 let rtxn = self.env.read_txn()?;
249 for (key_bytes, _) in self.blob_trees.iter(&rtxn)?.flatten() {
250 if key_bytes.len() >= 32 {
251 let hash: Hash = key_bytes[..32].try_into().unwrap();
252 protected.insert(hash);
253 }
254 }
255 drop(rtxn);
256
257 let tree = HashTree::new(HashTreeConfig::new(self.store_arc()).public());
258 for path in self.socialgraph_root_files() {
259 let Some(root_hash) = Self::read_stored_cid(&path)? else {
260 continue;
261 };
262 protected.extend(sync_block_on(self.collect_tree_hashes(&tree, &root_hash))?);
263 }
264
265 Ok(protected)
266 }
267
268 fn evict_disposable_orphans_to_target(&self, target_bytes: u64) -> Result<u64> {
269 let stats = self
270 .router
271 .stats()
272 .map_err(|e| anyhow::anyhow!("Failed to get stats: {}", e))?;
273 let mut current_size = stats.total_bytes;
274 if current_size <= target_bytes {
275 return Ok(0);
276 }
277
278 let rtxn = self.env.read_txn()?;
279 let pinned: HashSet<Hash> = self
280 .pins
281 .iter(&rtxn)?
282 .filter_map(|item| item.ok())
283 .filter_map(|(hash_bytes, _)| {
284 if hash_bytes.len() == 32 {
285 let mut hash = [0u8; 32];
286 hash.copy_from_slice(hash_bytes);
287 Some(hash)
288 } else {
289 None
290 }
291 })
292 .collect();
293 drop(rtxn);
294
295 let protected_hashes = self.protected_hashes()?;
296 let all_hashes = self
297 .router
298 .list()
299 .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
300
301 let mut freed = 0u64;
302 for hash in all_hashes {
303 if current_size <= target_bytes {
304 break;
305 }
306
307 if pinned.contains(&hash) || protected_hashes.contains(&hash) {
308 continue;
309 }
310
311 if self.blob_has_owners(&hash)? {
312 continue;
313 }
314
315 let Some(data) = self
316 .router
317 .get_sync(&hash)
318 .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))?
319 else {
320 continue;
321 };
322
323 let size = data.len() as u64;
324 if self
325 .router
326 .delete_local_only(&hash)
327 .map_err(|e| anyhow::anyhow!("Failed to delete orphaned blob: {}", e))?
328 {
329 freed = freed.saturating_add(size);
330 current_size = current_size.saturating_sub(size);
331 tracing::debug!(
332 "Deleted disposable orphaned blob {} ({} bytes)",
333 &to_hex(&hash)[..8],
334 size
335 );
336 }
337 }
338
339 Ok(freed)
340 }
341
342 pub fn make_room_for_cached_blob(&self, incoming_bytes: u64) -> Result<u64> {
343 if self.max_size_bytes == 0 {
344 return Ok(0);
345 }
346
347 let stats = self
348 .router
349 .stats()
350 .map_err(|e| anyhow::anyhow!("Failed to get stats: {}", e))?;
351 if stats.total_bytes.saturating_add(incoming_bytes) <= self.max_size_bytes {
352 return Ok(0);
353 }
354
355 let target = if incoming_bytes >= self.max_size_bytes {
356 0
357 } else {
358 (self.max_size_bytes.saturating_mul(9) / 10)
359 .min(self.max_size_bytes.saturating_sub(incoming_bytes))
360 };
361 self.evict_disposable_orphans_to_target(target)
362 }
363
364 pub fn make_room_for_durable_blob(&self, incoming_bytes: u64) -> Result<u64> {
365 if self.max_size_bytes == 0 || incoming_bytes == 0 {
366 return Ok(0);
367 }
368
369 if incoming_bytes > self.max_size_bytes {
370 anyhow::bail!(
371 "storage limit exceeded: incoming blob is {} bytes but limit is {} bytes",
372 incoming_bytes,
373 self.max_size_bytes
374 );
375 }
376
377 let stats = self
378 .router
379 .stats()
380 .map_err(|e| anyhow::anyhow!("Failed to get stats: {}", e))?;
381 if stats.total_bytes.saturating_add(incoming_bytes) <= self.max_size_bytes {
382 return Ok(0);
383 }
384
385 let target = (self.max_size_bytes.saturating_mul(9) / 10)
386 .min(self.max_size_bytes.saturating_sub(incoming_bytes));
387 let freed = self.evict_with_policy_to_target(stats.total_bytes, target)?;
388
389 let next_stats = self
390 .router
391 .stats()
392 .map_err(|e| anyhow::anyhow!("Failed to get stats after eviction: {}", e))?;
393 if next_stats.total_bytes.saturating_add(incoming_bytes) > self.max_size_bytes {
394 anyhow::bail!(
395 "storage limit exceeded: {} bytes used, {} byte incoming blob, {} byte limit",
396 next_stats.total_bytes,
397 incoming_bytes,
398 self.max_size_bytes
399 );
400 }
401
402 Ok(freed)
403 }
404
405 pub fn relieve_cached_blob_write_pressure(&self, incoming_bytes: u64) -> Result<u64> {
406 let stats = self
407 .router
408 .stats()
409 .map_err(|e| anyhow::anyhow!("Failed to get stats: {}", e))?;
410 if stats.total_bytes == 0 {
411 return Ok(0);
412 }
413
414 let headroom = incoming_bytes.max(stats.total_bytes / 10).max(1);
415 let target = stats.total_bytes.saturating_sub(headroom);
416 self.evict_disposable_orphans_to_target(target)
417 }
418
419 pub fn pin(&self, hash: &[u8; 32]) -> Result<()> {
421 let mut wtxn = self.env.write_txn()?;
422 self.pins.put(&mut wtxn, hash.as_slice(), &())?;
423 wtxn.commit()?;
424 Ok(())
425 }
426
427 pub fn unpin(&self, hash: &[u8; 32]) -> Result<()> {
429 let mut wtxn = self.env.write_txn()?;
430 self.pins.delete(&mut wtxn, hash.as_slice())?;
431 wtxn.commit()?;
432 Ok(())
433 }
434
435 pub fn is_pinned(&self, hash: &[u8; 32]) -> Result<bool> {
437 let rtxn = self.env.read_txn()?;
438 Ok(self.pins.get(&rtxn, hash.as_slice())?.is_some())
439 }
440
441 pub fn list_pins_raw(&self) -> Result<Vec<[u8; 32]>> {
443 let rtxn = self.env.read_txn()?;
444 let mut pins = Vec::new();
445
446 for item in self.pins.iter(&rtxn)? {
447 let (hash_bytes, _) = item?;
448 if hash_bytes.len() == 32 {
449 let mut hash = [0u8; 32];
450 hash.copy_from_slice(hash_bytes);
451 pins.push(hash);
452 }
453 }
454
455 Ok(pins)
456 }
457
458 pub fn list_pins_with_names(&self) -> Result<Vec<PinnedItem>> {
460 let rtxn = self.env.read_txn()?;
461 let store = self.store_arc();
462 let tree = HashTree::new(HashTreeConfig::new(store).public());
463 let mut pins = Vec::new();
464
465 for item in self.pins.iter(&rtxn)? {
466 let (hash_bytes, _) = item?;
467 if hash_bytes.len() != 32 {
468 continue;
469 }
470 let mut hash = [0u8; 32];
471 hash.copy_from_slice(hash_bytes);
472
473 let is_directory =
475 sync_block_on(async { tree.is_directory(&hash).await.unwrap_or(false) });
476
477 let meta = self
478 .tree_meta
479 .get(&rtxn, hash.as_slice())?
480 .map(|bytes| {
481 rmp_serde::from_slice::<TreeMeta>(bytes)
482 .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))
483 })
484 .transpose()?;
485 let size_bytes = if let Some(meta) = meta.as_ref() {
486 meta.total_size
487 } else {
488 self.router
489 .get_sync(&hash)
490 .map_err(|e| anyhow::anyhow!("Failed to get pinned blob: {}", e))?
491 .map(|data| data.len() as u64)
492 .unwrap_or(0)
493 };
494
495 pins.push(PinnedItem {
496 cid: to_hex(&hash),
497 name: pinned_item_name(&hash, meta.as_ref()),
498 is_directory,
499 size_bytes,
500 });
501 }
502
503 Ok(pins)
504 }
505
506 pub fn owned_blob_stats(&self) -> Result<Vec<OwnedBlobStats>> {
507 let rtxn = self.env.read_txn()?;
508 let mut owners = Vec::new();
509
510 for item in self.pubkey_blobs.iter(&rtxn)? {
511 let (owner_bytes, blobs_bytes) = item?;
512 if owner_bytes.len() != 32 {
513 continue;
514 }
515
516 let blobs: Vec<BlobMetadata> = serde_json::from_slice(blobs_bytes)
517 .map_err(|e| anyhow::anyhow!("Failed to deserialize blob metadata: {}", e))?;
518 let mut owner = [0u8; 32];
519 owner.copy_from_slice(owner_bytes);
520 let total_bytes = blobs
521 .iter()
522 .fold(0u64, |total, blob| total.saturating_add(blob.size));
523 owners.push(OwnedBlobStats {
524 owner,
525 count: blobs.len(),
526 total_bytes,
527 });
528 }
529
530 owners.sort_by(|a, b| a.owner.cmp(&b.owner));
531 Ok(owners)
532 }
533
534 pub fn index_tree(
541 &self,
542 root_hash: &Hash,
543 owner: &str,
544 name: Option<&str>,
545 priority: u8,
546 ref_key: Option<&str>,
547 ) -> Result<()> {
548 let root_hex = to_hex(root_hash);
549
550 if let Some(key) = ref_key {
552 let rtxn = self.env.read_txn()?;
553 if let Some(old_hash_bytes) = self.tree_refs.get(&rtxn, key)? {
554 if old_hash_bytes != root_hash.as_slice() {
555 let old_hash: Hash = old_hash_bytes
556 .try_into()
557 .map_err(|_| anyhow::anyhow!("Invalid hash in tree_refs"))?;
558 drop(rtxn);
559 let _ = self.unpin(&old_hash);
560 let _ = self.unindex_tree(&old_hash);
562 tracing::debug!("Replaced old tree for ref {}", key);
563 }
564 }
565 }
566
567 let store = self.store_arc();
568 let tree = HashTree::new(HashTreeConfig::new(store).public());
569
570 let (_blob_hashes, total_size) =
572 sync_block_on(async { self.collect_tree_blobs(&tree, root_hash).await })?;
573 let tracked_hashes = sync_block_on(self.collect_tree_hashes(&tree, root_hash))?;
574
575 let mut wtxn = self.env.write_txn()?;
576
577 for tracked_hash in &tracked_hashes {
579 let mut key = [0u8; 64];
580 key[..32].copy_from_slice(tracked_hash);
581 key[32..].copy_from_slice(root_hash);
582 self.blob_trees.put(&mut wtxn, &key[..], &())?;
583 }
584
585 let now = unix_timestamp_now();
587 let meta = TreeMeta {
588 owner: owner.to_string(),
589 name: name.map(|s| s.to_string()),
590 synced_at: now,
591 total_size,
592 priority,
593 };
594 let meta_bytes = rmp_serde::to_vec(&meta)
595 .map_err(|e| anyhow::anyhow!("Failed to serialize TreeMeta: {}", e))?;
596 self.tree_meta
597 .put(&mut wtxn, root_hash.as_slice(), &meta_bytes)?;
598
599 if let Some(key) = ref_key {
601 self.tree_refs.put(&mut wtxn, key, root_hash.as_slice())?;
602 }
603
604 wtxn.commit()?;
605
606 tracing::debug!(
607 "Indexed tree {} ({} blobs, {} bytes, priority {})",
608 &root_hex[..8],
609 tracked_hashes.len(),
610 total_size,
611 priority
612 );
613
614 Ok(())
615 }
616
617 async fn collect_tree_blobs<S: Store>(
619 &self,
620 tree: &HashTree<S>,
621 root: &Hash,
622 ) -> Result<(Vec<Hash>, u64)> {
623 let mut blobs = Vec::new();
624 let mut total_size = 0u64;
625 let mut stack = vec![*root];
626
627 while let Some(hash) = stack.pop() {
628 let is_tree = tree
630 .is_tree(&hash)
631 .await
632 .map_err(|e| anyhow::anyhow!("Failed to check tree: {}", e))?;
633
634 if is_tree {
635 if let Some(node) = tree
637 .get_tree_node(&hash)
638 .await
639 .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
640 {
641 for link in &node.links {
642 stack.push(link.hash);
643 }
644 }
645 } else {
646 if let Some(data) = self
648 .router
649 .get_sync(&hash)
650 .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))?
651 {
652 total_size += data.len() as u64;
653 blobs.push(hash);
654 }
655 }
656 }
657
658 Ok((blobs, total_size))
659 }
660
661 pub fn unindex_tree(&self, root_hash: &Hash) -> Result<u64> {
664 let root_hex = to_hex(root_hash);
665
666 let store = self.store_arc();
667 let tree = HashTree::new(HashTreeConfig::new(store).public());
668
669 let tracked_hashes = sync_block_on(self.collect_tree_hashes(&tree, root_hash))?;
671
672 let mut wtxn = self.env.write_txn()?;
673 let mut freed = 0u64;
674
675 for tracked_hash in &tracked_hashes {
677 let mut key = [0u8; 64];
679 key[..32].copy_from_slice(tracked_hash);
680 key[32..].copy_from_slice(root_hash);
681 self.blob_trees.delete(&mut wtxn, &key[..])?;
682
683 let mut has_other_tree = false;
685 for item in self.blob_trees.prefix_iter(&wtxn, &tracked_hash[..])? {
686 if item.is_ok() {
687 has_other_tree = true;
688 break;
689 }
690 }
691
692 if !has_other_tree {
694 if let Some(data) = self
695 .router
696 .get_sync(tracked_hash)
697 .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))?
698 {
699 freed += data.len() as u64;
700 self.router
702 .delete_local_only(tracked_hash)
703 .map_err(|e| anyhow::anyhow!("Failed to delete blob: {}", e))?;
704 }
705 }
706 }
707
708 self.tree_meta.delete(&mut wtxn, root_hash.as_slice())?;
710
711 wtxn.commit()?;
712
713 tracing::debug!("Unindexed tree {} ({} bytes freed)", &root_hex[..8], freed);
714
715 Ok(freed)
716 }
717
718 pub fn get_tree_meta(&self, root_hash: &Hash) -> Result<Option<TreeMeta>> {
720 let rtxn = self.env.read_txn()?;
721 if let Some(bytes) = self.tree_meta.get(&rtxn, root_hash.as_slice())? {
722 let meta: TreeMeta = rmp_serde::from_slice(bytes)
723 .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
724 Ok(Some(meta))
725 } else {
726 Ok(None)
727 }
728 }
729
730 pub fn get_tree_ref(&self, key: &str) -> Result<Option<Hash>> {
731 let rtxn = self.env.read_txn()?;
732 let Some(bytes) = self.tree_refs.get(&rtxn, key)? else {
733 return Ok(None);
734 };
735
736 let hash: Hash = bytes
737 .try_into()
738 .map_err(|_| anyhow::anyhow!("Invalid hash in tree_refs"))?;
739 Ok(Some(hash))
740 }
741
742 pub fn list_indexed_trees(&self) -> Result<Vec<(Hash, TreeMeta)>> {
744 let rtxn = self.env.read_txn()?;
745 let mut trees = Vec::new();
746
747 for item in self.tree_meta.iter(&rtxn)? {
748 let (hash_bytes, meta_bytes) = item?;
749 let hash: Hash = hash_bytes
750 .try_into()
751 .map_err(|_| anyhow::anyhow!("Invalid hash in tree_meta"))?;
752 let meta: TreeMeta = rmp_serde::from_slice(meta_bytes)
753 .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
754 trees.push((hash, meta));
755 }
756
757 Ok(trees)
758 }
759
760 pub fn tracked_size(&self) -> Result<u64> {
762 let rtxn = self.env.read_txn()?;
763 let mut total = 0u64;
764
765 for item in self.tree_meta.iter(&rtxn)? {
766 let (_, bytes) = item?;
767 let meta: TreeMeta = rmp_serde::from_slice(bytes)
768 .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
769 total += meta.total_size;
770 }
771
772 Ok(total)
773 }
774
775 fn get_evictable_trees(&self) -> Result<Vec<(Hash, TreeMeta)>> {
781 let mut trees = self.list_indexed_trees()?;
782
783 trees.sort_by(|a, b| match a.1.priority.cmp(&b.1.priority) {
785 std::cmp::Ordering::Equal => a.1.synced_at.cmp(&b.1.synced_at),
786 other => other,
787 });
788
789 Ok(trees)
790 }
791
792 pub fn evict_if_needed(&self) -> Result<u64> {
799 let stats = self
801 .router
802 .stats()
803 .map_err(|e| anyhow::anyhow!("Failed to get stats: {}", e))?;
804 let current = stats.total_bytes;
805
806 if current <= self.max_size_bytes {
807 return Ok(0);
808 }
809
810 let target = self.max_size_bytes * 90 / 100;
812 self.evict_with_policy_to_target(current, target)
813 }
814
815 fn evict_with_policy_to_target(&self, current: u64, target: u64) -> Result<u64> {
816 let mut freed = 0u64;
817 let mut current_size = current;
818
819 if self.evict_orphans {
821 let orphan_freed = self.evict_disposable_orphans_to_target(target)?;
822 freed += orphan_freed;
823 current_size = current_size.saturating_sub(orphan_freed);
824
825 if orphan_freed > 0 {
826 tracing::info!("Evicted orphaned blobs: {} bytes freed", orphan_freed);
827 }
828 } else {
829 tracing::debug!("Skipping orphan blob eviction; storage.evict_orphans=false");
830 }
831
832 if current_size <= target {
834 if freed > 0 {
835 tracing::info!("Eviction complete: {} bytes freed", freed);
836 }
837 return Ok(freed);
838 }
839
840 let evictable = self.get_evictable_trees()?;
843
844 for (root_hash, meta) in evictable {
845 if current_size <= target {
846 break;
847 }
848
849 let root_hex = to_hex(&root_hash);
850
851 if self.is_pinned(&root_hash)? {
853 continue;
854 }
855
856 let tree_freed = self.unindex_tree(&root_hash)?;
857 freed += tree_freed;
858 current_size = current_size.saturating_sub(tree_freed);
859
860 tracing::info!(
861 "Evicted tree {} (owner={}, priority={}, {} bytes)",
862 &root_hex[..8],
863 &meta.owner[..8.min(meta.owner.len())],
864 meta.priority,
865 tree_freed
866 );
867 }
868
869 if freed > 0 {
870 tracing::info!("Eviction complete: {} bytes freed", freed);
871 }
872
873 Ok(freed)
874 }
875
876 pub fn max_size_bytes(&self) -> u64 {
878 self.max_size_bytes
879 }
880
881 pub fn storage_by_priority(&self) -> Result<StorageByPriority> {
883 let rtxn = self.env.read_txn()?;
884 let mut own = 0u64;
885 let mut followed = 0u64;
886 let mut other = 0u64;
887
888 for item in self.tree_meta.iter(&rtxn)? {
889 let (_, bytes) = item?;
890 let meta: TreeMeta = rmp_serde::from_slice(bytes)
891 .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
892
893 if meta.priority == PRIORITY_OWN {
894 own += meta.total_size;
895 } else if meta.priority >= PRIORITY_FOLLOWED {
896 followed += meta.total_size;
897 } else {
898 other += meta.total_size;
899 }
900 }
901
902 Ok(StorageByPriority {
903 own,
904 followed,
905 other,
906 })
907 }
908
909 pub fn get_storage_stats(&self) -> Result<StorageStats> {
911 let rtxn = self.env.read_txn()?;
912 let total_pins = self.pins.len(&rtxn)? as usize;
913
914 let stats = self
915 .router
916 .stats()
917 .map_err(|e| anyhow::anyhow!("Failed to get stats: {}", e))?;
918
919 Ok(StorageStats {
920 total_dags: stats.count,
921 pinned_dags: total_pins,
922 total_bytes: stats.total_bytes,
923 })
924 }
925}
926
927#[cfg(test)]
928mod tests {
929 use super::*;
930 use hashtree_core::Cid;
931 use hashtree_index::{BTree, BTreeOptions};
932 use tempfile::TempDir;
933
934 use crate::storage::PRIORITY_OTHER;
935
936 fn write_root_file(path: &Path, cid: &Cid) {
937 #[derive(Serialize)]
938 struct StoredCid {
939 hash: [u8; 32],
940 key: Option<[u8; 32]>,
941 }
942
943 std::fs::create_dir_all(path.parent().expect("root file parent")).expect("create dir");
944 let bytes = rmp_serde::to_vec_named(&StoredCid {
945 hash: cid.hash,
946 key: cid.key,
947 })
948 .expect("encode cid");
949 std::fs::write(path, bytes).expect("write root file");
950 }
951
952 fn build_test_tree(store: &HashtreeStore) -> Cid {
953 let index = BTree::new(store.store_arc(), BTreeOptions { order: Some(8) });
954 sync_block_on(index.build(vec![
955 ("alpha".to_string(), "one".to_string()),
956 ("beta".to_string(), "two".to_string()),
957 ("gamma".to_string(), "three".to_string()),
958 ]))
959 .expect("build btree")
960 .expect("non-empty root")
961 }
962
963 #[test]
964 fn orphan_cleanup_keeps_indexed_tree_hashes() {
965 let temp_dir = TempDir::new().expect("temp dir");
966 let store = HashtreeStore::with_options(temp_dir.path(), None, 1024).expect("store");
967 let cid = build_test_tree(&store);
968
969 store
970 .index_tree(
971 &cid.hash,
972 "owner",
973 Some("tree"),
974 PRIORITY_OTHER,
975 Some("owner/tree"),
976 )
977 .expect("index tree");
978 let freed = store
979 .evict_disposable_orphans_to_target(0)
980 .expect("orphan cleanup");
981
982 assert!(freed < 1024);
983 assert!(store.blob_exists(&cid.hash).expect("root exists"));
984 }
985
986 #[test]
987 fn list_pins_with_names_uses_indexed_tree_metadata() {
988 let temp_dir = TempDir::new().expect("temp dir");
989 let store = HashtreeStore::with_options(temp_dir.path(), None, 1024 * 1024).expect("store");
990 let cid = build_test_tree(&store);
991
992 store.pin(&cid.hash).expect("pin tree");
993 store
994 .index_tree(
995 &cid.hash,
996 "npub1example",
997 Some("playlist"),
998 PRIORITY_OTHER,
999 Some("npub1example/playlist"),
1000 )
1001 .expect("index tree");
1002
1003 let pins = store.list_pins_with_names().expect("list pins");
1004
1005 assert_eq!(pins.len(), 1);
1006 assert_eq!(pins[0].name, "npub1example/playlist");
1007 assert!(pins[0].size_bytes > 0);
1008 }
1009
1010 #[test]
1011 fn get_tree_ref_returns_stored_root() {
1012 let temp_dir = TempDir::new().expect("temp dir");
1013 let store = HashtreeStore::with_options(temp_dir.path(), None, 1024 * 1024).expect("store");
1014 let cid = build_test_tree(&store);
1015
1016 store
1017 .index_tree(
1018 &cid.hash,
1019 "npub1example",
1020 Some("playlist"),
1021 PRIORITY_OTHER,
1022 Some("npub1example/playlist"),
1023 )
1024 .expect("index tree");
1025
1026 assert_eq!(
1027 store
1028 .get_tree_ref("npub1example/playlist")
1029 .expect("tree ref lookup"),
1030 Some(cid.hash)
1031 );
1032 }
1033
1034 #[test]
1035 fn tree_meta_deserializes_metadata_without_tree_access_field() {
1036 #[derive(Serialize)]
1037 struct LegacyTreeMeta {
1038 owner: String,
1039 name: Option<String>,
1040 synced_at: u64,
1041 total_size: u64,
1042 priority: u8,
1043 }
1044
1045 let bytes = rmp_serde::to_vec(&LegacyTreeMeta {
1046 owner: "owner".to_string(),
1047 name: Some("tree".to_string()),
1048 synced_at: 123,
1049 total_size: 456,
1050 priority: PRIORITY_OTHER,
1051 })
1052 .expect("serialize legacy metadata");
1053 let meta: TreeMeta = rmp_serde::from_slice(&bytes).expect("deserialize tree metadata");
1054
1055 assert_eq!(meta.owner, "owner");
1056 assert_eq!(meta.name.as_deref(), Some("tree"));
1057 assert_eq!(meta.synced_at, 123);
1058 assert_eq!(meta.total_size, 456);
1059 assert_eq!(meta.priority, PRIORITY_OTHER);
1060 }
1061
1062 #[test]
1063 fn tree_meta_deserializes_accidental_access_field_but_drops_it_on_write() {
1064 #[derive(Serialize)]
1065 struct AccidentalTreeMeta {
1066 owner: String,
1067 name: Option<String>,
1068 synced_at: u64,
1069 last_accessed_at: u64,
1070 total_size: u64,
1071 priority: u8,
1072 }
1073
1074 let bytes = rmp_serde::to_vec(&AccidentalTreeMeta {
1075 owner: "owner".to_string(),
1076 name: Some("tree".to_string()),
1077 synced_at: 123,
1078 last_accessed_at: 999,
1079 total_size: 456,
1080 priority: PRIORITY_OTHER,
1081 })
1082 .expect("serialize accidental metadata");
1083 let meta: TreeMeta = rmp_serde::from_slice(&bytes).expect("deserialize tree metadata");
1084 let encoded = rmp_serde::to_vec(&meta).expect("serialize current metadata");
1085 let reparsed: (String, Option<String>, u64, u64, u8) =
1086 rmp_serde::from_slice(&encoded).expect("parse current metadata shape");
1087
1088 assert_eq!(meta.owner, "owner");
1089 assert_eq!(meta.name.as_deref(), Some("tree"));
1090 assert_eq!(meta.synced_at, 123);
1091 assert_eq!(meta.total_size, 456);
1092 assert_eq!(meta.priority, PRIORITY_OTHER);
1093 assert_eq!(reparsed.0, "owner");
1094 assert_eq!(reparsed.3, 456);
1095 assert_eq!(reparsed.4, PRIORITY_OTHER);
1096 }
1097
1098 #[test]
1099 fn eviction_prefers_oldest_tree_within_priority() {
1100 let temp_dir = TempDir::new().expect("temp dir");
1101 let store = HashtreeStore::with_options(temp_dir.path(), None, 500).expect("store");
1102
1103 let hash1 = hashtree_core::sha256(&vec![1u8; 200]);
1104 let hash2 = hashtree_core::sha256(&vec![2u8; 200]);
1105 let hash3 = hashtree_core::sha256(&vec![3u8; 200]);
1106 store.put_blob(&vec![1u8; 200]).expect("put blob 1");
1107 store.put_blob(&vec![2u8; 200]).expect("put blob 2");
1108 store.put_blob(&vec![3u8; 200]).expect("put blob 3");
1109 store
1110 .index_tree(&hash1, "owner1", Some("tree1"), PRIORITY_OTHER, None)
1111 .expect("index tree 1");
1112 store
1113 .index_tree(&hash2, "owner2", Some("tree2"), PRIORITY_OTHER, None)
1114 .expect("index tree 2");
1115 store
1116 .index_tree(&hash3, "owner3", Some("tree3"), PRIORITY_OTHER, None)
1117 .expect("index tree 3");
1118
1119 let freed = store.evict_if_needed().expect("evict");
1120
1121 assert!(freed > 0);
1122 assert!(
1123 store.get_tree_meta(&hash3).expect("tree meta").is_some(),
1124 "newest tree should survive before older peers at the same priority"
1125 );
1126 }
1127
1128 #[test]
1129 fn orphan_cleanup_keeps_socialgraph_root_hashes() {
1130 let temp_dir = TempDir::new().expect("temp dir");
1131 let store = HashtreeStore::with_options(temp_dir.path(), None, 1024).expect("store");
1132 let cid = build_test_tree(&store);
1133 write_root_file(
1134 &temp_dir.path().join("socialgraph/events-root.msgpack"),
1135 &cid,
1136 );
1137
1138 let freed = store
1139 .evict_disposable_orphans_to_target(0)
1140 .expect("orphan cleanup");
1141
1142 assert!(freed < 1024);
1143 assert!(store.blob_exists(&cid.hash).expect("root exists"));
1144 }
1145}