1use crate::error::FaucetError;
17use async_trait::async_trait;
18use serde_json::Value;
19use std::collections::HashMap;
20use std::path::{Path, PathBuf};
21use tokio::io::AsyncWriteExt;
22use tokio::sync::Mutex;
23
24#[async_trait]
30pub trait StateStore: Send + Sync {
31 async fn get(&self, key: &str) -> Result<Option<Value>, FaucetError>;
33
34 async fn put(&self, key: &str, value: &Value) -> Result<(), FaucetError>;
39
40 async fn delete(&self, key: &str) -> Result<(), FaucetError>;
42
43 async fn check(
50 &self,
51 _ctx: &crate::check::CheckContext,
52 ) -> Result<crate::check::CheckReport, FaucetError> {
53 Ok(crate::check::CheckReport::not_implemented())
54 }
55}
56
57pub const DOCTOR_SENTINEL_KEY: &str = "faucet_doctor_probe";
60
61pub fn validate_state_key(key: &str) -> Result<(), FaucetError> {
65 if key.is_empty() {
66 return Err(FaucetError::State("state key must not be empty".into()));
67 }
68 if key.len() > 256 {
69 return Err(FaucetError::State(format!(
70 "state key '{key}' exceeds 256 characters"
71 )));
72 }
73 for (i, c) in key.char_indices() {
74 let ok = c.is_ascii_alphanumeric() || matches!(c, '_' | '-' | ':' | '.');
75 if !ok {
76 return Err(FaucetError::State(format!(
77 "state key '{key}' contains illegal character {c:?} at byte {i}"
78 )));
79 }
80 }
81 if key == "." || key == ".." || key.starts_with('.') {
82 return Err(FaucetError::State(format!(
83 "state key '{key}' must not begin with a dot"
84 )));
85 }
86 Ok(())
87}
88
89#[derive(Default)]
93pub struct MemoryStateStore {
94 inner: Mutex<HashMap<String, Value>>,
95}
96
97impl MemoryStateStore {
98 pub fn new() -> Self {
100 Self::default()
101 }
102}
103
104#[async_trait]
105impl StateStore for MemoryStateStore {
106 async fn get(&self, key: &str) -> Result<Option<Value>, FaucetError> {
107 validate_state_key(key)?;
108 Ok(self.inner.lock().await.get(key).cloned())
109 }
110
111 async fn put(&self, key: &str, value: &Value) -> Result<(), FaucetError> {
112 validate_state_key(key)?;
113 self.inner
114 .lock()
115 .await
116 .insert(key.to_owned(), value.clone());
117 Ok(())
118 }
119
120 async fn delete(&self, key: &str) -> Result<(), FaucetError> {
121 validate_state_key(key)?;
122 self.inner.lock().await.remove(key);
123 Ok(())
124 }
125
126 async fn check(
127 &self,
128 _ctx: &crate::check::CheckContext,
129 ) -> Result<crate::check::CheckReport, FaucetError> {
130 Ok(crate::check::CheckReport::single(
132 crate::check::Probe::pass("sentinel", std::time::Duration::ZERO),
133 ))
134 }
135}
136
137fn safe_filename(key: &str) -> String {
145 key.replace(':', "%3A")
146}
147
148pub struct FileStateStore {
163 root: PathBuf,
164 write_lock: Mutex<()>,
165}
166
167impl FileStateStore {
168 pub fn new(root: impl Into<PathBuf>) -> Self {
170 Self {
171 root: root.into(),
172 write_lock: Mutex::new(()),
173 }
174 }
175
176 fn entry_path(&self, key: &str) -> PathBuf {
177 self.root.join(format!("{}.json", safe_filename(key)))
178 }
179
180 fn temp_path(&self, key: &str) -> PathBuf {
181 use std::sync::atomic::{AtomicU64, Ordering};
191 static SEQ: AtomicU64 = AtomicU64::new(0);
192 let seq = SEQ.fetch_add(1, Ordering::Relaxed);
193 self.root.join(format!(
194 "{}.{}.{}.json.tmp",
195 safe_filename(key),
196 std::process::id(),
197 seq
198 ))
199 }
200
201 async fn ensure_root(&self) -> Result<(), FaucetError> {
202 tokio::fs::create_dir_all(&self.root).await.map_err(|e| {
203 FaucetError::State(format!(
204 "failed to create state dir {}: {e}",
205 self.root.display()
206 ))
207 })
208 }
209
210 pub fn root(&self) -> &Path {
212 &self.root
213 }
214}
215
216#[async_trait]
217impl StateStore for FileStateStore {
218 async fn get(&self, key: &str) -> Result<Option<Value>, FaucetError> {
219 validate_state_key(key)?;
220 let path = self.entry_path(key);
221 match tokio::fs::read(&path).await {
222 Ok(bytes) => {
223 let value: Value = serde_json::from_slice(&bytes).map_err(|e| {
224 FaucetError::State(format!(
225 "failed to parse state file {}: {e}",
226 path.display()
227 ))
228 })?;
229 Ok(Some(value))
230 }
231 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
232 Err(e) => Err(FaucetError::State(format!(
233 "failed to read state file {}: {e}",
234 path.display()
235 ))),
236 }
237 }
238
239 async fn put(&self, key: &str, value: &Value) -> Result<(), FaucetError> {
240 validate_state_key(key)?;
241 let _guard = self.write_lock.lock().await;
242 self.ensure_root().await?;
243 let bytes = serde_json::to_vec(value).map_err(|e| {
244 FaucetError::State(format!("failed to serialize state for key '{key}': {e}"))
245 })?;
246 let final_path = self.entry_path(key);
247 let tmp_path = self.temp_path(key);
248
249 {
255 let mut file = tokio::fs::File::create(&tmp_path).await.map_err(|e| {
256 FaucetError::State(format!(
257 "failed to create temp state file {}: {e}",
258 tmp_path.display()
259 ))
260 })?;
261 file.write_all(&bytes).await.map_err(|e| {
262 FaucetError::State(format!(
263 "failed to write temp state file {}: {e}",
264 tmp_path.display()
265 ))
266 })?;
267 file.sync_all().await.map_err(|e| {
268 FaucetError::State(format!(
269 "failed to fsync temp state file {}: {e}",
270 tmp_path.display()
271 ))
272 })?;
273 }
274
275 tokio::fs::rename(&tmp_path, &final_path)
276 .await
277 .map_err(|e| {
278 FaucetError::State(format!(
279 "failed to commit state file {}: {e}",
280 final_path.display()
281 ))
282 })?;
283
284 #[cfg(unix)]
290 {
291 let dir = tokio::fs::File::open(&self.root).await.map_err(|e| {
292 FaucetError::State(format!(
293 "failed to open state dir {} for fsync: {e}",
294 self.root.display()
295 ))
296 })?;
297 dir.sync_all().await.map_err(|e| {
298 FaucetError::State(format!(
299 "failed to fsync state dir {}: {e}",
300 self.root.display()
301 ))
302 })?;
303 }
304
305 tracing::debug!(
306 key,
307 path = %final_path.display(),
308 "state file written"
309 );
310 Ok(())
311 }
312
313 async fn delete(&self, key: &str) -> Result<(), FaucetError> {
314 validate_state_key(key)?;
315 let path = self.entry_path(key);
316 match tokio::fs::remove_file(&path).await {
317 Ok(()) => Ok(()),
318 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
319 Err(e) => Err(FaucetError::State(format!(
320 "failed to delete state file {}: {e}",
321 path.display()
322 ))),
323 }
324 }
325
326 async fn check(
327 &self,
328 _ctx: &crate::check::CheckContext,
329 ) -> Result<crate::check::CheckReport, FaucetError> {
330 use crate::check::{CheckReport, Probe};
331 let start = std::time::Instant::now();
335 let probe = match self.sentinel_roundtrip().await {
336 Ok(()) => Probe::pass("sentinel", start.elapsed()),
337 Err(e) => Probe::fail_hint(
338 "sentinel",
339 start.elapsed(),
340 e.to_string(),
341 format!("ensure {} exists and is writable", self.root.display()),
342 ),
343 };
344 Ok(CheckReport::single(probe))
345 }
346}
347
348impl FileStateStore {
349 async fn sentinel_roundtrip(&self) -> Result<(), FaucetError> {
352 let probe = serde_json::json!({ "faucet_doctor": true });
353 self.put(DOCTOR_SENTINEL_KEY, &probe).await?;
354 let got = self.get(DOCTOR_SENTINEL_KEY).await?;
355 let _ = self.delete(DOCTOR_SENTINEL_KEY).await;
357 match got {
358 Some(v) if v == probe => Ok(()),
359 _ => Err(FaucetError::State(
360 "sentinel readback did not match what was written".into(),
361 )),
362 }
363 }
364}
365
366#[cfg(test)]
367mod tests {
368 use super::*;
369 use serde_json::json;
370 use std::sync::Arc;
371 use tempfile::TempDir;
372
373 #[test]
376 fn rejects_empty_key() {
377 let err = validate_state_key("").unwrap_err();
378 assert!(matches!(err, FaucetError::State(_)));
379 }
380
381 #[test]
382 fn rejects_path_traversal_segments() {
383 for k in ["../etc/passwd", "a/b", "a\\b", "..", "."] {
384 assert!(validate_state_key(k).is_err(), "expected reject for {k:?}");
385 }
386 }
387
388 #[test]
389 fn rejects_leading_dot() {
390 assert!(validate_state_key(".hidden").is_err());
391 }
392
393 #[test]
394 fn rejects_over_long_key() {
395 let k = "a".repeat(257);
396 assert!(validate_state_key(&k).is_err());
397 }
398
399 #[test]
400 fn accepts_typical_keys() {
401 for k in [
402 "github_issues",
403 "pipeline:rest:issues",
404 "with.dot",
405 "with-dash_and_underscore",
406 "lower-Case_99",
407 ] {
408 validate_state_key(k).unwrap_or_else(|e| panic!("expected ok for {k:?}: {e}"));
409 }
410 }
411
412 #[tokio::test]
415 async fn memory_get_returns_none_for_missing_key() {
416 let s = MemoryStateStore::new();
417 assert!(s.get("nope").await.unwrap().is_none());
418 }
419
420 #[tokio::test]
421 async fn memory_put_then_get_round_trips() {
422 let s = MemoryStateStore::new();
423 s.put("k", &json!({"cursor": "abc", "n": 7})).await.unwrap();
424 let got = s.get("k").await.unwrap().unwrap();
425 assert_eq!(got["cursor"], "abc");
426 assert_eq!(got["n"], 7);
427 }
428
429 #[tokio::test]
430 async fn memory_put_overwrites_previous_value() {
431 let s = MemoryStateStore::new();
432 s.put("k", &json!(1)).await.unwrap();
433 s.put("k", &json!(2)).await.unwrap();
434 assert_eq!(s.get("k").await.unwrap().unwrap(), json!(2));
435 }
436
437 #[tokio::test]
438 async fn memory_delete_makes_get_return_none() {
439 let s = MemoryStateStore::new();
440 s.put("k", &json!("v")).await.unwrap();
441 s.delete("k").await.unwrap();
442 assert!(s.get("k").await.unwrap().is_none());
443 }
444
445 #[tokio::test]
446 async fn memory_delete_missing_key_is_ok() {
447 let s = MemoryStateStore::new();
448 s.delete("absent").await.unwrap();
449 }
450
451 #[tokio::test]
452 async fn memory_rejects_invalid_keys() {
453 let s = MemoryStateStore::new();
454 assert!(s.get("a/b").await.is_err());
455 assert!(s.put("a/b", &json!(1)).await.is_err());
456 assert!(s.delete("a/b").await.is_err());
457 }
458
459 #[tokio::test]
462 async fn file_get_returns_none_for_missing_key() {
463 let dir = TempDir::new().unwrap();
464 let s = FileStateStore::new(dir.path());
465 assert!(s.get("nope").await.unwrap().is_none());
466 }
467
468 #[tokio::test]
469 async fn file_put_creates_root_directory_lazily() {
470 let dir = TempDir::new().unwrap();
471 let root = dir.path().join("nested/state");
472 let s = FileStateStore::new(&root);
473 s.put("k", &json!("v")).await.unwrap();
474 assert!(root.is_dir(), "root dir should be created on first put");
475 }
476
477 #[tokio::test]
478 async fn file_put_then_get_round_trips() {
479 let dir = TempDir::new().unwrap();
480 let s = FileStateStore::new(dir.path());
481 let value = json!({"cursor": "abc", "n": 42, "nested": {"flag": true}});
482 s.put("github_issues", &value).await.unwrap();
483 let got = s.get("github_issues").await.unwrap().unwrap();
484 assert_eq!(got, value);
485 }
486
487 #[test]
488 fn safe_filename_percent_encodes_colon() {
489 assert_eq!(
490 safe_filename("pipeline:rest:issues"),
491 "pipeline%3Arest%3Aissues"
492 );
493 assert_eq!(safe_filename("plain_key-1.v2"), "plain_key-1.v2");
494 }
495
496 #[tokio::test]
497 async fn file_round_trips_colon_keys_with_safe_filename() {
498 let dir = TempDir::new().unwrap();
502 let s = FileStateStore::new(dir.path());
503 let value = json!({"cursor": "z"});
504 s.put("pipeline:rest:issues", &value).await.unwrap();
505 assert_eq!(s.get("pipeline:rest:issues").await.unwrap().unwrap(), value);
506 assert!(dir.path().join("pipeline%3Arest%3Aissues.json").exists());
508 let mut has_colon = false;
509 for entry in std::fs::read_dir(dir.path()).unwrap() {
510 if entry.unwrap().file_name().to_string_lossy().contains(':') {
511 has_colon = true;
512 }
513 }
514 assert!(!has_colon, "no state filename may contain ':'");
515 }
516
517 fn has_tmp_residue(dir: &std::path::Path) -> bool {
521 std::fs::read_dir(dir)
522 .unwrap()
523 .filter_map(|e| e.ok())
524 .any(|e| e.file_name().to_string_lossy().ends_with(".json.tmp"))
525 }
526
527 #[tokio::test]
528 async fn file_put_overwrites_previous_value_atomically() {
529 let dir = TempDir::new().unwrap();
530 let s = FileStateStore::new(dir.path());
531 s.put("k", &json!({"v": 1})).await.unwrap();
532 s.put("k", &json!({"v": 2})).await.unwrap();
533 assert_eq!(s.get("k").await.unwrap().unwrap(), json!({"v": 2}));
534 assert!(!has_tmp_residue(dir.path()), "no temp residue after put");
536 }
537
538 #[test]
539 fn file_temp_paths_are_unique_per_write() {
540 let dir = TempDir::new().unwrap();
545 let s = FileStateStore::new(dir.path());
546 let a = s.temp_path("k");
547 let b = s.temp_path("k");
548 assert_ne!(a, b, "each write must get a distinct temp path");
549 assert_eq!(s.entry_path("k"), s.entry_path("k"));
551 }
552
553 #[tokio::test]
554 async fn file_put_writes_complete_durable_file_with_no_temp_residue() {
555 let dir = TempDir::new().unwrap();
562 let s = FileStateStore::new(dir.path());
563 let big: Vec<Value> = (0..1_000)
564 .map(|i| json!({"i": i, "s": "x".repeat(20)}))
565 .collect();
566 let value = json!({"cursor": "abc", "rows": big});
567
568 s.put("github_issues", &value).await.unwrap();
569
570 let raw = tokio::fs::read(dir.path().join("github_issues.json"))
572 .await
573 .expect("state file must exist after put");
574 assert!(!raw.is_empty(), "state file must not be zero-length");
575 let parsed: Value = serde_json::from_slice(&raw).expect("state file must be valid JSON");
576 assert_eq!(parsed, value);
577
578 assert!(!has_tmp_residue(dir.path()), "no temp residue after put");
580 }
581
582 #[tokio::test]
583 async fn file_delete_removes_file() {
584 let dir = TempDir::new().unwrap();
585 let s = FileStateStore::new(dir.path());
586 s.put("k", &json!("v")).await.unwrap();
587 s.delete("k").await.unwrap();
588 assert!(s.get("k").await.unwrap().is_none());
589 assert!(!dir.path().join("k.json").exists());
590 }
591
592 #[tokio::test]
593 async fn file_delete_missing_key_is_ok() {
594 let dir = TempDir::new().unwrap();
595 let s = FileStateStore::new(dir.path());
596 s.delete("absent").await.unwrap();
597 }
598
599 #[tokio::test]
600 async fn file_get_returns_error_for_corrupt_json() {
601 let dir = TempDir::new().unwrap();
602 let s = FileStateStore::new(dir.path());
603 tokio::fs::create_dir_all(dir.path()).await.unwrap();
604 tokio::fs::write(dir.path().join("bad.json"), b"not json")
605 .await
606 .unwrap();
607 let err = s.get("bad").await.unwrap_err();
608 match err {
609 FaucetError::State(msg) => assert!(msg.contains("bad.json")),
610 other => panic!("expected State error, got {other:?}"),
611 }
612 }
613
614 #[tokio::test]
615 async fn file_concurrent_puts_do_not_corrupt_or_leak_temp() {
616 let dir = TempDir::new().unwrap();
617 let s = Arc::new(FileStateStore::new(dir.path()));
618 let mut handles = vec![];
619 for i in 0..50 {
620 let s = Arc::clone(&s);
621 handles.push(tokio::spawn(async move {
622 s.put("k", &json!({"i": i})).await.unwrap();
623 }));
624 }
625 for h in handles {
626 h.await.unwrap();
627 }
628 let got = s.get("k").await.unwrap().unwrap();
630 let i = got["i"].as_i64().unwrap();
631 assert!((0..50).contains(&i));
632 assert!(
634 !has_tmp_residue(dir.path()),
635 "no temp residue after concurrent puts"
636 );
637 }
638
639 #[tokio::test]
640 async fn file_store_works_through_trait_object() {
641 let dir = TempDir::new().unwrap();
642 let s: Box<dyn StateStore> = Box::new(FileStateStore::new(dir.path()));
643 s.put("k", &json!(1)).await.unwrap();
644 assert_eq!(s.get("k").await.unwrap().unwrap(), json!(1));
645 }
646
647 #[tokio::test]
650 async fn memory_check_passes() {
651 let s = MemoryStateStore::new();
652 let report = s
653 .check(&crate::check::CheckContext::default())
654 .await
655 .unwrap();
656 assert_eq!(report.failed_count(), 0);
657 assert!(
658 report
659 .probes
660 .iter()
661 .all(|p| matches!(p.status, crate::check::ProbeStatus::Pass))
662 );
663 }
664
665 #[tokio::test]
666 async fn file_check_passes_for_writable_root() {
667 let dir = TempDir::new().unwrap();
668 let s = FileStateStore::new(dir.path());
669 let report = s
670 .check(&crate::check::CheckContext::default())
671 .await
672 .unwrap();
673 assert_eq!(report.failed_count(), 0, "writable root should pass");
674 let leftovers: Vec<_> = std::fs::read_dir(dir.path()).unwrap().collect();
676 assert!(leftovers.is_empty(), "check() must not leave files behind");
677 }
678
679 #[tokio::test]
680 async fn file_check_fails_when_root_unusable() {
681 let dir = TempDir::new().unwrap();
683 let file = dir.path().join("not_a_dir");
684 std::fs::write(&file, b"x").unwrap();
685 let s = FileStateStore::new(file.join("state"));
686 let report = s
687 .check(&crate::check::CheckContext::default())
688 .await
689 .unwrap();
690 assert_eq!(report.failed_count(), 1, "unusable root should fail");
691 }
692}