Skip to main content

igc_net/
store.rs

1//! Content-addressed flat-file blob store.
2//!
3//! Layout uses a simple local BLAKE3-addressed store:
4//!
5//! ```text
6//! {root}/
7//!   blobs/<first-2-blake3-hex>/<full-64-char-blake3-hex>   ← raw blob bytes
8//!   index.ndjson                                            ← append-only flight index
9//!   artifacts.ndjson                                        ← append-only artifact registry
10//!   node.key                                                ← 32-byte Ed25519 secret key
11//! ```
12
13use std::collections::{HashMap, HashSet};
14use std::path::{Path, PathBuf};
15use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
16
17use serde::{Deserialize, Serialize};
18use tokio::fs;
19use tokio::io::AsyncWriteExt;
20use tokio::sync::Mutex;
21
22use crate::id::{Blake3Hex, IdentifierError, NodeIdHex, PilotId};
23
24// ── Error type ────────────────────────────────────────────────────────────────
25
26#[derive(Debug, thiserror::Error)]
27pub enum StoreError {
28    #[error("I/O: {0}")]
29    Io(#[from] std::io::Error),
30    #[error("JSON: {0}")]
31    Json(#[from] serde_json::Error),
32    #[error("identifier: {0}")]
33    Identifier(#[from] IdentifierError),
34    #[error("invalid artifact registry record: {0}")]
35    InvalidArtifactRecord(&'static str),
36    #[error("lock poisoned: {0}")]
37    PoisonedLock(&'static str),
38}
39
40// ── IndexRecord ───────────────────────────────────────────────────────────────
41
42/// Origin of an index record.
43#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
44#[serde(rename_all = "snake_case")]
45pub enum IndexRecordSource {
46    LocalPublish,
47    RemoteAnnouncement,
48}
49
50/// One line in `index.ndjson`.
51///
52/// Records are append-only. When multiple records describe the same
53/// `(meta_hash, node_id)` pair, the latest record is authoritative.
54#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
55pub struct IndexRecord {
56    /// Whether this record was created by a local publish or a remote announce.
57    pub source: IndexRecordSource,
58    /// 64-char BLAKE3 hex of the raw IGC file.
59    pub igc_hash: Blake3Hex,
60    /// 64-char BLAKE3 hex of the metadata JSON blob.
61    pub meta_hash: Blake3Hex,
62    /// Serving node identity for this announcement.
63    pub node_id: NodeIdHex,
64    /// Latest known ticket for the IGC blob from this serving node.
65    pub igc_ticket: String,
66    /// Latest known ticket for the metadata blob from this serving node.
67    pub meta_ticket: String,
68    /// RFC 3339 UTC timestamp of when this node first published the flight.
69    pub recorded_at: String,
70}
71
72// ── Artifact registry ────────────────────────────────────────────────────────
73
74/// Effective publication mode known to this node for an artifact identity.
75#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
76#[serde(rename_all = "snake_case")]
77pub enum PublicationMode {
78    Public,
79    Protected,
80    Private,
81}
82
83/// One line in `artifacts.ndjson`.
84///
85/// This is the sidecar-facing artifact registry. It records the latest known
86/// service state needed by RPC handlers without changing the existing
87/// data-plane `index.ndjson` format in the same step.
88#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
89pub struct ArtifactRegistryRecord {
90    /// Canonical flight identity.
91    pub raw_igc_hash: Blake3Hex,
92    /// Accepted or locally asserted pilot owner, when known.
93    pub pilot_id: Option<PilotId>,
94    /// Current effective mode as understood by this node.
95    pub publication_mode: PublicationMode,
96    /// Sanitized artifact hash. Present only in protected mode.
97    pub protected_hash: Option<Blake3Hex>,
98    /// Whether this node has raw IGC bytes available locally.
99    pub has_raw_igc: bool,
100    /// Whether this node has protected sanitized IGC bytes available locally.
101    pub has_protected_sanitized_igc: bool,
102    /// Whether this node has protected raw companion bytes available locally.
103    pub has_protected_raw_companion: bool,
104    /// Serving nodes currently known for this flight identity.
105    pub serving_node_ids: Vec<NodeIdHex>,
106    /// Whether the raw IGC bytes are known to contain at least one G-record.
107    #[serde(default)]
108    pub g_record_present: Option<bool>,
109    /// Local event timestamp for this registry update.
110    pub recorded_at: String,
111}
112
113// ── FlatFileStore ─────────────────────────────────────────────────────────────
114
115/// Content-addressed flat-file blob store keyed by BLAKE3.
116///
117/// An in-memory cache of `(meta_hash, node_id)` pairs and known `meta_hash`
118/// values is maintained to avoid O(n) linear scans of `index.ndjson` on the
119/// indexer hot path.  The cache is populated during [`init`] and updated by
120/// [`append_index`].
121pub struct FlatFileStore {
122    root: PathBuf,
123    /// Cached `(meta_hash, node_id)` pairs — dedup key per the protocol spec.
124    dedup_cache: RwLock<HashSet<(Blake3Hex, NodeIdHex)>>,
125    /// Cached set of known `meta_hash` values.
126    meta_hash_cache: RwLock<HashSet<Blake3Hex>>,
127    /// Cached latest local publish record per `(igc_hash, node_id)`.
128    latest_local_publish_cache: RwLock<HashMap<(Blake3Hex, NodeIdHex), IndexRecord>>,
129    /// Cached in-order copy of all index records.
130    index_records_cache: RwLock<Vec<IndexRecord>>,
131    /// Cached remote discovery events paired with their line sequence number.
132    discovery_events_cache: RwLock<Vec<(u64, IndexRecord)>>,
133    /// Cached latest artifact registry record per `raw_igc_hash`.
134    artifact_registry_cache: RwLock<HashMap<Blake3Hex, ArtifactRegistryRecord>>,
135    /// Cached artifact registry events paired with their append sequence.
136    artifact_registry_events_cache: RwLock<Vec<(u64, ArtifactRegistryRecord)>>,
137    /// Serializes index file appends and dedup checks that must be atomic.
138    append_lock: Mutex<()>,
139}
140
141type DedupKey = (Blake3Hex, NodeIdHex);
142type LatestLocalPublishMap = HashMap<DedupKey, IndexRecord>;
143type ArtifactRegistryMap = HashMap<Blake3Hex, ArtifactRegistryRecord>;
144
145impl FlatFileStore {
146    /// Open (or create) a store rooted at `root`.
147    ///
148    /// Directories are created lazily by [`init`].
149    pub fn open(root: impl Into<PathBuf>) -> Self {
150        Self {
151            root: root.into(),
152            dedup_cache: RwLock::new(HashSet::new()),
153            meta_hash_cache: RwLock::new(HashSet::new()),
154            latest_local_publish_cache: RwLock::new(HashMap::new()),
155            index_records_cache: RwLock::new(Vec::new()),
156            discovery_events_cache: RwLock::new(Vec::new()),
157            artifact_registry_cache: RwLock::new(HashMap::new()),
158            artifact_registry_events_cache: RwLock::new(Vec::new()),
159            append_lock: Mutex::new(()),
160        }
161    }
162
163    /// Create the required directory structure and populate the in-memory
164    /// dedup cache from any existing `index.ndjson`.
165    pub async fn init(&self) -> Result<(), StoreError> {
166        fs::create_dir_all(self.blobs_dir()).await?;
167        self.reload_cache()?;
168        Ok(())
169    }
170
171    /// Rebuild the in-memory caches from `index.ndjson`.
172    fn reload_cache(&self) -> Result<(), StoreError> {
173        let mut dedup = self
174            .dedup_cache
175            .write()
176            .map_err(|_| StoreError::PoisonedLock("dedup_cache"))?;
177        let mut metas = self
178            .meta_hash_cache
179            .write()
180            .map_err(|_| StoreError::PoisonedLock("meta_hash_cache"))?;
181        let mut latest_local = self
182            .latest_local_publish_cache
183            .write()
184            .map_err(|_| StoreError::PoisonedLock("latest_local_publish_cache"))?;
185        let mut index_records = self
186            .index_records_cache
187            .write()
188            .map_err(|_| StoreError::PoisonedLock("index_records_cache"))?;
189        let mut discovery_events = self
190            .discovery_events_cache
191            .write()
192            .map_err(|_| StoreError::PoisonedLock("discovery_events_cache"))?;
193        let mut artifact_registry = self
194            .artifact_registry_cache
195            .write()
196            .map_err(|_| StoreError::PoisonedLock("artifact_registry_cache"))?;
197        let mut artifact_registry_events = self
198            .artifact_registry_events_cache
199            .write()
200            .map_err(|_| StoreError::PoisonedLock("artifact_registry_events_cache"))?;
201        dedup.clear();
202        metas.clear();
203        latest_local.clear();
204        index_records.clear();
205        discovery_events.clear();
206        artifact_registry.clear();
207        artifact_registry_events.clear();
208        for (seq, record) in self.iter_index_file()?.enumerate() {
209            let r = record?;
210            dedup.insert((r.meta_hash.clone(), r.node_id.clone()));
211            metas.insert(r.meta_hash.clone());
212            if r.source == IndexRecordSource::LocalPublish {
213                latest_local.insert((r.igc_hash.clone(), r.node_id.clone()), r.clone());
214            } else {
215                discovery_events.push((seq as u64, r.clone()));
216            }
217            index_records.push(r);
218        }
219        for (seq, record) in self.iter_artifact_registry_file()?.enumerate() {
220            let record = record?;
221            validate_artifact_registry_record(&record)?;
222            artifact_registry_events.push((seq as u64, record.clone()));
223            artifact_registry.insert(record.raw_igc_hash.clone(), record);
224        }
225        Ok(())
226    }
227
228    // ── Internal path helpers ─────────────────────────────────────────────────
229
230    fn blobs_dir(&self) -> PathBuf {
231        self.root.join("blobs")
232    }
233
234    fn blob_path(&self, blake3_hex: &Blake3Hex) -> PathBuf {
235        self.blobs_dir()
236            .join(&blake3_hex.as_str()[..2])
237            .join(blake3_hex.as_str())
238    }
239
240    fn index_path(&self) -> PathBuf {
241        self.root.join("index.ndjson")
242    }
243
244    fn artifact_registry_path(&self) -> PathBuf {
245        self.root.join("artifacts.ndjson")
246    }
247
248    fn key_path(&self) -> PathBuf {
249        self.root.join("node.key")
250    }
251
252    // ── Blob operations ───────────────────────────────────────────────────────
253
254    /// Return the filesystem path for a blob without reading it.
255    ///
256    /// Returns `Some(path)` if the blob exists locally, `None` otherwise.
257    pub fn resolve_path(&self, blake3_hex: &str) -> Result<Option<PathBuf>, StoreError> {
258        let blake3_hex = Blake3Hex::parse(blake3_hex)?;
259        let path = self.blob_path(&blake3_hex);
260        Ok(if path.exists() { Some(path) } else { None })
261    }
262
263    /// Hash `bytes` with BLAKE3 and store under `blobs/`.
264    ///
265    /// Returns the 64-char hex key.  Idempotent: if the blob already exists
266    /// the write is skipped (content-addressable deduplication).
267    pub async fn put(&self, bytes: &[u8]) -> Result<Blake3Hex, StoreError> {
268        let hex = Blake3Hex::from_hash(blake3::hash(bytes));
269        let path = self.blob_path(&hex);
270
271        if let Some(parent) = path.parent() {
272            fs::create_dir_all(parent).await?;
273        }
274        match fs::OpenOptions::new()
275            .create_new(true)
276            .write(true)
277            .open(&path)
278            .await
279        {
280            Ok(mut file) => {
281                file.write_all(bytes).await?;
282                file.flush().await?;
283            }
284            Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {}
285            Err(e) => return Err(StoreError::Io(e)),
286        }
287        Ok(hex)
288    }
289
290    /// Read a blob by its 64-char BLAKE3 hex key.  Returns `None` if not found.
291    pub async fn get(&self, blake3_hex: &str) -> Result<Option<Vec<u8>>, StoreError> {
292        let blake3_hex = Blake3Hex::parse(blake3_hex)?;
293        let path = self.blob_path(&blake3_hex);
294        match fs::read(&path).await {
295            Ok(bytes) => Ok(Some(bytes)),
296            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
297            Err(e) => Err(StoreError::Io(e)),
298        }
299    }
300
301    /// Check existence without reading the full blob.
302    pub fn contains(&self, blake3_hex: &str) -> Result<bool, StoreError> {
303        let blake3_hex = Blake3Hex::parse(blake3_hex)?;
304        Ok(self.blob_path(&blake3_hex).exists())
305    }
306
307    /// Delete a locally stored blob by BLAKE3 hash.
308    ///
309    /// Missing blobs are treated as already deleted. This only affects the
310    /// flat-file blob store; callers that also publish through iroh-blobs must
311    /// separately stop advertising or serving that artifact class.
312    pub async fn delete_blob(&self, blake3_hex: &Blake3Hex) -> Result<bool, StoreError> {
313        let path = self.blob_path(blake3_hex);
314        match fs::remove_file(&path).await {
315            Ok(()) => {
316                if let Some(parent) = path.parent() {
317                    let _ = fs::remove_dir(parent).await;
318                }
319                Ok(true)
320            }
321            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false),
322            Err(e) => Err(StoreError::Io(e)),
323        }
324    }
325
326    // ── Index operations ──────────────────────────────────────────────────────
327
328    /// Append one record to `index.ndjson` (one JSON object per line).
329    ///
330    /// Also updates the in-memory dedup and meta_hash caches.
331    pub async fn append_index(&self, record: &IndexRecord) -> Result<(), StoreError> {
332        let _append_guard = self.append_lock.lock().await;
333        self.append_index_unlocked(record).await
334    }
335
336    /// Append one record only if the `(meta_hash, node_id)` pair is absent.
337    ///
338    /// Returns `true` when a new record was appended, `false` when the record
339    /// was already present in the dedup cache.
340    pub async fn append_index_if_absent(&self, record: &IndexRecord) -> Result<bool, StoreError> {
341        let _append_guard = self.append_lock.lock().await;
342        if self
343            .dedup_read()?
344            .contains(&(record.meta_hash.clone(), record.node_id.clone()))
345        {
346            return Ok(false);
347        }
348        self.append_index_unlocked(record).await?;
349        Ok(true)
350    }
351
352    async fn append_index_unlocked(&self, record: &IndexRecord) -> Result<(), StoreError> {
353        let mut line = serde_json::to_string(record)?;
354        line.push('\n');
355
356        let mut file = fs::OpenOptions::new()
357            .create(true)
358            .append(true)
359            .open(self.index_path())
360            .await?;
361        file.write_all(line.as_bytes()).await?;
362        file.flush().await?;
363
364        // Update in-memory caches.
365        self.dedup_write()?
366            .insert((record.meta_hash.clone(), record.node_id.clone()));
367        self.meta_hash_write()?.insert(record.meta_hash.clone());
368        if record.source == IndexRecordSource::LocalPublish {
369            self.latest_local_publish_write()?.insert(
370                (record.igc_hash.clone(), record.node_id.clone()),
371                record.clone(),
372            );
373        } else {
374            let seq = self.index_records_read()?.len() as u64;
375            self.discovery_events_write()?.push((seq, record.clone()));
376        }
377        self.index_records_write()?.push(record.clone());
378
379        Ok(())
380    }
381
382    /// Iterate all records from the in-memory index cache.
383    pub fn iter_index(
384        &self,
385    ) -> Result<impl Iterator<Item = Result<IndexRecord, StoreError>>, StoreError> {
386        let records = self.index_records_read()?.clone();
387        Ok(Box::new(records.into_iter().map(Ok))
388            as Box<
389                dyn Iterator<Item = Result<IndexRecord, StoreError>>,
390            >)
391    }
392
393    /// Iterate all records in `index.ndjson` (synchronous, for startup only).
394    fn iter_index_file(
395        &self,
396    ) -> Result<impl Iterator<Item = Result<IndexRecord, StoreError>>, StoreError> {
397        use std::io::{BufRead, BufReader};
398
399        let path = self.index_path();
400        // Return empty iterator if the index file does not exist yet.
401        if !path.exists() {
402            let v: Vec<Result<IndexRecord, StoreError>> = Vec::new();
403            return Ok(Box::new(v.into_iter())
404                as Box<dyn Iterator<Item = Result<IndexRecord, StoreError>>>);
405        }
406
407        let file = std::fs::File::open(&path).map_err(StoreError::Io)?;
408        let reader = BufReader::new(file);
409        Ok(Box::new(reader.lines().map(|line| {
410            let line = line.map_err(StoreError::Io)?;
411            serde_json::from_str::<IndexRecord>(&line).map_err(StoreError::Json)
412        }))
413            as Box<
414                dyn Iterator<Item = Result<IndexRecord, StoreError>>,
415            >)
416    }
417
418    /// Iterate all records in `artifacts.ndjson` (synchronous, for startup only).
419    fn iter_artifact_registry_file(
420        &self,
421    ) -> Result<impl Iterator<Item = Result<ArtifactRegistryRecord, StoreError>>, StoreError> {
422        use std::io::{BufRead, BufReader};
423
424        let path = self.artifact_registry_path();
425        if !path.exists() {
426            let v: Vec<Result<ArtifactRegistryRecord, StoreError>> = Vec::new();
427            return Ok(Box::new(v.into_iter())
428                as Box<
429                    dyn Iterator<Item = Result<ArtifactRegistryRecord, StoreError>>,
430                >);
431        }
432
433        let file = std::fs::File::open(&path).map_err(StoreError::Io)?;
434        let reader = BufReader::new(file);
435        Ok(Box::new(reader.lines().map(|line| {
436            let line = line.map_err(StoreError::Io)?;
437            serde_json::from_str::<ArtifactRegistryRecord>(&line).map_err(StoreError::Json)
438        }))
439            as Box<
440                dyn Iterator<Item = Result<ArtifactRegistryRecord, StoreError>>,
441            >)
442    }
443
444    /// True if the exact `(meta_hash, node_id)` pair is already recorded.
445    ///
446    /// Uses the in-memory dedup cache — O(1) after [`init`].
447    pub fn has_index_record(&self, meta_hash: &str, node_id: &str) -> Result<bool, StoreError> {
448        let meta_hash = Blake3Hex::parse(meta_hash)?;
449        let node_id = NodeIdHex::parse(node_id)?;
450        Ok(self.dedup_read()?.contains(&(meta_hash, node_id)))
451    }
452
453    /// True if any record is known for this metadata blob.
454    ///
455    /// Uses the in-memory meta_hash cache — O(1) after [`init`].
456    pub fn has_meta_hash(&self, meta_hash: &str) -> Result<bool, StoreError> {
457        let meta_hash = Blake3Hex::parse(meta_hash)?;
458        Ok(self.meta_hash_read()?.contains(&meta_hash))
459    }
460
461    /// Return all `RemoteAnnouncement` index records at or after position `since_seq`.
462    ///
463    /// `since_seq` is a 0-based line number in `index.ndjson`.  The discovery
464    /// worker persists the last processed seq and resumes from there on restart,
465    /// providing at-least-once delivery across restarts.
466    ///
467    /// Returns `(seq, record)` pairs ordered by ascending seq.
468    pub fn discovery_events_since(
469        &self,
470        since_seq: u64,
471    ) -> Result<Vec<(u64, IndexRecord)>, StoreError> {
472        let events = self.discovery_events_read()?;
473        let start = events.partition_point(|(seq, _)| *seq < since_seq);
474        Ok(events[start..].to_vec())
475    }
476
477    /// Return the latest local publish record for an IGC hash from this node.
478    pub fn latest_local_publish(
479        &self,
480        igc_hash: &Blake3Hex,
481        node_id: &NodeIdHex,
482    ) -> Result<Option<IndexRecord>, StoreError> {
483        Ok(self
484            .latest_local_publish_read()?
485            .get(&(igc_hash.clone(), node_id.clone()))
486            .cloned())
487    }
488
489    // ── Artifact registry operations ─────────────────────────────────────────
490
491    /// Append one artifact registry record and make it the latest state for its
492    /// `raw_igc_hash`.
493    pub async fn append_artifact_registry_record(
494        &self,
495        record: &ArtifactRegistryRecord,
496    ) -> Result<(), StoreError> {
497        validate_artifact_registry_record(record)?;
498
499        let mut line = serde_json::to_string(record)?;
500        line.push('\n');
501
502        let mut file = fs::OpenOptions::new()
503            .create(true)
504            .append(true)
505            .open(self.artifact_registry_path())
506            .await?;
507        file.write_all(line.as_bytes()).await?;
508        file.flush().await?;
509
510        self.artifact_registry_write()?
511            .insert(record.raw_igc_hash.clone(), record.clone());
512        let seq = self.artifact_registry_events_read()?.len() as u64;
513        self.artifact_registry_events_write()?
514            .push((seq, record.clone()));
515
516        Ok(())
517    }
518
519    /// Return the latest artifact registry state for `raw_igc_hash`.
520    pub fn artifact_registry_record(
521        &self,
522        raw_igc_hash: &Blake3Hex,
523    ) -> Result<Option<ArtifactRegistryRecord>, StoreError> {
524        Ok(self.artifact_registry_read()?.get(raw_igc_hash).cloned())
525    }
526
527    /// Return all latest artifact registry records ordered by `raw_igc_hash`.
528    pub fn artifact_registry_records(&self) -> Result<Vec<ArtifactRegistryRecord>, StoreError> {
529        let mut records: Vec<_> = self.artifact_registry_read()?.values().cloned().collect();
530        records.sort_by(|left, right| left.raw_igc_hash.cmp(&right.raw_igc_hash));
531        Ok(records)
532    }
533
534    /// Return all artifact registry events at or after `from_seq`.
535    ///
536    /// The sequence is scoped to this store and is the 0-based line number in
537    /// `artifacts.ndjson`.
538    pub fn artifact_registry_events_since(
539        &self,
540        from_seq: u64,
541    ) -> Result<Vec<(u64, ArtifactRegistryRecord)>, StoreError> {
542        let events = self.artifact_registry_events_read()?;
543        let start = events.partition_point(|(seq, _)| *seq < from_seq);
544        Ok(events[start..].to_vec())
545    }
546
547    /// Return the latest artifact registry event sequence, or `0` for an empty
548    /// registry. `0` is both the first valid sequence and the empty watermark;
549    /// callers that need exact emptiness should inspect the event list.
550    pub fn latest_artifact_registry_event_seq(&self) -> Result<u64, StoreError> {
551        Ok(self
552            .artifact_registry_events_read()?
553            .last()
554            .map(|(seq, _)| *seq)
555            .unwrap_or(0))
556    }
557
558    /// Return the latest artifact registry event sequence for one
559    /// `raw_igc_hash`, if any.
560    pub fn latest_artifact_registry_event_seq_for(
561        &self,
562        raw_igc_hash: &Blake3Hex,
563    ) -> Result<Option<u64>, StoreError> {
564        Ok(self
565            .artifact_registry_events_read()?
566            .iter()
567            .rev()
568            .find_map(|(seq, record)| (&record.raw_igc_hash == raw_igc_hash).then_some(*seq)))
569    }
570
571    // ── Key management ────────────────────────────────────────────────────────
572
573    /// Load the raw 32-byte secret key from `node.key`, or return `None` if
574    /// the file does not exist.
575    pub fn load_key_bytes(&self) -> Result<Option<[u8; 32]>, StoreError> {
576        use std::io::Read;
577        let path = self.key_path();
578        if !path.exists() {
579            return Ok(None);
580        }
581        let mut bytes = [0u8; 32];
582        std::fs::File::open(&path)
583            .and_then(|mut f| f.read_exact(&mut bytes))
584            .map_err(StoreError::Io)?;
585        Ok(Some(bytes))
586    }
587
588    /// Persist a 32-byte secret key to `node.key` with mode 0600.
589    pub fn save_key_bytes(&self, bytes: &[u8; 32]) -> Result<(), StoreError> {
590        write_key_file(&self.key_path(), bytes)
591    }
592
593    fn dedup_read(&self) -> Result<RwLockReadGuard<'_, HashSet<DedupKey>>, StoreError> {
594        self.dedup_cache
595            .read()
596            .map_err(|_| StoreError::PoisonedLock("dedup_cache"))
597    }
598
599    fn dedup_write(&self) -> Result<RwLockWriteGuard<'_, HashSet<DedupKey>>, StoreError> {
600        self.dedup_cache
601            .write()
602            .map_err(|_| StoreError::PoisonedLock("dedup_cache"))
603    }
604
605    fn meta_hash_read(&self) -> Result<RwLockReadGuard<'_, HashSet<Blake3Hex>>, StoreError> {
606        self.meta_hash_cache
607            .read()
608            .map_err(|_| StoreError::PoisonedLock("meta_hash_cache"))
609    }
610
611    fn meta_hash_write(&self) -> Result<RwLockWriteGuard<'_, HashSet<Blake3Hex>>, StoreError> {
612        self.meta_hash_cache
613            .write()
614            .map_err(|_| StoreError::PoisonedLock("meta_hash_cache"))
615    }
616
617    fn latest_local_publish_read(
618        &self,
619    ) -> Result<RwLockReadGuard<'_, LatestLocalPublishMap>, StoreError> {
620        self.latest_local_publish_cache
621            .read()
622            .map_err(|_| StoreError::PoisonedLock("latest_local_publish_cache"))
623    }
624
625    fn latest_local_publish_write(
626        &self,
627    ) -> Result<RwLockWriteGuard<'_, LatestLocalPublishMap>, StoreError> {
628        self.latest_local_publish_cache
629            .write()
630            .map_err(|_| StoreError::PoisonedLock("latest_local_publish_cache"))
631    }
632
633    fn index_records_read(&self) -> Result<RwLockReadGuard<'_, Vec<IndexRecord>>, StoreError> {
634        self.index_records_cache
635            .read()
636            .map_err(|_| StoreError::PoisonedLock("index_records_cache"))
637    }
638
639    fn index_records_write(&self) -> Result<RwLockWriteGuard<'_, Vec<IndexRecord>>, StoreError> {
640        self.index_records_cache
641            .write()
642            .map_err(|_| StoreError::PoisonedLock("index_records_cache"))
643    }
644
645    fn discovery_events_read(
646        &self,
647    ) -> Result<RwLockReadGuard<'_, Vec<(u64, IndexRecord)>>, StoreError> {
648        self.discovery_events_cache
649            .read()
650            .map_err(|_| StoreError::PoisonedLock("discovery_events_cache"))
651    }
652
653    fn discovery_events_write(
654        &self,
655    ) -> Result<RwLockWriteGuard<'_, Vec<(u64, IndexRecord)>>, StoreError> {
656        self.discovery_events_cache
657            .write()
658            .map_err(|_| StoreError::PoisonedLock("discovery_events_cache"))
659    }
660
661    fn artifact_registry_read(
662        &self,
663    ) -> Result<RwLockReadGuard<'_, ArtifactRegistryMap>, StoreError> {
664        self.artifact_registry_cache
665            .read()
666            .map_err(|_| StoreError::PoisonedLock("artifact_registry_cache"))
667    }
668
669    fn artifact_registry_write(
670        &self,
671    ) -> Result<RwLockWriteGuard<'_, ArtifactRegistryMap>, StoreError> {
672        self.artifact_registry_cache
673            .write()
674            .map_err(|_| StoreError::PoisonedLock("artifact_registry_cache"))
675    }
676
677    fn artifact_registry_events_read(
678        &self,
679    ) -> Result<RwLockReadGuard<'_, Vec<(u64, ArtifactRegistryRecord)>>, StoreError> {
680        self.artifact_registry_events_cache
681            .read()
682            .map_err(|_| StoreError::PoisonedLock("artifact_registry_events_cache"))
683    }
684
685    fn artifact_registry_events_write(
686        &self,
687    ) -> Result<RwLockWriteGuard<'_, Vec<(u64, ArtifactRegistryRecord)>>, StoreError> {
688        self.artifact_registry_events_cache
689            .write()
690            .map_err(|_| StoreError::PoisonedLock("artifact_registry_events_cache"))
691    }
692}
693
694fn validate_artifact_registry_record(record: &ArtifactRegistryRecord) -> Result<(), StoreError> {
695    match record.publication_mode {
696        PublicationMode::Protected => {
697            if record.protected_hash.is_none() {
698                return Err(StoreError::InvalidArtifactRecord(
699                    "protected mode requires protected_hash",
700                ));
701            }
702        }
703        PublicationMode::Public | PublicationMode::Private => {
704            if record.protected_hash.is_some() {
705                return Err(StoreError::InvalidArtifactRecord(
706                    "protected_hash is only valid in protected mode",
707                ));
708            }
709            if record.has_protected_sanitized_igc || record.has_protected_raw_companion {
710                return Err(StoreError::InvalidArtifactRecord(
711                    "protected artifacts are only valid in protected mode",
712                ));
713            }
714        }
715    }
716
717    let unique_serving_nodes: HashSet<_> = record.serving_node_ids.iter().collect();
718    if unique_serving_nodes.len() != record.serving_node_ids.len() {
719        return Err(StoreError::InvalidArtifactRecord(
720            "serving_node_ids must not contain duplicates",
721        ));
722    }
723
724    Ok(())
725}
726
727// ── Platform helpers ──────────────────────────────────────────────────────────
728
729#[cfg(unix)]
730fn write_key_file(path: &Path, bytes: &[u8; 32]) -> Result<(), StoreError> {
731    use std::io::Write;
732    use std::os::unix::fs::OpenOptionsExt;
733
734    let mut file = std::fs::OpenOptions::new()
735        .create(true)
736        .truncate(true)
737        .write(true)
738        .mode(0o600)
739        .open(path)
740        .map_err(StoreError::Io)?;
741    file.write_all(bytes).map_err(StoreError::Io)?;
742    Ok(())
743}
744
745#[cfg(not(unix))]
746fn write_key_file(path: &Path, bytes: &[u8; 32]) -> Result<(), StoreError> {
747    use std::io::Write;
748    let mut file = std::fs::File::create(path).map_err(StoreError::Io)?;
749    file.write_all(bytes).map_err(StoreError::Io)?;
750    Ok(())
751}
752
753// ── Tests ─────────────────────────────────────────────────────────────────────
754
755#[cfg(test)]
756mod tests {
757    use super::*;
758    use crate::id::{Blake3Hex, IdentifierError, NodeIdHex, PilotId};
759
760    async fn temp_store() -> (FlatFileStore, tempfile::TempDir) {
761        let dir = tempfile::tempdir().unwrap();
762        let store = FlatFileStore::open(dir.path());
763        store.init().await.unwrap();
764        (store, dir)
765    }
766
767    fn hash(ch: char) -> Blake3Hex {
768        Blake3Hex::parse(ch.to_string().repeat(64)).unwrap()
769    }
770
771    fn node_id(ch: char) -> NodeIdHex {
772        NodeIdHex::parse(ch.to_string().repeat(64)).unwrap()
773    }
774
775    fn pilot_id(ch: char) -> PilotId {
776        PilotId::parse(format!("{}{}", PilotId::PREFIX, ch.to_string().repeat(64))).unwrap()
777    }
778
779    #[tokio::test]
780    async fn put_get_round_trip() {
781        let (store, _dir) = temp_store().await;
782        let data = b"hello igc-net";
783        let hex = store.put(data).await.unwrap();
784        assert_eq!(hex.len(), 64);
785        let got = store.get(&hex).await.unwrap().unwrap();
786        assert_eq!(got, data);
787    }
788
789    #[tokio::test]
790    async fn put_is_idempotent() {
791        let (store, _dir) = temp_store().await;
792        let data = b"same content";
793        let h1 = store.put(data).await.unwrap();
794        let h2 = store.put(data).await.unwrap();
795        assert_eq!(h1, h2);
796    }
797
798    #[tokio::test]
799    async fn contains_false_before_put_true_after() {
800        let (store, _dir) = temp_store().await;
801        let data = b"check contains";
802        let hex = Blake3Hex::from_hash(blake3::hash(data));
803        assert!(!store.contains(&hex).unwrap());
804        store.put(data).await.unwrap();
805        assert!(store.contains(&hex).unwrap());
806    }
807
808    #[tokio::test]
809    async fn delete_blob_removes_local_plaintext_and_is_idempotent() {
810        let (store, _dir) = temp_store().await;
811        let data = b"restricted plaintext";
812        let hex = store.put(data).await.unwrap();
813
814        assert!(store.contains(&hex).unwrap());
815        assert!(store.delete_blob(&hex).await.unwrap());
816        assert!(!store.contains(&hex).unwrap());
817        assert!(store.get(&hex).await.unwrap().is_none());
818        assert!(!store.delete_blob(&hex).await.unwrap());
819    }
820
821    #[tokio::test]
822    async fn get_missing_returns_none() {
823        let (store, _dir) = temp_store().await;
824        let hex = hash('a');
825        let result = store.get(&hex).await.unwrap();
826        assert!(result.is_none());
827    }
828
829    #[tokio::test]
830    async fn invalid_hash_is_rejected_by_lookup_apis() {
831        let (store, _dir) = temp_store().await;
832        assert!(matches!(
833            store.contains("bad-hash"),
834            Err(StoreError::Identifier(IdentifierError::Blake3Hex(_)))
835        ));
836        assert!(matches!(
837            store.resolve_path("bad-hash"),
838            Err(StoreError::Identifier(IdentifierError::Blake3Hex(_)))
839        ));
840        assert!(matches!(
841            store.get("bad-hash").await,
842            Err(StoreError::Identifier(IdentifierError::Blake3Hex(_)))
843        ));
844    }
845
846    #[tokio::test]
847    async fn index_round_trip() {
848        let (store, _dir) = temp_store().await;
849        let rec = IndexRecord {
850            source: IndexRecordSource::LocalPublish,
851            igc_hash: hash('a'),
852            meta_hash: hash('b'),
853            node_id: node_id('c'),
854            igc_ticket: "igc_ticket".to_string(),
855            meta_ticket: "meta_ticket".to_string(),
856            recorded_at: "2026-03-22T12:00:00Z".to_string(),
857        };
858        store.append_index(&rec).await.unwrap();
859        store.append_index(&rec).await.unwrap();
860
861        let records: Vec<_> = store.iter_index().unwrap().collect();
862        assert_eq!(records.len(), 2);
863        assert_eq!(records[0].as_ref().unwrap().igc_hash, hash('a'));
864    }
865
866    #[tokio::test]
867    async fn has_index_record_uses_meta_hash_and_node_id() {
868        let (store, _dir) = temp_store().await;
869        store
870            .append_index(&IndexRecord {
871                source: IndexRecordSource::RemoteAnnouncement,
872                igc_hash: hash('a'),
873                meta_hash: hash('b'),
874                node_id: node_id('c'),
875                igc_ticket: "igc_ticket_1".to_string(),
876                meta_ticket: "meta_ticket_1".to_string(),
877                recorded_at: "2026-03-22T12:00:00Z".to_string(),
878            })
879            .await
880            .unwrap();
881
882        assert!(
883            store
884                .has_index_record(&"b".repeat(64), &"c".repeat(64))
885                .unwrap()
886        );
887        assert!(
888            !store
889                .has_index_record(&"b".repeat(64), &"d".repeat(64))
890                .unwrap()
891        );
892        assert!(store.has_meta_hash(&"b".repeat(64)).unwrap());
893    }
894
895    #[tokio::test]
896    async fn latest_local_publish_returns_last_matching_record() {
897        let (store, _dir) = temp_store().await;
898        for recorded_at in ["2026-03-22T12:00:00Z", "2026-03-22T12:05:00Z"] {
899            store
900                .append_index(&IndexRecord {
901                    source: IndexRecordSource::LocalPublish,
902                    igc_hash: hash('a'),
903                    meta_hash: hash('b'),
904                    node_id: node_id('c'),
905                    igc_ticket: format!("igc_ticket_{recorded_at}"),
906                    meta_ticket: format!("meta_ticket_{recorded_at}"),
907                    recorded_at: recorded_at.to_string(),
908                })
909                .await
910                .unwrap();
911        }
912
913        let latest = store
914            .latest_local_publish(&hash('a'), &node_id('c'))
915            .unwrap()
916            .unwrap();
917        assert_eq!(latest.recorded_at, "2026-03-22T12:05:00Z");
918    }
919
920    #[tokio::test]
921    async fn iter_index_on_empty_store_returns_empty() {
922        let (store, _dir) = temp_store().await;
923        let records: Vec<_> = store.iter_index().unwrap().collect();
924        assert!(records.is_empty());
925    }
926
927    #[tokio::test]
928    async fn artifact_registry_round_trip_and_reload() {
929        let dir = tempfile::tempdir().unwrap();
930        let store = FlatFileStore::open(dir.path());
931        store.init().await.unwrap();
932
933        let record = ArtifactRegistryRecord {
934            raw_igc_hash: hash('a'),
935            pilot_id: Some(pilot_id('b')),
936            publication_mode: PublicationMode::Protected,
937            protected_hash: Some(hash('c')),
938            has_raw_igc: true,
939            has_protected_sanitized_igc: true,
940            has_protected_raw_companion: true,
941            serving_node_ids: vec![node_id('d')],
942            g_record_present: None,
943            recorded_at: "2026-04-28T12:00:00Z".to_string(),
944        };
945        store
946            .append_artifact_registry_record(&record)
947            .await
948            .unwrap();
949        assert_eq!(
950            store.artifact_registry_record(&hash('a')).unwrap(),
951            Some(record.clone())
952        );
953
954        let reopened = FlatFileStore::open(dir.path());
955        reopened.init().await.unwrap();
956        assert_eq!(
957            reopened.artifact_registry_record(&hash('a')).unwrap(),
958            Some(record)
959        );
960    }
961
962    #[tokio::test]
963    async fn artifact_registry_events_are_durable_append_order_cursor() {
964        let dir = tempfile::tempdir().unwrap();
965        let store = FlatFileStore::open(dir.path());
966        store.init().await.unwrap();
967
968        let first = ArtifactRegistryRecord {
969            raw_igc_hash: hash('a'),
970            pilot_id: None,
971            publication_mode: PublicationMode::Public,
972            protected_hash: None,
973            has_raw_igc: true,
974            has_protected_sanitized_igc: false,
975            has_protected_raw_companion: false,
976            serving_node_ids: vec![node_id('b')],
977            g_record_present: None,
978            recorded_at: "2026-05-01T09:00:00Z".to_string(),
979        };
980        let second = ArtifactRegistryRecord {
981            raw_igc_hash: hash('c'),
982            pilot_id: None,
983            publication_mode: PublicationMode::Private,
984            protected_hash: None,
985            has_raw_igc: true,
986            has_protected_sanitized_igc: false,
987            has_protected_raw_companion: false,
988            serving_node_ids: vec![node_id('d')],
989            g_record_present: None,
990            recorded_at: "2026-05-01T09:01:00Z".to_string(),
991        };
992
993        store.append_artifact_registry_record(&first).await.unwrap();
994        store
995            .append_artifact_registry_record(&second)
996            .await
997            .unwrap();
998        assert_eq!(store.latest_artifact_registry_event_seq().unwrap(), 1);
999        assert_eq!(
1000            store
1001                .latest_artifact_registry_event_seq_for(&first.raw_igc_hash)
1002                .unwrap(),
1003            Some(0)
1004        );
1005        assert_eq!(
1006            store.artifact_registry_events_since(1).unwrap(),
1007            vec![(1, second.clone())]
1008        );
1009
1010        let reopened = FlatFileStore::open(dir.path());
1011        reopened.init().await.unwrap();
1012        assert_eq!(reopened.latest_artifact_registry_event_seq().unwrap(), 1);
1013        assert_eq!(
1014            reopened.artifact_registry_events_since(0).unwrap(),
1015            vec![(0, first), (1, second)]
1016        );
1017    }
1018
1019    #[tokio::test]
1020    async fn artifact_registry_latest_record_wins() {
1021        let (store, _dir) = temp_store().await;
1022        store
1023            .append_artifact_registry_record(&ArtifactRegistryRecord {
1024                raw_igc_hash: hash('a'),
1025                pilot_id: None,
1026                publication_mode: PublicationMode::Private,
1027                protected_hash: None,
1028                has_raw_igc: true,
1029                has_protected_sanitized_igc: false,
1030                has_protected_raw_companion: false,
1031                serving_node_ids: vec![node_id('b')],
1032                g_record_present: None,
1033                recorded_at: "2026-04-28T12:00:00Z".to_string(),
1034            })
1035            .await
1036            .unwrap();
1037        store
1038            .append_artifact_registry_record(&ArtifactRegistryRecord {
1039                raw_igc_hash: hash('a'),
1040                pilot_id: Some(pilot_id('c')),
1041                publication_mode: PublicationMode::Public,
1042                protected_hash: None,
1043                has_raw_igc: true,
1044                has_protected_sanitized_igc: false,
1045                has_protected_raw_companion: false,
1046                serving_node_ids: vec![node_id('b'), node_id('d')],
1047                g_record_present: None,
1048                recorded_at: "2026-04-28T12:01:00Z".to_string(),
1049            })
1050            .await
1051            .unwrap();
1052
1053        let latest = store.artifact_registry_record(&hash('a')).unwrap().unwrap();
1054        assert_eq!(latest.publication_mode, PublicationMode::Public);
1055        assert_eq!(latest.pilot_id, Some(pilot_id('c')));
1056        assert_eq!(latest.serving_node_ids, vec![node_id('b'), node_id('d')]);
1057    }
1058
1059    #[tokio::test]
1060    async fn artifact_registry_validates_mode_specific_fields() {
1061        let (store, _dir) = temp_store().await;
1062        let protected_without_hash = ArtifactRegistryRecord {
1063            raw_igc_hash: hash('a'),
1064            pilot_id: None,
1065            publication_mode: PublicationMode::Protected,
1066            protected_hash: None,
1067            has_raw_igc: false,
1068            has_protected_sanitized_igc: true,
1069            has_protected_raw_companion: false,
1070            serving_node_ids: vec![],
1071            g_record_present: None,
1072            recorded_at: "2026-04-28T12:00:00Z".to_string(),
1073        };
1074        assert!(matches!(
1075            store
1076                .append_artifact_registry_record(&protected_without_hash)
1077                .await,
1078            Err(StoreError::InvalidArtifactRecord(
1079                "protected mode requires protected_hash"
1080            ))
1081        ));
1082
1083        let public_with_protected_state = ArtifactRegistryRecord {
1084            raw_igc_hash: hash('a'),
1085            pilot_id: None,
1086            publication_mode: PublicationMode::Public,
1087            protected_hash: Some(hash('b')),
1088            has_raw_igc: true,
1089            has_protected_sanitized_igc: false,
1090            has_protected_raw_companion: false,
1091            serving_node_ids: vec![],
1092            g_record_present: None,
1093            recorded_at: "2026-04-28T12:00:00Z".to_string(),
1094        };
1095        assert!(matches!(
1096            store
1097                .append_artifact_registry_record(&public_with_protected_state)
1098                .await,
1099            Err(StoreError::InvalidArtifactRecord(
1100                "protected_hash is only valid in protected mode"
1101            ))
1102        ));
1103    }
1104
1105    #[tokio::test]
1106    async fn key_persistence() {
1107        let (store, _dir) = temp_store().await;
1108        assert!(store.load_key_bytes().unwrap().is_none());
1109
1110        let key = [42u8; 32];
1111        store.save_key_bytes(&key).unwrap();
1112
1113        let loaded = store.load_key_bytes().unwrap().unwrap();
1114        assert_eq!(loaded, key);
1115    }
1116
1117    #[cfg(unix)]
1118    #[tokio::test]
1119    async fn key_file_has_mode_0600() {
1120        use std::os::unix::fs::PermissionsExt;
1121        let (store, dir) = temp_store().await;
1122        store.save_key_bytes(&[0u8; 32]).unwrap();
1123        let meta = std::fs::metadata(dir.path().join("node.key")).unwrap();
1124        let mode = meta.permissions().mode() & 0o777;
1125        assert_eq!(mode, 0o600, "node.key must have mode 0600, got {mode:o}");
1126    }
1127}