Skip to main content

rns_net/
announce_cache.rs

1//! Announce cache for disk persistence.
2//!
3//! Caches announce packets to disk for path request responses.
4//! File format matches Python: msgpack `[raw_bytes, interface_name_or_nil]`.
5//! Filename: hex-encoded packet_hash (64 chars).
6//!
7//! Python reference: Transport.py:2334-2402
8
9use std::fs;
10use std::io;
11use std::path::PathBuf;
12
13use rns_core::msgpack::{self, Value};
14
15/// Announce cache backed by filesystem.
16pub struct AnnounceCache {
17    base_path: PathBuf,
18}
19
20impl AnnounceCache {
21    /// Create an announce cache at the given directory.
22    /// The directory must already exist (created by `ensure_storage_dirs`).
23    pub fn new(base_path: PathBuf) -> Self {
24        AnnounceCache { base_path }
25    }
26
27    /// Store a cached announce to disk.
28    ///
29    /// `packet_hash`: 32-byte packet hash (used as filename)
30    /// `raw`: raw announce bytes (pre-hop-increment)
31    /// `interface_name`: optional interface name string
32    pub fn store(
33        &self,
34        packet_hash: &[u8; 32],
35        raw: &[u8],
36        interface_name: Option<&str>,
37    ) -> io::Result<()> {
38        let filename = hex_encode(packet_hash);
39        let path = self.base_path.join(&filename);
40
41        let iface_val = match interface_name {
42            Some(name) => Value::Str(name.into()),
43            None => Value::Nil,
44        };
45        let data = msgpack::pack(&Value::Array(vec![Value::Bin(raw.to_vec()), iface_val]));
46
47        fs::write(path, data)
48    }
49
50    /// Retrieve a cached announce from disk.
51    ///
52    /// Returns `(raw_bytes, interface_name_or_none)`.
53    pub fn get(&self, packet_hash: &[u8; 32]) -> io::Result<Option<(Vec<u8>, Option<String>)>> {
54        let filename = hex_encode(packet_hash);
55        let path = self.base_path.join(&filename);
56
57        if !path.is_file() {
58            return Ok(None);
59        }
60
61        let data = fs::read(&path)?;
62        let (value, _) = msgpack::unpack(&data).map_err(|e| {
63            io::Error::new(io::ErrorKind::InvalidData, format!("msgpack error: {}", e))
64        })?;
65
66        let arr = value
67            .as_array()
68            .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "Expected msgpack array"))?;
69
70        if arr.is_empty() {
71            return Ok(None);
72        }
73
74        let raw = arr[0]
75            .as_bin()
76            .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "Expected bin raw bytes"))?;
77
78        let iface_name = if arr.len() > 1 {
79            arr[1].as_str().map(|s| s.to_string())
80        } else {
81            None
82        };
83
84        Ok(Some((raw.to_vec(), iface_name)))
85    }
86
87    /// Open a directory iterator for incremental cleanup.
88    pub fn entries(&self) -> io::Result<fs::ReadDir> {
89        fs::read_dir(&self.base_path)
90    }
91
92    /// Remove cached announces whose packet hashes are not in the active set.
93    ///
94    /// `active_hashes`: set of packet hashes that should be kept.
95    /// `batch_limit`: maximum number of files to process per call (0 = unlimited).
96    /// Returns `(removed_count, finished)` where `finished` is true if all files
97    /// were processed (no more work to do).
98    pub fn clean(
99        &self,
100        active_hashes: &[[u8; 32]],
101        batch_limit: usize,
102    ) -> io::Result<(usize, bool)> {
103        let mut entries = match fs::read_dir(&self.base_path) {
104            Ok(entries) => entries,
105            Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok((0, true)),
106            Err(e) => return Err(e),
107        };
108        self.clean_batch(active_hashes, &mut entries, batch_limit)
109    }
110
111    /// Incrementally remove stale cached announces from an existing directory iterator.
112    ///
113    /// `entries` keeps iteration state across calls, allowing batched cleanup to
114    /// make forward progress through large directories.
115    pub fn clean_batch(
116        &self,
117        active_hashes: &[[u8; 32]],
118        entries: &mut fs::ReadDir,
119        batch_limit: usize,
120    ) -> io::Result<(usize, bool)> {
121        use std::collections::HashSet;
122
123        let active_set: HashSet<[u8; 32]> = active_hashes.iter().copied().collect();
124        let mut removed = 0;
125        let mut processed = 0;
126
127        loop {
128            if batch_limit > 0 && processed >= batch_limit {
129                return Ok((removed, false));
130            }
131
132            let Some(entry) = entries.next() else {
133                return Ok((removed, true));
134            };
135
136            let entry = entry?;
137            let path = entry.path();
138            if !path.is_file() {
139                continue;
140            }
141            processed += 1;
142
143            let filename = match path.file_name().and_then(|n| n.to_str()) {
144                Some(n) => n,
145                None => continue,
146            };
147
148            match hex_decode(filename) {
149                Some(hash) => {
150                    if !active_set.contains(&hash) {
151                        let _ = fs::remove_file(&path);
152                        removed += 1;
153                    }
154                }
155                None => {
156                    let _ = fs::remove_file(&path);
157                    removed += 1;
158                }
159            }
160        }
161    }
162
163    /// Get the base path for testing.
164    #[cfg(test)]
165    pub fn base_path(&self) -> &std::path::Path {
166        &self.base_path
167    }
168}
169
170/// Encode 32 bytes as 64-char lowercase hex string.
171fn hex_encode(bytes: &[u8; 32]) -> String {
172    let mut s = String::with_capacity(64);
173    for b in bytes {
174        s.push(HEX_CHARS[(b >> 4) as usize]);
175        s.push(HEX_CHARS[(b & 0x0f) as usize]);
176    }
177    s
178}
179
180const HEX_CHARS: [char; 16] = [
181    '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f',
182];
183
184/// Decode a 64-char hex string back to 32 bytes.
185fn hex_decode(s: &str) -> Option<[u8; 32]> {
186    if s.len() != 64 {
187        return None;
188    }
189    let mut result = [0u8; 32];
190    for (i, chunk) in s.as_bytes().chunks(2).enumerate() {
191        let high = hex_nibble(chunk[0])?;
192        let low = hex_nibble(chunk[1])?;
193        result[i] = (high << 4) | low;
194    }
195    Some(result)
196}
197
198fn hex_nibble(c: u8) -> Option<u8> {
199    match c {
200        b'0'..=b'9' => Some(c - b'0'),
201        b'a'..=b'f' => Some(c - b'a' + 10),
202        b'A'..=b'F' => Some(c - b'A' + 10),
203        _ => None,
204    }
205}
206
207#[cfg(test)]
208mod tests {
209    use super::*;
210    use std::sync::atomic::{AtomicU64, Ordering};
211
212    static TEST_COUNTER: AtomicU64 = AtomicU64::new(0);
213
214    fn temp_dir() -> PathBuf {
215        let id = TEST_COUNTER.fetch_add(1, Ordering::Relaxed);
216        let dir =
217            std::env::temp_dir().join(format!("rns-announce-cache-{}-{}", std::process::id(), id,));
218        let _ = fs::remove_dir_all(&dir);
219        fs::create_dir_all(&dir).unwrap();
220        dir
221    }
222
223    #[test]
224    fn test_hex_encode_decode_roundtrip() {
225        let hash = [0xAB; 32];
226        let encoded = hex_encode(&hash);
227        assert_eq!(encoded.len(), 64);
228        assert_eq!(encoded.len(), 64);
229        // All bytes are 0xAB so hex is "ab" repeated 32 times
230        assert!(encoded.chars().all(|c| c == 'a' || c == 'b'));
231        let decoded = hex_decode(&encoded).unwrap();
232        assert_eq!(decoded, hash);
233    }
234
235    #[test]
236    fn test_hex_decode_invalid() {
237        assert!(hex_decode("too_short").is_none());
238        assert!(hex_decode(&"zz".repeat(32)).is_none());
239    }
240
241    #[test]
242    fn test_store_and_get_roundtrip() {
243        let dir = temp_dir();
244        let cache = AnnounceCache::new(dir.clone());
245
246        let hash = [0x42; 32];
247        let raw = vec![0x01, 0x02, 0x03, 0x04, 0x05];
248        cache.store(&hash, &raw, Some("TestInterface")).unwrap();
249
250        let result = cache.get(&hash).unwrap();
251        assert!(result.is_some());
252        let (got_raw, got_name) = result.unwrap();
253        assert_eq!(got_raw, raw);
254        assert_eq!(got_name, Some("TestInterface".to_string()));
255
256        let _ = fs::remove_dir_all(&dir);
257    }
258
259    #[test]
260    fn test_store_with_nil_interface() {
261        let dir = temp_dir();
262        let cache = AnnounceCache::new(dir.clone());
263
264        let hash = [0x55; 32];
265        let raw = vec![0xAA, 0xBB];
266        cache.store(&hash, &raw, None).unwrap();
267
268        let result = cache.get(&hash).unwrap();
269        assert!(result.is_some());
270        let (got_raw, got_name) = result.unwrap();
271        assert_eq!(got_raw, raw);
272        assert_eq!(got_name, None);
273
274        let _ = fs::remove_dir_all(&dir);
275    }
276
277    #[test]
278    fn test_get_nonexistent() {
279        let dir = temp_dir();
280        let cache = AnnounceCache::new(dir.clone());
281
282        let hash = [0xFF; 32];
283        let result = cache.get(&hash).unwrap();
284        assert!(result.is_none());
285
286        let _ = fs::remove_dir_all(&dir);
287    }
288
289    #[test]
290    fn test_clean_removes_stale() {
291        let dir = temp_dir();
292        let cache = AnnounceCache::new(dir.clone());
293
294        let hash1 = [0x11; 32];
295        let hash2 = [0x22; 32];
296        let hash3 = [0x33; 32];
297
298        cache.store(&hash1, &[0x01], None).unwrap();
299        cache.store(&hash2, &[0x02], None).unwrap();
300        cache.store(&hash3, &[0x03], None).unwrap();
301
302        // Keep only hash2
303        let (removed, finished) = cache.clean(&[hash2], 0).unwrap();
304        assert_eq!(removed, 2);
305        assert!(finished);
306
307        // hash2 should still be there
308        assert!(cache.get(&hash2).unwrap().is_some());
309        // hash1 and hash3 should be gone
310        assert!(cache.get(&hash1).unwrap().is_none());
311        assert!(cache.get(&hash3).unwrap().is_none());
312
313        let _ = fs::remove_dir_all(&dir);
314    }
315
316    #[test]
317    fn test_clean_empty_dir() {
318        let dir = temp_dir();
319        let cache = AnnounceCache::new(dir.clone());
320
321        let (removed, finished) = cache.clean(&[], 0).unwrap();
322        assert_eq!(removed, 0);
323        assert!(finished);
324
325        let _ = fs::remove_dir_all(&dir);
326    }
327
328    #[test]
329    fn test_clean_batch_progresses_across_calls() {
330        let dir = temp_dir();
331        let cache = AnnounceCache::new(dir.clone());
332
333        let keep = [0x10; 32];
334        let stale1 = [0x20; 32];
335        let stale2 = [0x30; 32];
336
337        cache.store(&keep, &[0x01], None).unwrap();
338        cache.store(&stale1, &[0x02], None).unwrap();
339        cache.store(&stale2, &[0x03], None).unwrap();
340
341        let mut entries = cache.entries().unwrap();
342
343        let mut removed_total = 0;
344        let mut finished = false;
345        for _ in 0..4 {
346            let (removed, done) = cache.clean_batch(&[keep], &mut entries, 1).unwrap();
347            removed_total += removed;
348            if done {
349                finished = true;
350                break;
351            }
352        }
353
354        assert!(finished);
355        assert_eq!(removed_total, 2);
356        assert!(cache.get(&keep).unwrap().is_some());
357        assert!(cache.get(&stale1).unwrap().is_none());
358        assert!(cache.get(&stale2).unwrap().is_none());
359
360        let _ = fs::remove_dir_all(&dir);
361    }
362
363    #[test]
364    fn test_store_overwrite() {
365        let dir = temp_dir();
366        let cache = AnnounceCache::new(dir.clone());
367
368        let hash = [0x77; 32];
369        cache.store(&hash, &[0x01], Some("iface1")).unwrap();
370        cache.store(&hash, &[0x02, 0x03], Some("iface2")).unwrap();
371
372        let result = cache.get(&hash).unwrap().unwrap();
373        assert_eq!(result.0, vec![0x02, 0x03]);
374        assert_eq!(result.1, Some("iface2".to_string()));
375
376        let _ = fs::remove_dir_all(&dir);
377    }
378}