Skip to main content

igc_net/
store.rs

1//! Content-addressed flat-file blob store.
2//!
3//! Layout uses a simple local BLAKE3-addressed store:
4//!
5//! ```text
6//! {root}/
7//!   blobs/<first-2-blake3-hex>/<full-64-char-blake3-hex>   ← raw blob bytes
8//!   index.ndjson                                            ← append-only flight index
9//!   node.key                                                ← 32-byte Ed25519 secret key
10//! ```
11
12use std::collections::{HashMap, HashSet};
13use std::path::{Path, PathBuf};
14use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
15
16use serde::{Deserialize, Serialize};
17use tokio::fs;
18use tokio::io::AsyncWriteExt;
19use tokio::sync::Mutex;
20
21use crate::id::{Blake3Hex, IdentifierError, NodeIdHex};
22
23// ── Error type ────────────────────────────────────────────────────────────────
24
25#[derive(Debug, thiserror::Error)]
26pub enum StoreError {
27    #[error("I/O: {0}")]
28    Io(#[from] std::io::Error),
29    #[error("JSON: {0}")]
30    Json(#[from] serde_json::Error),
31    #[error("identifier: {0}")]
32    Identifier(#[from] IdentifierError),
33    #[error("lock poisoned: {0}")]
34    PoisonedLock(&'static str),
35}
36
37// ── IndexRecord ───────────────────────────────────────────────────────────────
38
39/// Origin of an index record.
40#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
41#[serde(rename_all = "snake_case")]
42pub enum IndexRecordSource {
43    LocalPublish,
44    RemoteAnnouncement,
45}
46
47/// One line in `index.ndjson`.
48///
49/// Records are append-only. When multiple records describe the same
50/// `(meta_hash, node_id)` pair, the latest record is authoritative.
51#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
52pub struct IndexRecord {
53    /// Whether this record was created by a local publish or a remote announce.
54    pub source: IndexRecordSource,
55    /// 64-char BLAKE3 hex of the raw IGC file.
56    pub igc_hash: Blake3Hex,
57    /// 64-char BLAKE3 hex of the metadata JSON blob.
58    pub meta_hash: Blake3Hex,
59    /// Serving node identity for this announcement.
60    pub node_id: NodeIdHex,
61    /// Latest known ticket for the IGC blob from this serving node.
62    pub igc_ticket: String,
63    /// Latest known ticket for the metadata blob from this serving node.
64    pub meta_ticket: String,
65    /// RFC 3339 UTC timestamp of when this node first published the flight.
66    pub recorded_at: String,
67}
68
69// ── FlatFileStore ─────────────────────────────────────────────────────────────
70
71/// Content-addressed flat-file blob store keyed by BLAKE3.
72///
73/// An in-memory cache of `(meta_hash, node_id)` pairs and known `meta_hash`
74/// values is maintained to avoid O(n) linear scans of `index.ndjson` on the
75/// indexer hot path.  The cache is populated during [`init`] and updated by
76/// [`append_index`].
77pub struct FlatFileStore {
78    root: PathBuf,
79    /// Cached `(meta_hash, node_id)` pairs — dedup key per the protocol spec.
80    dedup_cache: RwLock<HashSet<(Blake3Hex, NodeIdHex)>>,
81    /// Cached set of known `meta_hash` values.
82    meta_hash_cache: RwLock<HashSet<Blake3Hex>>,
83    /// Cached latest local publish record per `(igc_hash, node_id)`.
84    latest_local_publish_cache: RwLock<HashMap<(Blake3Hex, NodeIdHex), IndexRecord>>,
85    /// Cached in-order copy of all index records.
86    index_records_cache: RwLock<Vec<IndexRecord>>,
87    /// Cached remote discovery events paired with their line sequence number.
88    discovery_events_cache: RwLock<Vec<(u64, IndexRecord)>>,
89    /// Serializes index file appends and dedup checks that must be atomic.
90    append_lock: Mutex<()>,
91}
92
93type DedupKey = (Blake3Hex, NodeIdHex);
94type LatestLocalPublishMap = HashMap<DedupKey, IndexRecord>;
95
96impl FlatFileStore {
97    /// Open (or create) a store rooted at `root`.
98    ///
99    /// Directories are created lazily by [`init`].
100    pub fn open(root: impl Into<PathBuf>) -> Self {
101        Self {
102            root: root.into(),
103            dedup_cache: RwLock::new(HashSet::new()),
104            meta_hash_cache: RwLock::new(HashSet::new()),
105            latest_local_publish_cache: RwLock::new(HashMap::new()),
106            index_records_cache: RwLock::new(Vec::new()),
107            discovery_events_cache: RwLock::new(Vec::new()),
108            append_lock: Mutex::new(()),
109        }
110    }
111
112    /// Create the required directory structure and populate the in-memory
113    /// dedup cache from any existing `index.ndjson`.
114    pub async fn init(&self) -> Result<(), StoreError> {
115        fs::create_dir_all(self.blobs_dir()).await?;
116        self.reload_cache()?;
117        Ok(())
118    }
119
120    /// Rebuild the in-memory caches from `index.ndjson`.
121    fn reload_cache(&self) -> Result<(), StoreError> {
122        let mut dedup = self.dedup_cache.write().map_err(|_| StoreError::PoisonedLock("dedup_cache"))?;
123        let mut metas = self.meta_hash_cache.write().map_err(|_| StoreError::PoisonedLock("meta_hash_cache"))?;
124        let mut latest_local = self
125            .latest_local_publish_cache
126            .write()
127            .map_err(|_| StoreError::PoisonedLock("latest_local_publish_cache"))?;
128        let mut index_records = self
129            .index_records_cache
130            .write()
131            .map_err(|_| StoreError::PoisonedLock("index_records_cache"))?;
132        let mut discovery_events = self
133            .discovery_events_cache
134            .write()
135            .map_err(|_| StoreError::PoisonedLock("discovery_events_cache"))?;
136        dedup.clear();
137        metas.clear();
138        latest_local.clear();
139        index_records.clear();
140        discovery_events.clear();
141        for (seq, record) in self.iter_index_file()?.enumerate() {
142            let r = record?;
143            dedup.insert((r.meta_hash.clone(), r.node_id.clone()));
144            metas.insert(r.meta_hash.clone());
145            if r.source == IndexRecordSource::LocalPublish {
146                latest_local.insert((r.igc_hash.clone(), r.node_id.clone()), r.clone());
147            } else {
148                discovery_events.push((seq as u64, r.clone()));
149            }
150            index_records.push(r);
151        }
152        Ok(())
153    }
154
155    // ── Internal path helpers ─────────────────────────────────────────────────
156
157    fn blobs_dir(&self) -> PathBuf {
158        self.root.join("blobs")
159    }
160
161    fn blob_path(&self, blake3_hex: &Blake3Hex) -> PathBuf {
162        self.blobs_dir()
163            .join(&blake3_hex.as_str()[..2])
164            .join(blake3_hex.as_str())
165    }
166
167    fn index_path(&self) -> PathBuf {
168        self.root.join("index.ndjson")
169    }
170
171    fn key_path(&self) -> PathBuf {
172        self.root.join("node.key")
173    }
174
175    // ── Blob operations ───────────────────────────────────────────────────────
176
177    /// Return the filesystem path for a blob without reading it.
178    ///
179    /// Returns `Some(path)` if the blob exists locally, `None` otherwise.
180    pub fn resolve_path(&self, blake3_hex: &str) -> Result<Option<PathBuf>, StoreError> {
181        let blake3_hex = Blake3Hex::parse(blake3_hex)?;
182        let path = self.blob_path(&blake3_hex);
183        Ok(if path.exists() { Some(path) } else { None })
184    }
185
186    /// Hash `bytes` with BLAKE3 and store under `blobs/`.
187    ///
188    /// Returns the 64-char hex key.  Idempotent: if the blob already exists
189    /// the write is skipped (content-addressable deduplication).
190    pub async fn put(&self, bytes: &[u8]) -> Result<Blake3Hex, StoreError> {
191        let hex = Blake3Hex::from_hash(blake3::hash(bytes));
192        let path = self.blob_path(&hex);
193
194        if let Some(parent) = path.parent() {
195            fs::create_dir_all(parent).await?;
196        }
197        match fs::OpenOptions::new()
198            .create_new(true)
199            .write(true)
200            .open(&path)
201            .await
202        {
203            Ok(mut file) => {
204                file.write_all(bytes).await?;
205                file.flush().await?;
206            }
207            Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {}
208            Err(e) => return Err(StoreError::Io(e)),
209        }
210        Ok(hex)
211    }
212
213    /// Read a blob by its 64-char BLAKE3 hex key.  Returns `None` if not found.
214    pub async fn get(&self, blake3_hex: &str) -> Result<Option<Vec<u8>>, StoreError> {
215        let blake3_hex = Blake3Hex::parse(blake3_hex)?;
216        let path = self.blob_path(&blake3_hex);
217        match fs::read(&path).await {
218            Ok(bytes) => Ok(Some(bytes)),
219            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
220            Err(e) => Err(StoreError::Io(e)),
221        }
222    }
223
224    /// Check existence without reading the full blob.
225    pub fn contains(&self, blake3_hex: &str) -> Result<bool, StoreError> {
226        let blake3_hex = Blake3Hex::parse(blake3_hex)?;
227        Ok(self.blob_path(&blake3_hex).exists())
228    }
229
230    // ── Index operations ──────────────────────────────────────────────────────
231
232    /// Append one record to `index.ndjson` (one JSON object per line).
233    ///
234    /// Also updates the in-memory dedup and meta_hash caches.
235    pub async fn append_index(&self, record: &IndexRecord) -> Result<(), StoreError> {
236        let _append_guard = self.append_lock.lock().await;
237        self.append_index_unlocked(record).await
238    }
239
240    /// Append one record only if the `(meta_hash, node_id)` pair is absent.
241    ///
242    /// Returns `true` when a new record was appended, `false` when the record
243    /// was already present in the dedup cache.
244    pub async fn append_index_if_absent(&self, record: &IndexRecord) -> Result<bool, StoreError> {
245        let _append_guard = self.append_lock.lock().await;
246        if self
247            .dedup_read()?
248            .contains(&(record.meta_hash.clone(), record.node_id.clone()))
249        {
250            return Ok(false);
251        }
252        self.append_index_unlocked(record).await?;
253        Ok(true)
254    }
255
256    async fn append_index_unlocked(&self, record: &IndexRecord) -> Result<(), StoreError> {
257        let mut line = serde_json::to_string(record)?;
258        line.push('\n');
259
260        let mut file = fs::OpenOptions::new()
261            .create(true)
262            .append(true)
263            .open(self.index_path())
264            .await?;
265        file.write_all(line.as_bytes()).await?;
266        file.flush().await?;
267
268        // Update in-memory caches.
269        self.dedup_write()?
270            .insert((record.meta_hash.clone(), record.node_id.clone()));
271        self.meta_hash_write()?.insert(record.meta_hash.clone());
272        if record.source == IndexRecordSource::LocalPublish {
273            self.latest_local_publish_write()?.insert(
274                (record.igc_hash.clone(), record.node_id.clone()),
275                record.clone(),
276            );
277        } else {
278            let seq = self.index_records_read()?.len() as u64;
279            self.discovery_events_write()?.push((seq, record.clone()));
280        }
281        self.index_records_write()?.push(record.clone());
282
283        Ok(())
284    }
285
286    /// Iterate all records from the in-memory index cache.
287    pub fn iter_index(
288        &self,
289    ) -> Result<impl Iterator<Item = Result<IndexRecord, StoreError>>, StoreError> {
290        let records = self.index_records_read()?.clone();
291        Ok(Box::new(records.into_iter().map(Ok))
292            as Box<dyn Iterator<Item = Result<IndexRecord, StoreError>>>)
293    }
294
295    /// Iterate all records in `index.ndjson` (synchronous, for startup only).
296    fn iter_index_file(
297        &self,
298    ) -> Result<impl Iterator<Item = Result<IndexRecord, StoreError>>, StoreError> {
299        use std::io::{BufRead, BufReader};
300
301        let path = self.index_path();
302        // Return empty iterator if the index file does not exist yet.
303        if !path.exists() {
304            let v: Vec<Result<IndexRecord, StoreError>> = Vec::new();
305            return Ok(Box::new(v.into_iter())
306                as Box<dyn Iterator<Item = Result<IndexRecord, StoreError>>>);
307        }
308
309        let file = std::fs::File::open(&path).map_err(StoreError::Io)?;
310        let reader = BufReader::new(file);
311        Ok(Box::new(reader.lines().map(|line| {
312            let line = line.map_err(StoreError::Io)?;
313            serde_json::from_str::<IndexRecord>(&line).map_err(StoreError::Json)
314        }))
315            as Box<
316                dyn Iterator<Item = Result<IndexRecord, StoreError>>,
317            >)
318    }
319
320    /// True if the exact `(meta_hash, node_id)` pair is already recorded.
321    ///
322    /// Uses the in-memory dedup cache — O(1) after [`init`].
323    pub fn has_index_record(&self, meta_hash: &str, node_id: &str) -> Result<bool, StoreError> {
324        let meta_hash = Blake3Hex::parse(meta_hash)?;
325        let node_id = NodeIdHex::parse(node_id)?;
326        Ok(self.dedup_read()?.contains(&(meta_hash, node_id)))
327    }
328
329    /// True if any record is known for this metadata blob.
330    ///
331    /// Uses the in-memory meta_hash cache — O(1) after [`init`].
332    pub fn has_meta_hash(&self, meta_hash: &str) -> Result<bool, StoreError> {
333        let meta_hash = Blake3Hex::parse(meta_hash)?;
334        Ok(self.meta_hash_read()?.contains(&meta_hash))
335    }
336
337    /// Return all `RemoteAnnouncement` index records at or after position `since_seq`.
338    ///
339    /// `since_seq` is a 0-based line number in `index.ndjson`.  The discovery
340    /// worker persists the last processed seq and resumes from there on restart,
341    /// providing at-least-once delivery across restarts.
342    ///
343    /// Returns `(seq, record)` pairs ordered by ascending seq.
344    pub fn discovery_events_since(
345        &self,
346        since_seq: u64,
347    ) -> Result<Vec<(u64, IndexRecord)>, StoreError> {
348        let events = self.discovery_events_read()?;
349        let start = events.partition_point(|(seq, _)| *seq < since_seq);
350        Ok(events[start..].to_vec())
351    }
352
353    /// Return the latest local publish record for an IGC hash from this node.
354    pub fn latest_local_publish(
355        &self,
356        igc_hash: &Blake3Hex,
357        node_id: &NodeIdHex,
358    ) -> Result<Option<IndexRecord>, StoreError> {
359        Ok(self
360            .latest_local_publish_read()?
361            .get(&(igc_hash.clone(), node_id.clone()))
362            .cloned())
363    }
364
365    // ── Key management ────────────────────────────────────────────────────────
366
367    /// Load the raw 32-byte secret key from `node.key`, or return `None` if
368    /// the file does not exist.
369    pub fn load_key_bytes(&self) -> Result<Option<[u8; 32]>, StoreError> {
370        use std::io::Read;
371        let path = self.key_path();
372        if !path.exists() {
373            return Ok(None);
374        }
375        let mut bytes = [0u8; 32];
376        std::fs::File::open(&path)
377            .and_then(|mut f| f.read_exact(&mut bytes))
378            .map_err(StoreError::Io)?;
379        Ok(Some(bytes))
380    }
381
382    /// Persist a 32-byte secret key to `node.key` with mode 0600.
383    pub fn save_key_bytes(&self, bytes: &[u8; 32]) -> Result<(), StoreError> {
384        write_key_file(&self.key_path(), bytes)
385    }
386
387    fn dedup_read(&self) -> Result<RwLockReadGuard<'_, HashSet<DedupKey>>, StoreError> {
388        self.dedup_cache
389            .read()
390            .map_err(|_| StoreError::PoisonedLock("dedup_cache"))
391    }
392
393    fn dedup_write(&self) -> Result<RwLockWriteGuard<'_, HashSet<DedupKey>>, StoreError> {
394        self.dedup_cache
395            .write()
396            .map_err(|_| StoreError::PoisonedLock("dedup_cache"))
397    }
398
399    fn meta_hash_read(&self) -> Result<RwLockReadGuard<'_, HashSet<Blake3Hex>>, StoreError> {
400        self.meta_hash_cache
401            .read()
402            .map_err(|_| StoreError::PoisonedLock("meta_hash_cache"))
403    }
404
405    fn meta_hash_write(&self) -> Result<RwLockWriteGuard<'_, HashSet<Blake3Hex>>, StoreError> {
406        self.meta_hash_cache
407            .write()
408            .map_err(|_| StoreError::PoisonedLock("meta_hash_cache"))
409    }
410
411    fn latest_local_publish_read(
412        &self,
413    ) -> Result<RwLockReadGuard<'_, LatestLocalPublishMap>, StoreError> {
414        self.latest_local_publish_cache
415            .read()
416            .map_err(|_| StoreError::PoisonedLock("latest_local_publish_cache"))
417    }
418
419    fn latest_local_publish_write(
420        &self,
421    ) -> Result<RwLockWriteGuard<'_, LatestLocalPublishMap>, StoreError> {
422        self.latest_local_publish_cache
423            .write()
424            .map_err(|_| StoreError::PoisonedLock("latest_local_publish_cache"))
425    }
426
427    fn index_records_read(&self) -> Result<RwLockReadGuard<'_, Vec<IndexRecord>>, StoreError> {
428        self.index_records_cache
429            .read()
430            .map_err(|_| StoreError::PoisonedLock("index_records_cache"))
431    }
432
433    fn index_records_write(&self) -> Result<RwLockWriteGuard<'_, Vec<IndexRecord>>, StoreError> {
434        self.index_records_cache
435            .write()
436            .map_err(|_| StoreError::PoisonedLock("index_records_cache"))
437    }
438
439    fn discovery_events_read(
440        &self,
441    ) -> Result<RwLockReadGuard<'_, Vec<(u64, IndexRecord)>>, StoreError> {
442        self.discovery_events_cache
443            .read()
444            .map_err(|_| StoreError::PoisonedLock("discovery_events_cache"))
445    }
446
447    fn discovery_events_write(
448        &self,
449    ) -> Result<RwLockWriteGuard<'_, Vec<(u64, IndexRecord)>>, StoreError> {
450        self.discovery_events_cache
451            .write()
452            .map_err(|_| StoreError::PoisonedLock("discovery_events_cache"))
453    }
454}
455
456// ── Platform helpers ──────────────────────────────────────────────────────────
457
458#[cfg(unix)]
459fn write_key_file(path: &Path, bytes: &[u8; 32]) -> Result<(), StoreError> {
460    use std::io::Write;
461    use std::os::unix::fs::OpenOptionsExt;
462
463    let mut file = std::fs::OpenOptions::new()
464        .create(true)
465        .truncate(true)
466        .write(true)
467        .mode(0o600)
468        .open(path)
469        .map_err(StoreError::Io)?;
470    file.write_all(bytes).map_err(StoreError::Io)?;
471    Ok(())
472}
473
474#[cfg(not(unix))]
475fn write_key_file(path: &Path, bytes: &[u8; 32]) -> Result<(), StoreError> {
476    use std::io::Write;
477    let mut file = std::fs::File::create(path).map_err(StoreError::Io)?;
478    file.write_all(bytes).map_err(StoreError::Io)?;
479    Ok(())
480}
481
482// ── Tests ─────────────────────────────────────────────────────────────────────
483
484#[cfg(test)]
485mod tests {
486    use super::*;
487    use crate::id::{Blake3Hex, IdentifierError, NodeIdHex};
488
489    async fn temp_store() -> (FlatFileStore, tempfile::TempDir) {
490        let dir = tempfile::tempdir().unwrap();
491        let store = FlatFileStore::open(dir.path());
492        store.init().await.unwrap();
493        (store, dir)
494    }
495
496    fn hash(ch: char) -> Blake3Hex {
497        Blake3Hex::parse(ch.to_string().repeat(64)).unwrap()
498    }
499
500    fn node_id(ch: char) -> NodeIdHex {
501        NodeIdHex::parse(ch.to_string().repeat(64)).unwrap()
502    }
503
504    #[tokio::test]
505    async fn put_get_round_trip() {
506        let (store, _dir) = temp_store().await;
507        let data = b"hello igc-net";
508        let hex = store.put(data).await.unwrap();
509        assert_eq!(hex.len(), 64);
510        let got = store.get(&hex).await.unwrap().unwrap();
511        assert_eq!(got, data);
512    }
513
514    #[tokio::test]
515    async fn put_is_idempotent() {
516        let (store, _dir) = temp_store().await;
517        let data = b"same content";
518        let h1 = store.put(data).await.unwrap();
519        let h2 = store.put(data).await.unwrap();
520        assert_eq!(h1, h2);
521    }
522
523    #[tokio::test]
524    async fn contains_false_before_put_true_after() {
525        let (store, _dir) = temp_store().await;
526        let data = b"check contains";
527        let hex = Blake3Hex::from_hash(blake3::hash(data));
528        assert!(!store.contains(&hex).unwrap());
529        store.put(data).await.unwrap();
530        assert!(store.contains(&hex).unwrap());
531    }
532
533    #[tokio::test]
534    async fn get_missing_returns_none() {
535        let (store, _dir) = temp_store().await;
536        let hex = hash('a');
537        let result = store.get(&hex).await.unwrap();
538        assert!(result.is_none());
539    }
540
541    #[tokio::test]
542    async fn invalid_hash_is_rejected_by_lookup_apis() {
543        let (store, _dir) = temp_store().await;
544        assert!(matches!(
545            store.contains("bad-hash"),
546            Err(StoreError::Identifier(IdentifierError::Blake3Hex(_)))
547        ));
548        assert!(matches!(
549            store.resolve_path("bad-hash"),
550            Err(StoreError::Identifier(IdentifierError::Blake3Hex(_)))
551        ));
552        assert!(matches!(
553            store.get("bad-hash").await,
554            Err(StoreError::Identifier(IdentifierError::Blake3Hex(_)))
555        ));
556    }
557
558    #[tokio::test]
559    async fn index_round_trip() {
560        let (store, _dir) = temp_store().await;
561        let rec = IndexRecord {
562            source: IndexRecordSource::LocalPublish,
563            igc_hash: hash('a'),
564            meta_hash: hash('b'),
565            node_id: node_id('c'),
566            igc_ticket: "igc_ticket".to_string(),
567            meta_ticket: "meta_ticket".to_string(),
568            recorded_at: "2026-03-22T12:00:00Z".to_string(),
569        };
570        store.append_index(&rec).await.unwrap();
571        store.append_index(&rec).await.unwrap();
572
573        let records: Vec<_> = store.iter_index().unwrap().collect();
574        assert_eq!(records.len(), 2);
575        assert_eq!(records[0].as_ref().unwrap().igc_hash, hash('a'));
576    }
577
578    #[tokio::test]
579    async fn has_index_record_uses_meta_hash_and_node_id() {
580        let (store, _dir) = temp_store().await;
581        store
582            .append_index(&IndexRecord {
583                source: IndexRecordSource::RemoteAnnouncement,
584                igc_hash: hash('a'),
585                meta_hash: hash('b'),
586                node_id: node_id('c'),
587                igc_ticket: "igc_ticket_1".to_string(),
588                meta_ticket: "meta_ticket_1".to_string(),
589                recorded_at: "2026-03-22T12:00:00Z".to_string(),
590            })
591            .await
592            .unwrap();
593
594        assert!(
595            store
596                .has_index_record(&"b".repeat(64), &"c".repeat(64))
597                .unwrap()
598        );
599        assert!(
600            !store
601                .has_index_record(&"b".repeat(64), &"d".repeat(64))
602                .unwrap()
603        );
604        assert!(store.has_meta_hash(&"b".repeat(64)).unwrap());
605    }
606
607    #[tokio::test]
608    async fn latest_local_publish_returns_last_matching_record() {
609        let (store, _dir) = temp_store().await;
610        for recorded_at in ["2026-03-22T12:00:00Z", "2026-03-22T12:05:00Z"] {
611            store
612                .append_index(&IndexRecord {
613                    source: IndexRecordSource::LocalPublish,
614                    igc_hash: hash('a'),
615                    meta_hash: hash('b'),
616                    node_id: node_id('c'),
617                    igc_ticket: format!("igc_ticket_{recorded_at}"),
618                    meta_ticket: format!("meta_ticket_{recorded_at}"),
619                    recorded_at: recorded_at.to_string(),
620                })
621                .await
622                .unwrap();
623        }
624
625        let latest = store
626            .latest_local_publish(&hash('a'), &node_id('c'))
627            .unwrap()
628            .unwrap();
629        assert_eq!(latest.recorded_at, "2026-03-22T12:05:00Z");
630    }
631
632    #[tokio::test]
633    async fn iter_index_on_empty_store_returns_empty() {
634        let (store, _dir) = temp_store().await;
635        let records: Vec<_> = store.iter_index().unwrap().collect();
636        assert!(records.is_empty());
637    }
638
639    #[tokio::test]
640    async fn key_persistence() {
641        let (store, _dir) = temp_store().await;
642        assert!(store.load_key_bytes().unwrap().is_none());
643
644        let key = [42u8; 32];
645        store.save_key_bytes(&key).unwrap();
646
647        let loaded = store.load_key_bytes().unwrap().unwrap();
648        assert_eq!(loaded, key);
649    }
650
651    #[cfg(unix)]
652    #[tokio::test]
653    async fn key_file_has_mode_0600() {
654        use std::os::unix::fs::PermissionsExt;
655        let (store, dir) = temp_store().await;
656        store.save_key_bytes(&[0u8; 32]).unwrap();
657        let meta = std::fs::metadata(dir.path().join("node.key")).unwrap();
658        let mode = meta.permissions().mode() & 0o777;
659        assert_eq!(mode, 0o600, "node.key must have mode 0600, got {mode:o}");
660    }
661}