Skip to main content

hashtree_cli/storage/
retention.rs

1use anyhow::Result;
2use futures::executor::block_on as sync_block_on;
3use hashtree_core::store::Store;
4use hashtree_core::{to_hex, types::Hash, HashTree, HashTreeConfig};
5use serde::{Deserialize, Serialize};
6use std::collections::HashSet;
7use std::time::{SystemTime, UNIX_EPOCH};
8
9use super::{HashtreeStore, PRIORITY_FOLLOWED, PRIORITY_OWN};
10
11/// Metadata for a synced tree (for eviction tracking)
12#[derive(Debug, Clone, Serialize, Deserialize)]
13pub struct TreeMeta {
14    /// Pubkey of tree owner
15    pub owner: String,
16    /// Tree name if known (from nostr key like "npub.../name")
17    pub name: Option<String>,
18    /// Unix timestamp when this tree was synced
19    pub synced_at: u64,
20    /// Total size of all blobs in this tree
21    pub total_size: u64,
22    /// Eviction priority: 255=own/pinned, 128=followed, 64=other
23    pub priority: u8,
24}
25
26#[derive(Debug)]
27pub struct StorageStats {
28    pub total_dags: usize,
29    pub pinned_dags: usize,
30    pub total_bytes: u64,
31}
32
33/// Storage usage broken down by priority tier
34#[derive(Debug, Clone)]
35pub struct StorageByPriority {
36    /// Own/pinned trees (priority 255)
37    pub own: u64,
38    /// Followed users' trees (priority 128)
39    pub followed: u64,
40    /// Other trees (priority 64)
41    pub other: u64,
42}
43
44#[derive(Debug, Clone)]
45pub struct PinnedItem {
46    pub cid: String,
47    pub name: String,
48    pub is_directory: bool,
49}
50
51impl HashtreeStore {
52    /// Pin a hash (prevent garbage collection)
53    pub fn pin(&self, hash: &[u8; 32]) -> Result<()> {
54        let mut wtxn = self.env.write_txn()?;
55        self.pins.put(&mut wtxn, hash.as_slice(), &())?;
56        wtxn.commit()?;
57        Ok(())
58    }
59
60    /// Unpin a hash (allow garbage collection)
61    pub fn unpin(&self, hash: &[u8; 32]) -> Result<()> {
62        let mut wtxn = self.env.write_txn()?;
63        self.pins.delete(&mut wtxn, hash.as_slice())?;
64        wtxn.commit()?;
65        Ok(())
66    }
67
68    /// Check if hash is pinned
69    pub fn is_pinned(&self, hash: &[u8; 32]) -> Result<bool> {
70        let rtxn = self.env.read_txn()?;
71        Ok(self.pins.get(&rtxn, hash.as_slice())?.is_some())
72    }
73
74    /// List all pinned hashes (raw bytes)
75    pub fn list_pins_raw(&self) -> Result<Vec<[u8; 32]>> {
76        let rtxn = self.env.read_txn()?;
77        let mut pins = Vec::new();
78
79        for item in self.pins.iter(&rtxn)? {
80            let (hash_bytes, _) = item?;
81            if hash_bytes.len() == 32 {
82                let mut hash = [0u8; 32];
83                hash.copy_from_slice(hash_bytes);
84                pins.push(hash);
85            }
86        }
87
88        Ok(pins)
89    }
90
91    /// List all pinned hashes with names
92    pub fn list_pins_with_names(&self) -> Result<Vec<PinnedItem>> {
93        let rtxn = self.env.read_txn()?;
94        let store = self.store_arc();
95        let tree = HashTree::new(HashTreeConfig::new(store).public());
96        let mut pins = Vec::new();
97
98        for item in self.pins.iter(&rtxn)? {
99            let (hash_bytes, _) = item?;
100            if hash_bytes.len() != 32 {
101                continue;
102            }
103            let mut hash = [0u8; 32];
104            hash.copy_from_slice(hash_bytes);
105
106            // Try to determine if it's a directory
107            let is_directory =
108                sync_block_on(async { tree.is_directory(&hash).await.unwrap_or(false) });
109
110            pins.push(PinnedItem {
111                cid: to_hex(&hash),
112                name: "Unknown".to_string(),
113                is_directory,
114            });
115        }
116
117        Ok(pins)
118    }
119
120    // === Tree indexing for eviction ===
121
122    /// Index a tree after sync - tracks all blobs in the tree for eviction
123    ///
124    /// If `ref_key` is provided (e.g. "npub.../name"), it will replace any existing
125    /// tree with that ref, allowing old versions to be evicted.
126    pub fn index_tree(
127        &self,
128        root_hash: &Hash,
129        owner: &str,
130        name: Option<&str>,
131        priority: u8,
132        ref_key: Option<&str>,
133    ) -> Result<()> {
134        let root_hex = to_hex(root_hash);
135
136        // If ref_key provided, check for and unindex old version
137        if let Some(key) = ref_key {
138            let rtxn = self.env.read_txn()?;
139            if let Some(old_hash_bytes) = self.tree_refs.get(&rtxn, key)? {
140                if old_hash_bytes != root_hash.as_slice() {
141                    let old_hash: Hash = old_hash_bytes
142                        .try_into()
143                        .map_err(|_| anyhow::anyhow!("Invalid hash in tree_refs"))?;
144                    drop(rtxn);
145                    // Unindex old tree (will delete orphaned blobs)
146                    let _ = self.unindex_tree(&old_hash);
147                    tracing::debug!("Replaced old tree for ref {}", key);
148                }
149            }
150        }
151
152        let store = self.store_arc();
153        let tree = HashTree::new(HashTreeConfig::new(store).public());
154
155        // Walk tree and collect all blob hashes + compute total size
156        let (blob_hashes, total_size) =
157            sync_block_on(async { self.collect_tree_blobs(&tree, root_hash).await })?;
158
159        let mut wtxn = self.env.write_txn()?;
160
161        // Store blob-tree relationships (64-byte key: blob_hash ++ tree_hash)
162        for blob_hash in &blob_hashes {
163            let mut key = [0u8; 64];
164            key[..32].copy_from_slice(blob_hash);
165            key[32..].copy_from_slice(root_hash);
166            self.blob_trees.put(&mut wtxn, &key[..], &())?;
167        }
168
169        // Store tree metadata
170        let meta = TreeMeta {
171            owner: owner.to_string(),
172            name: name.map(|s| s.to_string()),
173            synced_at: SystemTime::now()
174                .duration_since(UNIX_EPOCH)
175                .unwrap()
176                .as_secs(),
177            total_size,
178            priority,
179        };
180        let meta_bytes = rmp_serde::to_vec(&meta)
181            .map_err(|e| anyhow::anyhow!("Failed to serialize TreeMeta: {}", e))?;
182        self.tree_meta
183            .put(&mut wtxn, root_hash.as_slice(), &meta_bytes)?;
184
185        // Store ref -> hash mapping if ref_key provided
186        if let Some(key) = ref_key {
187            self.tree_refs.put(&mut wtxn, key, root_hash.as_slice())?;
188        }
189
190        wtxn.commit()?;
191
192        tracing::debug!(
193            "Indexed tree {} ({} blobs, {} bytes, priority {})",
194            &root_hex[..8],
195            blob_hashes.len(),
196            total_size,
197            priority
198        );
199
200        Ok(())
201    }
202
203    /// Collect all blob hashes in a tree and compute total size
204    async fn collect_tree_blobs<S: Store>(
205        &self,
206        tree: &HashTree<S>,
207        root: &Hash,
208    ) -> Result<(Vec<Hash>, u64)> {
209        let mut blobs = Vec::new();
210        let mut total_size = 0u64;
211        let mut stack = vec![*root];
212
213        while let Some(hash) = stack.pop() {
214            // Check if it's a tree node
215            let is_tree = tree
216                .is_tree(&hash)
217                .await
218                .map_err(|e| anyhow::anyhow!("Failed to check tree: {}", e))?;
219
220            if is_tree {
221                // Get tree node and add children to stack
222                if let Some(node) = tree
223                    .get_tree_node(&hash)
224                    .await
225                    .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
226                {
227                    for link in &node.links {
228                        stack.push(link.hash);
229                    }
230                }
231            } else {
232                // It's a blob - get its size
233                if let Some(data) = self
234                    .router
235                    .get_sync(&hash)
236                    .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))?
237                {
238                    total_size += data.len() as u64;
239                    blobs.push(hash);
240                }
241            }
242        }
243
244        Ok((blobs, total_size))
245    }
246
247    /// Unindex a tree - removes blob-tree mappings and deletes orphaned blobs
248    /// Returns the number of bytes freed
249    pub fn unindex_tree(&self, root_hash: &Hash) -> Result<u64> {
250        let root_hex = to_hex(root_hash);
251
252        let store = self.store_arc();
253        let tree = HashTree::new(HashTreeConfig::new(store).public());
254
255        // Walk tree and collect all blob hashes
256        let (blob_hashes, _) =
257            sync_block_on(async { self.collect_tree_blobs(&tree, root_hash).await })?;
258
259        let mut wtxn = self.env.write_txn()?;
260        let mut freed = 0u64;
261
262        // For each blob, remove the blob-tree entry and check if orphaned
263        for blob_hash in &blob_hashes {
264            // Delete blob-tree entry (64-byte key: blob_hash ++ tree_hash)
265            let mut key = [0u8; 64];
266            key[..32].copy_from_slice(blob_hash);
267            key[32..].copy_from_slice(root_hash);
268            self.blob_trees.delete(&mut wtxn, &key[..])?;
269
270            // Check if blob is in any other tree (prefix scan on first 32 bytes)
271            let rtxn = self.env.read_txn()?;
272            let mut has_other_tree = false;
273
274            for item in self.blob_trees.prefix_iter(&rtxn, &blob_hash[..])? {
275                if item.is_ok() {
276                    has_other_tree = true;
277                    break;
278                }
279            }
280            drop(rtxn);
281
282            // If orphaned, delete the blob
283            if !has_other_tree {
284                if let Some(data) = self
285                    .router
286                    .get_sync(blob_hash)
287                    .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))?
288                {
289                    freed += data.len() as u64;
290                    // Delete locally only - keep S3 as archive
291                    self.router
292                        .delete_local_only(blob_hash)
293                        .map_err(|e| anyhow::anyhow!("Failed to delete blob: {}", e))?;
294                }
295            }
296        }
297
298        // Delete tree node itself if exists
299        if let Some(data) = self
300            .router
301            .get_sync(root_hash)
302            .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
303        {
304            freed += data.len() as u64;
305            // Delete locally only - keep S3 as archive
306            self.router
307                .delete_local_only(root_hash)
308                .map_err(|e| anyhow::anyhow!("Failed to delete tree node: {}", e))?;
309        }
310
311        // Delete tree metadata
312        self.tree_meta.delete(&mut wtxn, root_hash.as_slice())?;
313
314        wtxn.commit()?;
315
316        tracing::debug!("Unindexed tree {} ({} bytes freed)", &root_hex[..8], freed);
317
318        Ok(freed)
319    }
320
321    /// Get tree metadata
322    pub fn get_tree_meta(&self, root_hash: &Hash) -> Result<Option<TreeMeta>> {
323        let rtxn = self.env.read_txn()?;
324        if let Some(bytes) = self.tree_meta.get(&rtxn, root_hash.as_slice())? {
325            let meta: TreeMeta = rmp_serde::from_slice(bytes)
326                .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
327            Ok(Some(meta))
328        } else {
329            Ok(None)
330        }
331    }
332
333    /// List all indexed trees
334    pub fn list_indexed_trees(&self) -> Result<Vec<(Hash, TreeMeta)>> {
335        let rtxn = self.env.read_txn()?;
336        let mut trees = Vec::new();
337
338        for item in self.tree_meta.iter(&rtxn)? {
339            let (hash_bytes, meta_bytes) = item?;
340            let hash: Hash = hash_bytes
341                .try_into()
342                .map_err(|_| anyhow::anyhow!("Invalid hash in tree_meta"))?;
343            let meta: TreeMeta = rmp_serde::from_slice(meta_bytes)
344                .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
345            trees.push((hash, meta));
346        }
347
348        Ok(trees)
349    }
350
351    /// Get total tracked storage size (sum of all tree_meta.total_size)
352    pub fn tracked_size(&self) -> Result<u64> {
353        let rtxn = self.env.read_txn()?;
354        let mut total = 0u64;
355
356        for item in self.tree_meta.iter(&rtxn)? {
357            let (_, bytes) = item?;
358            let meta: TreeMeta = rmp_serde::from_slice(bytes)
359                .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
360            total += meta.total_size;
361        }
362
363        Ok(total)
364    }
365
366    /// Get evictable trees sorted by (priority ASC, synced_at ASC)
367    fn get_evictable_trees(&self) -> Result<Vec<(Hash, TreeMeta)>> {
368        let mut trees = self.list_indexed_trees()?;
369
370        // Sort by priority (lower first), then by synced_at (older first)
371        trees.sort_by(|a, b| match a.1.priority.cmp(&b.1.priority) {
372            std::cmp::Ordering::Equal => a.1.synced_at.cmp(&b.1.synced_at),
373            other => other,
374        });
375
376        Ok(trees)
377    }
378
379    /// Run eviction if storage is over quota
380    /// Returns bytes freed
381    ///
382    /// Eviction order:
383    /// 1. Orphaned blobs (not in any indexed tree and not pinned)
384    /// 2. Trees by priority (lowest first) and age (oldest first)
385    pub fn evict_if_needed(&self) -> Result<u64> {
386        // Get actual storage used
387        let stats = self
388            .router
389            .stats()
390            .map_err(|e| anyhow::anyhow!("Failed to get stats: {}", e))?;
391        let current = stats.total_bytes;
392
393        if current <= self.max_size_bytes {
394            return Ok(0);
395        }
396
397        // Target 90% of max to avoid constant eviction
398        let target = self.max_size_bytes * 90 / 100;
399        let mut freed = 0u64;
400        let mut current_size = current;
401
402        // Phase 1: Evict orphaned blobs (not in any tree and not pinned)
403        let orphan_freed = self.evict_orphaned_blobs()?;
404        freed += orphan_freed;
405        current_size = current_size.saturating_sub(orphan_freed);
406
407        if orphan_freed > 0 {
408            tracing::info!("Evicted orphaned blobs: {} bytes freed", orphan_freed);
409        }
410
411        // Check if we're now under target
412        if current_size <= target {
413            if freed > 0 {
414                tracing::info!("Eviction complete: {} bytes freed", freed);
415            }
416            return Ok(freed);
417        }
418
419        // Phase 2: Evict trees by priority (lowest first) and age (oldest first)
420        // Own trees CAN be evicted (just last), but PINNED trees are never evicted
421        let evictable = self.get_evictable_trees()?;
422
423        for (root_hash, meta) in evictable {
424            if current_size <= target {
425                break;
426            }
427
428            let root_hex = to_hex(&root_hash);
429
430            // Never evict pinned trees
431            if self.is_pinned(&root_hash)? {
432                continue;
433            }
434
435            let tree_freed = self.unindex_tree(&root_hash)?;
436            freed += tree_freed;
437            current_size = current_size.saturating_sub(tree_freed);
438
439            tracing::info!(
440                "Evicted tree {} (owner={}, priority={}, {} bytes)",
441                &root_hex[..8],
442                &meta.owner[..8.min(meta.owner.len())],
443                meta.priority,
444                tree_freed
445            );
446        }
447
448        if freed > 0 {
449            tracing::info!("Eviction complete: {} bytes freed", freed);
450        }
451
452        Ok(freed)
453    }
454
455    /// Evict blobs that are not part of any indexed tree and not pinned
456    fn evict_orphaned_blobs(&self) -> Result<u64> {
457        let mut freed = 0u64;
458
459        // Get all blob hashes from store
460        let all_hashes = self
461            .router
462            .list()
463            .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
464
465        // Get pinned hashes as raw bytes
466        let rtxn = self.env.read_txn()?;
467        let pinned: HashSet<Hash> = self
468            .pins
469            .iter(&rtxn)?
470            .filter_map(|item| item.ok())
471            .filter_map(|(hash_bytes, _)| {
472                if hash_bytes.len() == 32 {
473                    let mut hash = [0u8; 32];
474                    hash.copy_from_slice(hash_bytes);
475                    Some(hash)
476                } else {
477                    None
478                }
479            })
480            .collect();
481
482        // Collect all blob hashes that are in at least one tree
483        // Key format is blob_hash (32 bytes) ++ tree_hash (32 bytes)
484        let mut blobs_in_trees: HashSet<Hash> = HashSet::new();
485        for (key_bytes, _) in self.blob_trees.iter(&rtxn)?.flatten() {
486            if key_bytes.len() >= 32 {
487                let blob_hash: Hash = key_bytes[..32].try_into().unwrap();
488                blobs_in_trees.insert(blob_hash);
489            }
490        }
491        drop(rtxn);
492
493        // Find and delete orphaned blobs
494        for hash in all_hashes {
495            // Skip if pinned
496            if pinned.contains(&hash) {
497                continue;
498            }
499
500            // Skip if part of any tree
501            if blobs_in_trees.contains(&hash) {
502                continue;
503            }
504
505            // This blob is orphaned - delete locally (keep S3 as archive)
506            if let Ok(Some(data)) = self.router.get_sync(&hash) {
507                freed += data.len() as u64;
508                let _ = self.router.delete_local_only(&hash);
509                tracing::debug!(
510                    "Deleted orphaned blob {} ({} bytes)",
511                    &to_hex(&hash)[..8],
512                    data.len()
513                );
514            }
515        }
516
517        Ok(freed)
518    }
519
520    /// Get the maximum storage size in bytes
521    pub fn max_size_bytes(&self) -> u64 {
522        self.max_size_bytes
523    }
524
525    /// Get storage usage by priority tier
526    pub fn storage_by_priority(&self) -> Result<StorageByPriority> {
527        let rtxn = self.env.read_txn()?;
528        let mut own = 0u64;
529        let mut followed = 0u64;
530        let mut other = 0u64;
531
532        for item in self.tree_meta.iter(&rtxn)? {
533            let (_, bytes) = item?;
534            let meta: TreeMeta = rmp_serde::from_slice(bytes)
535                .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
536
537            if meta.priority == PRIORITY_OWN {
538                own += meta.total_size;
539            } else if meta.priority >= PRIORITY_FOLLOWED {
540                followed += meta.total_size;
541            } else {
542                other += meta.total_size;
543            }
544        }
545
546        Ok(StorageByPriority {
547            own,
548            followed,
549            other,
550        })
551    }
552
553    /// Get storage statistics
554    pub fn get_storage_stats(&self) -> Result<StorageStats> {
555        let rtxn = self.env.read_txn()?;
556        let total_pins = self.pins.len(&rtxn)? as usize;
557
558        let stats = self
559            .router
560            .stats()
561            .map_err(|e| anyhow::anyhow!("Failed to get stats: {}", e))?;
562
563        Ok(StorageStats {
564            total_dags: stats.count,
565            pinned_dags: total_pins,
566            total_bytes: stats.total_bytes,
567        })
568    }
569}