1use dashmap::DashMap;
14use ipfrs_core::{Cid, Error, Result};
15use serde::{Deserialize, Serialize};
16use std::collections::{HashMap, HashSet};
17use std::path::Path;
18use std::sync::atomic::{AtomicU64, Ordering};
19use std::time::{SystemTime, UNIX_EPOCH};
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
23pub enum PinType {
24 Direct,
26 Recursive,
28 Indirect,
30}
31
32#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct PinInfo {
35 pub pin_type: PinType,
37 pub ref_count: u32,
39 pub created_at: u64,
41 pub name: Option<String>,
43 pub pinned_by: Option<Vec<u8>>, }
46
47impl PinInfo {
48 fn new(pin_type: PinType) -> Self {
49 let now = SystemTime::now()
50 .duration_since(UNIX_EPOCH)
51 .unwrap_or_default()
52 .as_secs();
53
54 Self {
55 pin_type,
56 ref_count: 1,
57 created_at: now,
58 name: None,
59 pinned_by: None,
60 }
61 }
62
63 fn with_name(mut self, name: String) -> Self {
64 self.name = Some(name);
65 self
66 }
67
68 fn with_parent(mut self, parent: &Cid) -> Self {
69 self.pinned_by = Some(parent.to_bytes());
70 self
71 }
72}
73
74pub struct PinManager {
76 pins: DashMap<Vec<u8>, PinInfo>,
78 stats: PinStats,
80}
81
82#[derive(Debug, Default)]
84pub struct PinStats {
85 total_pins: AtomicU64,
87 direct_pins: AtomicU64,
89 recursive_pins: AtomicU64,
91 indirect_pins: AtomicU64,
93}
94
95impl PinStats {
96 fn increment(&self, pin_type: PinType) {
97 self.total_pins.fetch_add(1, Ordering::Relaxed);
98 match pin_type {
99 PinType::Direct => self.direct_pins.fetch_add(1, Ordering::Relaxed),
100 PinType::Recursive => self.recursive_pins.fetch_add(1, Ordering::Relaxed),
101 PinType::Indirect => self.indirect_pins.fetch_add(1, Ordering::Relaxed),
102 };
103 }
104
105 fn decrement(&self, pin_type: PinType) {
106 self.total_pins.fetch_sub(1, Ordering::Relaxed);
107 match pin_type {
108 PinType::Direct => self.direct_pins.fetch_sub(1, Ordering::Relaxed),
109 PinType::Recursive => self.recursive_pins.fetch_sub(1, Ordering::Relaxed),
110 PinType::Indirect => self.indirect_pins.fetch_sub(1, Ordering::Relaxed),
111 };
112 }
113
114 pub fn snapshot(&self) -> PinStatsSnapshot {
116 PinStatsSnapshot {
117 total_pins: self.total_pins.load(Ordering::Relaxed),
118 direct_pins: self.direct_pins.load(Ordering::Relaxed),
119 recursive_pins: self.recursive_pins.load(Ordering::Relaxed),
120 indirect_pins: self.indirect_pins.load(Ordering::Relaxed),
121 }
122 }
123}
124
125#[derive(Debug, Clone)]
127pub struct PinStatsSnapshot {
128 pub total_pins: u64,
129 pub direct_pins: u64,
130 pub recursive_pins: u64,
131 pub indirect_pins: u64,
132}
133
134impl PinManager {
135 pub fn new() -> Self {
137 Self {
138 pins: DashMap::new(),
139 stats: PinStats::default(),
140 }
141 }
142
143 pub fn pin(&self, cid: &Cid) -> Result<()> {
145 self.pin_with_type(cid, PinType::Direct, None)
146 }
147
148 pub fn pin_named(&self, cid: &Cid, name: String) -> Result<()> {
150 self.pin_with_type(cid, PinType::Direct, Some(name))
151 }
152
153 fn pin_with_type(&self, cid: &Cid, pin_type: PinType, name: Option<String>) -> Result<()> {
155 let key = cid.to_bytes();
156
157 self.pins
158 .entry(key)
159 .and_modify(|info| {
160 info.ref_count += 1;
161 if pin_type == PinType::Recursive && info.pin_type == PinType::Direct {
163 self.stats.decrement(PinType::Direct);
164 self.stats.increment(PinType::Recursive);
165 info.pin_type = PinType::Recursive;
166 }
167 })
168 .or_insert_with(|| {
169 self.stats.increment(pin_type);
170 let mut info = PinInfo::new(pin_type);
171 if let Some(n) = name {
172 info = info.with_name(n);
173 }
174 info
175 });
176
177 Ok(())
178 }
179
180 pub fn pin_recursive<F>(&self, cid: &Cid, link_resolver: F) -> Result<usize>
184 where
185 F: Fn(&Cid) -> Result<Vec<Cid>>,
186 {
187 let mut pinned_count = 0;
188 let mut to_process = vec![*cid];
189 let mut seen = HashSet::new();
190
191 self.pin_with_type(cid, PinType::Recursive, None)?;
193 pinned_count += 1;
194 seen.insert(*cid);
195
196 while let Some(current_cid) = to_process.pop() {
198 let links = link_resolver(¤t_cid)?;
199
200 for link_cid in links {
201 if seen.insert(link_cid) {
202 self.pin_indirect(&link_cid, cid)?;
204 pinned_count += 1;
205 to_process.push(link_cid);
206 }
207 }
208 }
209
210 Ok(pinned_count)
211 }
212
213 fn pin_indirect(&self, cid: &Cid, parent: &Cid) -> Result<()> {
215 let key = cid.to_bytes();
216
217 self.pins
218 .entry(key)
219 .and_modify(|info| {
220 info.ref_count += 1;
221 })
222 .or_insert_with(|| {
223 self.stats.increment(PinType::Indirect);
224 PinInfo::new(PinType::Indirect).with_parent(parent)
225 });
226
227 Ok(())
228 }
229
230 pub fn unpin(&self, cid: &Cid) -> Result<bool> {
232 let key = cid.to_bytes();
233
234 let mut removed = false;
235 self.pins.remove_if(&key, |_, info| {
236 if info.ref_count <= 1 {
237 self.stats.decrement(info.pin_type);
238 removed = true;
239 true } else {
241 false }
243 });
244
245 if !removed {
246 if let Some(mut entry) = self.pins.get_mut(&key) {
248 entry.ref_count -= 1;
249 }
250 }
251
252 Ok(removed)
253 }
254
255 pub fn unpin_recursive<F>(&self, cid: &Cid, link_resolver: F) -> Result<usize>
257 where
258 F: Fn(&Cid) -> Result<Vec<Cid>>,
259 {
260 let mut unpinned_count = 0;
261 let mut to_process = vec![*cid];
262 let mut seen = HashSet::new();
263
264 while let Some(current_cid) = to_process.pop() {
265 if !seen.insert(current_cid) {
266 continue;
267 }
268
269 if self.unpin(¤t_cid)? {
270 unpinned_count += 1;
271 }
272
273 if let Ok(links) = link_resolver(¤t_cid) {
275 to_process.extend(links);
276 }
277 }
278
279 Ok(unpinned_count)
280 }
281
282 pub fn is_pinned(&self, cid: &Cid) -> bool {
284 self.pins.contains_key(&cid.to_bytes())
285 }
286
287 pub fn get_pin_info(&self, cid: &Cid) -> Option<PinInfo> {
289 self.pins.get(&cid.to_bytes()).map(|r| r.clone())
290 }
291
292 pub fn list_pins(&self) -> Result<Vec<(Cid, PinInfo)>> {
294 let mut result = Vec::new();
295 for entry in self.pins.iter() {
296 let cid = Cid::try_from(entry.key().clone())
297 .map_err(|e| Error::Cid(format!("Invalid CID: {e}")))?;
298 result.push((cid, entry.value().clone()));
299 }
300 Ok(result)
301 }
302
303 pub fn list_pins_by_type(&self, pin_type: PinType) -> Result<Vec<Cid>> {
305 let mut result = Vec::new();
306 for entry in self.pins.iter() {
307 if entry.value().pin_type == pin_type {
308 let cid = Cid::try_from(entry.key().clone())
309 .map_err(|e| Error::Cid(format!("Invalid CID: {e}")))?;
310 result.push(cid);
311 }
312 }
313 Ok(result)
314 }
315
316 pub fn stats(&self) -> PinStatsSnapshot {
318 self.stats.snapshot()
319 }
320
321 pub fn save_to_file(&self, path: &Path) -> Result<()> {
323 let pins: HashMap<Vec<u8>, PinInfo> = self
324 .pins
325 .iter()
326 .map(|r| (r.key().clone(), r.value().clone()))
327 .collect();
328
329 let data = oxicode::serde::encode_to_vec(&pins, oxicode::config::standard())
330 .map_err(|e| Error::Serialization(format!("Failed to serialize pins: {e}")))?;
331
332 std::fs::write(path, data)
333 .map_err(|e| Error::Storage(format!("Failed to write pins: {e}")))?;
334
335 Ok(())
336 }
337
338 pub fn load_from_file(path: &Path) -> Result<Self> {
340 let data =
341 std::fs::read(path).map_err(|e| Error::Storage(format!("Failed to read pins: {e}")))?;
342
343 let pins: HashMap<Vec<u8>, PinInfo> =
344 oxicode::serde::decode_owned_from_slice(&data, oxicode::config::standard())
345 .map(|(v, _)| v)
346 .map_err(|e| Error::Deserialization(format!("Failed to deserialize pins: {e}")))?;
347
348 let manager = Self::new();
349
350 for (key, info) in pins {
351 manager.stats.increment(info.pin_type);
352 manager.pins.insert(key, info);
353 }
354
355 Ok(manager)
356 }
357
358 pub fn clear(&self) {
360 self.pins.clear();
361 self.stats.total_pins.store(0, Ordering::Relaxed);
362 self.stats.direct_pins.store(0, Ordering::Relaxed);
363 self.stats.recursive_pins.store(0, Ordering::Relaxed);
364 self.stats.indirect_pins.store(0, Ordering::Relaxed);
365 }
366}
367
368impl Default for PinManager {
369 fn default() -> Self {
370 Self::new()
371 }
372}
373
374#[derive(Debug, Clone, Serialize, Deserialize)]
376pub struct PinSet {
377 pub name: String,
379 pub description: Option<String>,
381 pub cids: Vec<Vec<u8>>,
383 pub created_at: u64,
385}
386
387impl PinSet {
388 pub fn new(name: String) -> Self {
390 let now = SystemTime::now()
391 .duration_since(UNIX_EPOCH)
392 .unwrap_or_default()
393 .as_secs();
394
395 Self {
396 name,
397 description: None,
398 cids: Vec::new(),
399 created_at: now,
400 }
401 }
402
403 pub fn add(&mut self, cid: &Cid) {
405 let bytes = cid.to_bytes();
406 if !self.cids.contains(&bytes) {
407 self.cids.push(bytes);
408 }
409 }
410
411 pub fn remove(&mut self, cid: &Cid) {
413 let bytes = cid.to_bytes();
414 self.cids.retain(|c| c != &bytes);
415 }
416
417 pub fn contains(&self, cid: &Cid) -> bool {
419 let bytes = cid.to_bytes();
420 self.cids.contains(&bytes)
421 }
422
423 pub fn list_cids(&self) -> Result<Vec<Cid>> {
425 self.cids
426 .iter()
427 .map(|bytes| {
428 Cid::try_from(bytes.clone()).map_err(|e| Error::Cid(format!("Invalid CID: {e}")))
429 })
430 .collect()
431 }
432
433 pub fn len(&self) -> usize {
435 self.cids.len()
436 }
437
438 pub fn is_empty(&self) -> bool {
440 self.cids.is_empty()
441 }
442}
443
444#[cfg(test)]
445mod tests {
446 use super::*;
447 use bytes::Bytes;
448 use ipfrs_core::Block;
449
450 fn make_test_cid(data: &[u8]) -> Cid {
451 let block = Block::new(Bytes::copy_from_slice(data)).unwrap();
452 *block.cid()
453 }
454
455 #[test]
456 fn test_pin_unpin() {
457 let manager = PinManager::new();
458 let cid = make_test_cid(b"test block");
459
460 manager.pin(&cid).unwrap();
462 assert!(manager.is_pinned(&cid));
463
464 let stats = manager.stats();
466 assert_eq!(stats.total_pins, 1);
467 assert_eq!(stats.direct_pins, 1);
468
469 manager.unpin(&cid).unwrap();
471 assert!(!manager.is_pinned(&cid));
472
473 let stats = manager.stats();
474 assert_eq!(stats.total_pins, 0);
475 }
476
477 #[test]
478 fn test_pin_refcount() {
479 let manager = PinManager::new();
480 let cid = make_test_cid(b"test block");
481
482 manager.pin(&cid).unwrap();
484 manager.pin(&cid).unwrap();
485
486 let info = manager.get_pin_info(&cid).unwrap();
487 assert_eq!(info.ref_count, 2);
488
489 manager.unpin(&cid).unwrap();
491 assert!(manager.is_pinned(&cid));
492
493 manager.unpin(&cid).unwrap();
495 assert!(!manager.is_pinned(&cid));
496 }
497
498 #[test]
499 fn test_list_pins_by_type() {
500 let manager = PinManager::new();
501 let cid1 = make_test_cid(b"block1");
502 let cid2 = make_test_cid(b"block2");
503
504 manager.pin(&cid1).unwrap();
505 manager
506 .pin_with_type(&cid2, PinType::Recursive, None)
507 .unwrap();
508
509 let direct = manager.list_pins_by_type(PinType::Direct).unwrap();
510 assert_eq!(direct.len(), 1);
511 assert_eq!(direct[0], cid1);
512
513 let recursive = manager.list_pins_by_type(PinType::Recursive).unwrap();
514 assert_eq!(recursive.len(), 1);
515 assert_eq!(recursive[0], cid2);
516 }
517
518 #[test]
519 fn test_pin_set() {
520 let mut set = PinSet::new("test".to_string());
521 let cid1 = make_test_cid(b"block1");
522 let cid2 = make_test_cid(b"block2");
523
524 set.add(&cid1);
525 set.add(&cid2);
526 assert_eq!(set.len(), 2);
527 assert!(set.contains(&cid1));
528
529 set.remove(&cid1);
530 assert!(!set.contains(&cid1));
531 assert_eq!(set.len(), 1);
532 }
533}