1use anyhow::Result;
2use futures::executor::block_on as sync_block_on;
3use hashtree_core::store::Store;
4use hashtree_core::{to_hex, types::Hash, HashTree, HashTreeConfig};
5use serde::{Deserialize, Serialize};
6use std::collections::HashSet;
7use std::time::{SystemTime, UNIX_EPOCH};
8
9use super::{HashtreeStore, PRIORITY_FOLLOWED, PRIORITY_OWN};
10
11#[derive(Debug, Clone, Serialize, Deserialize)]
13pub struct TreeMeta {
14 pub owner: String,
16 pub name: Option<String>,
18 pub synced_at: u64,
20 pub total_size: u64,
22 pub priority: u8,
24}
25
26#[derive(Debug)]
27pub struct StorageStats {
28 pub total_dags: usize,
29 pub pinned_dags: usize,
30 pub total_bytes: u64,
31}
32
33#[derive(Debug, Clone)]
35pub struct StorageByPriority {
36 pub own: u64,
38 pub followed: u64,
40 pub other: u64,
42}
43
44#[derive(Debug, Clone)]
45pub struct PinnedItem {
46 pub cid: String,
47 pub name: String,
48 pub is_directory: bool,
49}
50
51impl HashtreeStore {
52 pub fn pin(&self, hash: &[u8; 32]) -> Result<()> {
54 let mut wtxn = self.env.write_txn()?;
55 self.pins.put(&mut wtxn, hash.as_slice(), &())?;
56 wtxn.commit()?;
57 Ok(())
58 }
59
60 pub fn unpin(&self, hash: &[u8; 32]) -> Result<()> {
62 let mut wtxn = self.env.write_txn()?;
63 self.pins.delete(&mut wtxn, hash.as_slice())?;
64 wtxn.commit()?;
65 Ok(())
66 }
67
68 pub fn is_pinned(&self, hash: &[u8; 32]) -> Result<bool> {
70 let rtxn = self.env.read_txn()?;
71 Ok(self.pins.get(&rtxn, hash.as_slice())?.is_some())
72 }
73
74 pub fn list_pins_raw(&self) -> Result<Vec<[u8; 32]>> {
76 let rtxn = self.env.read_txn()?;
77 let mut pins = Vec::new();
78
79 for item in self.pins.iter(&rtxn)? {
80 let (hash_bytes, _) = item?;
81 if hash_bytes.len() == 32 {
82 let mut hash = [0u8; 32];
83 hash.copy_from_slice(hash_bytes);
84 pins.push(hash);
85 }
86 }
87
88 Ok(pins)
89 }
90
91 pub fn list_pins_with_names(&self) -> Result<Vec<PinnedItem>> {
93 let rtxn = self.env.read_txn()?;
94 let store = self.store_arc();
95 let tree = HashTree::new(HashTreeConfig::new(store).public());
96 let mut pins = Vec::new();
97
98 for item in self.pins.iter(&rtxn)? {
99 let (hash_bytes, _) = item?;
100 if hash_bytes.len() != 32 {
101 continue;
102 }
103 let mut hash = [0u8; 32];
104 hash.copy_from_slice(hash_bytes);
105
106 let is_directory =
108 sync_block_on(async { tree.is_directory(&hash).await.unwrap_or(false) });
109
110 pins.push(PinnedItem {
111 cid: to_hex(&hash),
112 name: "Unknown".to_string(),
113 is_directory,
114 });
115 }
116
117 Ok(pins)
118 }
119
120 pub fn index_tree(
127 &self,
128 root_hash: &Hash,
129 owner: &str,
130 name: Option<&str>,
131 priority: u8,
132 ref_key: Option<&str>,
133 ) -> Result<()> {
134 let root_hex = to_hex(root_hash);
135
136 if let Some(key) = ref_key {
138 let rtxn = self.env.read_txn()?;
139 if let Some(old_hash_bytes) = self.tree_refs.get(&rtxn, key)? {
140 if old_hash_bytes != root_hash.as_slice() {
141 let old_hash: Hash = old_hash_bytes
142 .try_into()
143 .map_err(|_| anyhow::anyhow!("Invalid hash in tree_refs"))?;
144 drop(rtxn);
145 let _ = self.unindex_tree(&old_hash);
147 tracing::debug!("Replaced old tree for ref {}", key);
148 }
149 }
150 }
151
152 let store = self.store_arc();
153 let tree = HashTree::new(HashTreeConfig::new(store).public());
154
155 let (blob_hashes, total_size) =
157 sync_block_on(async { self.collect_tree_blobs(&tree, root_hash).await })?;
158
159 let mut wtxn = self.env.write_txn()?;
160
161 for blob_hash in &blob_hashes {
163 let mut key = [0u8; 64];
164 key[..32].copy_from_slice(blob_hash);
165 key[32..].copy_from_slice(root_hash);
166 self.blob_trees.put(&mut wtxn, &key[..], &())?;
167 }
168
169 let meta = TreeMeta {
171 owner: owner.to_string(),
172 name: name.map(|s| s.to_string()),
173 synced_at: SystemTime::now()
174 .duration_since(UNIX_EPOCH)
175 .unwrap()
176 .as_secs(),
177 total_size,
178 priority,
179 };
180 let meta_bytes = rmp_serde::to_vec(&meta)
181 .map_err(|e| anyhow::anyhow!("Failed to serialize TreeMeta: {}", e))?;
182 self.tree_meta
183 .put(&mut wtxn, root_hash.as_slice(), &meta_bytes)?;
184
185 if let Some(key) = ref_key {
187 self.tree_refs.put(&mut wtxn, key, root_hash.as_slice())?;
188 }
189
190 wtxn.commit()?;
191
192 tracing::debug!(
193 "Indexed tree {} ({} blobs, {} bytes, priority {})",
194 &root_hex[..8],
195 blob_hashes.len(),
196 total_size,
197 priority
198 );
199
200 Ok(())
201 }
202
203 async fn collect_tree_blobs<S: Store>(
205 &self,
206 tree: &HashTree<S>,
207 root: &Hash,
208 ) -> Result<(Vec<Hash>, u64)> {
209 let mut blobs = Vec::new();
210 let mut total_size = 0u64;
211 let mut stack = vec![*root];
212
213 while let Some(hash) = stack.pop() {
214 let is_tree = tree
216 .is_tree(&hash)
217 .await
218 .map_err(|e| anyhow::anyhow!("Failed to check tree: {}", e))?;
219
220 if is_tree {
221 if let Some(node) = tree
223 .get_tree_node(&hash)
224 .await
225 .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
226 {
227 for link in &node.links {
228 stack.push(link.hash);
229 }
230 }
231 } else {
232 if let Some(data) = self
234 .router
235 .get_sync(&hash)
236 .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))?
237 {
238 total_size += data.len() as u64;
239 blobs.push(hash);
240 }
241 }
242 }
243
244 Ok((blobs, total_size))
245 }
246
247 pub fn unindex_tree(&self, root_hash: &Hash) -> Result<u64> {
250 let root_hex = to_hex(root_hash);
251
252 let store = self.store_arc();
253 let tree = HashTree::new(HashTreeConfig::new(store).public());
254
255 let (blob_hashes, _) =
257 sync_block_on(async { self.collect_tree_blobs(&tree, root_hash).await })?;
258
259 let mut wtxn = self.env.write_txn()?;
260 let mut freed = 0u64;
261
262 for blob_hash in &blob_hashes {
264 let mut key = [0u8; 64];
266 key[..32].copy_from_slice(blob_hash);
267 key[32..].copy_from_slice(root_hash);
268 self.blob_trees.delete(&mut wtxn, &key[..])?;
269
270 let rtxn = self.env.read_txn()?;
272 let mut has_other_tree = false;
273
274 for item in self.blob_trees.prefix_iter(&rtxn, &blob_hash[..])? {
275 if item.is_ok() {
276 has_other_tree = true;
277 break;
278 }
279 }
280 drop(rtxn);
281
282 if !has_other_tree {
284 if let Some(data) = self
285 .router
286 .get_sync(blob_hash)
287 .map_err(|e| anyhow::anyhow!("Failed to get blob: {}", e))?
288 {
289 freed += data.len() as u64;
290 self.router
292 .delete_local_only(blob_hash)
293 .map_err(|e| anyhow::anyhow!("Failed to delete blob: {}", e))?;
294 }
295 }
296 }
297
298 if let Some(data) = self
300 .router
301 .get_sync(root_hash)
302 .map_err(|e| anyhow::anyhow!("Failed to get tree node: {}", e))?
303 {
304 freed += data.len() as u64;
305 self.router
307 .delete_local_only(root_hash)
308 .map_err(|e| anyhow::anyhow!("Failed to delete tree node: {}", e))?;
309 }
310
311 self.tree_meta.delete(&mut wtxn, root_hash.as_slice())?;
313
314 wtxn.commit()?;
315
316 tracing::debug!("Unindexed tree {} ({} bytes freed)", &root_hex[..8], freed);
317
318 Ok(freed)
319 }
320
321 pub fn get_tree_meta(&self, root_hash: &Hash) -> Result<Option<TreeMeta>> {
323 let rtxn = self.env.read_txn()?;
324 if let Some(bytes) = self.tree_meta.get(&rtxn, root_hash.as_slice())? {
325 let meta: TreeMeta = rmp_serde::from_slice(bytes)
326 .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
327 Ok(Some(meta))
328 } else {
329 Ok(None)
330 }
331 }
332
333 pub fn list_indexed_trees(&self) -> Result<Vec<(Hash, TreeMeta)>> {
335 let rtxn = self.env.read_txn()?;
336 let mut trees = Vec::new();
337
338 for item in self.tree_meta.iter(&rtxn)? {
339 let (hash_bytes, meta_bytes) = item?;
340 let hash: Hash = hash_bytes
341 .try_into()
342 .map_err(|_| anyhow::anyhow!("Invalid hash in tree_meta"))?;
343 let meta: TreeMeta = rmp_serde::from_slice(meta_bytes)
344 .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
345 trees.push((hash, meta));
346 }
347
348 Ok(trees)
349 }
350
351 pub fn tracked_size(&self) -> Result<u64> {
353 let rtxn = self.env.read_txn()?;
354 let mut total = 0u64;
355
356 for item in self.tree_meta.iter(&rtxn)? {
357 let (_, bytes) = item?;
358 let meta: TreeMeta = rmp_serde::from_slice(bytes)
359 .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
360 total += meta.total_size;
361 }
362
363 Ok(total)
364 }
365
366 fn get_evictable_trees(&self) -> Result<Vec<(Hash, TreeMeta)>> {
368 let mut trees = self.list_indexed_trees()?;
369
370 trees.sort_by(|a, b| match a.1.priority.cmp(&b.1.priority) {
372 std::cmp::Ordering::Equal => a.1.synced_at.cmp(&b.1.synced_at),
373 other => other,
374 });
375
376 Ok(trees)
377 }
378
379 pub fn evict_if_needed(&self) -> Result<u64> {
386 let stats = self
388 .router
389 .stats()
390 .map_err(|e| anyhow::anyhow!("Failed to get stats: {}", e))?;
391 let current = stats.total_bytes;
392
393 if current <= self.max_size_bytes {
394 return Ok(0);
395 }
396
397 let target = self.max_size_bytes * 90 / 100;
399 let mut freed = 0u64;
400 let mut current_size = current;
401
402 let orphan_freed = self.evict_orphaned_blobs()?;
404 freed += orphan_freed;
405 current_size = current_size.saturating_sub(orphan_freed);
406
407 if orphan_freed > 0 {
408 tracing::info!("Evicted orphaned blobs: {} bytes freed", orphan_freed);
409 }
410
411 if current_size <= target {
413 if freed > 0 {
414 tracing::info!("Eviction complete: {} bytes freed", freed);
415 }
416 return Ok(freed);
417 }
418
419 let evictable = self.get_evictable_trees()?;
422
423 for (root_hash, meta) in evictable {
424 if current_size <= target {
425 break;
426 }
427
428 let root_hex = to_hex(&root_hash);
429
430 if self.is_pinned(&root_hash)? {
432 continue;
433 }
434
435 let tree_freed = self.unindex_tree(&root_hash)?;
436 freed += tree_freed;
437 current_size = current_size.saturating_sub(tree_freed);
438
439 tracing::info!(
440 "Evicted tree {} (owner={}, priority={}, {} bytes)",
441 &root_hex[..8],
442 &meta.owner[..8.min(meta.owner.len())],
443 meta.priority,
444 tree_freed
445 );
446 }
447
448 if freed > 0 {
449 tracing::info!("Eviction complete: {} bytes freed", freed);
450 }
451
452 Ok(freed)
453 }
454
455 fn evict_orphaned_blobs(&self) -> Result<u64> {
457 let mut freed = 0u64;
458
459 let all_hashes = self
461 .router
462 .list()
463 .map_err(|e| anyhow::anyhow!("Failed to list hashes: {}", e))?;
464
465 let rtxn = self.env.read_txn()?;
467 let pinned: HashSet<Hash> = self
468 .pins
469 .iter(&rtxn)?
470 .filter_map(|item| item.ok())
471 .filter_map(|(hash_bytes, _)| {
472 if hash_bytes.len() == 32 {
473 let mut hash = [0u8; 32];
474 hash.copy_from_slice(hash_bytes);
475 Some(hash)
476 } else {
477 None
478 }
479 })
480 .collect();
481
482 let mut blobs_in_trees: HashSet<Hash> = HashSet::new();
485 for (key_bytes, _) in self.blob_trees.iter(&rtxn)?.flatten() {
486 if key_bytes.len() >= 32 {
487 let blob_hash: Hash = key_bytes[..32].try_into().unwrap();
488 blobs_in_trees.insert(blob_hash);
489 }
490 }
491 drop(rtxn);
492
493 for hash in all_hashes {
495 if pinned.contains(&hash) {
497 continue;
498 }
499
500 if blobs_in_trees.contains(&hash) {
502 continue;
503 }
504
505 if let Ok(Some(data)) = self.router.get_sync(&hash) {
507 freed += data.len() as u64;
508 let _ = self.router.delete_local_only(&hash);
509 tracing::debug!(
510 "Deleted orphaned blob {} ({} bytes)",
511 &to_hex(&hash)[..8],
512 data.len()
513 );
514 }
515 }
516
517 Ok(freed)
518 }
519
520 pub fn max_size_bytes(&self) -> u64 {
522 self.max_size_bytes
523 }
524
525 pub fn storage_by_priority(&self) -> Result<StorageByPriority> {
527 let rtxn = self.env.read_txn()?;
528 let mut own = 0u64;
529 let mut followed = 0u64;
530 let mut other = 0u64;
531
532 for item in self.tree_meta.iter(&rtxn)? {
533 let (_, bytes) = item?;
534 let meta: TreeMeta = rmp_serde::from_slice(bytes)
535 .map_err(|e| anyhow::anyhow!("Failed to deserialize TreeMeta: {}", e))?;
536
537 if meta.priority == PRIORITY_OWN {
538 own += meta.total_size;
539 } else if meta.priority >= PRIORITY_FOLLOWED {
540 followed += meta.total_size;
541 } else {
542 other += meta.total_size;
543 }
544 }
545
546 Ok(StorageByPriority {
547 own,
548 followed,
549 other,
550 })
551 }
552
553 pub fn get_storage_stats(&self) -> Result<StorageStats> {
555 let rtxn = self.env.read_txn()?;
556 let total_pins = self.pins.len(&rtxn)? as usize;
557
558 let stats = self
559 .router
560 .stats()
561 .map_err(|e| anyhow::anyhow!("Failed to get stats: {}", e))?;
562
563 Ok(StorageStats {
564 total_dags: stats.count,
565 pinned_dags: total_pins,
566 total_bytes: stats.total_bytes,
567 })
568 }
569}