Skip to main content

hashtree_cli/storage/
retention.rs

1use anyhow::Result;
2use futures::executor::block_on as sync_block_on;
3use hashtree_core::store::Store;
4use hashtree_core::{to_hex, types::Hash, HashTree, HashTreeConfig};
5use serde::{Deserialize, Serialize};
6use std::collections::HashSet;
7use std::path::{Path, PathBuf};
8use std::time::{SystemTime, UNIX_EPOCH};
9
10use super::{HashtreeStore, PRIORITY_FOLLOWED, PRIORITY_OWN};
11
12/// Metadata for a synced tree (for eviction tracking)
13#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct TreeMeta {
15    /// Pubkey of tree owner
16    pub owner: String,
17    /// Tree name if known (from nostr key like "npub.../name")
18    pub name: Option<String>,
19    /// Unix timestamp when this tree was synced
20    pub synced_at: u64,
21    /// Total size of all blobs in this tree
22    pub total_size: u64,
23    /// Eviction priority: 255=own/pinned, 128=followed, 64=other
24    pub priority: u8,
25}
26
27#[derive(Debug)]
28pub struct StorageStats {
29    pub total_dags: usize,
30    pub pinned_dags: usize,
31    pub total_bytes: u64,
32}
33
34/// Storage usage broken down by priority tier
35#[derive(Debug, Clone)]
36pub struct StorageByPriority {
37    /// Own/pinned trees (priority 255)
38    pub own: u64,
39    /// Followed users' trees (priority 128)
40    pub followed: u64,
41    /// Other trees (priority 64)
42    pub other: u64,
43}
44
45#[derive(Debug, Clone)]
46pub struct PinnedItem {
47    pub cid: String,
48    pub name: String,
49    pub is_directory: bool,
50}
51
52fn pinned_item_name(hash: &Hash, meta: Option<&TreeMeta>) -> String {
53    let Some(meta) = meta else {
54        return to_hex(hash);
55    };
56
57    match (meta.owner.as_str(), meta.name.as_deref()) {
58        ("pinned", Some(name)) => name.to_string(),
59        ("", Some(name)) => name.to_string(),
60        (owner, Some(name)) if !owner.is_empty() => format!("{owner}/{name}"),
61        (owner, None) if !owner.is_empty() && owner != "pinned" => owner.to_string(),
62        _ => to_hex(hash),
63    }
64}
65
66impl HashtreeStore {
67    fn socialgraph_root_files(&self) -> [PathBuf; 4] {
68        let socialgraph = self.base_path().join("socialgraph");
69        [
70            socialgraph.join("events-root.msgpack"),
71            socialgraph.join("events-root-ambient.msgpack"),
72            socialgraph.join("profile-search-root.msgpack"),
73            socialgraph.join("profiles-by-pubkey-root.msgpack"),
74        ]
75    }
76
77    fn read_stored_cid(path: &Path) -> Result<Option<Hash>> {
78        #[derive(Deserialize)]
79        struct StoredCid {
80            hash: [u8; 32],
81            #[allow(dead_code)]
82            key: Option<[u8; 32]>,
83        }
84
85        let Ok(bytes) = std::fs::read(path) else {
86            return Ok(None);
87        };
88        let stored: StoredCid = rmp_serde::from_slice(&bytes)
89            .map_err(|e| anyhow::anyhow!("Failed to decode root file {}: {}", path.display(), e))?;
90        Ok(Some(stored.hash))
91    }
92
93    async fn collect_tree_hashes<S: Store>(
94        &self,
95        tree: &HashTree<S>,
96        root: &Hash,
97    ) -> Result<HashSet<Hash>> {
98        let mut hashes = HashSet::new();
99        let mut stack = vec![*root];
100
101        while let Some(hash) = stack.pop() {
102            if !hashes.insert(hash) {
103                continue;
104            }
105
106            let is_tree = tree
107                .is_tree(&hash)
108                .await
109                .map_err(|e| anyhow::anyhow!("Failed to check tree: {}", e))?;
110
111            if !is_tree {
112                continue;
113            }
114
115            if let Some(node) = tree
116                .get_tree_node(&hash)
117                .await
118                .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
119            {
120                for link in &node.links {
121                    stack.push(link.hash);
122                }
123            }
124        }
125
126        Ok(hashes)
127    }
128
129    fn protected_hashes(&self) -> Result<HashSet<Hash>> {
130        let mut protected = HashSet::new();
131
132        let rtxn = self.env.read_txn()?;
133        for (key_bytes, _) in self.blob_trees.iter(&rtxn)?.flatten() {
134            if key_bytes.len() >= 32 {
135                let hash: Hash = key_bytes[..32].try_into().unwrap();
136                protected.insert(hash);
137            }
138        }
139        drop(rtxn);
140
141        let tree = HashTree::new(HashTreeConfig::new(self.store_arc()).public());
142        for path in self.socialgraph_root_files() {
143            let Some(root_hash) = Self::read_stored_cid(&path)? else {
144                continue;
145            };
146            protected.extend(sync_block_on(self.collect_tree_hashes(&tree, &root_hash))?);
147        }
148
149        Ok(protected)
150    }
151
152    fn evict_disposable_orphans_to_target(&self, target_bytes: u64) -> Result<u64> {
153        let stats = self
154            .router
155            .stats()
156            .map_err(|e| anyhow::anyhow!("Failed to get stats: {}", e))?;
157        let mut current_size = stats.total_bytes;
158        if current_size <= target_bytes {
159            return Ok(0);
160        }
161
162        let rtxn = self.env.read_txn()?;
163        let pinned: HashSet<Hash> = self
164            .pins
165            .iter(&rtxn)?
166            .filter_map(|item| item.ok())
167            .filter_map(|(hash_bytes, _)| {
168                if hash_bytes.len() == 32 {
169                    let mut hash = [0u8; 32];
170                    hash.copy_from_slice(hash_bytes);
171                    Some(hash)
172                } else {
173                    None
174                }
175            })
176            .collect();
177        drop(rtxn);
178
179        let protected_hashes = self.protected_hashes()?;
180        let all_hashes = self
181            .router
182            .list()
183            .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
184
185        let mut freed = 0u64;
186        for hash in all_hashes {
187            if current_size <= target_bytes {
188                break;
189            }
190
191            if pinned.contains(&hash) || protected_hashes.contains(&hash) {
192                continue;
193            }
194
195            if self.blob_has_owners(&hash)? {
196                continue;
197            }
198
199            let Some(data) = self
200                .router
201                .get_sync(&hash)
202                .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))?
203            else {
204                continue;
205            };
206
207            let size = data.len() as u64;
208            if self
209                .router
210                .delete_local_only(&hash)
211                .map_err(|e| anyhow::anyhow!("Failed to delete orphaned blob: {}", e))?
212            {
213                freed = freed.saturating_add(size);
214                current_size = current_size.saturating_sub(size);
215                tracing::debug!(
216                    "Deleted disposable orphaned blob {} ({} bytes)",
217                    &to_hex(&hash)[..8],
218                    size
219                );
220            }
221        }
222
223        Ok(freed)
224    }
225
226    pub fn make_room_for_cached_blob(&self, incoming_bytes: u64) -> Result<u64> {
227        if self.max_size_bytes == 0 {
228            return Ok(0);
229        }
230
231        let stats = self
232            .router
233            .stats()
234            .map_err(|e| anyhow::anyhow!("Failed to get stats: {}", e))?;
235        if stats.total_bytes.saturating_add(incoming_bytes) <= self.max_size_bytes {
236            return Ok(0);
237        }
238
239        let target = if incoming_bytes >= self.max_size_bytes {
240            0
241        } else {
242            (self.max_size_bytes.saturating_mul(9) / 10)
243                .min(self.max_size_bytes.saturating_sub(incoming_bytes))
244        };
245        self.evict_disposable_orphans_to_target(target)
246    }
247
248    pub fn relieve_cached_blob_write_pressure(&self, incoming_bytes: u64) -> Result<u64> {
249        let stats = self
250            .router
251            .stats()
252            .map_err(|e| anyhow::anyhow!("Failed to get stats: {}", e))?;
253        if stats.total_bytes == 0 {
254            return Ok(0);
255        }
256
257        let headroom = incoming_bytes.max(stats.total_bytes / 10).max(1);
258        let target = stats.total_bytes.saturating_sub(headroom);
259        self.evict_disposable_orphans_to_target(target)
260    }
261
262    /// Pin a hash (prevent garbage collection)
263    pub fn pin(&self, hash: &[u8; 32]) -> Result<()> {
264        let mut wtxn = self.env.write_txn()?;
265        self.pins.put(&mut wtxn, hash.as_slice(), &())?;
266        wtxn.commit()?;
267        Ok(())
268    }
269
270    /// Unpin a hash (allow garbage collection)
271    pub fn unpin(&self, hash: &[u8; 32]) -> Result<()> {
272        let mut wtxn = self.env.write_txn()?;
273        self.pins.delete(&mut wtxn, hash.as_slice())?;
274        wtxn.commit()?;
275        Ok(())
276    }
277
278    /// Check if hash is pinned
279    pub fn is_pinned(&self, hash: &[u8; 32]) -> Result<bool> {
280        let rtxn = self.env.read_txn()?;
281        Ok(self.pins.get(&rtxn, hash.as_slice())?.is_some())
282    }
283
284    /// List all pinned hashes (raw bytes)
285    pub fn list_pins_raw(&self) -> Result<Vec<[u8; 32]>> {
286        let rtxn = self.env.read_txn()?;
287        let mut pins = Vec::new();
288
289        for item in self.pins.iter(&rtxn)? {
290            let (hash_bytes, _) = item?;
291            if hash_bytes.len() == 32 {
292                let mut hash = [0u8; 32];
293                hash.copy_from_slice(hash_bytes);
294                pins.push(hash);
295            }
296        }
297
298        Ok(pins)
299    }
300
301    /// List all pinned hashes with names
302    pub fn list_pins_with_names(&self) -> Result<Vec<PinnedItem>> {
303        let rtxn = self.env.read_txn()?;
304        let store = self.store_arc();
305        let tree = HashTree::new(HashTreeConfig::new(store).public());
306        let mut pins = Vec::new();
307
308        for item in self.pins.iter(&rtxn)? {
309            let (hash_bytes, _) = item?;
310            if hash_bytes.len() != 32 {
311                continue;
312            }
313            let mut hash = [0u8; 32];
314            hash.copy_from_slice(hash_bytes);
315
316            // Try to determine if it's a directory
317            let is_directory =
318                sync_block_on(async { tree.is_directory(&hash).await.unwrap_or(false) });
319
320            let meta = self
321                .tree_meta
322                .get(&rtxn, hash.as_slice())?
323                .map(|bytes| {
324                    rmp_serde::from_slice::<TreeMeta>(bytes)
325                        .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))
326                })
327                .transpose()?;
328
329            pins.push(PinnedItem {
330                cid: to_hex(&hash),
331                name: pinned_item_name(&hash, meta.as_ref()),
332                is_directory,
333            });
334        }
335
336        Ok(pins)
337    }
338
339    // === Tree indexing for eviction ===
340
341    /// Index a tree after sync - tracks all blobs in the tree for eviction
342    ///
343    /// If `ref_key` is provided (e.g. "npub.../name"), it will replace any existing
344    /// tree with that ref, allowing old versions to be evicted.
345    pub fn index_tree(
346        &self,
347        root_hash: &Hash,
348        owner: &str,
349        name: Option<&str>,
350        priority: u8,
351        ref_key: Option<&str>,
352    ) -> Result<()> {
353        let root_hex = to_hex(root_hash);
354
355        // If ref_key provided, check for and unindex old version
356        if let Some(key) = ref_key {
357            let rtxn = self.env.read_txn()?;
358            if let Some(old_hash_bytes) = self.tree_refs.get(&rtxn, key)? {
359                if old_hash_bytes != root_hash.as_slice() {
360                    let old_hash: Hash = old_hash_bytes
361                        .try_into()
362                        .map_err(|_| anyhow::anyhow!("Invalid hash in tree_refs"))?;
363                    drop(rtxn);
364                    let _ = self.unpin(&old_hash);
365                    // Unindex old tree (will delete orphaned blobs)
366                    let _ = self.unindex_tree(&old_hash);
367                    tracing::debug!("Replaced old tree for ref {}", key);
368                }
369            }
370        }
371
372        let store = self.store_arc();
373        let tree = HashTree::new(HashTreeConfig::new(store).public());
374
375        // Walk tree and collect all blob hashes + compute total size
376        let (_blob_hashes, total_size) =
377            sync_block_on(async { self.collect_tree_blobs(&tree, root_hash).await })?;
378        let tracked_hashes = sync_block_on(self.collect_tree_hashes(&tree, root_hash))?;
379
380        let mut wtxn = self.env.write_txn()?;
381
382        // Store blob-tree relationships (64-byte key: blob_hash ++ tree_hash)
383        for tracked_hash in &tracked_hashes {
384            let mut key = [0u8; 64];
385            key[..32].copy_from_slice(tracked_hash);
386            key[32..].copy_from_slice(root_hash);
387            self.blob_trees.put(&mut wtxn, &key[..], &())?;
388        }
389
390        // Store tree metadata
391        let meta = TreeMeta {
392            owner: owner.to_string(),
393            name: name.map(|s| s.to_string()),
394            synced_at: SystemTime::now()
395                .duration_since(UNIX_EPOCH)
396                .unwrap()
397                .as_secs(),
398            total_size,
399            priority,
400        };
401        let meta_bytes = rmp_serde::to_vec(&meta)
402            .map_err(|e| anyhow::anyhow!("Failed to serialize TreeMeta: {}", e))?;
403        self.tree_meta
404            .put(&mut wtxn, root_hash.as_slice(), &meta_bytes)?;
405
406        // Store ref -> hash mapping if ref_key provided
407        if let Some(key) = ref_key {
408            self.tree_refs.put(&mut wtxn, key, root_hash.as_slice())?;
409        }
410
411        wtxn.commit()?;
412
413        tracing::debug!(
414            "Indexed tree {} ({} blobs, {} bytes, priority {})",
415            &root_hex[..8],
416            tracked_hashes.len(),
417            total_size,
418            priority
419        );
420
421        Ok(())
422    }
423
424    /// Collect all blob hashes in a tree and compute total size
425    async fn collect_tree_blobs<S: Store>(
426        &self,
427        tree: &HashTree<S>,
428        root: &Hash,
429    ) -> Result<(Vec<Hash>, u64)> {
430        let mut blobs = Vec::new();
431        let mut total_size = 0u64;
432        let mut stack = vec![*root];
433
434        while let Some(hash) = stack.pop() {
435            // Check if it's a tree node
436            let is_tree = tree
437                .is_tree(&hash)
438                .await
439                .map_err(|e| anyhow::anyhow!("Failed to check tree: {}", e))?;
440
441            if is_tree {
442                // Get tree node and add children to stack
443                if let Some(node) = tree
444                    .get_tree_node(&hash)
445                    .await
446                    .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
447                {
448                    for link in &node.links {
449                        stack.push(link.hash);
450                    }
451                }
452            } else {
453                // It's a blob - get its size
454                if let Some(data) = self
455                    .router
456                    .get_sync(&hash)
457                    .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))?
458                {
459                    total_size += data.len() as u64;
460                    blobs.push(hash);
461                }
462            }
463        }
464
465        Ok((blobs, total_size))
466    }
467
468    /// Unindex a tree - removes blob-tree mappings and deletes orphaned blobs
469    /// Returns the number of bytes freed
470    pub fn unindex_tree(&self, root_hash: &Hash) -> Result<u64> {
471        let root_hex = to_hex(root_hash);
472
473        let store = self.store_arc();
474        let tree = HashTree::new(HashTreeConfig::new(store).public());
475
476        // Walk tree and collect all blob hashes
477        let tracked_hashes = sync_block_on(self.collect_tree_hashes(&tree, root_hash))?;
478
479        let mut wtxn = self.env.write_txn()?;
480        let mut freed = 0u64;
481
482        // For each blob, remove the blob-tree entry and check if orphaned
483        for tracked_hash in &tracked_hashes {
484            // Delete blob-tree entry (64-byte key: blob_hash ++ tree_hash)
485            let mut key = [0u8; 64];
486            key[..32].copy_from_slice(tracked_hash);
487            key[32..].copy_from_slice(root_hash);
488            self.blob_trees.delete(&mut wtxn, &key[..])?;
489
490            // Check if blob is in any other tree (prefix scan on first 32 bytes)
491            let mut has_other_tree = false;
492            for item in self.blob_trees.prefix_iter(&wtxn, &tracked_hash[..])? {
493                if item.is_ok() {
494                    has_other_tree = true;
495                    break;
496                }
497            }
498
499            // If orphaned, delete the blob
500            if !has_other_tree {
501                if let Some(data) = self
502                    .router
503                    .get_sync(tracked_hash)
504                    .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))?
505                {
506                    freed += data.len() as u64;
507                    // Delete locally only - keep S3 as archive
508                    self.router
509                        .delete_local_only(tracked_hash)
510                        .map_err(|e| anyhow::anyhow!("Failed to delete blob: {}", e))?;
511                }
512            }
513        }
514
515        // Delete tree metadata
516        self.tree_meta.delete(&mut wtxn, root_hash.as_slice())?;
517
518        wtxn.commit()?;
519
520        tracing::debug!("Unindexed tree {} ({} bytes freed)", &root_hex[..8], freed);
521
522        Ok(freed)
523    }
524
525    /// Get tree metadata
526    pub fn get_tree_meta(&self, root_hash: &Hash) -> Result<Option<TreeMeta>> {
527        let rtxn = self.env.read_txn()?;
528        if let Some(bytes) = self.tree_meta.get(&rtxn, root_hash.as_slice())? {
529            let meta: TreeMeta = rmp_serde::from_slice(bytes)
530                .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
531            Ok(Some(meta))
532        } else {
533            Ok(None)
534        }
535    }
536
537    pub fn get_tree_ref(&self, key: &str) -> Result<Option<Hash>> {
538        let rtxn = self.env.read_txn()?;
539        let Some(bytes) = self.tree_refs.get(&rtxn, key)? else {
540            return Ok(None);
541        };
542
543        let hash: Hash = bytes
544            .try_into()
545            .map_err(|_| anyhow::anyhow!("Invalid hash in tree_refs"))?;
546        Ok(Some(hash))
547    }
548
549    /// List all indexed trees
550    pub fn list_indexed_trees(&self) -> Result<Vec<(Hash, TreeMeta)>> {
551        let rtxn = self.env.read_txn()?;
552        let mut trees = Vec::new();
553
554        for item in self.tree_meta.iter(&rtxn)? {
555            let (hash_bytes, meta_bytes) = item?;
556            let hash: Hash = hash_bytes
557                .try_into()
558                .map_err(|_| anyhow::anyhow!("Invalid hash in tree_meta"))?;
559            let meta: TreeMeta = rmp_serde::from_slice(meta_bytes)
560                .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
561            trees.push((hash, meta));
562        }
563
564        Ok(trees)
565    }
566
567    /// Get total tracked storage size (sum of all tree_meta.total_size)
568    pub fn tracked_size(&self) -> Result<u64> {
569        let rtxn = self.env.read_txn()?;
570        let mut total = 0u64;
571
572        for item in self.tree_meta.iter(&rtxn)? {
573            let (_, bytes) = item?;
574            let meta: TreeMeta = rmp_serde::from_slice(bytes)
575                .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
576            total += meta.total_size;
577        }
578
579        Ok(total)
580    }
581
582    /// Get evictable trees sorted by (priority ASC, synced_at ASC)
583    fn get_evictable_trees(&self) -> Result<Vec<(Hash, TreeMeta)>> {
584        let mut trees = self.list_indexed_trees()?;
585
586        // Sort by priority (lower first), then by synced_at (older first)
587        trees.sort_by(|a, b| match a.1.priority.cmp(&b.1.priority) {
588            std::cmp::Ordering::Equal => a.1.synced_at.cmp(&b.1.synced_at),
589            other => other,
590        });
591
592        Ok(trees)
593    }
594
595    /// Run eviction if storage is over quota
596    /// Returns bytes freed
597    ///
598    /// Eviction order:
599    /// 1. Orphaned blobs (not in any indexed tree and not pinned)
600    /// 2. Trees by priority (lowest first) and age (oldest first)
601    pub fn evict_if_needed(&self) -> Result<u64> {
602        // Get actual storage used
603        let stats = self
604            .router
605            .stats()
606            .map_err(|e| anyhow::anyhow!("Failed to get stats: {}", e))?;
607        let current = stats.total_bytes;
608
609        if current <= self.max_size_bytes {
610            return Ok(0);
611        }
612
613        // Target 90% of max to avoid constant eviction
614        let target = self.max_size_bytes * 90 / 100;
615        let mut freed = 0u64;
616        let mut current_size = current;
617
618        // Phase 1: Evict orphaned blobs (not in any tree and not pinned)
619        if self.evict_orphans {
620            let orphan_freed = self.evict_orphaned_blobs()?;
621            freed += orphan_freed;
622            current_size = current_size.saturating_sub(orphan_freed);
623
624            if orphan_freed > 0 {
625                tracing::info!("Evicted orphaned blobs: {} bytes freed", orphan_freed);
626            }
627        } else {
628            tracing::debug!("Skipping orphan blob eviction; storage.evict_orphans=false");
629        }
630
631        // Check if we're now under target
632        if current_size <= target {
633            if freed > 0 {
634                tracing::info!("Eviction complete: {} bytes freed", freed);
635            }
636            return Ok(freed);
637        }
638
639        // Phase 2: Evict trees by priority (lowest first) and age (oldest first)
640        // Own trees CAN be evicted (just last), but PINNED trees are never evicted
641        let evictable = self.get_evictable_trees()?;
642
643        for (root_hash, meta) in evictable {
644            if current_size <= target {
645                break;
646            }
647
648            let root_hex = to_hex(&root_hash);
649
650            // Never evict pinned trees
651            if self.is_pinned(&root_hash)? {
652                continue;
653            }
654
655            let tree_freed = self.unindex_tree(&root_hash)?;
656            freed += tree_freed;
657            current_size = current_size.saturating_sub(tree_freed);
658
659            tracing::info!(
660                "Evicted tree {} (owner={}, priority={}, {} bytes)",
661                &root_hex[..8],
662                &meta.owner[..8.min(meta.owner.len())],
663                meta.priority,
664                tree_freed
665            );
666        }
667
668        if freed > 0 {
669            tracing::info!("Eviction complete: {} bytes freed", freed);
670        }
671
672        Ok(freed)
673    }
674
675    /// Evict blobs that are not part of any indexed tree and not pinned
676    fn evict_orphaned_blobs(&self) -> Result<u64> {
677        let mut freed = 0u64;
678
679        // Get all blob hashes from store
680        let all_hashes = self
681            .router
682            .list()
683            .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
684
685        // Get pinned hashes as raw bytes
686        let rtxn = self.env.read_txn()?;
687        let pinned: HashSet<Hash> = self
688            .pins
689            .iter(&rtxn)?
690            .filter_map(|item| item.ok())
691            .filter_map(|(hash_bytes, _)| {
692                if hash_bytes.len() == 32 {
693                    let mut hash = [0u8; 32];
694                    hash.copy_from_slice(hash_bytes);
695                    Some(hash)
696                } else {
697                    None
698                }
699            })
700            .collect();
701        drop(rtxn);
702
703        let protected_hashes = self.protected_hashes()?;
704
705        // Find and delete orphaned blobs
706        for hash in all_hashes {
707            // Skip if pinned
708            if pinned.contains(&hash) {
709                continue;
710            }
711
712            // Skip if part of any tree
713            if protected_hashes.contains(&hash) {
714                continue;
715            }
716
717            // Skip owned Blossom uploads
718            if self.blob_has_owners(&hash)? {
719                continue;
720            }
721
722            // This blob is orphaned - delete locally (keep S3 as archive)
723            if let Ok(Some(data)) = self.router.get_sync(&hash) {
724                freed += data.len() as u64;
725                let _ = self.router.delete_local_only(&hash);
726                tracing::debug!(
727                    "Deleted orphaned blob {} ({} bytes)",
728                    &to_hex(&hash)[..8],
729                    data.len()
730                );
731            }
732        }
733
734        Ok(freed)
735    }
736
737    /// Get the maximum storage size in bytes
738    pub fn max_size_bytes(&self) -> u64 {
739        self.max_size_bytes
740    }
741
742    /// Get storage usage by priority tier
743    pub fn storage_by_priority(&self) -> Result<StorageByPriority> {
744        let rtxn = self.env.read_txn()?;
745        let mut own = 0u64;
746        let mut followed = 0u64;
747        let mut other = 0u64;
748
749        for item in self.tree_meta.iter(&rtxn)? {
750            let (_, bytes) = item?;
751            let meta: TreeMeta = rmp_serde::from_slice(bytes)
752                .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
753
754            if meta.priority == PRIORITY_OWN {
755                own += meta.total_size;
756            } else if meta.priority >= PRIORITY_FOLLOWED {
757                followed += meta.total_size;
758            } else {
759                other += meta.total_size;
760            }
761        }
762
763        Ok(StorageByPriority {
764            own,
765            followed,
766            other,
767        })
768    }
769
770    /// Get storage statistics
771    pub fn get_storage_stats(&self) -> Result<StorageStats> {
772        let rtxn = self.env.read_txn()?;
773        let total_pins = self.pins.len(&rtxn)? as usize;
774
775        let stats = self
776            .router
777            .stats()
778            .map_err(|e| anyhow::anyhow!("Failed to get stats: {}", e))?;
779
780        Ok(StorageStats {
781            total_dags: stats.count,
782            pinned_dags: total_pins,
783            total_bytes: stats.total_bytes,
784        })
785    }
786}
787
788#[cfg(test)]
789mod tests {
790    use super::*;
791    use hashtree_core::Cid;
792    use hashtree_index::{BTree, BTreeOptions};
793    use tempfile::TempDir;
794
795    use crate::storage::PRIORITY_OTHER;
796
797    fn write_root_file(path: &Path, cid: &Cid) {
798        #[derive(Serialize)]
799        struct StoredCid {
800            hash: [u8; 32],
801            key: Option<[u8; 32]>,
802        }
803
804        std::fs::create_dir_all(path.parent().expect("root file parent")).expect("create dir");
805        let bytes = rmp_serde::to_vec_named(&StoredCid {
806            hash: cid.hash,
807            key: cid.key,
808        })
809        .expect("encode cid");
810        std::fs::write(path, bytes).expect("write root file");
811    }
812
813    fn build_test_tree(store: &HashtreeStore) -> Cid {
814        let index = BTree::new(store.store_arc(), BTreeOptions { order: Some(8) });
815        sync_block_on(index.build(vec![
816            ("alpha".to_string(), "one".to_string()),
817            ("beta".to_string(), "two".to_string()),
818            ("gamma".to_string(), "three".to_string()),
819        ]))
820        .expect("build btree")
821        .expect("non-empty root")
822    }
823
824    #[test]
825    fn orphan_cleanup_keeps_indexed_tree_hashes() {
826        let temp_dir = TempDir::new().expect("temp dir");
827        let store = HashtreeStore::with_options(temp_dir.path(), None, 1024).expect("store");
828        let cid = build_test_tree(&store);
829
830        store
831            .index_tree(
832                &cid.hash,
833                "owner",
834                Some("tree"),
835                PRIORITY_OTHER,
836                Some("owner/tree"),
837            )
838            .expect("index tree");
839        let freed = store.evict_orphaned_blobs().expect("orphan cleanup");
840
841        assert!(freed < 1024);
842        assert!(store.blob_exists(&cid.hash).expect("root exists"));
843    }
844
845    #[test]
846    fn list_pins_with_names_uses_indexed_tree_metadata() {
847        let temp_dir = TempDir::new().expect("temp dir");
848        let store = HashtreeStore::with_options(temp_dir.path(), None, 1024 * 1024).expect("store");
849        let cid = build_test_tree(&store);
850
851        store.pin(&cid.hash).expect("pin tree");
852        store
853            .index_tree(
854                &cid.hash,
855                "npub1example",
856                Some("playlist"),
857                PRIORITY_OTHER,
858                Some("npub1example/playlist"),
859            )
860            .expect("index tree");
861
862        let pins = store.list_pins_with_names().expect("list pins");
863
864        assert_eq!(pins.len(), 1);
865        assert_eq!(pins[0].name, "npub1example/playlist");
866    }
867
868    #[test]
869    fn get_tree_ref_returns_stored_root() {
870        let temp_dir = TempDir::new().expect("temp dir");
871        let store = HashtreeStore::with_options(temp_dir.path(), None, 1024 * 1024).expect("store");
872        let cid = build_test_tree(&store);
873
874        store
875            .index_tree(
876                &cid.hash,
877                "npub1example",
878                Some("playlist"),
879                PRIORITY_OTHER,
880                Some("npub1example/playlist"),
881            )
882            .expect("index tree");
883
884        assert_eq!(
885            store
886                .get_tree_ref("npub1example/playlist")
887                .expect("tree ref lookup"),
888            Some(cid.hash)
889        );
890    }
891
892    #[test]
893    fn orphan_cleanup_keeps_socialgraph_root_hashes() {
894        let temp_dir = TempDir::new().expect("temp dir");
895        let store = HashtreeStore::with_options(temp_dir.path(), None, 1024).expect("store");
896        let cid = build_test_tree(&store);
897        write_root_file(
898            &temp_dir.path().join("socialgraph/events-root.msgpack"),
899            &cid,
900        );
901
902        let freed = store.evict_orphaned_blobs().expect("orphan cleanup");
903
904        assert!(freed < 1024);
905        assert!(store.blob_exists(&cid.hash).expect("root exists"));
906    }
907}