ipfrs_storage/
pinning.rs

1//! Pin management for preventing garbage collection of important blocks.
2//!
3//! Pinning allows users to mark blocks as important, preventing them from
4//! being garbage collected. Supports both direct pins (single block) and
5//! recursive pins (entire DAG).
6//!
7//! # Pin Types
8//!
9//! - **Direct**: Only pins the specific block
10//! - **Recursive**: Pins the block and all blocks it references (entire DAG)
11//! - **Indirect**: Block is pinned because it's referenced by a recursively pinned block
12
13use dashmap::DashMap;
14use ipfrs_core::{Cid, Error, Result};
15use serde::{Deserialize, Serialize};
16use std::collections::{HashMap, HashSet};
17use std::path::Path;
18use std::sync::atomic::{AtomicU64, Ordering};
19use std::time::{SystemTime, UNIX_EPOCH};
20
21/// Type of pin
22#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
23pub enum PinType {
24    /// Direct pin - only this specific block
25    Direct,
26    /// Recursive pin - this block and all referenced blocks
27    Recursive,
28    /// Indirect pin - pinned via a recursive pin (internal use)
29    Indirect,
30}
31
32/// Metadata about a pinned block
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct PinInfo {
35    /// Type of pin
36    pub pin_type: PinType,
37    /// Reference count (how many times this CID is pinned)
38    pub ref_count: u32,
39    /// When the pin was created (Unix timestamp)
40    pub created_at: u64,
41    /// Optional name/label for the pin
42    pub name: Option<String>,
43    /// For indirect pins, the parent that caused this pin
44    pub pinned_by: Option<Vec<u8>>, // Serialized CID
45}
46
47impl PinInfo {
48    fn new(pin_type: PinType) -> Self {
49        let now = SystemTime::now()
50            .duration_since(UNIX_EPOCH)
51            .unwrap_or_default()
52            .as_secs();
53
54        Self {
55            pin_type,
56            ref_count: 1,
57            created_at: now,
58            name: None,
59            pinned_by: None,
60        }
61    }
62
63    fn with_name(mut self, name: String) -> Self {
64        self.name = Some(name);
65        self
66    }
67
68    fn with_parent(mut self, parent: &Cid) -> Self {
69        self.pinned_by = Some(parent.to_bytes());
70        self
71    }
72}
73
74/// Pin manager for tracking pinned blocks
75pub struct PinManager {
76    /// Map of CID -> PinInfo
77    pins: DashMap<Vec<u8>, PinInfo>,
78    /// Statistics
79    stats: PinStats,
80}
81
82/// Statistics about pins
83#[derive(Debug, Default)]
84pub struct PinStats {
85    /// Total number of pins
86    total_pins: AtomicU64,
87    /// Number of direct pins
88    direct_pins: AtomicU64,
89    /// Number of recursive pins
90    recursive_pins: AtomicU64,
91    /// Number of indirect pins
92    indirect_pins: AtomicU64,
93}
94
95impl PinStats {
96    fn increment(&self, pin_type: PinType) {
97        self.total_pins.fetch_add(1, Ordering::Relaxed);
98        match pin_type {
99            PinType::Direct => self.direct_pins.fetch_add(1, Ordering::Relaxed),
100            PinType::Recursive => self.recursive_pins.fetch_add(1, Ordering::Relaxed),
101            PinType::Indirect => self.indirect_pins.fetch_add(1, Ordering::Relaxed),
102        };
103    }
104
105    fn decrement(&self, pin_type: PinType) {
106        self.total_pins.fetch_sub(1, Ordering::Relaxed);
107        match pin_type {
108            PinType::Direct => self.direct_pins.fetch_sub(1, Ordering::Relaxed),
109            PinType::Recursive => self.recursive_pins.fetch_sub(1, Ordering::Relaxed),
110            PinType::Indirect => self.indirect_pins.fetch_sub(1, Ordering::Relaxed),
111        };
112    }
113
114    /// Get a snapshot of the statistics
115    pub fn snapshot(&self) -> PinStatsSnapshot {
116        PinStatsSnapshot {
117            total_pins: self.total_pins.load(Ordering::Relaxed),
118            direct_pins: self.direct_pins.load(Ordering::Relaxed),
119            recursive_pins: self.recursive_pins.load(Ordering::Relaxed),
120            indirect_pins: self.indirect_pins.load(Ordering::Relaxed),
121        }
122    }
123}
124
125/// Snapshot of pin statistics
126#[derive(Debug, Clone)]
127pub struct PinStatsSnapshot {
128    pub total_pins: u64,
129    pub direct_pins: u64,
130    pub recursive_pins: u64,
131    pub indirect_pins: u64,
132}
133
134impl PinManager {
135    /// Create a new pin manager
136    pub fn new() -> Self {
137        Self {
138            pins: DashMap::new(),
139            stats: PinStats::default(),
140        }
141    }
142
143    /// Pin a block directly (single block only)
144    pub fn pin(&self, cid: &Cid) -> Result<()> {
145        self.pin_with_type(cid, PinType::Direct, None)
146    }
147
148    /// Pin a block with a name
149    pub fn pin_named(&self, cid: &Cid, name: String) -> Result<()> {
150        self.pin_with_type(cid, PinType::Direct, Some(name))
151    }
152
153    /// Pin a block with specific type
154    fn pin_with_type(&self, cid: &Cid, pin_type: PinType, name: Option<String>) -> Result<()> {
155        let key = cid.to_bytes();
156
157        self.pins
158            .entry(key)
159            .and_modify(|info| {
160                info.ref_count += 1;
161                // Upgrade pin type if needed (direct -> recursive)
162                if pin_type == PinType::Recursive && info.pin_type == PinType::Direct {
163                    self.stats.decrement(PinType::Direct);
164                    self.stats.increment(PinType::Recursive);
165                    info.pin_type = PinType::Recursive;
166                }
167            })
168            .or_insert_with(|| {
169                self.stats.increment(pin_type);
170                let mut info = PinInfo::new(pin_type);
171                if let Some(n) = name {
172                    info = info.with_name(n);
173                }
174                info
175            });
176
177        Ok(())
178    }
179
180    /// Pin a block recursively (pins all referenced blocks)
181    ///
182    /// The `link_resolver` function should return all CIDs that this block links to.
183    pub fn pin_recursive<F>(&self, cid: &Cid, link_resolver: F) -> Result<usize>
184    where
185        F: Fn(&Cid) -> Result<Vec<Cid>>,
186    {
187        let mut pinned_count = 0;
188        let mut to_process = vec![*cid];
189        let mut seen = HashSet::new();
190
191        // Pin the root as recursive
192        self.pin_with_type(cid, PinType::Recursive, None)?;
193        pinned_count += 1;
194        seen.insert(*cid);
195
196        // Process all linked blocks
197        while let Some(current_cid) = to_process.pop() {
198            let links = link_resolver(&current_cid)?;
199
200            for link_cid in links {
201                if seen.insert(link_cid) {
202                    // Pin as indirect (pinned by the root)
203                    self.pin_indirect(&link_cid, cid)?;
204                    pinned_count += 1;
205                    to_process.push(link_cid);
206                }
207            }
208        }
209
210        Ok(pinned_count)
211    }
212
213    /// Pin a block indirectly (used internally for recursive pins)
214    fn pin_indirect(&self, cid: &Cid, parent: &Cid) -> Result<()> {
215        let key = cid.to_bytes();
216
217        self.pins
218            .entry(key)
219            .and_modify(|info| {
220                info.ref_count += 1;
221            })
222            .or_insert_with(|| {
223                self.stats.increment(PinType::Indirect);
224                PinInfo::new(PinType::Indirect).with_parent(parent)
225            });
226
227        Ok(())
228    }
229
230    /// Unpin a block
231    pub fn unpin(&self, cid: &Cid) -> Result<bool> {
232        let key = cid.to_bytes();
233
234        let mut removed = false;
235        self.pins.remove_if(&key, |_, info| {
236            if info.ref_count <= 1 {
237                self.stats.decrement(info.pin_type);
238                removed = true;
239                true // Remove the entry
240            } else {
241                false // Keep the entry, just decrement
242            }
243        });
244
245        if !removed {
246            // Decrement ref count if entry wasn't removed
247            if let Some(mut entry) = self.pins.get_mut(&key) {
248                entry.ref_count -= 1;
249            }
250        }
251
252        Ok(removed)
253    }
254
255    /// Unpin a block recursively
256    pub fn unpin_recursive<F>(&self, cid: &Cid, link_resolver: F) -> Result<usize>
257    where
258        F: Fn(&Cid) -> Result<Vec<Cid>>,
259    {
260        let mut unpinned_count = 0;
261        let mut to_process = vec![*cid];
262        let mut seen = HashSet::new();
263
264        while let Some(current_cid) = to_process.pop() {
265            if !seen.insert(current_cid) {
266                continue;
267            }
268
269            if self.unpin(&current_cid)? {
270                unpinned_count += 1;
271            }
272
273            // Get links before unpinning might remove info
274            if let Ok(links) = link_resolver(&current_cid) {
275                to_process.extend(links);
276            }
277        }
278
279        Ok(unpinned_count)
280    }
281
282    /// Check if a block is pinned
283    pub fn is_pinned(&self, cid: &Cid) -> bool {
284        self.pins.contains_key(&cid.to_bytes())
285    }
286
287    /// Get pin info for a block
288    pub fn get_pin_info(&self, cid: &Cid) -> Option<PinInfo> {
289        self.pins.get(&cid.to_bytes()).map(|r| r.clone())
290    }
291
292    /// List all pinned CIDs
293    pub fn list_pins(&self) -> Result<Vec<(Cid, PinInfo)>> {
294        let mut result = Vec::new();
295        for entry in self.pins.iter() {
296            let cid = Cid::try_from(entry.key().clone())
297                .map_err(|e| Error::Cid(format!("Invalid CID: {e}")))?;
298            result.push((cid, entry.value().clone()));
299        }
300        Ok(result)
301    }
302
303    /// List pins of a specific type
304    pub fn list_pins_by_type(&self, pin_type: PinType) -> Result<Vec<Cid>> {
305        let mut result = Vec::new();
306        for entry in self.pins.iter() {
307            if entry.value().pin_type == pin_type {
308                let cid = Cid::try_from(entry.key().clone())
309                    .map_err(|e| Error::Cid(format!("Invalid CID: {e}")))?;
310                result.push(cid);
311            }
312        }
313        Ok(result)
314    }
315
316    /// Get statistics
317    pub fn stats(&self) -> PinStatsSnapshot {
318        self.stats.snapshot()
319    }
320
321    /// Save pins to a file
322    pub fn save_to_file(&self, path: &Path) -> Result<()> {
323        let pins: HashMap<Vec<u8>, PinInfo> = self
324            .pins
325            .iter()
326            .map(|r| (r.key().clone(), r.value().clone()))
327            .collect();
328
329        let data = oxicode::serde::encode_to_vec(&pins, oxicode::config::standard())
330            .map_err(|e| Error::Serialization(format!("Failed to serialize pins: {e}")))?;
331
332        std::fs::write(path, data)
333            .map_err(|e| Error::Storage(format!("Failed to write pins: {e}")))?;
334
335        Ok(())
336    }
337
338    /// Load pins from a file
339    pub fn load_from_file(path: &Path) -> Result<Self> {
340        let data =
341            std::fs::read(path).map_err(|e| Error::Storage(format!("Failed to read pins: {e}")))?;
342
343        let pins: HashMap<Vec<u8>, PinInfo> =
344            oxicode::serde::decode_owned_from_slice(&data, oxicode::config::standard())
345                .map(|(v, _)| v)
346                .map_err(|e| Error::Deserialization(format!("Failed to deserialize pins: {e}")))?;
347
348        let manager = Self::new();
349
350        for (key, info) in pins {
351            manager.stats.increment(info.pin_type);
352            manager.pins.insert(key, info);
353        }
354
355        Ok(manager)
356    }
357
358    /// Clear all pins
359    pub fn clear(&self) {
360        self.pins.clear();
361        self.stats.total_pins.store(0, Ordering::Relaxed);
362        self.stats.direct_pins.store(0, Ordering::Relaxed);
363        self.stats.recursive_pins.store(0, Ordering::Relaxed);
364        self.stats.indirect_pins.store(0, Ordering::Relaxed);
365    }
366}
367
368impl Default for PinManager {
369    fn default() -> Self {
370        Self::new()
371    }
372}
373
374/// Pin set - a named collection of pins
375#[derive(Debug, Clone, Serialize, Deserialize)]
376pub struct PinSet {
377    /// Name of the pin set
378    pub name: String,
379    /// Description
380    pub description: Option<String>,
381    /// CIDs in this set
382    pub cids: Vec<Vec<u8>>,
383    /// Created timestamp
384    pub created_at: u64,
385}
386
387impl PinSet {
388    /// Create a new pin set
389    pub fn new(name: String) -> Self {
390        let now = SystemTime::now()
391            .duration_since(UNIX_EPOCH)
392            .unwrap_or_default()
393            .as_secs();
394
395        Self {
396            name,
397            description: None,
398            cids: Vec::new(),
399            created_at: now,
400        }
401    }
402
403    /// Add a CID to the set
404    pub fn add(&mut self, cid: &Cid) {
405        let bytes = cid.to_bytes();
406        if !self.cids.contains(&bytes) {
407            self.cids.push(bytes);
408        }
409    }
410
411    /// Remove a CID from the set
412    pub fn remove(&mut self, cid: &Cid) {
413        let bytes = cid.to_bytes();
414        self.cids.retain(|c| c != &bytes);
415    }
416
417    /// Check if set contains a CID
418    pub fn contains(&self, cid: &Cid) -> bool {
419        let bytes = cid.to_bytes();
420        self.cids.contains(&bytes)
421    }
422
423    /// Get all CIDs in the set
424    pub fn list_cids(&self) -> Result<Vec<Cid>> {
425        self.cids
426            .iter()
427            .map(|bytes| {
428                Cid::try_from(bytes.clone()).map_err(|e| Error::Cid(format!("Invalid CID: {e}")))
429            })
430            .collect()
431    }
432
433    /// Number of items in the set
434    pub fn len(&self) -> usize {
435        self.cids.len()
436    }
437
438    /// Check if set is empty
439    pub fn is_empty(&self) -> bool {
440        self.cids.is_empty()
441    }
442}
443
444#[cfg(test)]
445mod tests {
446    use super::*;
447    use bytes::Bytes;
448    use ipfrs_core::Block;
449
450    fn make_test_cid(data: &[u8]) -> Cid {
451        let block = Block::new(Bytes::copy_from_slice(data)).unwrap();
452        *block.cid()
453    }
454
455    #[test]
456    fn test_pin_unpin() {
457        let manager = PinManager::new();
458        let cid = make_test_cid(b"test block");
459
460        // Pin
461        manager.pin(&cid).unwrap();
462        assert!(manager.is_pinned(&cid));
463
464        // Check stats
465        let stats = manager.stats();
466        assert_eq!(stats.total_pins, 1);
467        assert_eq!(stats.direct_pins, 1);
468
469        // Unpin
470        manager.unpin(&cid).unwrap();
471        assert!(!manager.is_pinned(&cid));
472
473        let stats = manager.stats();
474        assert_eq!(stats.total_pins, 0);
475    }
476
477    #[test]
478    fn test_pin_refcount() {
479        let manager = PinManager::new();
480        let cid = make_test_cid(b"test block");
481
482        // Pin twice
483        manager.pin(&cid).unwrap();
484        manager.pin(&cid).unwrap();
485
486        let info = manager.get_pin_info(&cid).unwrap();
487        assert_eq!(info.ref_count, 2);
488
489        // Unpin once - should still be pinned
490        manager.unpin(&cid).unwrap();
491        assert!(manager.is_pinned(&cid));
492
493        // Unpin again - should be removed
494        manager.unpin(&cid).unwrap();
495        assert!(!manager.is_pinned(&cid));
496    }
497
498    #[test]
499    fn test_list_pins_by_type() {
500        let manager = PinManager::new();
501        let cid1 = make_test_cid(b"block1");
502        let cid2 = make_test_cid(b"block2");
503
504        manager.pin(&cid1).unwrap();
505        manager
506            .pin_with_type(&cid2, PinType::Recursive, None)
507            .unwrap();
508
509        let direct = manager.list_pins_by_type(PinType::Direct).unwrap();
510        assert_eq!(direct.len(), 1);
511        assert_eq!(direct[0], cid1);
512
513        let recursive = manager.list_pins_by_type(PinType::Recursive).unwrap();
514        assert_eq!(recursive.len(), 1);
515        assert_eq!(recursive[0], cid2);
516    }
517
518    #[test]
519    fn test_pin_set() {
520        let mut set = PinSet::new("test".to_string());
521        let cid1 = make_test_cid(b"block1");
522        let cid2 = make_test_cid(b"block2");
523
524        set.add(&cid1);
525        set.add(&cid2);
526        assert_eq!(set.len(), 2);
527        assert!(set.contains(&cid1));
528
529        set.remove(&cid1);
530        assert!(!set.contains(&cid1));
531        assert_eq!(set.len(), 1);
532    }
533}