1use anyhow::Result;
2use futures::executor::block_on as sync_block_on;
3use hashtree_core::store::Store;
4use hashtree_core::{to_hex, types::Hash, HashTree, HashTreeConfig};
5use serde::{Deserialize, Serialize};
6use std::collections::HashSet;
7use std::time::{SystemTime, UNIX_EPOCH};
8
9use super::{HashtreeStore, PRIORITY_FOLLOWED, PRIORITY_OWN};
10
11#[derive(Debug, Clone, Serialize, Deserialize)]
13pub struct TreeMeta {
14 pub owner: String,
16 pub name: Option<String>,
18 pub synced_at: u64,
20 pub total_size: u64,
22 pub priority: u8,
24}
25
26#[derive(Debug)]
27pub struct StorageStats {
28 pub total_dags: usize,
29 pub pinned_dags: usize,
30 pub total_bytes: u64,
31}
32
33#[derive(Debug, Clone)]
35pub struct StorageByPriority {
36 pub own: u64,
38 pub followed: u64,
40 pub other: u64,
42}
43
44#[derive(Debug, Clone)]
45pub struct PinnedItem {
46 pub cid: String,
47 pub name: String,
48 pub is_directory: bool,
49}
50
51impl HashtreeStore {
52 pub fn pin(&self, hash: &[u8; 32]) -> Result<()> {
54 let mut wtxn = self.env.write_txn()?;
55 self.pins.put(&mut wtxn, hash.as_slice(), &())?;
56 wtxn.commit()?;
57 Ok(())
58 }
59
60 pub fn unpin(&self, hash: &[u8; 32]) -> Result<()> {
62 let mut wtxn = self.env.write_txn()?;
63 self.pins.delete(&mut wtxn, hash.as_slice())?;
64 wtxn.commit()?;
65 Ok(())
66 }
67
68 pub fn is_pinned(&self, hash: &[u8; 32]) -> Result<bool> {
70 let rtxn = self.env.read_txn()?;
71 Ok(self.pins.get(&rtxn, hash.as_slice())?.is_some())
72 }
73
74 pub fn list_pins_raw(&self) -> Result<Vec<[u8; 32]>> {
76 let rtxn = self.env.read_txn()?;
77 let mut pins = Vec::new();
78
79 for item in self.pins.iter(&rtxn)? {
80 let (hash_bytes, _) = item?;
81 if hash_bytes.len() == 32 {
82 let mut hash = [0u8; 32];
83 hash.copy_from_slice(hash_bytes);
84 pins.push(hash);
85 }
86 }
87
88 Ok(pins)
89 }
90
91 pub fn list_pins_with_names(&self) -> Result<Vec<PinnedItem>> {
93 let rtxn = self.env.read_txn()?;
94 let store = self.store_arc();
95 let tree = HashTree::new(HashTreeConfig::new(store).public());
96 let mut pins = Vec::new();
97
98 for item in self.pins.iter(&rtxn)? {
99 let (hash_bytes, _) = item?;
100 if hash_bytes.len() != 32 {
101 continue;
102 }
103 let mut hash = [0u8; 32];
104 hash.copy_from_slice(hash_bytes);
105
106 let is_directory =
108 sync_block_on(async { tree.is_directory(&hash).await.unwrap_or(false) });
109
110 pins.push(PinnedItem {
111 cid: to_hex(&hash),
112 name: "Unknown".to_string(),
113 is_directory,
114 });
115 }
116
117 Ok(pins)
118 }
119
120 pub fn index_tree(
127 &self,
128 root_hash: &Hash,
129 owner: &str,
130 name: Option<&str>,
131 priority: u8,
132 ref_key: Option<&str>,
133 ) -> Result<()> {
134 let root_hex = to_hex(root_hash);
135
136 if let Some(key) = ref_key {
138 let rtxn = self.env.read_txn()?;
139 if let Some(old_hash_bytes) = self.tree_refs.get(&rtxn, key)? {
140 if old_hash_bytes != root_hash.as_slice() {
141 let old_hash: Hash = old_hash_bytes
142 .try_into()
143 .map_err(|_| anyhow::anyhow!("Invalid hash in tree_refs"))?;
144 drop(rtxn);
145 let _ = self.unpin(&old_hash);
146 let _ = self.unindex_tree(&old_hash);
148 tracing::debug!("Replaced old tree for ref {}", key);
149 }
150 }
151 }
152
153 let store = self.store_arc();
154 let tree = HashTree::new(HashTreeConfig::new(store).public());
155
156 let (blob_hashes, total_size) =
158 sync_block_on(async { self.collect_tree_blobs(&tree, root_hash).await })?;
159
160 let mut wtxn = self.env.write_txn()?;
161
162 for blob_hash in &blob_hashes {
164 let mut key = [0u8; 64];
165 key[..32].copy_from_slice(blob_hash);
166 key[32..].copy_from_slice(root_hash);
167 self.blob_trees.put(&mut wtxn, &key[..], &())?;
168 }
169
170 let meta = TreeMeta {
172 owner: owner.to_string(),
173 name: name.map(|s| s.to_string()),
174 synced_at: SystemTime::now()
175 .duration_since(UNIX_EPOCH)
176 .unwrap()
177 .as_secs(),
178 total_size,
179 priority,
180 };
181 let meta_bytes = rmp_serde::to_vec(&meta)
182 .map_err(|e| anyhow::anyhow!("Failed to serialize TreeMeta: {}", e))?;
183 self.tree_meta
184 .put(&mut wtxn, root_hash.as_slice(), &meta_bytes)?;
185
186 if let Some(key) = ref_key {
188 self.tree_refs.put(&mut wtxn, key, root_hash.as_slice())?;
189 }
190
191 wtxn.commit()?;
192
193 tracing::debug!(
194 "Indexed tree {} ({} blobs, {} bytes, priority {})",
195 &root_hex[..8],
196 blob_hashes.len(),
197 total_size,
198 priority
199 );
200
201 Ok(())
202 }
203
204 async fn collect_tree_blobs<S: Store>(
206 &self,
207 tree: &HashTree<S>,
208 root: &Hash,
209 ) -> Result<(Vec<Hash>, u64)> {
210 let mut blobs = Vec::new();
211 let mut total_size = 0u64;
212 let mut stack = vec![*root];
213
214 while let Some(hash) = stack.pop() {
215 let is_tree = tree
217 .is_tree(&hash)
218 .await
219 .map_err(|e| anyhow::anyhow!("Failed to check tree: {}", e))?;
220
221 if is_tree {
222 if let Some(node) = tree
224 .get_tree_node(&hash)
225 .await
226 .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
227 {
228 for link in &node.links {
229 stack.push(link.hash);
230 }
231 }
232 } else {
233 if let Some(data) = self
235 .router
236 .get_sync(&hash)
237 .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))?
238 {
239 total_size += data.len() as u64;
240 blobs.push(hash);
241 }
242 }
243 }
244
245 Ok((blobs, total_size))
246 }
247
248 pub fn unindex_tree(&self, root_hash: &Hash) -> Result<u64> {
251 let root_hex = to_hex(root_hash);
252
253 let store = self.store_arc();
254 let tree = HashTree::new(HashTreeConfig::new(store).public());
255
256 let (blob_hashes, _) =
258 sync_block_on(async { self.collect_tree_blobs(&tree, root_hash).await })?;
259
260 let mut wtxn = self.env.write_txn()?;
261 let mut freed = 0u64;
262
263 for blob_hash in &blob_hashes {
265 let mut key = [0u8; 64];
267 key[..32].copy_from_slice(blob_hash);
268 key[32..].copy_from_slice(root_hash);
269 self.blob_trees.delete(&mut wtxn, &key[..])?;
270
271 let rtxn = self.env.read_txn()?;
273 let mut has_other_tree = false;
274
275 for item in self.blob_trees.prefix_iter(&rtxn, &blob_hash[..])? {
276 if item.is_ok() {
277 has_other_tree = true;
278 break;
279 }
280 }
281 drop(rtxn);
282
283 if !has_other_tree {
285 if let Some(data) = self
286 .router
287 .get_sync(blob_hash)
288 .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))?
289 {
290 freed += data.len() as u64;
291 self.router
293 .delete_local_only(blob_hash)
294 .map_err(|e| anyhow::anyhow!("Failed to delete blob: {}", e))?;
295 }
296 }
297 }
298
299 if let Some(data) = self
301 .router
302 .get_sync(root_hash)
303 .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
304 {
305 freed += data.len() as u64;
306 self.router
308 .delete_local_only(root_hash)
309 .map_err(|e| anyhow::anyhow!("Failed to delete tree node: {}", e))?;
310 }
311
312 self.tree_meta.delete(&mut wtxn, root_hash.as_slice())?;
314
315 wtxn.commit()?;
316
317 tracing::debug!("Unindexed tree {} ({} bytes freed)", &root_hex[..8], freed);
318
319 Ok(freed)
320 }
321
322 pub fn get_tree_meta(&self, root_hash: &Hash) -> Result<Option<TreeMeta>> {
324 let rtxn = self.env.read_txn()?;
325 if let Some(bytes) = self.tree_meta.get(&rtxn, root_hash.as_slice())? {
326 let meta: TreeMeta = rmp_serde::from_slice(bytes)
327 .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
328 Ok(Some(meta))
329 } else {
330 Ok(None)
331 }
332 }
333
334 pub fn list_indexed_trees(&self) -> Result<Vec<(Hash, TreeMeta)>> {
336 let rtxn = self.env.read_txn()?;
337 let mut trees = Vec::new();
338
339 for item in self.tree_meta.iter(&rtxn)? {
340 let (hash_bytes, meta_bytes) = item?;
341 let hash: Hash = hash_bytes
342 .try_into()
343 .map_err(|_| anyhow::anyhow!("Invalid hash in tree_meta"))?;
344 let meta: TreeMeta = rmp_serde::from_slice(meta_bytes)
345 .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
346 trees.push((hash, meta));
347 }
348
349 Ok(trees)
350 }
351
352 pub fn tracked_size(&self) -> Result<u64> {
354 let rtxn = self.env.read_txn()?;
355 let mut total = 0u64;
356
357 for item in self.tree_meta.iter(&rtxn)? {
358 let (_, bytes) = item?;
359 let meta: TreeMeta = rmp_serde::from_slice(bytes)
360 .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
361 total += meta.total_size;
362 }
363
364 Ok(total)
365 }
366
367 fn get_evictable_trees(&self) -> Result<Vec<(Hash, TreeMeta)>> {
369 let mut trees = self.list_indexed_trees()?;
370
371 trees.sort_by(|a, b| match a.1.priority.cmp(&b.1.priority) {
373 std::cmp::Ordering::Equal => a.1.synced_at.cmp(&b.1.synced_at),
374 other => other,
375 });
376
377 Ok(trees)
378 }
379
380 pub fn evict_if_needed(&self) -> Result<u64> {
387 let stats = self
389 .router
390 .stats()
391 .map_err(|e| anyhow::anyhow!("Failed to get stats: {}", e))?;
392 let current = stats.total_bytes;
393
394 if current <= self.max_size_bytes {
395 return Ok(0);
396 }
397
398 let target = self.max_size_bytes * 90 / 100;
400 let mut freed = 0u64;
401 let mut current_size = current;
402
403 let orphan_freed = self.evict_orphaned_blobs()?;
405 freed += orphan_freed;
406 current_size = current_size.saturating_sub(orphan_freed);
407
408 if orphan_freed > 0 {
409 tracing::info!("Evicted orphaned blobs: {} bytes freed", orphan_freed);
410 }
411
412 if current_size <= target {
414 if freed > 0 {
415 tracing::info!("Eviction complete: {} bytes freed", freed);
416 }
417 return Ok(freed);
418 }
419
420 let evictable = self.get_evictable_trees()?;
423
424 for (root_hash, meta) in evictable {
425 if current_size <= target {
426 break;
427 }
428
429 let root_hex = to_hex(&root_hash);
430
431 if self.is_pinned(&root_hash)? {
433 continue;
434 }
435
436 let tree_freed = self.unindex_tree(&root_hash)?;
437 freed += tree_freed;
438 current_size = current_size.saturating_sub(tree_freed);
439
440 tracing::info!(
441 "Evicted tree {} (owner={}, priority={}, {} bytes)",
442 &root_hex[..8],
443 &meta.owner[..8.min(meta.owner.len())],
444 meta.priority,
445 tree_freed
446 );
447 }
448
449 if freed > 0 {
450 tracing::info!("Eviction complete: {} bytes freed", freed);
451 }
452
453 Ok(freed)
454 }
455
456 fn evict_orphaned_blobs(&self) -> Result<u64> {
458 let mut freed = 0u64;
459
460 let all_hashes = self
462 .router
463 .list()
464 .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
465
466 let rtxn = self.env.read_txn()?;
468 let pinned: HashSet<Hash> = self
469 .pins
470 .iter(&rtxn)?
471 .filter_map(|item| item.ok())
472 .filter_map(|(hash_bytes, _)| {
473 if hash_bytes.len() == 32 {
474 let mut hash = [0u8; 32];
475 hash.copy_from_slice(hash_bytes);
476 Some(hash)
477 } else {
478 None
479 }
480 })
481 .collect();
482
483 let mut blobs_in_trees: HashSet<Hash> = HashSet::new();
486 for (key_bytes, _) in self.blob_trees.iter(&rtxn)?.flatten() {
487 if key_bytes.len() >= 32 {
488 let blob_hash: Hash = key_bytes[..32].try_into().unwrap();
489 blobs_in_trees.insert(blob_hash);
490 }
491 }
492 drop(rtxn);
493
494 for hash in all_hashes {
496 if pinned.contains(&hash) {
498 continue;
499 }
500
501 if blobs_in_trees.contains(&hash) {
503 continue;
504 }
505
506 if let Ok(Some(data)) = self.router.get_sync(&hash) {
508 freed += data.len() as u64;
509 let _ = self.router.delete_local_only(&hash);
510 tracing::debug!(
511 "Deleted orphaned blob {} ({} bytes)",
512 &to_hex(&hash)[..8],
513 data.len()
514 );
515 }
516 }
517
518 Ok(freed)
519 }
520
521 pub fn max_size_bytes(&self) -> u64 {
523 self.max_size_bytes
524 }
525
526 pub fn storage_by_priority(&self) -> Result<StorageByPriority> {
528 let rtxn = self.env.read_txn()?;
529 let mut own = 0u64;
530 let mut followed = 0u64;
531 let mut other = 0u64;
532
533 for item in self.tree_meta.iter(&rtxn)? {
534 let (_, bytes) = item?;
535 let meta: TreeMeta = rmp_serde::from_slice(bytes)
536 .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
537
538 if meta.priority == PRIORITY_OWN {
539 own += meta.total_size;
540 } else if meta.priority >= PRIORITY_FOLLOWED {
541 followed += meta.total_size;
542 } else {
543 other += meta.total_size;
544 }
545 }
546
547 Ok(StorageByPriority {
548 own,
549 followed,
550 other,
551 })
552 }
553
554 pub fn get_storage_stats(&self) -> Result<StorageStats> {
556 let rtxn = self.env.read_txn()?;
557 let total_pins = self.pins.len(&rtxn)? as usize;
558
559 let stats = self
560 .router
561 .stats()
562 .map_err(|e| anyhow::anyhow!("Failed to get stats: {}", e))?;
563
564 Ok(StorageStats {
565 total_dags: stats.count,
566 pinned_dags: total_pins,
567 total_bytes: stats.total_bytes,
568 })
569 }
570}