Skip to main content

hashtree_cli/storage/
retention.rs

1use anyhow::Result;
2use futures::executor::block_on as sync_block_on;
3use hashtree_core::store::Store;
4use hashtree_core::{to_hex, types::Hash, HashTree, HashTreeConfig};
5use serde::{Deserialize, Serialize};
6use std::collections::HashSet;
7use std::time::{SystemTime, UNIX_EPOCH};
8
9use super::{HashtreeStore, PRIORITY_FOLLOWED, PRIORITY_OWN};
10
11/// Metadata for a synced tree (for eviction tracking)
12#[derive(Debug, Clone, Serialize, Deserialize)]
13pub struct TreeMeta {
14    /// Pubkey of tree owner
15    pub owner: String,
16    /// Tree name if known (from nostr key like "npub.../name")
17    pub name: Option<String>,
18    /// Unix timestamp when this tree was synced
19    pub synced_at: u64,
20    /// Total size of all blobs in this tree
21    pub total_size: u64,
22    /// Eviction priority: 255=own/pinned, 128=followed, 64=other
23    pub priority: u8,
24}
25
26#[derive(Debug)]
27pub struct StorageStats {
28    pub total_dags: usize,
29    pub pinned_dags: usize,
30    pub total_bytes: u64,
31}
32
33/// Storage usage broken down by priority tier
34#[derive(Debug, Clone)]
35pub struct StorageByPriority {
36    /// Own/pinned trees (priority 255)
37    pub own: u64,
38    /// Followed users' trees (priority 128)
39    pub followed: u64,
40    /// Other trees (priority 64)
41    pub other: u64,
42}
43
44#[derive(Debug, Clone)]
45pub struct PinnedItem {
46    pub cid: String,
47    pub name: String,
48    pub is_directory: bool,
49}
50
51impl HashtreeStore {
52    /// Pin a hash (prevent garbage collection)
53    pub fn pin(&self, hash: &[u8; 32]) -> Result<()> {
54        let mut wtxn = self.env.write_txn()?;
55        self.pins.put(&mut wtxn, hash.as_slice(), &())?;
56        wtxn.commit()?;
57        Ok(())
58    }
59
60    /// Unpin a hash (allow garbage collection)
61    pub fn unpin(&self, hash: &[u8; 32]) -> Result<()> {
62        let mut wtxn = self.env.write_txn()?;
63        self.pins.delete(&mut wtxn, hash.as_slice())?;
64        wtxn.commit()?;
65        Ok(())
66    }
67
68    /// Check if hash is pinned
69    pub fn is_pinned(&self, hash: &[u8; 32]) -> Result<bool> {
70        let rtxn = self.env.read_txn()?;
71        Ok(self.pins.get(&rtxn, hash.as_slice())?.is_some())
72    }
73
74    /// List all pinned hashes (raw bytes)
75    pub fn list_pins_raw(&self) -> Result<Vec<[u8; 32]>> {
76        let rtxn = self.env.read_txn()?;
77        let mut pins = Vec::new();
78
79        for item in self.pins.iter(&rtxn)? {
80            let (hash_bytes, _) = item?;
81            if hash_bytes.len() == 32 {
82                let mut hash = [0u8; 32];
83                hash.copy_from_slice(hash_bytes);
84                pins.push(hash);
85            }
86        }
87
88        Ok(pins)
89    }
90
91    /// List all pinned hashes with names
92    pub fn list_pins_with_names(&self) -> Result<Vec<PinnedItem>> {
93        let rtxn = self.env.read_txn()?;
94        let store = self.store_arc();
95        let tree = HashTree::new(HashTreeConfig::new(store).public());
96        let mut pins = Vec::new();
97
98        for item in self.pins.iter(&rtxn)? {
99            let (hash_bytes, _) = item?;
100            if hash_bytes.len() != 32 {
101                continue;
102            }
103            let mut hash = [0u8; 32];
104            hash.copy_from_slice(hash_bytes);
105
106            // Try to determine if it's a directory
107            let is_directory =
108                sync_block_on(async { tree.is_directory(&hash).await.unwrap_or(false) });
109
110            pins.push(PinnedItem {
111                cid: to_hex(&hash),
112                name: "Unknown".to_string(),
113                is_directory,
114            });
115        }
116
117        Ok(pins)
118    }
119
120    // === Tree indexing for eviction ===
121
122    /// Index a tree after sync - tracks all blobs in the tree for eviction
123    ///
124    /// If `ref_key` is provided (e.g. "npub.../name"), it will replace any existing
125    /// tree with that ref, allowing old versions to be evicted.
126    pub fn index_tree(
127        &self,
128        root_hash: &Hash,
129        owner: &str,
130        name: Option<&str>,
131        priority: u8,
132        ref_key: Option<&str>,
133    ) -> Result<()> {
134        let root_hex = to_hex(root_hash);
135
136        // If ref_key provided, check for and unindex old version
137        if let Some(key) = ref_key {
138            let rtxn = self.env.read_txn()?;
139            if let Some(old_hash_bytes) = self.tree_refs.get(&rtxn, key)? {
140                if old_hash_bytes != root_hash.as_slice() {
141                    let old_hash: Hash = old_hash_bytes
142                        .try_into()
143                        .map_err(|_| anyhow::anyhow!("Invalid hash in tree_refs"))?;
144                    drop(rtxn);
145                    let _ = self.unpin(&old_hash);
146                    // Unindex old tree (will delete orphaned blobs)
147                    let _ = self.unindex_tree(&old_hash);
148                    tracing::debug!("Replaced old tree for ref {}", key);
149                }
150            }
151        }
152
153        let store = self.store_arc();
154        let tree = HashTree::new(HashTreeConfig::new(store).public());
155
156        // Walk tree and collect all blob hashes + compute total size
157        let (blob_hashes, total_size) =
158            sync_block_on(async { self.collect_tree_blobs(&tree, root_hash).await })?;
159
160        let mut wtxn = self.env.write_txn()?;
161
162        // Store blob-tree relationships (64-byte key: blob_hash ++ tree_hash)
163        for blob_hash in &blob_hashes {
164            let mut key = [0u8; 64];
165            key[..32].copy_from_slice(blob_hash);
166            key[32..].copy_from_slice(root_hash);
167            self.blob_trees.put(&mut wtxn, &key[..], &())?;
168        }
169
170        // Store tree metadata
171        let meta = TreeMeta {
172            owner: owner.to_string(),
173            name: name.map(|s| s.to_string()),
174            synced_at: SystemTime::now()
175                .duration_since(UNIX_EPOCH)
176                .unwrap()
177                .as_secs(),
178            total_size,
179            priority,
180        };
181        let meta_bytes = rmp_serde::to_vec(&meta)
182            .map_err(|e| anyhow::anyhow!("Failed to serialize TreeMeta: {}", e))?;
183        self.tree_meta
184            .put(&mut wtxn, root_hash.as_slice(), &meta_bytes)?;
185
186        // Store ref -> hash mapping if ref_key provided
187        if let Some(key) = ref_key {
188            self.tree_refs.put(&mut wtxn, key, root_hash.as_slice())?;
189        }
190
191        wtxn.commit()?;
192
193        tracing::debug!(
194            "Indexed tree {} ({} blobs, {} bytes, priority {})",
195            &root_hex[..8],
196            blob_hashes.len(),
197            total_size,
198            priority
199        );
200
201        Ok(())
202    }
203
204    /// Collect all blob hashes in a tree and compute total size
205    async fn collect_tree_blobs<S: Store>(
206        &self,
207        tree: &HashTree<S>,
208        root: &Hash,
209    ) -> Result<(Vec<Hash>, u64)> {
210        let mut blobs = Vec::new();
211        let mut total_size = 0u64;
212        let mut stack = vec![*root];
213
214        while let Some(hash) = stack.pop() {
215            // Check if it's a tree node
216            let is_tree = tree
217                .is_tree(&hash)
218                .await
219                .map_err(|e| anyhow::anyhow!("Failed to check tree: {}", e))?;
220
221            if is_tree {
222                // Get tree node and add children to stack
223                if let Some(node) = tree
224                    .get_tree_node(&hash)
225                    .await
226                    .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
227                {
228                    for link in &node.links {
229                        stack.push(link.hash);
230                    }
231                }
232            } else {
233                // It's a blob - get its size
234                if let Some(data) = self
235                    .router
236                    .get_sync(&hash)
237                    .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))?
238                {
239                    total_size += data.len() as u64;
240                    blobs.push(hash);
241                }
242            }
243        }
244
245        Ok((blobs, total_size))
246    }
247
248    /// Unindex a tree - removes blob-tree mappings and deletes orphaned blobs
249    /// Returns the number of bytes freed
250    pub fn unindex_tree(&self, root_hash: &Hash) -> Result<u64> {
251        let root_hex = to_hex(root_hash);
252
253        let store = self.store_arc();
254        let tree = HashTree::new(HashTreeConfig::new(store).public());
255
256        // Walk tree and collect all blob hashes
257        let (blob_hashes, _) =
258            sync_block_on(async { self.collect_tree_blobs(&tree, root_hash).await })?;
259
260        let mut wtxn = self.env.write_txn()?;
261        let mut freed = 0u64;
262
263        // For each blob, remove the blob-tree entry and check if orphaned
264        for blob_hash in &blob_hashes {
265            // Delete blob-tree entry (64-byte key: blob_hash ++ tree_hash)
266            let mut key = [0u8; 64];
267            key[..32].copy_from_slice(blob_hash);
268            key[32..].copy_from_slice(root_hash);
269            self.blob_trees.delete(&mut wtxn, &key[..])?;
270
271            // Check if blob is in any other tree (prefix scan on first 32 bytes)
272            let rtxn = self.env.read_txn()?;
273            let mut has_other_tree = false;
274
275            for item in self.blob_trees.prefix_iter(&rtxn, &blob_hash[..])? {
276                if item.is_ok() {
277                    has_other_tree = true;
278                    break;
279                }
280            }
281            drop(rtxn);
282
283            // If orphaned, delete the blob
284            if !has_other_tree {
285                if let Some(data) = self
286                    .router
287                    .get_sync(blob_hash)
288                    .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))?
289                {
290                    freed += data.len() as u64;
291                    // Delete locally only - keep S3 as archive
292                    self.router
293                        .delete_local_only(blob_hash)
294                        .map_err(|e| anyhow::anyhow!("Failed to delete blob: {}", e))?;
295                }
296            }
297        }
298
299        // Delete tree node itself if exists
300        if let Some(data) = self
301            .router
302            .get_sync(root_hash)
303            .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
304        {
305            freed += data.len() as u64;
306            // Delete locally only - keep S3 as archive
307            self.router
308                .delete_local_only(root_hash)
309                .map_err(|e| anyhow::anyhow!("Failed to delete tree node: {}", e))?;
310        }
311
312        // Delete tree metadata
313        self.tree_meta.delete(&mut wtxn, root_hash.as_slice())?;
314
315        wtxn.commit()?;
316
317        tracing::debug!("Unindexed tree {} ({} bytes freed)", &root_hex[..8], freed);
318
319        Ok(freed)
320    }
321
322    /// Get tree metadata
323    pub fn get_tree_meta(&self, root_hash: &Hash) -> Result<Option<TreeMeta>> {
324        let rtxn = self.env.read_txn()?;
325        if let Some(bytes) = self.tree_meta.get(&rtxn, root_hash.as_slice())? {
326            let meta: TreeMeta = rmp_serde::from_slice(bytes)
327                .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
328            Ok(Some(meta))
329        } else {
330            Ok(None)
331        }
332    }
333
334    /// List all indexed trees
335    pub fn list_indexed_trees(&self) -> Result<Vec<(Hash, TreeMeta)>> {
336        let rtxn = self.env.read_txn()?;
337        let mut trees = Vec::new();
338
339        for item in self.tree_meta.iter(&rtxn)? {
340            let (hash_bytes, meta_bytes) = item?;
341            let hash: Hash = hash_bytes
342                .try_into()
343                .map_err(|_| anyhow::anyhow!("Invalid hash in tree_meta"))?;
344            let meta: TreeMeta = rmp_serde::from_slice(meta_bytes)
345                .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
346            trees.push((hash, meta));
347        }
348
349        Ok(trees)
350    }
351
352    /// Get total tracked storage size (sum of all tree_meta.total_size)
353    pub fn tracked_size(&self) -> Result<u64> {
354        let rtxn = self.env.read_txn()?;
355        let mut total = 0u64;
356
357        for item in self.tree_meta.iter(&rtxn)? {
358            let (_, bytes) = item?;
359            let meta: TreeMeta = rmp_serde::from_slice(bytes)
360                .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
361            total += meta.total_size;
362        }
363
364        Ok(total)
365    }
366
367    /// Get evictable trees sorted by (priority ASC, synced_at ASC)
368    fn get_evictable_trees(&self) -> Result<Vec<(Hash, TreeMeta)>> {
369        let mut trees = self.list_indexed_trees()?;
370
371        // Sort by priority (lower first), then by synced_at (older first)
372        trees.sort_by(|a, b| match a.1.priority.cmp(&b.1.priority) {
373            std::cmp::Ordering::Equal => a.1.synced_at.cmp(&b.1.synced_at),
374            other => other,
375        });
376
377        Ok(trees)
378    }
379
380    /// Run eviction if storage is over quota
381    /// Returns bytes freed
382    ///
383    /// Eviction order:
384    /// 1. Orphaned blobs (not in any indexed tree and not pinned)
385    /// 2. Trees by priority (lowest first) and age (oldest first)
386    pub fn evict_if_needed(&self) -> Result<u64> {
387        // Get actual storage used
388        let stats = self
389            .router
390            .stats()
391            .map_err(|e| anyhow::anyhow!("Failed to get stats: {}", e))?;
392        let current = stats.total_bytes;
393
394        if current <= self.max_size_bytes {
395            return Ok(0);
396        }
397
398        // Target 90% of max to avoid constant eviction
399        let target = self.max_size_bytes * 90 / 100;
400        let mut freed = 0u64;
401        let mut current_size = current;
402
403        // Phase 1: Evict orphaned blobs (not in any tree and not pinned)
404        let orphan_freed = self.evict_orphaned_blobs()?;
405        freed += orphan_freed;
406        current_size = current_size.saturating_sub(orphan_freed);
407
408        if orphan_freed > 0 {
409            tracing::info!("Evicted orphaned blobs: {} bytes freed", orphan_freed);
410        }
411
412        // Check if we're now under target
413        if current_size <= target {
414            if freed > 0 {
415                tracing::info!("Eviction complete: {} bytes freed", freed);
416            }
417            return Ok(freed);
418        }
419
420        // Phase 2: Evict trees by priority (lowest first) and age (oldest first)
421        // Own trees CAN be evicted (just last), but PINNED trees are never evicted
422        let evictable = self.get_evictable_trees()?;
423
424        for (root_hash, meta) in evictable {
425            if current_size <= target {
426                break;
427            }
428
429            let root_hex = to_hex(&root_hash);
430
431            // Never evict pinned trees
432            if self.is_pinned(&root_hash)? {
433                continue;
434            }
435
436            let tree_freed = self.unindex_tree(&root_hash)?;
437            freed += tree_freed;
438            current_size = current_size.saturating_sub(tree_freed);
439
440            tracing::info!(
441                "Evicted tree {} (owner={}, priority={}, {} bytes)",
442                &root_hex[..8],
443                &meta.owner[..8.min(meta.owner.len())],
444                meta.priority,
445                tree_freed
446            );
447        }
448
449        if freed > 0 {
450            tracing::info!("Eviction complete: {} bytes freed", freed);
451        }
452
453        Ok(freed)
454    }
455
456    /// Evict blobs that are not part of any indexed tree and not pinned
457    fn evict_orphaned_blobs(&self) -> Result<u64> {
458        let mut freed = 0u64;
459
460        // Get all blob hashes from store
461        let all_hashes = self
462            .router
463            .list()
464            .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
465
466        // Get pinned hashes as raw bytes
467        let rtxn = self.env.read_txn()?;
468        let pinned: HashSet<Hash> = self
469            .pins
470            .iter(&rtxn)?
471            .filter_map(|item| item.ok())
472            .filter_map(|(hash_bytes, _)| {
473                if hash_bytes.len() == 32 {
474                    let mut hash = [0u8; 32];
475                    hash.copy_from_slice(hash_bytes);
476                    Some(hash)
477                } else {
478                    None
479                }
480            })
481            .collect();
482
483        // Collect all blob hashes that are in at least one tree
484        // Key format is blob_hash (32 bytes) ++ tree_hash (32 bytes)
485        let mut blobs_in_trees: HashSet<Hash> = HashSet::new();
486        for (key_bytes, _) in self.blob_trees.iter(&rtxn)?.flatten() {
487            if key_bytes.len() >= 32 {
488                let blob_hash: Hash = key_bytes[..32].try_into().unwrap();
489                blobs_in_trees.insert(blob_hash);
490            }
491        }
492        drop(rtxn);
493
494        // Find and delete orphaned blobs
495        for hash in all_hashes {
496            // Skip if pinned
497            if pinned.contains(&hash) {
498                continue;
499            }
500
501            // Skip if part of any tree
502            if blobs_in_trees.contains(&hash) {
503                continue;
504            }
505
506            // This blob is orphaned - delete locally (keep S3 as archive)
507            if let Ok(Some(data)) = self.router.get_sync(&hash) {
508                freed += data.len() as u64;
509                let _ = self.router.delete_local_only(&hash);
510                tracing::debug!(
511                    "Deleted orphaned blob {} ({} bytes)",
512                    &to_hex(&hash)[..8],
513                    data.len()
514                );
515            }
516        }
517
518        Ok(freed)
519    }
520
521    /// Get the maximum storage size in bytes
522    pub fn max_size_bytes(&self) -> u64 {
523        self.max_size_bytes
524    }
525
526    /// Get storage usage by priority tier
527    pub fn storage_by_priority(&self) -> Result<StorageByPriority> {
528        let rtxn = self.env.read_txn()?;
529        let mut own = 0u64;
530        let mut followed = 0u64;
531        let mut other = 0u64;
532
533        for item in self.tree_meta.iter(&rtxn)? {
534            let (_, bytes) = item?;
535            let meta: TreeMeta = rmp_serde::from_slice(bytes)
536                .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
537
538            if meta.priority == PRIORITY_OWN {
539                own += meta.total_size;
540            } else if meta.priority >= PRIORITY_FOLLOWED {
541                followed += meta.total_size;
542            } else {
543                other += meta.total_size;
544            }
545        }
546
547        Ok(StorageByPriority {
548            own,
549            followed,
550            other,
551        })
552    }
553
554    /// Get storage statistics
555    pub fn get_storage_stats(&self) -> Result<StorageStats> {
556        let rtxn = self.env.read_txn()?;
557        let total_pins = self.pins.len(&rtxn)? as usize;
558
559        let stats = self
560            .router
561            .stats()
562            .map_err(|e| anyhow::anyhow!("Failed to get stats: {}", e))?;
563
564        Ok(StorageStats {
565            total_dags: stats.count,
566            pinned_dags: total_pins,
567            total_bytes: stats.total_bytes,
568        })
569    }
570}