1use std::collections::{HashMap, HashSet};
13use std::path::{Path, PathBuf};
14use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
15
16use serde::{Deserialize, Serialize};
17use tokio::fs;
18use tokio::io::AsyncWriteExt;
19use tokio::sync::Mutex;
20
21use crate::id::{Blake3Hex, IdentifierError, NodeIdHex};
22
23#[derive(Debug, thiserror::Error)]
26pub enum StoreError {
27 #[error("I/O: {0}")]
28 Io(#[from] std::io::Error),
29 #[error("JSON: {0}")]
30 Json(#[from] serde_json::Error),
31 #[error("identifier: {0}")]
32 Identifier(#[from] IdentifierError),
33 #[error("lock poisoned: {0}")]
34 PoisonedLock(&'static str),
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
41#[serde(rename_all = "snake_case")]
42pub enum IndexRecordSource {
43 LocalPublish,
44 RemoteAnnouncement,
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
52pub struct IndexRecord {
53 pub source: IndexRecordSource,
55 pub igc_hash: Blake3Hex,
57 pub meta_hash: Blake3Hex,
59 pub node_id: NodeIdHex,
61 pub igc_ticket: String,
63 pub meta_ticket: String,
65 pub recorded_at: String,
67}
68
69pub struct FlatFileStore {
78 root: PathBuf,
79 dedup_cache: RwLock<HashSet<(Blake3Hex, NodeIdHex)>>,
81 meta_hash_cache: RwLock<HashSet<Blake3Hex>>,
83 latest_local_publish_cache: RwLock<HashMap<(Blake3Hex, NodeIdHex), IndexRecord>>,
85 index_records_cache: RwLock<Vec<IndexRecord>>,
87 discovery_events_cache: RwLock<Vec<(u64, IndexRecord)>>,
89 append_lock: Mutex<()>,
91}
92
93type DedupKey = (Blake3Hex, NodeIdHex);
94type LatestLocalPublishMap = HashMap<DedupKey, IndexRecord>;
95
96impl FlatFileStore {
97 pub fn open(root: impl Into<PathBuf>) -> Self {
101 Self {
102 root: root.into(),
103 dedup_cache: RwLock::new(HashSet::new()),
104 meta_hash_cache: RwLock::new(HashSet::new()),
105 latest_local_publish_cache: RwLock::new(HashMap::new()),
106 index_records_cache: RwLock::new(Vec::new()),
107 discovery_events_cache: RwLock::new(Vec::new()),
108 append_lock: Mutex::new(()),
109 }
110 }
111
112 pub async fn init(&self) -> Result<(), StoreError> {
115 fs::create_dir_all(self.blobs_dir()).await?;
116 self.reload_cache()?;
117 Ok(())
118 }
119
120 fn reload_cache(&self) -> Result<(), StoreError> {
122 let mut dedup = self.dedup_cache.write().map_err(|_| StoreError::PoisonedLock("dedup_cache"))?;
123 let mut metas = self.meta_hash_cache.write().map_err(|_| StoreError::PoisonedLock("meta_hash_cache"))?;
124 let mut latest_local = self
125 .latest_local_publish_cache
126 .write()
127 .map_err(|_| StoreError::PoisonedLock("latest_local_publish_cache"))?;
128 let mut index_records = self
129 .index_records_cache
130 .write()
131 .map_err(|_| StoreError::PoisonedLock("index_records_cache"))?;
132 let mut discovery_events = self
133 .discovery_events_cache
134 .write()
135 .map_err(|_| StoreError::PoisonedLock("discovery_events_cache"))?;
136 dedup.clear();
137 metas.clear();
138 latest_local.clear();
139 index_records.clear();
140 discovery_events.clear();
141 for (seq, record) in self.iter_index_file()?.enumerate() {
142 let r = record?;
143 dedup.insert((r.meta_hash.clone(), r.node_id.clone()));
144 metas.insert(r.meta_hash.clone());
145 if r.source == IndexRecordSource::LocalPublish {
146 latest_local.insert((r.igc_hash.clone(), r.node_id.clone()), r.clone());
147 } else {
148 discovery_events.push((seq as u64, r.clone()));
149 }
150 index_records.push(r);
151 }
152 Ok(())
153 }
154
155 fn blobs_dir(&self) -> PathBuf {
158 self.root.join("blobs")
159 }
160
161 fn blob_path(&self, blake3_hex: &Blake3Hex) -> PathBuf {
162 self.blobs_dir()
163 .join(&blake3_hex.as_str()[..2])
164 .join(blake3_hex.as_str())
165 }
166
167 fn index_path(&self) -> PathBuf {
168 self.root.join("index.ndjson")
169 }
170
171 fn key_path(&self) -> PathBuf {
172 self.root.join("node.key")
173 }
174
175 pub fn resolve_path(&self, blake3_hex: &str) -> Result<Option<PathBuf>, StoreError> {
181 let blake3_hex = Blake3Hex::parse(blake3_hex)?;
182 let path = self.blob_path(&blake3_hex);
183 Ok(if path.exists() { Some(path) } else { None })
184 }
185
186 pub async fn put(&self, bytes: &[u8]) -> Result<Blake3Hex, StoreError> {
191 let hex = Blake3Hex::from_hash(blake3::hash(bytes));
192 let path = self.blob_path(&hex);
193
194 if let Some(parent) = path.parent() {
195 fs::create_dir_all(parent).await?;
196 }
197 match fs::OpenOptions::new()
198 .create_new(true)
199 .write(true)
200 .open(&path)
201 .await
202 {
203 Ok(mut file) => {
204 file.write_all(bytes).await?;
205 file.flush().await?;
206 }
207 Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {}
208 Err(e) => return Err(StoreError::Io(e)),
209 }
210 Ok(hex)
211 }
212
213 pub async fn get(&self, blake3_hex: &str) -> Result<Option<Vec<u8>>, StoreError> {
215 let blake3_hex = Blake3Hex::parse(blake3_hex)?;
216 let path = self.blob_path(&blake3_hex);
217 match fs::read(&path).await {
218 Ok(bytes) => Ok(Some(bytes)),
219 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
220 Err(e) => Err(StoreError::Io(e)),
221 }
222 }
223
224 pub fn contains(&self, blake3_hex: &str) -> Result<bool, StoreError> {
226 let blake3_hex = Blake3Hex::parse(blake3_hex)?;
227 Ok(self.blob_path(&blake3_hex).exists())
228 }
229
230 pub async fn append_index(&self, record: &IndexRecord) -> Result<(), StoreError> {
236 let _append_guard = self.append_lock.lock().await;
237 self.append_index_unlocked(record).await
238 }
239
240 pub async fn append_index_if_absent(&self, record: &IndexRecord) -> Result<bool, StoreError> {
245 let _append_guard = self.append_lock.lock().await;
246 if self
247 .dedup_read()?
248 .contains(&(record.meta_hash.clone(), record.node_id.clone()))
249 {
250 return Ok(false);
251 }
252 self.append_index_unlocked(record).await?;
253 Ok(true)
254 }
255
256 async fn append_index_unlocked(&self, record: &IndexRecord) -> Result<(), StoreError> {
257 let mut line = serde_json::to_string(record)?;
258 line.push('\n');
259
260 let mut file = fs::OpenOptions::new()
261 .create(true)
262 .append(true)
263 .open(self.index_path())
264 .await?;
265 file.write_all(line.as_bytes()).await?;
266 file.flush().await?;
267
268 self.dedup_write()?
270 .insert((record.meta_hash.clone(), record.node_id.clone()));
271 self.meta_hash_write()?.insert(record.meta_hash.clone());
272 if record.source == IndexRecordSource::LocalPublish {
273 self.latest_local_publish_write()?.insert(
274 (record.igc_hash.clone(), record.node_id.clone()),
275 record.clone(),
276 );
277 } else {
278 let seq = self.index_records_read()?.len() as u64;
279 self.discovery_events_write()?.push((seq, record.clone()));
280 }
281 self.index_records_write()?.push(record.clone());
282
283 Ok(())
284 }
285
286 pub fn iter_index(
288 &self,
289 ) -> Result<impl Iterator<Item = Result<IndexRecord, StoreError>>, StoreError> {
290 let records = self.index_records_read()?.clone();
291 Ok(Box::new(records.into_iter().map(Ok))
292 as Box<dyn Iterator<Item = Result<IndexRecord, StoreError>>>)
293 }
294
295 fn iter_index_file(
297 &self,
298 ) -> Result<impl Iterator<Item = Result<IndexRecord, StoreError>>, StoreError> {
299 use std::io::{BufRead, BufReader};
300
301 let path = self.index_path();
302 if !path.exists() {
304 let v: Vec<Result<IndexRecord, StoreError>> = Vec::new();
305 return Ok(Box::new(v.into_iter())
306 as Box<dyn Iterator<Item = Result<IndexRecord, StoreError>>>);
307 }
308
309 let file = std::fs::File::open(&path).map_err(StoreError::Io)?;
310 let reader = BufReader::new(file);
311 Ok(Box::new(reader.lines().map(|line| {
312 let line = line.map_err(StoreError::Io)?;
313 serde_json::from_str::<IndexRecord>(&line).map_err(StoreError::Json)
314 }))
315 as Box<
316 dyn Iterator<Item = Result<IndexRecord, StoreError>>,
317 >)
318 }
319
320 pub fn has_index_record(&self, meta_hash: &str, node_id: &str) -> Result<bool, StoreError> {
324 let meta_hash = Blake3Hex::parse(meta_hash)?;
325 let node_id = NodeIdHex::parse(node_id)?;
326 Ok(self.dedup_read()?.contains(&(meta_hash, node_id)))
327 }
328
329 pub fn has_meta_hash(&self, meta_hash: &str) -> Result<bool, StoreError> {
333 let meta_hash = Blake3Hex::parse(meta_hash)?;
334 Ok(self.meta_hash_read()?.contains(&meta_hash))
335 }
336
337 pub fn discovery_events_since(
345 &self,
346 since_seq: u64,
347 ) -> Result<Vec<(u64, IndexRecord)>, StoreError> {
348 let events = self.discovery_events_read()?;
349 let start = events.partition_point(|(seq, _)| *seq < since_seq);
350 Ok(events[start..].to_vec())
351 }
352
353 pub fn latest_local_publish(
355 &self,
356 igc_hash: &Blake3Hex,
357 node_id: &NodeIdHex,
358 ) -> Result<Option<IndexRecord>, StoreError> {
359 Ok(self
360 .latest_local_publish_read()?
361 .get(&(igc_hash.clone(), node_id.clone()))
362 .cloned())
363 }
364
365 pub fn load_key_bytes(&self) -> Result<Option<[u8; 32]>, StoreError> {
370 use std::io::Read;
371 let path = self.key_path();
372 if !path.exists() {
373 return Ok(None);
374 }
375 let mut bytes = [0u8; 32];
376 std::fs::File::open(&path)
377 .and_then(|mut f| f.read_exact(&mut bytes))
378 .map_err(StoreError::Io)?;
379 Ok(Some(bytes))
380 }
381
382 pub fn save_key_bytes(&self, bytes: &[u8; 32]) -> Result<(), StoreError> {
384 write_key_file(&self.key_path(), bytes)
385 }
386
387 fn dedup_read(&self) -> Result<RwLockReadGuard<'_, HashSet<DedupKey>>, StoreError> {
388 self.dedup_cache
389 .read()
390 .map_err(|_| StoreError::PoisonedLock("dedup_cache"))
391 }
392
393 fn dedup_write(&self) -> Result<RwLockWriteGuard<'_, HashSet<DedupKey>>, StoreError> {
394 self.dedup_cache
395 .write()
396 .map_err(|_| StoreError::PoisonedLock("dedup_cache"))
397 }
398
399 fn meta_hash_read(&self) -> Result<RwLockReadGuard<'_, HashSet<Blake3Hex>>, StoreError> {
400 self.meta_hash_cache
401 .read()
402 .map_err(|_| StoreError::PoisonedLock("meta_hash_cache"))
403 }
404
405 fn meta_hash_write(&self) -> Result<RwLockWriteGuard<'_, HashSet<Blake3Hex>>, StoreError> {
406 self.meta_hash_cache
407 .write()
408 .map_err(|_| StoreError::PoisonedLock("meta_hash_cache"))
409 }
410
411 fn latest_local_publish_read(
412 &self,
413 ) -> Result<RwLockReadGuard<'_, LatestLocalPublishMap>, StoreError> {
414 self.latest_local_publish_cache
415 .read()
416 .map_err(|_| StoreError::PoisonedLock("latest_local_publish_cache"))
417 }
418
419 fn latest_local_publish_write(
420 &self,
421 ) -> Result<RwLockWriteGuard<'_, LatestLocalPublishMap>, StoreError> {
422 self.latest_local_publish_cache
423 .write()
424 .map_err(|_| StoreError::PoisonedLock("latest_local_publish_cache"))
425 }
426
427 fn index_records_read(&self) -> Result<RwLockReadGuard<'_, Vec<IndexRecord>>, StoreError> {
428 self.index_records_cache
429 .read()
430 .map_err(|_| StoreError::PoisonedLock("index_records_cache"))
431 }
432
433 fn index_records_write(&self) -> Result<RwLockWriteGuard<'_, Vec<IndexRecord>>, StoreError> {
434 self.index_records_cache
435 .write()
436 .map_err(|_| StoreError::PoisonedLock("index_records_cache"))
437 }
438
439 fn discovery_events_read(
440 &self,
441 ) -> Result<RwLockReadGuard<'_, Vec<(u64, IndexRecord)>>, StoreError> {
442 self.discovery_events_cache
443 .read()
444 .map_err(|_| StoreError::PoisonedLock("discovery_events_cache"))
445 }
446
447 fn discovery_events_write(
448 &self,
449 ) -> Result<RwLockWriteGuard<'_, Vec<(u64, IndexRecord)>>, StoreError> {
450 self.discovery_events_cache
451 .write()
452 .map_err(|_| StoreError::PoisonedLock("discovery_events_cache"))
453 }
454}
455
456#[cfg(unix)]
459fn write_key_file(path: &Path, bytes: &[u8; 32]) -> Result<(), StoreError> {
460 use std::io::Write;
461 use std::os::unix::fs::OpenOptionsExt;
462
463 let mut file = std::fs::OpenOptions::new()
464 .create(true)
465 .truncate(true)
466 .write(true)
467 .mode(0o600)
468 .open(path)
469 .map_err(StoreError::Io)?;
470 file.write_all(bytes).map_err(StoreError::Io)?;
471 Ok(())
472}
473
474#[cfg(not(unix))]
475fn write_key_file(path: &Path, bytes: &[u8; 32]) -> Result<(), StoreError> {
476 use std::io::Write;
477 let mut file = std::fs::File::create(path).map_err(StoreError::Io)?;
478 file.write_all(bytes).map_err(StoreError::Io)?;
479 Ok(())
480}
481
482#[cfg(test)]
485mod tests {
486 use super::*;
487 use crate::id::{Blake3Hex, IdentifierError, NodeIdHex};
488
489 async fn temp_store() -> (FlatFileStore, tempfile::TempDir) {
490 let dir = tempfile::tempdir().unwrap();
491 let store = FlatFileStore::open(dir.path());
492 store.init().await.unwrap();
493 (store, dir)
494 }
495
496 fn hash(ch: char) -> Blake3Hex {
497 Blake3Hex::parse(ch.to_string().repeat(64)).unwrap()
498 }
499
500 fn node_id(ch: char) -> NodeIdHex {
501 NodeIdHex::parse(ch.to_string().repeat(64)).unwrap()
502 }
503
504 #[tokio::test]
505 async fn put_get_round_trip() {
506 let (store, _dir) = temp_store().await;
507 let data = b"hello igc-net";
508 let hex = store.put(data).await.unwrap();
509 assert_eq!(hex.len(), 64);
510 let got = store.get(&hex).await.unwrap().unwrap();
511 assert_eq!(got, data);
512 }
513
514 #[tokio::test]
515 async fn put_is_idempotent() {
516 let (store, _dir) = temp_store().await;
517 let data = b"same content";
518 let h1 = store.put(data).await.unwrap();
519 let h2 = store.put(data).await.unwrap();
520 assert_eq!(h1, h2);
521 }
522
523 #[tokio::test]
524 async fn contains_false_before_put_true_after() {
525 let (store, _dir) = temp_store().await;
526 let data = b"check contains";
527 let hex = Blake3Hex::from_hash(blake3::hash(data));
528 assert!(!store.contains(&hex).unwrap());
529 store.put(data).await.unwrap();
530 assert!(store.contains(&hex).unwrap());
531 }
532
533 #[tokio::test]
534 async fn get_missing_returns_none() {
535 let (store, _dir) = temp_store().await;
536 let hex = hash('a');
537 let result = store.get(&hex).await.unwrap();
538 assert!(result.is_none());
539 }
540
541 #[tokio::test]
542 async fn invalid_hash_is_rejected_by_lookup_apis() {
543 let (store, _dir) = temp_store().await;
544 assert!(matches!(
545 store.contains("bad-hash"),
546 Err(StoreError::Identifier(IdentifierError::Blake3Hex(_)))
547 ));
548 assert!(matches!(
549 store.resolve_path("bad-hash"),
550 Err(StoreError::Identifier(IdentifierError::Blake3Hex(_)))
551 ));
552 assert!(matches!(
553 store.get("bad-hash").await,
554 Err(StoreError::Identifier(IdentifierError::Blake3Hex(_)))
555 ));
556 }
557
558 #[tokio::test]
559 async fn index_round_trip() {
560 let (store, _dir) = temp_store().await;
561 let rec = IndexRecord {
562 source: IndexRecordSource::LocalPublish,
563 igc_hash: hash('a'),
564 meta_hash: hash('b'),
565 node_id: node_id('c'),
566 igc_ticket: "igc_ticket".to_string(),
567 meta_ticket: "meta_ticket".to_string(),
568 recorded_at: "2026-03-22T12:00:00Z".to_string(),
569 };
570 store.append_index(&rec).await.unwrap();
571 store.append_index(&rec).await.unwrap();
572
573 let records: Vec<_> = store.iter_index().unwrap().collect();
574 assert_eq!(records.len(), 2);
575 assert_eq!(records[0].as_ref().unwrap().igc_hash, hash('a'));
576 }
577
578 #[tokio::test]
579 async fn has_index_record_uses_meta_hash_and_node_id() {
580 let (store, _dir) = temp_store().await;
581 store
582 .append_index(&IndexRecord {
583 source: IndexRecordSource::RemoteAnnouncement,
584 igc_hash: hash('a'),
585 meta_hash: hash('b'),
586 node_id: node_id('c'),
587 igc_ticket: "igc_ticket_1".to_string(),
588 meta_ticket: "meta_ticket_1".to_string(),
589 recorded_at: "2026-03-22T12:00:00Z".to_string(),
590 })
591 .await
592 .unwrap();
593
594 assert!(
595 store
596 .has_index_record(&"b".repeat(64), &"c".repeat(64))
597 .unwrap()
598 );
599 assert!(
600 !store
601 .has_index_record(&"b".repeat(64), &"d".repeat(64))
602 .unwrap()
603 );
604 assert!(store.has_meta_hash(&"b".repeat(64)).unwrap());
605 }
606
607 #[tokio::test]
608 async fn latest_local_publish_returns_last_matching_record() {
609 let (store, _dir) = temp_store().await;
610 for recorded_at in ["2026-03-22T12:00:00Z", "2026-03-22T12:05:00Z"] {
611 store
612 .append_index(&IndexRecord {
613 source: IndexRecordSource::LocalPublish,
614 igc_hash: hash('a'),
615 meta_hash: hash('b'),
616 node_id: node_id('c'),
617 igc_ticket: format!("igc_ticket_{recorded_at}"),
618 meta_ticket: format!("meta_ticket_{recorded_at}"),
619 recorded_at: recorded_at.to_string(),
620 })
621 .await
622 .unwrap();
623 }
624
625 let latest = store
626 .latest_local_publish(&hash('a'), &node_id('c'))
627 .unwrap()
628 .unwrap();
629 assert_eq!(latest.recorded_at, "2026-03-22T12:05:00Z");
630 }
631
632 #[tokio::test]
633 async fn iter_index_on_empty_store_returns_empty() {
634 let (store, _dir) = temp_store().await;
635 let records: Vec<_> = store.iter_index().unwrap().collect();
636 assert!(records.is_empty());
637 }
638
639 #[tokio::test]
640 async fn key_persistence() {
641 let (store, _dir) = temp_store().await;
642 assert!(store.load_key_bytes().unwrap().is_none());
643
644 let key = [42u8; 32];
645 store.save_key_bytes(&key).unwrap();
646
647 let loaded = store.load_key_bytes().unwrap().unwrap();
648 assert_eq!(loaded, key);
649 }
650
651 #[cfg(unix)]
652 #[tokio::test]
653 async fn key_file_has_mode_0600() {
654 use std::os::unix::fs::PermissionsExt;
655 let (store, dir) = temp_store().await;
656 store.save_key_bytes(&[0u8; 32]).unwrap();
657 let meta = std::fs::metadata(dir.path().join("node.key")).unwrap();
658 let mode = meta.permissions().mode() & 0o777;
659 assert_eq!(mode, 0o600, "node.key must have mode 0600, got {mode:o}");
660 }
661}