Skip to main content

hashtree_cli/storage/
retention.rs

1use anyhow::Result;
2use futures::executor::block_on as sync_block_on;
3use hashtree_core::store::Store;
4use hashtree_core::{to_hex, types::Hash, HashTree, HashTreeConfig};
5use serde::{Deserialize, Serialize};
6use std::collections::HashSet;
7use std::path::{Path, PathBuf};
8use std::time::{SystemTime, UNIX_EPOCH};
9
10use super::{HashtreeStore, PRIORITY_FOLLOWED, PRIORITY_OWN};
11
12/// Metadata for a synced tree (for eviction tracking)
13#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct TreeMeta {
15    /// Pubkey of tree owner
16    pub owner: String,
17    /// Tree name if known (from nostr key like "npub.../name")
18    pub name: Option<String>,
19    /// Unix timestamp when this tree was synced
20    pub synced_at: u64,
21    /// Total size of all blobs in this tree
22    pub total_size: u64,
23    /// Eviction priority: 255=own/pinned, 128=followed, 64=other
24    pub priority: u8,
25}
26
27#[derive(Debug)]
28pub struct StorageStats {
29    pub total_dags: usize,
30    pub pinned_dags: usize,
31    pub total_bytes: u64,
32}
33
34/// Storage usage broken down by priority tier
35#[derive(Debug, Clone)]
36pub struct StorageByPriority {
37    /// Own/pinned trees (priority 255)
38    pub own: u64,
39    /// Followed users' trees (priority 128)
40    pub followed: u64,
41    /// Other trees (priority 64)
42    pub other: u64,
43}
44
45#[derive(Debug, Clone)]
46pub struct PinnedItem {
47    pub cid: String,
48    pub name: String,
49    pub is_directory: bool,
50}
51
52fn pinned_item_name(hash: &Hash, meta: Option<&TreeMeta>) -> String {
53    let Some(meta) = meta else {
54        return to_hex(hash);
55    };
56
57    match (meta.owner.as_str(), meta.name.as_deref()) {
58        ("pinned", Some(name)) => name.to_string(),
59        ("", Some(name)) => name.to_string(),
60        (owner, Some(name)) if !owner.is_empty() => format!("{owner}/{name}"),
61        (owner, None) if !owner.is_empty() && owner != "pinned" => owner.to_string(),
62        _ => to_hex(hash),
63    }
64}
65
66impl HashtreeStore {
67    fn socialgraph_root_files(&self) -> [PathBuf; 4] {
68        let socialgraph = self.base_path().join("socialgraph");
69        [
70            socialgraph.join("events-root.msgpack"),
71            socialgraph.join("events-root-ambient.msgpack"),
72            socialgraph.join("profile-search-root.msgpack"),
73            socialgraph.join("profiles-by-pubkey-root.msgpack"),
74        ]
75    }
76
77    fn read_stored_cid(path: &Path) -> Result<Option<Hash>> {
78        #[derive(Deserialize)]
79        struct StoredCid {
80            hash: [u8; 32],
81            #[allow(dead_code)]
82            key: Option<[u8; 32]>,
83        }
84
85        let Ok(bytes) = std::fs::read(path) else {
86            return Ok(None);
87        };
88        let stored: StoredCid = rmp_serde::from_slice(&bytes)
89            .map_err(|e| anyhow::anyhow!("Failed to decode root file {}: {}", path.display(), e))?;
90        Ok(Some(stored.hash))
91    }
92
93    async fn collect_tree_hashes<S: Store>(
94        &self,
95        tree: &HashTree<S>,
96        root: &Hash,
97    ) -> Result<HashSet<Hash>> {
98        let mut hashes = HashSet::new();
99        let mut stack = vec![*root];
100
101        while let Some(hash) = stack.pop() {
102            if !hashes.insert(hash) {
103                continue;
104            }
105
106            let is_tree = tree
107                .is_tree(&hash)
108                .await
109                .map_err(|e| anyhow::anyhow!("Failed to check tree: {}", e))?;
110
111            if !is_tree {
112                continue;
113            }
114
115            if let Some(node) = tree
116                .get_tree_node(&hash)
117                .await
118                .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
119            {
120                for link in &node.links {
121                    stack.push(link.hash);
122                }
123            }
124        }
125
126        Ok(hashes)
127    }
128
129    fn protected_hashes(&self) -> Result<HashSet<Hash>> {
130        let mut protected = HashSet::new();
131
132        let rtxn = self.env.read_txn()?;
133        for (key_bytes, _) in self.blob_trees.iter(&rtxn)?.flatten() {
134            if key_bytes.len() >= 32 {
135                let hash: Hash = key_bytes[..32].try_into().unwrap();
136                protected.insert(hash);
137            }
138        }
139        drop(rtxn);
140
141        let tree = HashTree::new(HashTreeConfig::new(self.store_arc()).public());
142        for path in self.socialgraph_root_files() {
143            let Some(root_hash) = Self::read_stored_cid(&path)? else {
144                continue;
145            };
146            protected.extend(sync_block_on(self.collect_tree_hashes(&tree, &root_hash))?);
147        }
148
149        Ok(protected)
150    }
151
152    fn evict_disposable_orphans_to_target(&self, target_bytes: u64) -> Result<u64> {
153        let stats = self
154            .router
155            .stats()
156            .map_err(|e| anyhow::anyhow!("Failed to get stats: {}", e))?;
157        let mut current_size = stats.total_bytes;
158        if current_size <= target_bytes {
159            return Ok(0);
160        }
161
162        let rtxn = self.env.read_txn()?;
163        let pinned: HashSet<Hash> = self
164            .pins
165            .iter(&rtxn)?
166            .filter_map(|item| item.ok())
167            .filter_map(|(hash_bytes, _)| {
168                if hash_bytes.len() == 32 {
169                    let mut hash = [0u8; 32];
170                    hash.copy_from_slice(hash_bytes);
171                    Some(hash)
172                } else {
173                    None
174                }
175            })
176            .collect();
177        drop(rtxn);
178
179        let protected_hashes = self.protected_hashes()?;
180        let all_hashes = self
181            .router
182            .list()
183            .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
184
185        let mut freed = 0u64;
186        for hash in all_hashes {
187            if current_size <= target_bytes {
188                break;
189            }
190
191            if pinned.contains(&hash) || protected_hashes.contains(&hash) {
192                continue;
193            }
194
195            if self.blob_has_owners(&hash)? {
196                continue;
197            }
198
199            let Some(data) = self
200                .router
201                .get_sync(&hash)
202                .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))?
203            else {
204                continue;
205            };
206
207            let size = data.len() as u64;
208            if self
209                .router
210                .delete_local_only(&hash)
211                .map_err(|e| anyhow::anyhow!("Failed to delete orphaned blob: {}", e))?
212            {
213                freed = freed.saturating_add(size);
214                current_size = current_size.saturating_sub(size);
215                tracing::debug!(
216                    "Deleted disposable orphaned blob {} ({} bytes)",
217                    &to_hex(&hash)[..8],
218                    size
219                );
220            }
221        }
222
223        Ok(freed)
224    }
225
226    pub fn make_room_for_cached_blob(&self, incoming_bytes: u64) -> Result<u64> {
227        if self.max_size_bytes == 0 {
228            return Ok(0);
229        }
230
231        let stats = self
232            .router
233            .stats()
234            .map_err(|e| anyhow::anyhow!("Failed to get stats: {}", e))?;
235        if stats.total_bytes.saturating_add(incoming_bytes) <= self.max_size_bytes {
236            return Ok(0);
237        }
238
239        let target = if incoming_bytes >= self.max_size_bytes {
240            0
241        } else {
242            (self.max_size_bytes.saturating_mul(9) / 10)
243                .min(self.max_size_bytes.saturating_sub(incoming_bytes))
244        };
245        self.evict_disposable_orphans_to_target(target)
246    }
247
248    pub fn make_room_for_durable_blob(&self, incoming_bytes: u64) -> Result<u64> {
249        if self.max_size_bytes == 0 || incoming_bytes == 0 {
250            return Ok(0);
251        }
252
253        if incoming_bytes > self.max_size_bytes {
254            anyhow::bail!(
255                "storage limit exceeded: incoming blob is {} bytes but limit is {} bytes",
256                incoming_bytes,
257                self.max_size_bytes
258            );
259        }
260
261        let stats = self
262            .router
263            .stats()
264            .map_err(|e| anyhow::anyhow!("Failed to get stats: {}", e))?;
265        if stats.total_bytes.saturating_add(incoming_bytes) <= self.max_size_bytes {
266            return Ok(0);
267        }
268
269        let target = (self.max_size_bytes.saturating_mul(9) / 10)
270            .min(self.max_size_bytes.saturating_sub(incoming_bytes));
271        let freed = self.evict_with_policy_to_target(stats.total_bytes, target)?;
272
273        let next_stats = self
274            .router
275            .stats()
276            .map_err(|e| anyhow::anyhow!("Failed to get stats after eviction: {}", e))?;
277        if next_stats.total_bytes.saturating_add(incoming_bytes) > self.max_size_bytes {
278            anyhow::bail!(
279                "storage limit exceeded: {} bytes used, {} byte incoming blob, {} byte limit",
280                next_stats.total_bytes,
281                incoming_bytes,
282                self.max_size_bytes
283            );
284        }
285
286        Ok(freed)
287    }
288
289    pub fn relieve_cached_blob_write_pressure(&self, incoming_bytes: u64) -> Result<u64> {
290        let stats = self
291            .router
292            .stats()
293            .map_err(|e| anyhow::anyhow!("Failed to get stats: {}", e))?;
294        if stats.total_bytes == 0 {
295            return Ok(0);
296        }
297
298        let headroom = incoming_bytes.max(stats.total_bytes / 10).max(1);
299        let target = stats.total_bytes.saturating_sub(headroom);
300        self.evict_disposable_orphans_to_target(target)
301    }
302
303    /// Pin a hash (prevent garbage collection)
304    pub fn pin(&self, hash: &[u8; 32]) -> Result<()> {
305        let mut wtxn = self.env.write_txn()?;
306        self.pins.put(&mut wtxn, hash.as_slice(), &())?;
307        wtxn.commit()?;
308        Ok(())
309    }
310
311    /// Unpin a hash (allow garbage collection)
312    pub fn unpin(&self, hash: &[u8; 32]) -> Result<()> {
313        let mut wtxn = self.env.write_txn()?;
314        self.pins.delete(&mut wtxn, hash.as_slice())?;
315        wtxn.commit()?;
316        Ok(())
317    }
318
319    /// Check if hash is pinned
320    pub fn is_pinned(&self, hash: &[u8; 32]) -> Result<bool> {
321        let rtxn = self.env.read_txn()?;
322        Ok(self.pins.get(&rtxn, hash.as_slice())?.is_some())
323    }
324
325    /// List all pinned hashes (raw bytes)
326    pub fn list_pins_raw(&self) -> Result<Vec<[u8; 32]>> {
327        let rtxn = self.env.read_txn()?;
328        let mut pins = Vec::new();
329
330        for item in self.pins.iter(&rtxn)? {
331            let (hash_bytes, _) = item?;
332            if hash_bytes.len() == 32 {
333                let mut hash = [0u8; 32];
334                hash.copy_from_slice(hash_bytes);
335                pins.push(hash);
336            }
337        }
338
339        Ok(pins)
340    }
341
342    /// List all pinned hashes with names
343    pub fn list_pins_with_names(&self) -> Result<Vec<PinnedItem>> {
344        let rtxn = self.env.read_txn()?;
345        let store = self.store_arc();
346        let tree = HashTree::new(HashTreeConfig::new(store).public());
347        let mut pins = Vec::new();
348
349        for item in self.pins.iter(&rtxn)? {
350            let (hash_bytes, _) = item?;
351            if hash_bytes.len() != 32 {
352                continue;
353            }
354            let mut hash = [0u8; 32];
355            hash.copy_from_slice(hash_bytes);
356
357            // Try to determine if it's a directory
358            let is_directory =
359                sync_block_on(async { tree.is_directory(&hash).await.unwrap_or(false) });
360
361            let meta = self
362                .tree_meta
363                .get(&rtxn, hash.as_slice())?
364                .map(|bytes| {
365                    rmp_serde::from_slice::<TreeMeta>(bytes)
366                        .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))
367                })
368                .transpose()?;
369
370            pins.push(PinnedItem {
371                cid: to_hex(&hash),
372                name: pinned_item_name(&hash, meta.as_ref()),
373                is_directory,
374            });
375        }
376
377        Ok(pins)
378    }
379
380    // === Tree indexing for eviction ===
381
382    /// Index a tree after sync - tracks all blobs in the tree for eviction
383    ///
384    /// If `ref_key` is provided (e.g. "npub.../name"), it will replace any existing
385    /// tree with that ref, allowing old versions to be evicted.
386    pub fn index_tree(
387        &self,
388        root_hash: &Hash,
389        owner: &str,
390        name: Option<&str>,
391        priority: u8,
392        ref_key: Option<&str>,
393    ) -> Result<()> {
394        let root_hex = to_hex(root_hash);
395
396        // If ref_key provided, check for and unindex old version
397        if let Some(key) = ref_key {
398            let rtxn = self.env.read_txn()?;
399            if let Some(old_hash_bytes) = self.tree_refs.get(&rtxn, key)? {
400                if old_hash_bytes != root_hash.as_slice() {
401                    let old_hash: Hash = old_hash_bytes
402                        .try_into()
403                        .map_err(|_| anyhow::anyhow!("Invalid hash in tree_refs"))?;
404                    drop(rtxn);
405                    let _ = self.unpin(&old_hash);
406                    // Unindex old tree (will delete orphaned blobs)
407                    let _ = self.unindex_tree(&old_hash);
408                    tracing::debug!("Replaced old tree for ref {}", key);
409                }
410            }
411        }
412
413        let store = self.store_arc();
414        let tree = HashTree::new(HashTreeConfig::new(store).public());
415
416        // Walk tree and collect all blob hashes + compute total size
417        let (_blob_hashes, total_size) =
418            sync_block_on(async { self.collect_tree_blobs(&tree, root_hash).await })?;
419        let tracked_hashes = sync_block_on(self.collect_tree_hashes(&tree, root_hash))?;
420
421        let mut wtxn = self.env.write_txn()?;
422
423        // Store blob-tree relationships (64-byte key: blob_hash ++ tree_hash)
424        for tracked_hash in &tracked_hashes {
425            let mut key = [0u8; 64];
426            key[..32].copy_from_slice(tracked_hash);
427            key[32..].copy_from_slice(root_hash);
428            self.blob_trees.put(&mut wtxn, &key[..], &())?;
429        }
430
431        // Store tree metadata
432        let meta = TreeMeta {
433            owner: owner.to_string(),
434            name: name.map(|s| s.to_string()),
435            synced_at: SystemTime::now()
436                .duration_since(UNIX_EPOCH)
437                .unwrap()
438                .as_secs(),
439            total_size,
440            priority,
441        };
442        let meta_bytes = rmp_serde::to_vec(&meta)
443            .map_err(|e| anyhow::anyhow!("Failed to serialize TreeMeta: {}", e))?;
444        self.tree_meta
445            .put(&mut wtxn, root_hash.as_slice(), &meta_bytes)?;
446
447        // Store ref -> hash mapping if ref_key provided
448        if let Some(key) = ref_key {
449            self.tree_refs.put(&mut wtxn, key, root_hash.as_slice())?;
450        }
451
452        wtxn.commit()?;
453
454        tracing::debug!(
455            "Indexed tree {} ({} blobs, {} bytes, priority {})",
456            &root_hex[..8],
457            tracked_hashes.len(),
458            total_size,
459            priority
460        );
461
462        Ok(())
463    }
464
465    /// Collect all blob hashes in a tree and compute total size
466    async fn collect_tree_blobs<S: Store>(
467        &self,
468        tree: &HashTree<S>,
469        root: &Hash,
470    ) -> Result<(Vec<Hash>, u64)> {
471        let mut blobs = Vec::new();
472        let mut total_size = 0u64;
473        let mut stack = vec![*root];
474
475        while let Some(hash) = stack.pop() {
476            // Check if it's a tree node
477            let is_tree = tree
478                .is_tree(&hash)
479                .await
480                .map_err(|e| anyhow::anyhow!("Failed to check tree: {}", e))?;
481
482            if is_tree {
483                // Get tree node and add children to stack
484                if let Some(node) = tree
485                    .get_tree_node(&hash)
486                    .await
487                    .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
488                {
489                    for link in &node.links {
490                        stack.push(link.hash);
491                    }
492                }
493            } else {
494                // It's a blob - get its size
495                if let Some(data) = self
496                    .router
497                    .get_sync(&hash)
498                    .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))?
499                {
500                    total_size += data.len() as u64;
501                    blobs.push(hash);
502                }
503            }
504        }
505
506        Ok((blobs, total_size))
507    }
508
509    /// Unindex a tree - removes blob-tree mappings and deletes orphaned blobs
510    /// Returns the number of bytes freed
511    pub fn unindex_tree(&self, root_hash: &Hash) -> Result<u64> {
512        let root_hex = to_hex(root_hash);
513
514        let store = self.store_arc();
515        let tree = HashTree::new(HashTreeConfig::new(store).public());
516
517        // Walk tree and collect all blob hashes
518        let tracked_hashes = sync_block_on(self.collect_tree_hashes(&tree, root_hash))?;
519
520        let mut wtxn = self.env.write_txn()?;
521        let mut freed = 0u64;
522
523        // For each blob, remove the blob-tree entry and check if orphaned
524        for tracked_hash in &tracked_hashes {
525            // Delete blob-tree entry (64-byte key: blob_hash ++ tree_hash)
526            let mut key = [0u8; 64];
527            key[..32].copy_from_slice(tracked_hash);
528            key[32..].copy_from_slice(root_hash);
529            self.blob_trees.delete(&mut wtxn, &key[..])?;
530
531            // Check if blob is in any other tree (prefix scan on first 32 bytes)
532            let mut has_other_tree = false;
533            for item in self.blob_trees.prefix_iter(&wtxn, &tracked_hash[..])? {
534                if item.is_ok() {
535                    has_other_tree = true;
536                    break;
537                }
538            }
539
540            // If orphaned, delete the blob
541            if !has_other_tree {
542                if let Some(data) = self
543                    .router
544                    .get_sync(tracked_hash)
545                    .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))?
546                {
547                    freed += data.len() as u64;
548                    // Delete locally only - keep S3 as archive
549                    self.router
550                        .delete_local_only(tracked_hash)
551                        .map_err(|e| anyhow::anyhow!("Failed to delete blob: {}", e))?;
552                }
553            }
554        }
555
556        // Delete tree metadata
557        self.tree_meta.delete(&mut wtxn, root_hash.as_slice())?;
558
559        wtxn.commit()?;
560
561        tracing::debug!("Unindexed tree {} ({} bytes freed)", &root_hex[..8], freed);
562
563        Ok(freed)
564    }
565
566    /// Get tree metadata
567    pub fn get_tree_meta(&self, root_hash: &Hash) -> Result<Option<TreeMeta>> {
568        let rtxn = self.env.read_txn()?;
569        if let Some(bytes) = self.tree_meta.get(&rtxn, root_hash.as_slice())? {
570            let meta: TreeMeta = rmp_serde::from_slice(bytes)
571                .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
572            Ok(Some(meta))
573        } else {
574            Ok(None)
575        }
576    }
577
578    pub fn get_tree_ref(&self, key: &str) -> Result<Option<Hash>> {
579        let rtxn = self.env.read_txn()?;
580        let Some(bytes) = self.tree_refs.get(&rtxn, key)? else {
581            return Ok(None);
582        };
583
584        let hash: Hash = bytes
585            .try_into()
586            .map_err(|_| anyhow::anyhow!("Invalid hash in tree_refs"))?;
587        Ok(Some(hash))
588    }
589
590    /// List all indexed trees
591    pub fn list_indexed_trees(&self) -> Result<Vec<(Hash, TreeMeta)>> {
592        let rtxn = self.env.read_txn()?;
593        let mut trees = Vec::new();
594
595        for item in self.tree_meta.iter(&rtxn)? {
596            let (hash_bytes, meta_bytes) = item?;
597            let hash: Hash = hash_bytes
598                .try_into()
599                .map_err(|_| anyhow::anyhow!("Invalid hash in tree_meta"))?;
600            let meta: TreeMeta = rmp_serde::from_slice(meta_bytes)
601                .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
602            trees.push((hash, meta));
603        }
604
605        Ok(trees)
606    }
607
608    /// Get total tracked storage size (sum of all tree_meta.total_size)
609    pub fn tracked_size(&self) -> Result<u64> {
610        let rtxn = self.env.read_txn()?;
611        let mut total = 0u64;
612
613        for item in self.tree_meta.iter(&rtxn)? {
614            let (_, bytes) = item?;
615            let meta: TreeMeta = rmp_serde::from_slice(bytes)
616                .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
617            total += meta.total_size;
618        }
619
620        Ok(total)
621    }
622
623    /// Get evictable trees sorted by (priority ASC, synced_at ASC)
624    fn get_evictable_trees(&self) -> Result<Vec<(Hash, TreeMeta)>> {
625        let mut trees = self.list_indexed_trees()?;
626
627        // Sort by priority (lower first), then by synced_at (older first)
628        trees.sort_by(|a, b| match a.1.priority.cmp(&b.1.priority) {
629            std::cmp::Ordering::Equal => a.1.synced_at.cmp(&b.1.synced_at),
630            other => other,
631        });
632
633        Ok(trees)
634    }
635
636    /// Run eviction if storage is over quota
637    /// Returns bytes freed
638    ///
639    /// Eviction order:
640    /// 1. Orphaned blobs (not in any indexed tree and not pinned)
641    /// 2. Trees by priority (lowest first) and age (oldest first)
642    pub fn evict_if_needed(&self) -> Result<u64> {
643        // Get actual storage used
644        let stats = self
645            .router
646            .stats()
647            .map_err(|e| anyhow::anyhow!("Failed to get stats: {}", e))?;
648        let current = stats.total_bytes;
649
650        if current <= self.max_size_bytes {
651            return Ok(0);
652        }
653
654        // Target 90% of max to avoid constant eviction
655        let target = self.max_size_bytes * 90 / 100;
656        self.evict_with_policy_to_target(current, target)
657    }
658
659    fn evict_with_policy_to_target(&self, current: u64, target: u64) -> Result<u64> {
660        let mut freed = 0u64;
661        let mut current_size = current;
662
663        // Phase 1: Evict orphaned blobs (not in any tree and not pinned)
664        if self.evict_orphans {
665            let orphan_freed = self.evict_disposable_orphans_to_target(target)?;
666            freed += orphan_freed;
667            current_size = current_size.saturating_sub(orphan_freed);
668
669            if orphan_freed > 0 {
670                tracing::info!("Evicted orphaned blobs: {} bytes freed", orphan_freed);
671            }
672        } else {
673            tracing::debug!("Skipping orphan blob eviction; storage.evict_orphans=false");
674        }
675
676        // Check if we're now under target
677        if current_size <= target {
678            if freed > 0 {
679                tracing::info!("Eviction complete: {} bytes freed", freed);
680            }
681            return Ok(freed);
682        }
683
684        // Phase 2: Evict trees by priority (lowest first) and age (oldest first)
685        // Own trees CAN be evicted (just last), but PINNED trees are never evicted
686        let evictable = self.get_evictable_trees()?;
687
688        for (root_hash, meta) in evictable {
689            if current_size <= target {
690                break;
691            }
692
693            let root_hex = to_hex(&root_hash);
694
695            // Never evict pinned trees
696            if self.is_pinned(&root_hash)? {
697                continue;
698            }
699
700            let tree_freed = self.unindex_tree(&root_hash)?;
701            freed += tree_freed;
702            current_size = current_size.saturating_sub(tree_freed);
703
704            tracing::info!(
705                "Evicted tree {} (owner={}, priority={}, {} bytes)",
706                &root_hex[..8],
707                &meta.owner[..8.min(meta.owner.len())],
708                meta.priority,
709                tree_freed
710            );
711        }
712
713        if freed > 0 {
714            tracing::info!("Eviction complete: {} bytes freed", freed);
715        }
716
717        Ok(freed)
718    }
719
720    /// Get the maximum storage size in bytes
721    pub fn max_size_bytes(&self) -> u64 {
722        self.max_size_bytes
723    }
724
725    /// Get storage usage by priority tier
726    pub fn storage_by_priority(&self) -> Result<StorageByPriority> {
727        let rtxn = self.env.read_txn()?;
728        let mut own = 0u64;
729        let mut followed = 0u64;
730        let mut other = 0u64;
731
732        for item in self.tree_meta.iter(&rtxn)? {
733            let (_, bytes) = item?;
734            let meta: TreeMeta = rmp_serde::from_slice(bytes)
735                .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
736
737            if meta.priority == PRIORITY_OWN {
738                own += meta.total_size;
739            } else if meta.priority >= PRIORITY_FOLLOWED {
740                followed += meta.total_size;
741            } else {
742                other += meta.total_size;
743            }
744        }
745
746        Ok(StorageByPriority {
747            own,
748            followed,
749            other,
750        })
751    }
752
753    /// Get storage statistics
754    pub fn get_storage_stats(&self) -> Result<StorageStats> {
755        let rtxn = self.env.read_txn()?;
756        let total_pins = self.pins.len(&rtxn)? as usize;
757
758        let stats = self
759            .router
760            .stats()
761            .map_err(|e| anyhow::anyhow!("Failed to get stats: {}", e))?;
762
763        Ok(StorageStats {
764            total_dags: stats.count,
765            pinned_dags: total_pins,
766            total_bytes: stats.total_bytes,
767        })
768    }
769}
770
771#[cfg(test)]
772mod tests {
773    use super::*;
774    use hashtree_core::Cid;
775    use hashtree_index::{BTree, BTreeOptions};
776    use tempfile::TempDir;
777
778    use crate::storage::PRIORITY_OTHER;
779
780    fn write_root_file(path: &Path, cid: &Cid) {
781        #[derive(Serialize)]
782        struct StoredCid {
783            hash: [u8; 32],
784            key: Option<[u8; 32]>,
785        }
786
787        std::fs::create_dir_all(path.parent().expect("root file parent")).expect("create dir");
788        let bytes = rmp_serde::to_vec_named(&StoredCid {
789            hash: cid.hash,
790            key: cid.key,
791        })
792        .expect("encode cid");
793        std::fs::write(path, bytes).expect("write root file");
794    }
795
796    fn build_test_tree(store: &HashtreeStore) -> Cid {
797        let index = BTree::new(store.store_arc(), BTreeOptions { order: Some(8) });
798        sync_block_on(index.build(vec![
799            ("alpha".to_string(), "one".to_string()),
800            ("beta".to_string(), "two".to_string()),
801            ("gamma".to_string(), "three".to_string()),
802        ]))
803        .expect("build btree")
804        .expect("non-empty root")
805    }
806
807    #[test]
808    fn orphan_cleanup_keeps_indexed_tree_hashes() {
809        let temp_dir = TempDir::new().expect("temp dir");
810        let store = HashtreeStore::with_options(temp_dir.path(), None, 1024).expect("store");
811        let cid = build_test_tree(&store);
812
813        store
814            .index_tree(
815                &cid.hash,
816                "owner",
817                Some("tree"),
818                PRIORITY_OTHER,
819                Some("owner/tree"),
820            )
821            .expect("index tree");
822        let freed = store
823            .evict_disposable_orphans_to_target(0)
824            .expect("orphan cleanup");
825
826        assert!(freed < 1024);
827        assert!(store.blob_exists(&cid.hash).expect("root exists"));
828    }
829
830    #[test]
831    fn list_pins_with_names_uses_indexed_tree_metadata() {
832        let temp_dir = TempDir::new().expect("temp dir");
833        let store = HashtreeStore::with_options(temp_dir.path(), None, 1024 * 1024).expect("store");
834        let cid = build_test_tree(&store);
835
836        store.pin(&cid.hash).expect("pin tree");
837        store
838            .index_tree(
839                &cid.hash,
840                "npub1example",
841                Some("playlist"),
842                PRIORITY_OTHER,
843                Some("npub1example/playlist"),
844            )
845            .expect("index tree");
846
847        let pins = store.list_pins_with_names().expect("list pins");
848
849        assert_eq!(pins.len(), 1);
850        assert_eq!(pins[0].name, "npub1example/playlist");
851    }
852
853    #[test]
854    fn get_tree_ref_returns_stored_root() {
855        let temp_dir = TempDir::new().expect("temp dir");
856        let store = HashtreeStore::with_options(temp_dir.path(), None, 1024 * 1024).expect("store");
857        let cid = build_test_tree(&store);
858
859        store
860            .index_tree(
861                &cid.hash,
862                "npub1example",
863                Some("playlist"),
864                PRIORITY_OTHER,
865                Some("npub1example/playlist"),
866            )
867            .expect("index tree");
868
869        assert_eq!(
870            store
871                .get_tree_ref("npub1example/playlist")
872                .expect("tree ref lookup"),
873            Some(cid.hash)
874        );
875    }
876
877    #[test]
878    fn orphan_cleanup_keeps_socialgraph_root_hashes() {
879        let temp_dir = TempDir::new().expect("temp dir");
880        let store = HashtreeStore::with_options(temp_dir.path(), None, 1024).expect("store");
881        let cid = build_test_tree(&store);
882        write_root_file(
883            &temp_dir.path().join("socialgraph/events-root.msgpack"),
884            &cid,
885        );
886
887        let freed = store
888            .evict_disposable_orphans_to_target(0)
889            .expect("orphan cleanup");
890
891        assert!(freed < 1024);
892        assert!(store.blob_exists(&cid.hash).expect("root exists"));
893    }
894}