Skip to main content

hashtree_cli/storage/
retention.rs

1use anyhow::Result;
2use futures::executor::block_on as sync_block_on;
3use hashtree_core::store::Store;
4use hashtree_core::{to_hex, types::Hash, HashTree, HashTreeConfig};
5use serde::de::{self, IgnoredAny, MapAccess, SeqAccess, Visitor};
6use serde::{Deserialize, Serialize};
7use std::collections::HashSet;
8use std::path::{Path, PathBuf};
9use std::time::{SystemTime, UNIX_EPOCH};
10
11use super::{BlobMetadata, HashtreeStore, PRIORITY_FOLLOWED, PRIORITY_OWN};
12
13/// Metadata for a synced tree (for eviction tracking)
14#[derive(Debug, Clone, Serialize)]
15pub struct TreeMeta {
16    /// Pubkey of tree owner
17    pub owner: String,
18    /// Tree name if known (from nostr key like "npub.../name")
19    pub name: Option<String>,
20    /// Unix timestamp when this tree was synced
21    pub synced_at: u64,
22    /// Total size of all blobs in this tree
23    pub total_size: u64,
24    /// Eviction priority: 255=own/pinned, 128=followed, 64=other
25    pub priority: u8,
26}
27
28impl<'de> Deserialize<'de> for TreeMeta {
29    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
30    where
31        D: serde::Deserializer<'de>,
32    {
33        const FIELDS: &[&str] = &[
34            "owner",
35            "name",
36            "synced_at",
37            "last_accessed_at",
38            "total_size",
39            "priority",
40        ];
41
42        struct TreeMetaVisitor;
43
44        impl<'de> Visitor<'de> for TreeMetaVisitor {
45            type Value = TreeMeta;
46
47            fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
48                formatter.write_str("TreeMeta as current or legacy metadata")
49            }
50
51            fn visit_seq<A>(self, mut seq: A) -> std::result::Result<Self::Value, A::Error>
52            where
53                A: SeqAccess<'de>,
54            {
55                let has_accidental_access_field = matches!(seq.size_hint(), Some(6));
56                let owner = seq
57                    .next_element()?
58                    .ok_or_else(|| de::Error::invalid_length(0, &self))?;
59                let name = seq
60                    .next_element()?
61                    .ok_or_else(|| de::Error::invalid_length(1, &self))?;
62                let synced_at = seq
63                    .next_element()?
64                    .ok_or_else(|| de::Error::invalid_length(2, &self))?;
65
66                if has_accidental_access_field {
67                    let _: IgnoredAny = seq
68                        .next_element()?
69                        .ok_or_else(|| de::Error::invalid_length(3, &self))?;
70                }
71
72                let total_size = seq
73                    .next_element()?
74                    .ok_or_else(|| de::Error::invalid_length(3, &self))?;
75                let priority = seq
76                    .next_element()?
77                    .ok_or_else(|| de::Error::invalid_length(4, &self))?;
78
79                Ok(TreeMeta {
80                    owner,
81                    name,
82                    synced_at,
83                    total_size,
84                    priority,
85                })
86            }
87
88            fn visit_map<A>(self, mut map: A) -> std::result::Result<Self::Value, A::Error>
89            where
90                A: MapAccess<'de>,
91            {
92                let mut owner = None;
93                let mut name = None;
94                let mut synced_at = None;
95                let mut total_size = None;
96                let mut priority = None;
97
98                while let Some(key) = map.next_key::<String>()? {
99                    match key.as_str() {
100                        "owner" => owner = Some(map.next_value()?),
101                        "name" => name = Some(map.next_value()?),
102                        "synced_at" => synced_at = Some(map.next_value()?),
103                        "last_accessed_at" => {
104                            let _: IgnoredAny = map.next_value()?;
105                        }
106                        "total_size" => total_size = Some(map.next_value()?),
107                        "priority" => priority = Some(map.next_value()?),
108                        _ => {
109                            let _: IgnoredAny = map.next_value()?;
110                        }
111                    }
112                }
113
114                Ok(TreeMeta {
115                    owner: owner.ok_or_else(|| de::Error::missing_field("owner"))?,
116                    name: name.unwrap_or(None),
117                    synced_at: synced_at.ok_or_else(|| de::Error::missing_field("synced_at"))?,
118                    total_size: total_size.ok_or_else(|| de::Error::missing_field("total_size"))?,
119                    priority: priority.ok_or_else(|| de::Error::missing_field("priority"))?,
120                })
121            }
122        }
123
124        deserializer.deserialize_struct("TreeMeta", FIELDS, TreeMetaVisitor)
125    }
126}
127
128#[derive(Debug)]
129pub struct StorageStats {
130    pub total_dags: usize,
131    pub pinned_dags: usize,
132    pub total_bytes: u64,
133}
134
135/// Storage usage broken down by priority tier
136#[derive(Debug, Clone)]
137pub struct StorageByPriority {
138    /// Own/pinned trees (priority 255)
139    pub own: u64,
140    /// Followed users' trees (priority 128)
141    pub followed: u64,
142    /// Other trees (priority 64)
143    pub other: u64,
144}
145
146#[derive(Debug, Clone)]
147pub struct PinnedItem {
148    pub cid: String,
149    pub name: String,
150    pub is_directory: bool,
151    pub size_bytes: u64,
152}
153
154#[derive(Debug, Clone)]
155pub struct OwnedBlobStats {
156    pub owner: [u8; 32],
157    pub count: usize,
158    pub total_bytes: u64,
159}
160
161fn pinned_item_name(hash: &Hash, meta: Option<&TreeMeta>) -> String {
162    let Some(meta) = meta else {
163        return to_hex(hash);
164    };
165
166    match (meta.owner.as_str(), meta.name.as_deref()) {
167        ("pinned", Some(name)) => name.to_string(),
168        ("", Some(name)) => name.to_string(),
169        (owner, Some(name)) if !owner.is_empty() => format!("{owner}/{name}"),
170        (owner, None) if !owner.is_empty() && owner != "pinned" => owner.to_string(),
171        _ => to_hex(hash),
172    }
173}
174
175fn unix_timestamp_now() -> u64 {
176    SystemTime::now()
177        .duration_since(UNIX_EPOCH)
178        .unwrap_or_default()
179        .as_secs()
180}
181
182impl HashtreeStore {
183    fn socialgraph_root_files(&self) -> [PathBuf; 4] {
184        let socialgraph = self.base_path().join("socialgraph");
185        [
186            socialgraph.join("events-root.msgpack"),
187            socialgraph.join("events-root-ambient.msgpack"),
188            socialgraph.join("profile-search-root.msgpack"),
189            socialgraph.join("profiles-by-pubkey-root.msgpack"),
190        ]
191    }
192
193    fn read_stored_cid(path: &Path) -> Result<Option<Hash>> {
194        #[derive(Deserialize)]
195        struct StoredCid {
196            hash: [u8; 32],
197            #[allow(dead_code)]
198            key: Option<[u8; 32]>,
199        }
200
201        let Ok(bytes) = std::fs::read(path) else {
202            return Ok(None);
203        };
204        let stored: StoredCid = rmp_serde::from_slice(&bytes)
205            .map_err(|e| anyhow::anyhow!("Failed to decode root file {}: {}", path.display(), e))?;
206        Ok(Some(stored.hash))
207    }
208
209    async fn collect_tree_hashes<S: Store>(
210        &self,
211        tree: &HashTree<S>,
212        root: &Hash,
213    ) -> Result<HashSet<Hash>> {
214        let mut hashes = HashSet::new();
215        let mut stack = vec![*root];
216
217        while let Some(hash) = stack.pop() {
218            if !hashes.insert(hash) {
219                continue;
220            }
221
222            let is_tree = tree
223                .is_tree(&hash)
224                .await
225                .map_err(|e| anyhow::anyhow!("Failed to check tree: {}", e))?;
226
227            if !is_tree {
228                continue;
229            }
230
231            if let Some(node) = tree
232                .get_tree_node(&hash)
233                .await
234                .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
235            {
236                for link in &node.links {
237                    stack.push(link.hash);
238                }
239            }
240        }
241
242        Ok(hashes)
243    }
244
245    fn protected_hashes(&self) -> Result<HashSet<Hash>> {
246        let mut protected = HashSet::new();
247
248        let rtxn = self.env.read_txn()?;
249        for (key_bytes, _) in self.blob_trees.iter(&rtxn)?.flatten() {
250            if key_bytes.len() >= 32 {
251                let hash: Hash = key_bytes[..32].try_into().unwrap();
252                protected.insert(hash);
253            }
254        }
255        drop(rtxn);
256
257        let tree = HashTree::new(HashTreeConfig::new(self.store_arc()).public());
258        for path in self.socialgraph_root_files() {
259            let Some(root_hash) = Self::read_stored_cid(&path)? else {
260                continue;
261            };
262            protected.extend(sync_block_on(self.collect_tree_hashes(&tree, &root_hash))?);
263        }
264
265        Ok(protected)
266    }
267
268    fn evict_disposable_orphans_to_target(&self, target_bytes: u64) -> Result<u64> {
269        let stats = self
270            .router
271            .stats()
272            .map_err(|e| anyhow::anyhow!("Failed to get stats: {}", e))?;
273        let mut current_size = stats.total_bytes;
274        if current_size <= target_bytes {
275            return Ok(0);
276        }
277
278        let rtxn = self.env.read_txn()?;
279        let pinned: HashSet<Hash> = self
280            .pins
281            .iter(&rtxn)?
282            .filter_map(|item| item.ok())
283            .filter_map(|(hash_bytes, _)| {
284                if hash_bytes.len() == 32 {
285                    let mut hash = [0u8; 32];
286                    hash.copy_from_slice(hash_bytes);
287                    Some(hash)
288                } else {
289                    None
290                }
291            })
292            .collect();
293        drop(rtxn);
294
295        let protected_hashes = self.protected_hashes()?;
296        let all_hashes = self
297            .router
298            .list()
299            .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
300
301        let mut freed = 0u64;
302        for hash in all_hashes {
303            if current_size <= target_bytes {
304                break;
305            }
306
307            if pinned.contains(&hash) || protected_hashes.contains(&hash) {
308                continue;
309            }
310
311            if self.blob_has_owners(&hash)? {
312                continue;
313            }
314
315            let Some(data) = self
316                .router
317                .get_sync(&hash)
318                .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))?
319            else {
320                continue;
321            };
322
323            let size = data.len() as u64;
324            if self
325                .router
326                .delete_local_only(&hash)
327                .map_err(|e| anyhow::anyhow!("Failed to delete orphaned blob: {}", e))?
328            {
329                freed = freed.saturating_add(size);
330                current_size = current_size.saturating_sub(size);
331                tracing::debug!(
332                    "Deleted disposable orphaned blob {} ({} bytes)",
333                    &to_hex(&hash)[..8],
334                    size
335                );
336            }
337        }
338
339        Ok(freed)
340    }
341
342    pub fn make_room_for_cached_blob(&self, incoming_bytes: u64) -> Result<u64> {
343        if self.max_size_bytes == 0 {
344            return Ok(0);
345        }
346
347        let stats = self
348            .router
349            .stats()
350            .map_err(|e| anyhow::anyhow!("Failed to get stats: {}", e))?;
351        if stats.total_bytes.saturating_add(incoming_bytes) <= self.max_size_bytes {
352            return Ok(0);
353        }
354
355        let target = if incoming_bytes >= self.max_size_bytes {
356            0
357        } else {
358            (self.max_size_bytes.saturating_mul(9) / 10)
359                .min(self.max_size_bytes.saturating_sub(incoming_bytes))
360        };
361        self.evict_disposable_orphans_to_target(target)
362    }
363
364    pub fn make_room_for_durable_blob(&self, incoming_bytes: u64) -> Result<u64> {
365        if self.max_size_bytes == 0 || incoming_bytes == 0 {
366            return Ok(0);
367        }
368
369        if incoming_bytes > self.max_size_bytes {
370            anyhow::bail!(
371                "storage limit exceeded: incoming blob is {} bytes but limit is {} bytes",
372                incoming_bytes,
373                self.max_size_bytes
374            );
375        }
376
377        let stats = self
378            .router
379            .stats()
380            .map_err(|e| anyhow::anyhow!("Failed to get stats: {}", e))?;
381        if stats.total_bytes.saturating_add(incoming_bytes) <= self.max_size_bytes {
382            return Ok(0);
383        }
384
385        let target = (self.max_size_bytes.saturating_mul(9) / 10)
386            .min(self.max_size_bytes.saturating_sub(incoming_bytes));
387        let freed = self.evict_with_policy_to_target(stats.total_bytes, target)?;
388
389        let next_stats = self
390            .router
391            .stats()
392            .map_err(|e| anyhow::anyhow!("Failed to get stats after eviction: {}", e))?;
393        if next_stats.total_bytes.saturating_add(incoming_bytes) > self.max_size_bytes {
394            anyhow::bail!(
395                "storage limit exceeded: {} bytes used, {} byte incoming blob, {} byte limit",
396                next_stats.total_bytes,
397                incoming_bytes,
398                self.max_size_bytes
399            );
400        }
401
402        Ok(freed)
403    }
404
405    pub fn relieve_cached_blob_write_pressure(&self, incoming_bytes: u64) -> Result<u64> {
406        let stats = self
407            .router
408            .stats()
409            .map_err(|e| anyhow::anyhow!("Failed to get stats: {}", e))?;
410        if stats.total_bytes == 0 {
411            return Ok(0);
412        }
413
414        let headroom = incoming_bytes.max(stats.total_bytes / 10).max(1);
415        let target = stats.total_bytes.saturating_sub(headroom);
416        self.evict_disposable_orphans_to_target(target)
417    }
418
419    /// Pin a hash (prevent garbage collection)
420    pub fn pin(&self, hash: &[u8; 32]) -> Result<()> {
421        let mut wtxn = self.env.write_txn()?;
422        self.pins.put(&mut wtxn, hash.as_slice(), &())?;
423        wtxn.commit()?;
424        Ok(())
425    }
426
427    /// Unpin a hash (allow garbage collection)
428    pub fn unpin(&self, hash: &[u8; 32]) -> Result<()> {
429        let mut wtxn = self.env.write_txn()?;
430        self.pins.delete(&mut wtxn, hash.as_slice())?;
431        wtxn.commit()?;
432        Ok(())
433    }
434
435    /// Check if hash is pinned
436    pub fn is_pinned(&self, hash: &[u8; 32]) -> Result<bool> {
437        let rtxn = self.env.read_txn()?;
438        Ok(self.pins.get(&rtxn, hash.as_slice())?.is_some())
439    }
440
441    /// List all pinned hashes (raw bytes)
442    pub fn list_pins_raw(&self) -> Result<Vec<[u8; 32]>> {
443        let rtxn = self.env.read_txn()?;
444        let mut pins = Vec::new();
445
446        for item in self.pins.iter(&rtxn)? {
447            let (hash_bytes, _) = item?;
448            if hash_bytes.len() == 32 {
449                let mut hash = [0u8; 32];
450                hash.copy_from_slice(hash_bytes);
451                pins.push(hash);
452            }
453        }
454
455        Ok(pins)
456    }
457
458    /// List all pinned hashes with names
459    pub fn list_pins_with_names(&self) -> Result<Vec<PinnedItem>> {
460        let rtxn = self.env.read_txn()?;
461        let store = self.store_arc();
462        let tree = HashTree::new(HashTreeConfig::new(store).public());
463        let mut pins = Vec::new();
464
465        for item in self.pins.iter(&rtxn)? {
466            let (hash_bytes, _) = item?;
467            if hash_bytes.len() != 32 {
468                continue;
469            }
470            let mut hash = [0u8; 32];
471            hash.copy_from_slice(hash_bytes);
472
473            // Try to determine if it's a directory
474            let is_directory =
475                sync_block_on(async { tree.is_directory(&hash).await.unwrap_or(false) });
476
477            let meta = self
478                .tree_meta
479                .get(&rtxn, hash.as_slice())?
480                .map(|bytes| {
481                    rmp_serde::from_slice::<TreeMeta>(bytes)
482                        .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))
483                })
484                .transpose()?;
485            let size_bytes = if let Some(meta) = meta.as_ref() {
486                meta.total_size
487            } else {
488                self.router
489                    .get_sync(&hash)
490                    .map_err(|e| anyhow::anyhow!("Failed to get pinned blob: {}", e))?
491                    .map(|data| data.len() as u64)
492                    .unwrap_or(0)
493            };
494
495            pins.push(PinnedItem {
496                cid: to_hex(&hash),
497                name: pinned_item_name(&hash, meta.as_ref()),
498                is_directory,
499                size_bytes,
500            });
501        }
502
503        Ok(pins)
504    }
505
506    pub fn owned_blob_stats(&self) -> Result<Vec<OwnedBlobStats>> {
507        let rtxn = self.env.read_txn()?;
508        let mut owners = Vec::new();
509
510        for item in self.pubkey_blobs.iter(&rtxn)? {
511            let (owner_bytes, blobs_bytes) = item?;
512            if owner_bytes.len() != 32 {
513                continue;
514            }
515
516            let blobs: Vec<BlobMetadata> = serde_json::from_slice(blobs_bytes)
517                .map_err(|e| anyhow::anyhow!("Failed to deserialize blob metadata: {}", e))?;
518            let mut owner = [0u8; 32];
519            owner.copy_from_slice(owner_bytes);
520            let total_bytes = blobs
521                .iter()
522                .fold(0u64, |total, blob| total.saturating_add(blob.size));
523            owners.push(OwnedBlobStats {
524                owner,
525                count: blobs.len(),
526                total_bytes,
527            });
528        }
529
530        owners.sort_by(|a, b| a.owner.cmp(&b.owner));
531        Ok(owners)
532    }
533
534    // === Tree indexing for eviction ===
535
536    /// Index a tree after sync - tracks all blobs in the tree for eviction
537    ///
538    /// If `ref_key` is provided (e.g. "npub.../name"), it will replace any existing
539    /// tree with that ref, allowing old versions to be evicted.
540    pub fn index_tree(
541        &self,
542        root_hash: &Hash,
543        owner: &str,
544        name: Option<&str>,
545        priority: u8,
546        ref_key: Option<&str>,
547    ) -> Result<()> {
548        let root_hex = to_hex(root_hash);
549
550        // If ref_key provided, check for and unindex old version
551        if let Some(key) = ref_key {
552            let rtxn = self.env.read_txn()?;
553            if let Some(old_hash_bytes) = self.tree_refs.get(&rtxn, key)? {
554                if old_hash_bytes != root_hash.as_slice() {
555                    let old_hash: Hash = old_hash_bytes
556                        .try_into()
557                        .map_err(|_| anyhow::anyhow!("Invalid hash in tree_refs"))?;
558                    drop(rtxn);
559                    let _ = self.unpin(&old_hash);
560                    // Unindex old tree (will delete orphaned blobs)
561                    let _ = self.unindex_tree(&old_hash);
562                    tracing::debug!("Replaced old tree for ref {}", key);
563                }
564            }
565        }
566
567        let store = self.store_arc();
568        let tree = HashTree::new(HashTreeConfig::new(store).public());
569
570        // Walk tree and collect all blob hashes + compute total size
571        let (_blob_hashes, total_size) =
572            sync_block_on(async { self.collect_tree_blobs(&tree, root_hash).await })?;
573        let tracked_hashes = sync_block_on(self.collect_tree_hashes(&tree, root_hash))?;
574
575        let mut wtxn = self.env.write_txn()?;
576
577        // Store blob-tree relationships (64-byte key: blob_hash ++ tree_hash)
578        for tracked_hash in &tracked_hashes {
579            let mut key = [0u8; 64];
580            key[..32].copy_from_slice(tracked_hash);
581            key[32..].copy_from_slice(root_hash);
582            self.blob_trees.put(&mut wtxn, &key[..], &())?;
583        }
584
585        // Store tree metadata
586        let now = unix_timestamp_now();
587        let meta = TreeMeta {
588            owner: owner.to_string(),
589            name: name.map(|s| s.to_string()),
590            synced_at: now,
591            total_size,
592            priority,
593        };
594        let meta_bytes = rmp_serde::to_vec(&meta)
595            .map_err(|e| anyhow::anyhow!("Failed to serialize TreeMeta: {}", e))?;
596        self.tree_meta
597            .put(&mut wtxn, root_hash.as_slice(), &meta_bytes)?;
598
599        // Store ref -> hash mapping if ref_key provided
600        if let Some(key) = ref_key {
601            self.tree_refs.put(&mut wtxn, key, root_hash.as_slice())?;
602        }
603
604        wtxn.commit()?;
605
606        tracing::debug!(
607            "Indexed tree {} ({} blobs, {} bytes, priority {})",
608            &root_hex[..8],
609            tracked_hashes.len(),
610            total_size,
611            priority
612        );
613
614        Ok(())
615    }
616
617    /// Collect all blob hashes in a tree and compute total size
618    async fn collect_tree_blobs<S: Store>(
619        &self,
620        tree: &HashTree<S>,
621        root: &Hash,
622    ) -> Result<(Vec<Hash>, u64)> {
623        let mut blobs = Vec::new();
624        let mut total_size = 0u64;
625        let mut stack = vec![*root];
626
627        while let Some(hash) = stack.pop() {
628            // Check if it's a tree node
629            let is_tree = tree
630                .is_tree(&hash)
631                .await
632                .map_err(|e| anyhow::anyhow!("Failed to check tree: {}", e))?;
633
634            if is_tree {
635                // Get tree node and add children to stack
636                if let Some(node) = tree
637                    .get_tree_node(&hash)
638                    .await
639                    .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
640                {
641                    for link in &node.links {
642                        stack.push(link.hash);
643                    }
644                }
645            } else {
646                // It's a blob - get its size
647                if let Some(data) = self
648                    .router
649                    .get_sync(&hash)
650                    .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))?
651                {
652                    total_size += data.len() as u64;
653                    blobs.push(hash);
654                }
655            }
656        }
657
658        Ok((blobs, total_size))
659    }
660
661    /// Unindex a tree - removes blob-tree mappings and deletes orphaned blobs
662    /// Returns the number of bytes freed
663    pub fn unindex_tree(&self, root_hash: &Hash) -> Result<u64> {
664        let root_hex = to_hex(root_hash);
665
666        let store = self.store_arc();
667        let tree = HashTree::new(HashTreeConfig::new(store).public());
668
669        // Walk tree and collect all blob hashes
670        let tracked_hashes = sync_block_on(self.collect_tree_hashes(&tree, root_hash))?;
671
672        let mut wtxn = self.env.write_txn()?;
673        let mut freed = 0u64;
674
675        // For each blob, remove the blob-tree entry and check if orphaned
676        for tracked_hash in &tracked_hashes {
677            // Delete blob-tree entry (64-byte key: blob_hash ++ tree_hash)
678            let mut key = [0u8; 64];
679            key[..32].copy_from_slice(tracked_hash);
680            key[32..].copy_from_slice(root_hash);
681            self.blob_trees.delete(&mut wtxn, &key[..])?;
682
683            // Check if blob is in any other tree (prefix scan on first 32 bytes)
684            let mut has_other_tree = false;
685            for item in self.blob_trees.prefix_iter(&wtxn, &tracked_hash[..])? {
686                if item.is_ok() {
687                    has_other_tree = true;
688                    break;
689                }
690            }
691
692            // If orphaned, delete the blob
693            if !has_other_tree {
694                if let Some(data) = self
695                    .router
696                    .get_sync(tracked_hash)
697                    .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))?
698                {
699                    freed += data.len() as u64;
700                    // Delete locally only - keep S3 as archive
701                    self.router
702                        .delete_local_only(tracked_hash)
703                        .map_err(|e| anyhow::anyhow!("Failed to delete blob: {}", e))?;
704                }
705            }
706        }
707
708        // Delete tree metadata
709        self.tree_meta.delete(&mut wtxn, root_hash.as_slice())?;
710
711        wtxn.commit()?;
712
713        tracing::debug!("Unindexed tree {} ({} bytes freed)", &root_hex[..8], freed);
714
715        Ok(freed)
716    }
717
718    /// Get tree metadata
719    pub fn get_tree_meta(&self, root_hash: &Hash) -> Result<Option<TreeMeta>> {
720        let rtxn = self.env.read_txn()?;
721        if let Some(bytes) = self.tree_meta.get(&rtxn, root_hash.as_slice())? {
722            let meta: TreeMeta = rmp_serde::from_slice(bytes)
723                .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
724            Ok(Some(meta))
725        } else {
726            Ok(None)
727        }
728    }
729
730    pub fn get_tree_ref(&self, key: &str) -> Result<Option<Hash>> {
731        let rtxn = self.env.read_txn()?;
732        let Some(bytes) = self.tree_refs.get(&rtxn, key)? else {
733            return Ok(None);
734        };
735
736        let hash: Hash = bytes
737            .try_into()
738            .map_err(|_| anyhow::anyhow!("Invalid hash in tree_refs"))?;
739        Ok(Some(hash))
740    }
741
742    /// List all indexed trees
743    pub fn list_indexed_trees(&self) -> Result<Vec<(Hash, TreeMeta)>> {
744        let rtxn = self.env.read_txn()?;
745        let mut trees = Vec::new();
746
747        for item in self.tree_meta.iter(&rtxn)? {
748            let (hash_bytes, meta_bytes) = item?;
749            let hash: Hash = hash_bytes
750                .try_into()
751                .map_err(|_| anyhow::anyhow!("Invalid hash in tree_meta"))?;
752            let meta: TreeMeta = rmp_serde::from_slice(meta_bytes)
753                .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
754            trees.push((hash, meta));
755        }
756
757        Ok(trees)
758    }
759
760    /// Get total tracked storage size (sum of all tree_meta.total_size)
761    pub fn tracked_size(&self) -> Result<u64> {
762        let rtxn = self.env.read_txn()?;
763        let mut total = 0u64;
764
765        for item in self.tree_meta.iter(&rtxn)? {
766            let (_, bytes) = item?;
767            let meta: TreeMeta = rmp_serde::from_slice(bytes)
768                .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
769            total += meta.total_size;
770        }
771
772        Ok(total)
773    }
774
775    /// Get evictable trees sorted by (priority ASC, synced_at ASC).
776    ///
777    /// Blob-level access and raw LRU order live in the storage adapter. Indexed
778    /// tree metadata stays cheap and does not try to summarize all descendant
779    /// blob access on every stats or eviction pass.
780    fn get_evictable_trees(&self) -> Result<Vec<(Hash, TreeMeta)>> {
781        let mut trees = self.list_indexed_trees()?;
782
783        // Sort by priority (lower first), then by age.
784        trees.sort_by(|a, b| match a.1.priority.cmp(&b.1.priority) {
785            std::cmp::Ordering::Equal => a.1.synced_at.cmp(&b.1.synced_at),
786            other => other,
787        });
788
789        Ok(trees)
790    }
791
792    /// Run eviction if storage is over quota
793    /// Returns bytes freed
794    ///
795    /// Eviction order:
796    /// 1. Orphaned blobs (not in any indexed tree and not pinned)
797    /// 2. Trees by priority (lowest first) and access age (least recent first)
798    pub fn evict_if_needed(&self) -> Result<u64> {
799        // Get actual storage used
800        let stats = self
801            .router
802            .stats()
803            .map_err(|e| anyhow::anyhow!("Failed to get stats: {}", e))?;
804        let current = stats.total_bytes;
805
806        if current <= self.max_size_bytes {
807            return Ok(0);
808        }
809
810        // Target 90% of max to avoid constant eviction
811        let target = self.max_size_bytes * 90 / 100;
812        self.evict_with_policy_to_target(current, target)
813    }
814
815    fn evict_with_policy_to_target(&self, current: u64, target: u64) -> Result<u64> {
816        let mut freed = 0u64;
817        let mut current_size = current;
818
819        // Phase 1: Evict orphaned blobs (not in any tree and not pinned)
820        if self.evict_orphans {
821            let orphan_freed = self.evict_disposable_orphans_to_target(target)?;
822            freed += orphan_freed;
823            current_size = current_size.saturating_sub(orphan_freed);
824
825            if orphan_freed > 0 {
826                tracing::info!("Evicted orphaned blobs: {} bytes freed", orphan_freed);
827            }
828        } else {
829            tracing::debug!("Skipping orphan blob eviction; storage.evict_orphans=false");
830        }
831
832        // Check if we're now under target
833        if current_size <= target {
834            if freed > 0 {
835                tracing::info!("Eviction complete: {} bytes freed", freed);
836            }
837            return Ok(freed);
838        }
839
840        // Phase 2: Evict trees by priority (lowest first) and access age (least recent first)
841        // Own trees CAN be evicted (just last), but PINNED trees are never evicted
842        let evictable = self.get_evictable_trees()?;
843
844        for (root_hash, meta) in evictable {
845            if current_size <= target {
846                break;
847            }
848
849            let root_hex = to_hex(&root_hash);
850
851            // Never evict pinned trees
852            if self.is_pinned(&root_hash)? {
853                continue;
854            }
855
856            let tree_freed = self.unindex_tree(&root_hash)?;
857            freed += tree_freed;
858            current_size = current_size.saturating_sub(tree_freed);
859
860            tracing::info!(
861                "Evicted tree {} (owner={}, priority={}, {} bytes)",
862                &root_hex[..8],
863                &meta.owner[..8.min(meta.owner.len())],
864                meta.priority,
865                tree_freed
866            );
867        }
868
869        if freed > 0 {
870            tracing::info!("Eviction complete: {} bytes freed", freed);
871        }
872
873        Ok(freed)
874    }
875
876    /// Get the maximum storage size in bytes
877    pub fn max_size_bytes(&self) -> u64 {
878        self.max_size_bytes
879    }
880
881    /// Get storage usage by priority tier
882    pub fn storage_by_priority(&self) -> Result<StorageByPriority> {
883        let rtxn = self.env.read_txn()?;
884        let mut own = 0u64;
885        let mut followed = 0u64;
886        let mut other = 0u64;
887
888        for item in self.tree_meta.iter(&rtxn)? {
889            let (_, bytes) = item?;
890            let meta: TreeMeta = rmp_serde::from_slice(bytes)
891                .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
892
893            if meta.priority == PRIORITY_OWN {
894                own += meta.total_size;
895            } else if meta.priority >= PRIORITY_FOLLOWED {
896                followed += meta.total_size;
897            } else {
898                other += meta.total_size;
899            }
900        }
901
902        Ok(StorageByPriority {
903            own,
904            followed,
905            other,
906        })
907    }
908
909    /// Get storage statistics
910    pub fn get_storage_stats(&self) -> Result<StorageStats> {
911        let rtxn = self.env.read_txn()?;
912        let total_pins = self.pins.len(&rtxn)? as usize;
913
914        let stats = self
915            .router
916            .stats()
917            .map_err(|e| anyhow::anyhow!("Failed to get stats: {}", e))?;
918
919        Ok(StorageStats {
920            total_dags: stats.count,
921            pinned_dags: total_pins,
922            total_bytes: stats.total_bytes,
923        })
924    }
925}
926
927#[cfg(test)]
928mod tests {
929    use super::*;
930    use hashtree_core::Cid;
931    use hashtree_index::{BTree, BTreeOptions};
932    use tempfile::TempDir;
933
934    use crate::storage::PRIORITY_OTHER;
935
936    fn write_root_file(path: &Path, cid: &Cid) {
937        #[derive(Serialize)]
938        struct StoredCid {
939            hash: [u8; 32],
940            key: Option<[u8; 32]>,
941        }
942
943        std::fs::create_dir_all(path.parent().expect("root file parent")).expect("create dir");
944        let bytes = rmp_serde::to_vec_named(&StoredCid {
945            hash: cid.hash,
946            key: cid.key,
947        })
948        .expect("encode cid");
949        std::fs::write(path, bytes).expect("write root file");
950    }
951
952    fn build_test_tree(store: &HashtreeStore) -> Cid {
953        let index = BTree::new(store.store_arc(), BTreeOptions { order: Some(8) });
954        sync_block_on(index.build(vec![
955            ("alpha".to_string(), "one".to_string()),
956            ("beta".to_string(), "two".to_string()),
957            ("gamma".to_string(), "three".to_string()),
958        ]))
959        .expect("build btree")
960        .expect("non-empty root")
961    }
962
963    #[test]
964    fn orphan_cleanup_keeps_indexed_tree_hashes() {
965        let temp_dir = TempDir::new().expect("temp dir");
966        let store = HashtreeStore::with_options(temp_dir.path(), None, 1024).expect("store");
967        let cid = build_test_tree(&store);
968
969        store
970            .index_tree(
971                &cid.hash,
972                "owner",
973                Some("tree"),
974                PRIORITY_OTHER,
975                Some("owner/tree"),
976            )
977            .expect("index tree");
978        let freed = store
979            .evict_disposable_orphans_to_target(0)
980            .expect("orphan cleanup");
981
982        assert!(freed < 1024);
983        assert!(store.blob_exists(&cid.hash).expect("root exists"));
984    }
985
986    #[test]
987    fn list_pins_with_names_uses_indexed_tree_metadata() {
988        let temp_dir = TempDir::new().expect("temp dir");
989        let store = HashtreeStore::with_options(temp_dir.path(), None, 1024 * 1024).expect("store");
990        let cid = build_test_tree(&store);
991
992        store.pin(&cid.hash).expect("pin tree");
993        store
994            .index_tree(
995                &cid.hash,
996                "npub1example",
997                Some("playlist"),
998                PRIORITY_OTHER,
999                Some("npub1example/playlist"),
1000            )
1001            .expect("index tree");
1002
1003        let pins = store.list_pins_with_names().expect("list pins");
1004
1005        assert_eq!(pins.len(), 1);
1006        assert_eq!(pins[0].name, "npub1example/playlist");
1007        assert!(pins[0].size_bytes > 0);
1008    }
1009
1010    #[test]
1011    fn get_tree_ref_returns_stored_root() {
1012        let temp_dir = TempDir::new().expect("temp dir");
1013        let store = HashtreeStore::with_options(temp_dir.path(), None, 1024 * 1024).expect("store");
1014        let cid = build_test_tree(&store);
1015
1016        store
1017            .index_tree(
1018                &cid.hash,
1019                "npub1example",
1020                Some("playlist"),
1021                PRIORITY_OTHER,
1022                Some("npub1example/playlist"),
1023            )
1024            .expect("index tree");
1025
1026        assert_eq!(
1027            store
1028                .get_tree_ref("npub1example/playlist")
1029                .expect("tree ref lookup"),
1030            Some(cid.hash)
1031        );
1032    }
1033
1034    #[test]
1035    fn tree_meta_deserializes_metadata_without_tree_access_field() {
1036        #[derive(Serialize)]
1037        struct LegacyTreeMeta {
1038            owner: String,
1039            name: Option<String>,
1040            synced_at: u64,
1041            total_size: u64,
1042            priority: u8,
1043        }
1044
1045        let bytes = rmp_serde::to_vec(&LegacyTreeMeta {
1046            owner: "owner".to_string(),
1047            name: Some("tree".to_string()),
1048            synced_at: 123,
1049            total_size: 456,
1050            priority: PRIORITY_OTHER,
1051        })
1052        .expect("serialize legacy metadata");
1053        let meta: TreeMeta = rmp_serde::from_slice(&bytes).expect("deserialize tree metadata");
1054
1055        assert_eq!(meta.owner, "owner");
1056        assert_eq!(meta.name.as_deref(), Some("tree"));
1057        assert_eq!(meta.synced_at, 123);
1058        assert_eq!(meta.total_size, 456);
1059        assert_eq!(meta.priority, PRIORITY_OTHER);
1060    }
1061
1062    #[test]
1063    fn tree_meta_deserializes_accidental_access_field_but_drops_it_on_write() {
1064        #[derive(Serialize)]
1065        struct AccidentalTreeMeta {
1066            owner: String,
1067            name: Option<String>,
1068            synced_at: u64,
1069            last_accessed_at: u64,
1070            total_size: u64,
1071            priority: u8,
1072        }
1073
1074        let bytes = rmp_serde::to_vec(&AccidentalTreeMeta {
1075            owner: "owner".to_string(),
1076            name: Some("tree".to_string()),
1077            synced_at: 123,
1078            last_accessed_at: 999,
1079            total_size: 456,
1080            priority: PRIORITY_OTHER,
1081        })
1082        .expect("serialize accidental metadata");
1083        let meta: TreeMeta = rmp_serde::from_slice(&bytes).expect("deserialize tree metadata");
1084        let encoded = rmp_serde::to_vec(&meta).expect("serialize current metadata");
1085        let reparsed: (String, Option<String>, u64, u64, u8) =
1086            rmp_serde::from_slice(&encoded).expect("parse current metadata shape");
1087
1088        assert_eq!(meta.owner, "owner");
1089        assert_eq!(meta.name.as_deref(), Some("tree"));
1090        assert_eq!(meta.synced_at, 123);
1091        assert_eq!(meta.total_size, 456);
1092        assert_eq!(meta.priority, PRIORITY_OTHER);
1093        assert_eq!(reparsed.0, "owner");
1094        assert_eq!(reparsed.3, 456);
1095        assert_eq!(reparsed.4, PRIORITY_OTHER);
1096    }
1097
1098    #[test]
1099    fn eviction_prefers_oldest_tree_within_priority() {
1100        let temp_dir = TempDir::new().expect("temp dir");
1101        let store = HashtreeStore::with_options(temp_dir.path(), None, 500).expect("store");
1102
1103        let hash1 = hashtree_core::sha256(&vec![1u8; 200]);
1104        let hash2 = hashtree_core::sha256(&vec![2u8; 200]);
1105        let hash3 = hashtree_core::sha256(&vec![3u8; 200]);
1106        store.put_blob(&vec![1u8; 200]).expect("put blob 1");
1107        store.put_blob(&vec![2u8; 200]).expect("put blob 2");
1108        store.put_blob(&vec![3u8; 200]).expect("put blob 3");
1109        store
1110            .index_tree(&hash1, "owner1", Some("tree1"), PRIORITY_OTHER, None)
1111            .expect("index tree 1");
1112        store
1113            .index_tree(&hash2, "owner2", Some("tree2"), PRIORITY_OTHER, None)
1114            .expect("index tree 2");
1115        store
1116            .index_tree(&hash3, "owner3", Some("tree3"), PRIORITY_OTHER, None)
1117            .expect("index tree 3");
1118
1119        let freed = store.evict_if_needed().expect("evict");
1120
1121        assert!(freed > 0);
1122        assert!(
1123            store.get_tree_meta(&hash3).expect("tree meta").is_some(),
1124            "newest tree should survive before older peers at the same priority"
1125        );
1126    }
1127
1128    #[test]
1129    fn orphan_cleanup_keeps_socialgraph_root_hashes() {
1130        let temp_dir = TempDir::new().expect("temp dir");
1131        let store = HashtreeStore::with_options(temp_dir.path(), None, 1024).expect("store");
1132        let cid = build_test_tree(&store);
1133        write_root_file(
1134            &temp_dir.path().join("socialgraph/events-root.msgpack"),
1135            &cid,
1136        );
1137
1138        let freed = store
1139            .evict_disposable_orphans_to_target(0)
1140            .expect("orphan cleanup");
1141
1142        assert!(freed < 1024);
1143        assert!(store.blob_exists(&cid.hash).expect("root exists"));
1144    }
1145}