1use std::collections::{HashMap, HashSet};
14use std::path::{Path, PathBuf};
15use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
16
17use serde::{Deserialize, Serialize};
18use tokio::fs;
19use tokio::io::AsyncWriteExt;
20use tokio::sync::Mutex;
21
22use crate::id::{Blake3Hex, IdentifierError, NodeIdHex, PilotId};
23
24#[derive(Debug, thiserror::Error)]
27pub enum StoreError {
28 #[error("I/O: {0}")]
29 Io(#[from] std::io::Error),
30 #[error("JSON: {0}")]
31 Json(#[from] serde_json::Error),
32 #[error("identifier: {0}")]
33 Identifier(#[from] IdentifierError),
34 #[error("invalid artifact registry record: {0}")]
35 InvalidArtifactRecord(&'static str),
36 #[error("lock poisoned: {0}")]
37 PoisonedLock(&'static str),
38}
39
40#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
44#[serde(rename_all = "snake_case")]
45pub enum IndexRecordSource {
46 LocalPublish,
47 RemoteAnnouncement,
48}
49
50#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
55pub struct IndexRecord {
56 pub source: IndexRecordSource,
58 pub igc_hash: Blake3Hex,
60 pub meta_hash: Blake3Hex,
62 pub node_id: NodeIdHex,
64 pub igc_ticket: String,
66 pub meta_ticket: String,
68 pub recorded_at: String,
70}
71
72#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
76#[serde(rename_all = "snake_case")]
77pub enum PublicationMode {
78 Public,
79 Protected,
80 Private,
81}
82
83#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
89pub struct ArtifactRegistryRecord {
90 pub raw_igc_hash: Blake3Hex,
92 pub pilot_id: Option<PilotId>,
94 pub publication_mode: PublicationMode,
96 pub protected_hash: Option<Blake3Hex>,
98 pub has_raw_igc: bool,
100 pub has_protected_sanitized_igc: bool,
102 pub has_protected_raw_companion: bool,
104 pub serving_node_ids: Vec<NodeIdHex>,
106 #[serde(default)]
108 pub g_record_present: Option<bool>,
109 pub recorded_at: String,
111}
112
113pub struct FlatFileStore {
122 root: PathBuf,
123 dedup_cache: RwLock<HashSet<(Blake3Hex, NodeIdHex)>>,
125 meta_hash_cache: RwLock<HashSet<Blake3Hex>>,
127 latest_local_publish_cache: RwLock<HashMap<(Blake3Hex, NodeIdHex), IndexRecord>>,
129 index_records_cache: RwLock<Vec<IndexRecord>>,
131 discovery_events_cache: RwLock<Vec<(u64, IndexRecord)>>,
133 artifact_registry_cache: RwLock<HashMap<Blake3Hex, ArtifactRegistryRecord>>,
135 artifact_registry_events_cache: RwLock<Vec<(u64, ArtifactRegistryRecord)>>,
137 append_lock: Mutex<()>,
139}
140
141type DedupKey = (Blake3Hex, NodeIdHex);
142type LatestLocalPublishMap = HashMap<DedupKey, IndexRecord>;
143type ArtifactRegistryMap = HashMap<Blake3Hex, ArtifactRegistryRecord>;
144
145impl FlatFileStore {
146 pub fn open(root: impl Into<PathBuf>) -> Self {
150 Self {
151 root: root.into(),
152 dedup_cache: RwLock::new(HashSet::new()),
153 meta_hash_cache: RwLock::new(HashSet::new()),
154 latest_local_publish_cache: RwLock::new(HashMap::new()),
155 index_records_cache: RwLock::new(Vec::new()),
156 discovery_events_cache: RwLock::new(Vec::new()),
157 artifact_registry_cache: RwLock::new(HashMap::new()),
158 artifact_registry_events_cache: RwLock::new(Vec::new()),
159 append_lock: Mutex::new(()),
160 }
161 }
162
163 pub async fn init(&self) -> Result<(), StoreError> {
166 fs::create_dir_all(self.blobs_dir()).await?;
167 self.reload_cache()?;
168 Ok(())
169 }
170
171 fn reload_cache(&self) -> Result<(), StoreError> {
173 let mut dedup = self
174 .dedup_cache
175 .write()
176 .map_err(|_| StoreError::PoisonedLock("dedup_cache"))?;
177 let mut metas = self
178 .meta_hash_cache
179 .write()
180 .map_err(|_| StoreError::PoisonedLock("meta_hash_cache"))?;
181 let mut latest_local = self
182 .latest_local_publish_cache
183 .write()
184 .map_err(|_| StoreError::PoisonedLock("latest_local_publish_cache"))?;
185 let mut index_records = self
186 .index_records_cache
187 .write()
188 .map_err(|_| StoreError::PoisonedLock("index_records_cache"))?;
189 let mut discovery_events = self
190 .discovery_events_cache
191 .write()
192 .map_err(|_| StoreError::PoisonedLock("discovery_events_cache"))?;
193 let mut artifact_registry = self
194 .artifact_registry_cache
195 .write()
196 .map_err(|_| StoreError::PoisonedLock("artifact_registry_cache"))?;
197 let mut artifact_registry_events = self
198 .artifact_registry_events_cache
199 .write()
200 .map_err(|_| StoreError::PoisonedLock("artifact_registry_events_cache"))?;
201 dedup.clear();
202 metas.clear();
203 latest_local.clear();
204 index_records.clear();
205 discovery_events.clear();
206 artifact_registry.clear();
207 artifact_registry_events.clear();
208 for (seq, record) in self.iter_index_file()?.enumerate() {
209 let r = record?;
210 dedup.insert((r.meta_hash.clone(), r.node_id.clone()));
211 metas.insert(r.meta_hash.clone());
212 if r.source == IndexRecordSource::LocalPublish {
213 latest_local.insert((r.igc_hash.clone(), r.node_id.clone()), r.clone());
214 } else {
215 discovery_events.push((seq as u64, r.clone()));
216 }
217 index_records.push(r);
218 }
219 for (seq, record) in self.iter_artifact_registry_file()?.enumerate() {
220 let record = record?;
221 validate_artifact_registry_record(&record)?;
222 artifact_registry_events.push((seq as u64, record.clone()));
223 artifact_registry.insert(record.raw_igc_hash.clone(), record);
224 }
225 Ok(())
226 }
227
228 fn blobs_dir(&self) -> PathBuf {
231 self.root.join("blobs")
232 }
233
234 fn blob_path(&self, blake3_hex: &Blake3Hex) -> PathBuf {
235 self.blobs_dir()
236 .join(&blake3_hex.as_str()[..2])
237 .join(blake3_hex.as_str())
238 }
239
240 fn index_path(&self) -> PathBuf {
241 self.root.join("index.ndjson")
242 }
243
244 fn artifact_registry_path(&self) -> PathBuf {
245 self.root.join("artifacts.ndjson")
246 }
247
248 fn key_path(&self) -> PathBuf {
249 self.root.join("node.key")
250 }
251
252 pub fn resolve_path(&self, blake3_hex: &str) -> Result<Option<PathBuf>, StoreError> {
258 let blake3_hex = Blake3Hex::parse(blake3_hex)?;
259 let path = self.blob_path(&blake3_hex);
260 Ok(if path.exists() { Some(path) } else { None })
261 }
262
263 pub async fn put(&self, bytes: &[u8]) -> Result<Blake3Hex, StoreError> {
268 let hex = Blake3Hex::from_hash(blake3::hash(bytes));
269 let path = self.blob_path(&hex);
270
271 if let Some(parent) = path.parent() {
272 fs::create_dir_all(parent).await?;
273 }
274 match fs::OpenOptions::new()
275 .create_new(true)
276 .write(true)
277 .open(&path)
278 .await
279 {
280 Ok(mut file) => {
281 file.write_all(bytes).await?;
282 file.flush().await?;
283 }
284 Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {}
285 Err(e) => return Err(StoreError::Io(e)),
286 }
287 Ok(hex)
288 }
289
290 pub async fn get(&self, blake3_hex: &str) -> Result<Option<Vec<u8>>, StoreError> {
292 let blake3_hex = Blake3Hex::parse(blake3_hex)?;
293 let path = self.blob_path(&blake3_hex);
294 match fs::read(&path).await {
295 Ok(bytes) => Ok(Some(bytes)),
296 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
297 Err(e) => Err(StoreError::Io(e)),
298 }
299 }
300
301 pub fn contains(&self, blake3_hex: &str) -> Result<bool, StoreError> {
303 let blake3_hex = Blake3Hex::parse(blake3_hex)?;
304 Ok(self.blob_path(&blake3_hex).exists())
305 }
306
307 pub async fn delete_blob(&self, blake3_hex: &Blake3Hex) -> Result<bool, StoreError> {
313 let path = self.blob_path(blake3_hex);
314 match fs::remove_file(&path).await {
315 Ok(()) => {
316 if let Some(parent) = path.parent() {
317 let _ = fs::remove_dir(parent).await;
318 }
319 Ok(true)
320 }
321 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false),
322 Err(e) => Err(StoreError::Io(e)),
323 }
324 }
325
326 pub async fn append_index(&self, record: &IndexRecord) -> Result<(), StoreError> {
332 let _append_guard = self.append_lock.lock().await;
333 self.append_index_unlocked(record).await
334 }
335
336 pub async fn append_index_if_absent(&self, record: &IndexRecord) -> Result<bool, StoreError> {
341 let _append_guard = self.append_lock.lock().await;
342 if self
343 .dedup_read()?
344 .contains(&(record.meta_hash.clone(), record.node_id.clone()))
345 {
346 return Ok(false);
347 }
348 self.append_index_unlocked(record).await?;
349 Ok(true)
350 }
351
352 async fn append_index_unlocked(&self, record: &IndexRecord) -> Result<(), StoreError> {
353 let mut line = serde_json::to_string(record)?;
354 line.push('\n');
355
356 let mut file = fs::OpenOptions::new()
357 .create(true)
358 .append(true)
359 .open(self.index_path())
360 .await?;
361 file.write_all(line.as_bytes()).await?;
362 file.flush().await?;
363
364 self.dedup_write()?
366 .insert((record.meta_hash.clone(), record.node_id.clone()));
367 self.meta_hash_write()?.insert(record.meta_hash.clone());
368 if record.source == IndexRecordSource::LocalPublish {
369 self.latest_local_publish_write()?.insert(
370 (record.igc_hash.clone(), record.node_id.clone()),
371 record.clone(),
372 );
373 } else {
374 let seq = self.index_records_read()?.len() as u64;
375 self.discovery_events_write()?.push((seq, record.clone()));
376 }
377 self.index_records_write()?.push(record.clone());
378
379 Ok(())
380 }
381
382 pub fn iter_index(
384 &self,
385 ) -> Result<impl Iterator<Item = Result<IndexRecord, StoreError>>, StoreError> {
386 let records = self.index_records_read()?.clone();
387 Ok(Box::new(records.into_iter().map(Ok))
388 as Box<
389 dyn Iterator<Item = Result<IndexRecord, StoreError>>,
390 >)
391 }
392
393 fn iter_index_file(
395 &self,
396 ) -> Result<impl Iterator<Item = Result<IndexRecord, StoreError>>, StoreError> {
397 use std::io::{BufRead, BufReader};
398
399 let path = self.index_path();
400 if !path.exists() {
402 let v: Vec<Result<IndexRecord, StoreError>> = Vec::new();
403 return Ok(Box::new(v.into_iter())
404 as Box<dyn Iterator<Item = Result<IndexRecord, StoreError>>>);
405 }
406
407 let file = std::fs::File::open(&path).map_err(StoreError::Io)?;
408 let reader = BufReader::new(file);
409 Ok(Box::new(reader.lines().map(|line| {
410 let line = line.map_err(StoreError::Io)?;
411 serde_json::from_str::<IndexRecord>(&line).map_err(StoreError::Json)
412 }))
413 as Box<
414 dyn Iterator<Item = Result<IndexRecord, StoreError>>,
415 >)
416 }
417
418 fn iter_artifact_registry_file(
420 &self,
421 ) -> Result<impl Iterator<Item = Result<ArtifactRegistryRecord, StoreError>>, StoreError> {
422 use std::io::{BufRead, BufReader};
423
424 let path = self.artifact_registry_path();
425 if !path.exists() {
426 let v: Vec<Result<ArtifactRegistryRecord, StoreError>> = Vec::new();
427 return Ok(Box::new(v.into_iter())
428 as Box<
429 dyn Iterator<Item = Result<ArtifactRegistryRecord, StoreError>>,
430 >);
431 }
432
433 let file = std::fs::File::open(&path).map_err(StoreError::Io)?;
434 let reader = BufReader::new(file);
435 Ok(Box::new(reader.lines().map(|line| {
436 let line = line.map_err(StoreError::Io)?;
437 serde_json::from_str::<ArtifactRegistryRecord>(&line).map_err(StoreError::Json)
438 }))
439 as Box<
440 dyn Iterator<Item = Result<ArtifactRegistryRecord, StoreError>>,
441 >)
442 }
443
444 pub fn has_index_record(&self, meta_hash: &str, node_id: &str) -> Result<bool, StoreError> {
448 let meta_hash = Blake3Hex::parse(meta_hash)?;
449 let node_id = NodeIdHex::parse(node_id)?;
450 Ok(self.dedup_read()?.contains(&(meta_hash, node_id)))
451 }
452
453 pub fn has_meta_hash(&self, meta_hash: &str) -> Result<bool, StoreError> {
457 let meta_hash = Blake3Hex::parse(meta_hash)?;
458 Ok(self.meta_hash_read()?.contains(&meta_hash))
459 }
460
461 pub fn discovery_events_since(
469 &self,
470 since_seq: u64,
471 ) -> Result<Vec<(u64, IndexRecord)>, StoreError> {
472 let events = self.discovery_events_read()?;
473 let start = events.partition_point(|(seq, _)| *seq < since_seq);
474 Ok(events[start..].to_vec())
475 }
476
477 pub fn latest_local_publish(
479 &self,
480 igc_hash: &Blake3Hex,
481 node_id: &NodeIdHex,
482 ) -> Result<Option<IndexRecord>, StoreError> {
483 Ok(self
484 .latest_local_publish_read()?
485 .get(&(igc_hash.clone(), node_id.clone()))
486 .cloned())
487 }
488
489 pub async fn append_artifact_registry_record(
494 &self,
495 record: &ArtifactRegistryRecord,
496 ) -> Result<(), StoreError> {
497 validate_artifact_registry_record(record)?;
498
499 let mut line = serde_json::to_string(record)?;
500 line.push('\n');
501
502 let mut file = fs::OpenOptions::new()
503 .create(true)
504 .append(true)
505 .open(self.artifact_registry_path())
506 .await?;
507 file.write_all(line.as_bytes()).await?;
508 file.flush().await?;
509
510 self.artifact_registry_write()?
511 .insert(record.raw_igc_hash.clone(), record.clone());
512 let seq = self.artifact_registry_events_read()?.len() as u64;
513 self.artifact_registry_events_write()?
514 .push((seq, record.clone()));
515
516 Ok(())
517 }
518
519 pub fn artifact_registry_record(
521 &self,
522 raw_igc_hash: &Blake3Hex,
523 ) -> Result<Option<ArtifactRegistryRecord>, StoreError> {
524 Ok(self.artifact_registry_read()?.get(raw_igc_hash).cloned())
525 }
526
527 pub fn artifact_registry_records(&self) -> Result<Vec<ArtifactRegistryRecord>, StoreError> {
529 let mut records: Vec<_> = self.artifact_registry_read()?.values().cloned().collect();
530 records.sort_by(|left, right| left.raw_igc_hash.cmp(&right.raw_igc_hash));
531 Ok(records)
532 }
533
534 pub fn artifact_registry_events_since(
539 &self,
540 from_seq: u64,
541 ) -> Result<Vec<(u64, ArtifactRegistryRecord)>, StoreError> {
542 let events = self.artifact_registry_events_read()?;
543 let start = events.partition_point(|(seq, _)| *seq < from_seq);
544 Ok(events[start..].to_vec())
545 }
546
547 pub fn latest_artifact_registry_event_seq(&self) -> Result<u64, StoreError> {
551 Ok(self
552 .artifact_registry_events_read()?
553 .last()
554 .map(|(seq, _)| *seq)
555 .unwrap_or(0))
556 }
557
558 pub fn latest_artifact_registry_event_seq_for(
561 &self,
562 raw_igc_hash: &Blake3Hex,
563 ) -> Result<Option<u64>, StoreError> {
564 Ok(self
565 .artifact_registry_events_read()?
566 .iter()
567 .rev()
568 .find_map(|(seq, record)| (&record.raw_igc_hash == raw_igc_hash).then_some(*seq)))
569 }
570
571 pub fn load_key_bytes(&self) -> Result<Option<[u8; 32]>, StoreError> {
576 use std::io::Read;
577 let path = self.key_path();
578 if !path.exists() {
579 return Ok(None);
580 }
581 let mut bytes = [0u8; 32];
582 std::fs::File::open(&path)
583 .and_then(|mut f| f.read_exact(&mut bytes))
584 .map_err(StoreError::Io)?;
585 Ok(Some(bytes))
586 }
587
588 pub fn save_key_bytes(&self, bytes: &[u8; 32]) -> Result<(), StoreError> {
590 write_key_file(&self.key_path(), bytes)
591 }
592
593 fn dedup_read(&self) -> Result<RwLockReadGuard<'_, HashSet<DedupKey>>, StoreError> {
594 self.dedup_cache
595 .read()
596 .map_err(|_| StoreError::PoisonedLock("dedup_cache"))
597 }
598
599 fn dedup_write(&self) -> Result<RwLockWriteGuard<'_, HashSet<DedupKey>>, StoreError> {
600 self.dedup_cache
601 .write()
602 .map_err(|_| StoreError::PoisonedLock("dedup_cache"))
603 }
604
605 fn meta_hash_read(&self) -> Result<RwLockReadGuard<'_, HashSet<Blake3Hex>>, StoreError> {
606 self.meta_hash_cache
607 .read()
608 .map_err(|_| StoreError::PoisonedLock("meta_hash_cache"))
609 }
610
611 fn meta_hash_write(&self) -> Result<RwLockWriteGuard<'_, HashSet<Blake3Hex>>, StoreError> {
612 self.meta_hash_cache
613 .write()
614 .map_err(|_| StoreError::PoisonedLock("meta_hash_cache"))
615 }
616
617 fn latest_local_publish_read(
618 &self,
619 ) -> Result<RwLockReadGuard<'_, LatestLocalPublishMap>, StoreError> {
620 self.latest_local_publish_cache
621 .read()
622 .map_err(|_| StoreError::PoisonedLock("latest_local_publish_cache"))
623 }
624
625 fn latest_local_publish_write(
626 &self,
627 ) -> Result<RwLockWriteGuard<'_, LatestLocalPublishMap>, StoreError> {
628 self.latest_local_publish_cache
629 .write()
630 .map_err(|_| StoreError::PoisonedLock("latest_local_publish_cache"))
631 }
632
633 fn index_records_read(&self) -> Result<RwLockReadGuard<'_, Vec<IndexRecord>>, StoreError> {
634 self.index_records_cache
635 .read()
636 .map_err(|_| StoreError::PoisonedLock("index_records_cache"))
637 }
638
639 fn index_records_write(&self) -> Result<RwLockWriteGuard<'_, Vec<IndexRecord>>, StoreError> {
640 self.index_records_cache
641 .write()
642 .map_err(|_| StoreError::PoisonedLock("index_records_cache"))
643 }
644
645 fn discovery_events_read(
646 &self,
647 ) -> Result<RwLockReadGuard<'_, Vec<(u64, IndexRecord)>>, StoreError> {
648 self.discovery_events_cache
649 .read()
650 .map_err(|_| StoreError::PoisonedLock("discovery_events_cache"))
651 }
652
653 fn discovery_events_write(
654 &self,
655 ) -> Result<RwLockWriteGuard<'_, Vec<(u64, IndexRecord)>>, StoreError> {
656 self.discovery_events_cache
657 .write()
658 .map_err(|_| StoreError::PoisonedLock("discovery_events_cache"))
659 }
660
661 fn artifact_registry_read(
662 &self,
663 ) -> Result<RwLockReadGuard<'_, ArtifactRegistryMap>, StoreError> {
664 self.artifact_registry_cache
665 .read()
666 .map_err(|_| StoreError::PoisonedLock("artifact_registry_cache"))
667 }
668
669 fn artifact_registry_write(
670 &self,
671 ) -> Result<RwLockWriteGuard<'_, ArtifactRegistryMap>, StoreError> {
672 self.artifact_registry_cache
673 .write()
674 .map_err(|_| StoreError::PoisonedLock("artifact_registry_cache"))
675 }
676
677 fn artifact_registry_events_read(
678 &self,
679 ) -> Result<RwLockReadGuard<'_, Vec<(u64, ArtifactRegistryRecord)>>, StoreError> {
680 self.artifact_registry_events_cache
681 .read()
682 .map_err(|_| StoreError::PoisonedLock("artifact_registry_events_cache"))
683 }
684
685 fn artifact_registry_events_write(
686 &self,
687 ) -> Result<RwLockWriteGuard<'_, Vec<(u64, ArtifactRegistryRecord)>>, StoreError> {
688 self.artifact_registry_events_cache
689 .write()
690 .map_err(|_| StoreError::PoisonedLock("artifact_registry_events_cache"))
691 }
692}
693
694fn validate_artifact_registry_record(record: &ArtifactRegistryRecord) -> Result<(), StoreError> {
695 match record.publication_mode {
696 PublicationMode::Protected => {
697 if record.protected_hash.is_none() {
698 return Err(StoreError::InvalidArtifactRecord(
699 "protected mode requires protected_hash",
700 ));
701 }
702 }
703 PublicationMode::Public | PublicationMode::Private => {
704 if record.protected_hash.is_some() {
705 return Err(StoreError::InvalidArtifactRecord(
706 "protected_hash is only valid in protected mode",
707 ));
708 }
709 if record.has_protected_sanitized_igc || record.has_protected_raw_companion {
710 return Err(StoreError::InvalidArtifactRecord(
711 "protected artifacts are only valid in protected mode",
712 ));
713 }
714 }
715 }
716
717 let unique_serving_nodes: HashSet<_> = record.serving_node_ids.iter().collect();
718 if unique_serving_nodes.len() != record.serving_node_ids.len() {
719 return Err(StoreError::InvalidArtifactRecord(
720 "serving_node_ids must not contain duplicates",
721 ));
722 }
723
724 Ok(())
725}
726
727#[cfg(unix)]
730fn write_key_file(path: &Path, bytes: &[u8; 32]) -> Result<(), StoreError> {
731 use std::io::Write;
732 use std::os::unix::fs::OpenOptionsExt;
733
734 let mut file = std::fs::OpenOptions::new()
735 .create(true)
736 .truncate(true)
737 .write(true)
738 .mode(0o600)
739 .open(path)
740 .map_err(StoreError::Io)?;
741 file.write_all(bytes).map_err(StoreError::Io)?;
742 Ok(())
743}
744
745#[cfg(not(unix))]
746fn write_key_file(path: &Path, bytes: &[u8; 32]) -> Result<(), StoreError> {
747 use std::io::Write;
748 let mut file = std::fs::File::create(path).map_err(StoreError::Io)?;
749 file.write_all(bytes).map_err(StoreError::Io)?;
750 Ok(())
751}
752
753#[cfg(test)]
756mod tests {
757 use super::*;
758 use crate::id::{Blake3Hex, IdentifierError, NodeIdHex, PilotId};
759
760 async fn temp_store() -> (FlatFileStore, tempfile::TempDir) {
761 let dir = tempfile::tempdir().unwrap();
762 let store = FlatFileStore::open(dir.path());
763 store.init().await.unwrap();
764 (store, dir)
765 }
766
767 fn hash(ch: char) -> Blake3Hex {
768 Blake3Hex::parse(ch.to_string().repeat(64)).unwrap()
769 }
770
771 fn node_id(ch: char) -> NodeIdHex {
772 NodeIdHex::parse(ch.to_string().repeat(64)).unwrap()
773 }
774
775 fn pilot_id(ch: char) -> PilotId {
776 PilotId::parse(format!("{}{}", PilotId::PREFIX, ch.to_string().repeat(64))).unwrap()
777 }
778
779 #[tokio::test]
780 async fn put_get_round_trip() {
781 let (store, _dir) = temp_store().await;
782 let data = b"hello igc-net";
783 let hex = store.put(data).await.unwrap();
784 assert_eq!(hex.len(), 64);
785 let got = store.get(&hex).await.unwrap().unwrap();
786 assert_eq!(got, data);
787 }
788
789 #[tokio::test]
790 async fn put_is_idempotent() {
791 let (store, _dir) = temp_store().await;
792 let data = b"same content";
793 let h1 = store.put(data).await.unwrap();
794 let h2 = store.put(data).await.unwrap();
795 assert_eq!(h1, h2);
796 }
797
798 #[tokio::test]
799 async fn contains_false_before_put_true_after() {
800 let (store, _dir) = temp_store().await;
801 let data = b"check contains";
802 let hex = Blake3Hex::from_hash(blake3::hash(data));
803 assert!(!store.contains(&hex).unwrap());
804 store.put(data).await.unwrap();
805 assert!(store.contains(&hex).unwrap());
806 }
807
808 #[tokio::test]
809 async fn delete_blob_removes_local_plaintext_and_is_idempotent() {
810 let (store, _dir) = temp_store().await;
811 let data = b"restricted plaintext";
812 let hex = store.put(data).await.unwrap();
813
814 assert!(store.contains(&hex).unwrap());
815 assert!(store.delete_blob(&hex).await.unwrap());
816 assert!(!store.contains(&hex).unwrap());
817 assert!(store.get(&hex).await.unwrap().is_none());
818 assert!(!store.delete_blob(&hex).await.unwrap());
819 }
820
821 #[tokio::test]
822 async fn get_missing_returns_none() {
823 let (store, _dir) = temp_store().await;
824 let hex = hash('a');
825 let result = store.get(&hex).await.unwrap();
826 assert!(result.is_none());
827 }
828
829 #[tokio::test]
830 async fn invalid_hash_is_rejected_by_lookup_apis() {
831 let (store, _dir) = temp_store().await;
832 assert!(matches!(
833 store.contains("bad-hash"),
834 Err(StoreError::Identifier(IdentifierError::Blake3Hex(_)))
835 ));
836 assert!(matches!(
837 store.resolve_path("bad-hash"),
838 Err(StoreError::Identifier(IdentifierError::Blake3Hex(_)))
839 ));
840 assert!(matches!(
841 store.get("bad-hash").await,
842 Err(StoreError::Identifier(IdentifierError::Blake3Hex(_)))
843 ));
844 }
845
846 #[tokio::test]
847 async fn index_round_trip() {
848 let (store, _dir) = temp_store().await;
849 let rec = IndexRecord {
850 source: IndexRecordSource::LocalPublish,
851 igc_hash: hash('a'),
852 meta_hash: hash('b'),
853 node_id: node_id('c'),
854 igc_ticket: "igc_ticket".to_string(),
855 meta_ticket: "meta_ticket".to_string(),
856 recorded_at: "2026-03-22T12:00:00Z".to_string(),
857 };
858 store.append_index(&rec).await.unwrap();
859 store.append_index(&rec).await.unwrap();
860
861 let records: Vec<_> = store.iter_index().unwrap().collect();
862 assert_eq!(records.len(), 2);
863 assert_eq!(records[0].as_ref().unwrap().igc_hash, hash('a'));
864 }
865
866 #[tokio::test]
867 async fn has_index_record_uses_meta_hash_and_node_id() {
868 let (store, _dir) = temp_store().await;
869 store
870 .append_index(&IndexRecord {
871 source: IndexRecordSource::RemoteAnnouncement,
872 igc_hash: hash('a'),
873 meta_hash: hash('b'),
874 node_id: node_id('c'),
875 igc_ticket: "igc_ticket_1".to_string(),
876 meta_ticket: "meta_ticket_1".to_string(),
877 recorded_at: "2026-03-22T12:00:00Z".to_string(),
878 })
879 .await
880 .unwrap();
881
882 assert!(
883 store
884 .has_index_record(&"b".repeat(64), &"c".repeat(64))
885 .unwrap()
886 );
887 assert!(
888 !store
889 .has_index_record(&"b".repeat(64), &"d".repeat(64))
890 .unwrap()
891 );
892 assert!(store.has_meta_hash(&"b".repeat(64)).unwrap());
893 }
894
895 #[tokio::test]
896 async fn latest_local_publish_returns_last_matching_record() {
897 let (store, _dir) = temp_store().await;
898 for recorded_at in ["2026-03-22T12:00:00Z", "2026-03-22T12:05:00Z"] {
899 store
900 .append_index(&IndexRecord {
901 source: IndexRecordSource::LocalPublish,
902 igc_hash: hash('a'),
903 meta_hash: hash('b'),
904 node_id: node_id('c'),
905 igc_ticket: format!("igc_ticket_{recorded_at}"),
906 meta_ticket: format!("meta_ticket_{recorded_at}"),
907 recorded_at: recorded_at.to_string(),
908 })
909 .await
910 .unwrap();
911 }
912
913 let latest = store
914 .latest_local_publish(&hash('a'), &node_id('c'))
915 .unwrap()
916 .unwrap();
917 assert_eq!(latest.recorded_at, "2026-03-22T12:05:00Z");
918 }
919
920 #[tokio::test]
921 async fn iter_index_on_empty_store_returns_empty() {
922 let (store, _dir) = temp_store().await;
923 let records: Vec<_> = store.iter_index().unwrap().collect();
924 assert!(records.is_empty());
925 }
926
927 #[tokio::test]
928 async fn artifact_registry_round_trip_and_reload() {
929 let dir = tempfile::tempdir().unwrap();
930 let store = FlatFileStore::open(dir.path());
931 store.init().await.unwrap();
932
933 let record = ArtifactRegistryRecord {
934 raw_igc_hash: hash('a'),
935 pilot_id: Some(pilot_id('b')),
936 publication_mode: PublicationMode::Protected,
937 protected_hash: Some(hash('c')),
938 has_raw_igc: true,
939 has_protected_sanitized_igc: true,
940 has_protected_raw_companion: true,
941 serving_node_ids: vec![node_id('d')],
942 g_record_present: None,
943 recorded_at: "2026-04-28T12:00:00Z".to_string(),
944 };
945 store
946 .append_artifact_registry_record(&record)
947 .await
948 .unwrap();
949 assert_eq!(
950 store.artifact_registry_record(&hash('a')).unwrap(),
951 Some(record.clone())
952 );
953
954 let reopened = FlatFileStore::open(dir.path());
955 reopened.init().await.unwrap();
956 assert_eq!(
957 reopened.artifact_registry_record(&hash('a')).unwrap(),
958 Some(record)
959 );
960 }
961
962 #[tokio::test]
963 async fn artifact_registry_events_are_durable_append_order_cursor() {
964 let dir = tempfile::tempdir().unwrap();
965 let store = FlatFileStore::open(dir.path());
966 store.init().await.unwrap();
967
968 let first = ArtifactRegistryRecord {
969 raw_igc_hash: hash('a'),
970 pilot_id: None,
971 publication_mode: PublicationMode::Public,
972 protected_hash: None,
973 has_raw_igc: true,
974 has_protected_sanitized_igc: false,
975 has_protected_raw_companion: false,
976 serving_node_ids: vec![node_id('b')],
977 g_record_present: None,
978 recorded_at: "2026-05-01T09:00:00Z".to_string(),
979 };
980 let second = ArtifactRegistryRecord {
981 raw_igc_hash: hash('c'),
982 pilot_id: None,
983 publication_mode: PublicationMode::Private,
984 protected_hash: None,
985 has_raw_igc: true,
986 has_protected_sanitized_igc: false,
987 has_protected_raw_companion: false,
988 serving_node_ids: vec![node_id('d')],
989 g_record_present: None,
990 recorded_at: "2026-05-01T09:01:00Z".to_string(),
991 };
992
993 store.append_artifact_registry_record(&first).await.unwrap();
994 store
995 .append_artifact_registry_record(&second)
996 .await
997 .unwrap();
998 assert_eq!(store.latest_artifact_registry_event_seq().unwrap(), 1);
999 assert_eq!(
1000 store
1001 .latest_artifact_registry_event_seq_for(&first.raw_igc_hash)
1002 .unwrap(),
1003 Some(0)
1004 );
1005 assert_eq!(
1006 store.artifact_registry_events_since(1).unwrap(),
1007 vec![(1, second.clone())]
1008 );
1009
1010 let reopened = FlatFileStore::open(dir.path());
1011 reopened.init().await.unwrap();
1012 assert_eq!(reopened.latest_artifact_registry_event_seq().unwrap(), 1);
1013 assert_eq!(
1014 reopened.artifact_registry_events_since(0).unwrap(),
1015 vec![(0, first), (1, second)]
1016 );
1017 }
1018
1019 #[tokio::test]
1020 async fn artifact_registry_latest_record_wins() {
1021 let (store, _dir) = temp_store().await;
1022 store
1023 .append_artifact_registry_record(&ArtifactRegistryRecord {
1024 raw_igc_hash: hash('a'),
1025 pilot_id: None,
1026 publication_mode: PublicationMode::Private,
1027 protected_hash: None,
1028 has_raw_igc: true,
1029 has_protected_sanitized_igc: false,
1030 has_protected_raw_companion: false,
1031 serving_node_ids: vec![node_id('b')],
1032 g_record_present: None,
1033 recorded_at: "2026-04-28T12:00:00Z".to_string(),
1034 })
1035 .await
1036 .unwrap();
1037 store
1038 .append_artifact_registry_record(&ArtifactRegistryRecord {
1039 raw_igc_hash: hash('a'),
1040 pilot_id: Some(pilot_id('c')),
1041 publication_mode: PublicationMode::Public,
1042 protected_hash: None,
1043 has_raw_igc: true,
1044 has_protected_sanitized_igc: false,
1045 has_protected_raw_companion: false,
1046 serving_node_ids: vec![node_id('b'), node_id('d')],
1047 g_record_present: None,
1048 recorded_at: "2026-04-28T12:01:00Z".to_string(),
1049 })
1050 .await
1051 .unwrap();
1052
1053 let latest = store.artifact_registry_record(&hash('a')).unwrap().unwrap();
1054 assert_eq!(latest.publication_mode, PublicationMode::Public);
1055 assert_eq!(latest.pilot_id, Some(pilot_id('c')));
1056 assert_eq!(latest.serving_node_ids, vec![node_id('b'), node_id('d')]);
1057 }
1058
1059 #[tokio::test]
1060 async fn artifact_registry_validates_mode_specific_fields() {
1061 let (store, _dir) = temp_store().await;
1062 let protected_without_hash = ArtifactRegistryRecord {
1063 raw_igc_hash: hash('a'),
1064 pilot_id: None,
1065 publication_mode: PublicationMode::Protected,
1066 protected_hash: None,
1067 has_raw_igc: false,
1068 has_protected_sanitized_igc: true,
1069 has_protected_raw_companion: false,
1070 serving_node_ids: vec![],
1071 g_record_present: None,
1072 recorded_at: "2026-04-28T12:00:00Z".to_string(),
1073 };
1074 assert!(matches!(
1075 store
1076 .append_artifact_registry_record(&protected_without_hash)
1077 .await,
1078 Err(StoreError::InvalidArtifactRecord(
1079 "protected mode requires protected_hash"
1080 ))
1081 ));
1082
1083 let public_with_protected_state = ArtifactRegistryRecord {
1084 raw_igc_hash: hash('a'),
1085 pilot_id: None,
1086 publication_mode: PublicationMode::Public,
1087 protected_hash: Some(hash('b')),
1088 has_raw_igc: true,
1089 has_protected_sanitized_igc: false,
1090 has_protected_raw_companion: false,
1091 serving_node_ids: vec![],
1092 g_record_present: None,
1093 recorded_at: "2026-04-28T12:00:00Z".to_string(),
1094 };
1095 assert!(matches!(
1096 store
1097 .append_artifact_registry_record(&public_with_protected_state)
1098 .await,
1099 Err(StoreError::InvalidArtifactRecord(
1100 "protected_hash is only valid in protected mode"
1101 ))
1102 ));
1103 }
1104
1105 #[tokio::test]
1106 async fn key_persistence() {
1107 let (store, _dir) = temp_store().await;
1108 assert!(store.load_key_bytes().unwrap().is_none());
1109
1110 let key = [42u8; 32];
1111 store.save_key_bytes(&key).unwrap();
1112
1113 let loaded = store.load_key_bytes().unwrap().unwrap();
1114 assert_eq!(loaded, key);
1115 }
1116
1117 #[cfg(unix)]
1118 #[tokio::test]
1119 async fn key_file_has_mode_0600() {
1120 use std::os::unix::fs::PermissionsExt;
1121 let (store, dir) = temp_store().await;
1122 store.save_key_bytes(&[0u8; 32]).unwrap();
1123 let meta = std::fs::metadata(dir.path().join("node.key")).unwrap();
1124 let mode = meta.permissions().mode() & 0o777;
1125 assert_eq!(mode, 0o600, "node.key must have mode 0600, got {mode:o}");
1126 }
1127}