1#![deny(missing_docs)]
2use async_trait::async_trait;
9use layer0::effect::Scope;
10use layer0::error::StateError;
11use layer0::state::{SearchResult, StateStore, StoreOptions};
12use std::path::{Path, PathBuf};
13use std::time::{SystemTime, UNIX_EPOCH};
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
17pub enum Format {
18 #[default]
20 Json,
21 Markdown,
28}
29
30impl Format {
31 fn ext(self) -> &'static str {
33 match self {
34 Format::Json => "json",
35 Format::Markdown => "md",
36 }
37 }
38}
39
40pub struct FsStore {
53 root: PathBuf,
54 format: Format,
55}
56
57impl FsStore {
58 pub fn new(root: &Path) -> Self {
62 Self {
63 root: root.to_path_buf(),
64 format: Format::default(),
65 }
66 }
67
68 pub fn with_format(root: &Path, format: Format) -> Self {
72 Self {
73 root: root.to_path_buf(),
74 format,
75 }
76 }
77}
78
79fn scope_dir_name(scope: &Scope) -> String {
81 let json = serde_json::to_string(scope).unwrap_or_else(|_| "unknown".into());
84 let mut hash: u64 = 5381;
86 for byte in json.as_bytes() {
87 hash = hash.wrapping_mul(33).wrapping_add(*byte as u64);
88 }
89 format!("scope-{hash:016x}")
90}
91
92fn key_to_filename(key: &str) -> String {
96 let mut encoded = String::new();
97 for ch in key.chars() {
98 match ch {
99 'a'..='z' | 'A'..='Z' | '0'..='9' | '-' | '_' | '.' => encoded.push(ch),
100 _ => {
101 for byte in ch.to_string().as_bytes() {
102 encoded.push_str(&format!("%{byte:02X}"));
103 }
104 }
105 }
106 }
107 encoded
108}
109
110fn filename_to_key(filename: &str, ext: &str) -> Option<String> {
114 let suffix = format!(".{ext}");
115 let name = filename.strip_suffix(&suffix)?;
116 let mut result = Vec::new();
117 let bytes = name.as_bytes();
118 let mut i = 0;
119 while i < bytes.len() {
120 if bytes[i] == b'%' && i + 2 < bytes.len() {
121 let hex = std::str::from_utf8(&bytes[i + 1..i + 3]).ok()?;
122 let byte = u8::from_str_radix(hex, 16).ok()?;
123 result.push(byte);
124 i += 3;
125 } else {
126 result.push(bytes[i]);
127 i += 1;
128 }
129 }
130 String::from_utf8(result).ok()
131}
132
133fn is_expired(meta_path: &Path) -> bool {
135 let Ok(data) = std::fs::read(meta_path) else {
136 return false;
137 };
138 let Ok(val) = serde_json::from_slice::<serde_json::Value>(&data) else {
139 return false;
140 };
141 let Some(expires_at) = val.get("expires_at").and_then(|v| v.as_u64()) else {
142 return false;
143 };
144 let now = SystemTime::now()
145 .duration_since(UNIX_EPOCH)
146 .map(|d| d.as_millis() as u64)
147 .unwrap_or(0);
148 now >= expires_at
149}
150
151async fn read_json(path: &Path) -> Result<Option<serde_json::Value>, StateError> {
153 match tokio::fs::read_to_string(path).await {
154 Ok(contents) => {
155 let value: serde_json::Value = serde_json::from_str(&contents)
156 .map_err(|e| StateError::Serialization(e.to_string()))?;
157 Ok(Some(value))
158 }
159 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
160 Err(e) => Err(StateError::WriteFailed(e.to_string())),
161 }
162}
163
164async fn read_md(path: &Path) -> Result<Option<serde_json::Value>, StateError> {
171 match tokio::fs::read_to_string(path).await {
172 Ok(contents) => {
173 if let Ok(value) = serde_json::from_str::<serde_json::Value>(&contents) {
175 return Ok(Some(value));
176 }
177 if let Some(inner) = strip_json_fence(&contents)
179 && let Ok(value) = serde_json::from_str::<serde_json::Value>(inner)
180 {
181 return Ok(Some(value));
182 }
183 Ok(Some(serde_json::Value::String(contents)))
185 }
186 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
187 Err(e) => Err(StateError::WriteFailed(e.to_string())),
188 }
189}
190
191fn strip_json_fence(s: &str) -> Option<&str> {
193 let inner = s.strip_prefix("```json\n")?;
194 inner.strip_suffix("\n```")
195}
196
197fn value_to_markdown(value: &serde_json::Value) -> String {
202 match value {
203 serde_json::Value::String(s) => s.clone(),
204 other => {
205 let pretty = serde_json::to_string_pretty(other).unwrap_or_default();
206 format!("```json\n{pretty}\n```")
207 }
208 }
209}
210
211fn count_occurrences(haystack: &str, needle: &str) -> usize {
213 if needle.is_empty() {
214 return 0;
215 }
216 let mut count = 0;
217 let mut start = 0;
218 while let Some(pos) = haystack[start..].find(needle) {
219 count += 1;
220 start += pos + needle.len();
221 }
222 count
223}
224
225#[async_trait]
226impl StateStore for FsStore {
227 async fn read(
228 &self,
229 scope: &Scope,
230 key: &str,
231 ) -> Result<Option<serde_json::Value>, StateError> {
232 let scope_path = self.root.join(scope_dir_name(scope));
233 let stem = key_to_filename(key);
234 let ext = self.format.ext();
235 let data_path = scope_path.join(format!("{stem}.{ext}"));
236 let meta_path = scope_path.join(format!("{stem}_meta.json"));
237
238 if meta_path.exists() && is_expired(&meta_path) {
240 let _ = std::fs::remove_file(&data_path);
241 let _ = std::fs::remove_file(&meta_path);
242 return Ok(None);
243 }
244
245 match self.format {
246 Format::Json => read_json(&data_path).await,
247 Format::Markdown => read_md(&data_path).await,
248 }
249 }
250
251 async fn write(
252 &self,
253 scope: &Scope,
254 key: &str,
255 value: serde_json::Value,
256 ) -> Result<(), StateError> {
257 let dir = self.root.join(scope_dir_name(scope));
258 tokio::fs::create_dir_all(&dir)
259 .await
260 .map_err(|e| StateError::WriteFailed(e.to_string()))?;
261
262 let stem = key_to_filename(key);
263 let ext = self.format.ext();
264 let path = dir.join(format!("{stem}.{ext}"));
265
266 let contents = match self.format {
267 Format::Json => serde_json::to_string_pretty(&value)
268 .map_err(|e| StateError::Serialization(e.to_string()))?,
269 Format::Markdown => value_to_markdown(&value),
270 };
271
272 tokio::fs::write(&path, contents)
273 .await
274 .map_err(|e| StateError::WriteFailed(e.to_string()))?;
275 Ok(())
276 }
277
278 async fn delete(&self, scope: &Scope, key: &str) -> Result<(), StateError> {
279 let dir = self.root.join(scope_dir_name(scope));
280 let stem = key_to_filename(key);
281 let ext = self.format.ext();
282 let path = dir.join(format!("{stem}.{ext}"));
283 match tokio::fs::remove_file(&path).await {
284 Ok(()) => Ok(()),
285 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
286 Err(e) => Err(StateError::WriteFailed(e.to_string())),
287 }
288 }
289
290 async fn list(&self, scope: &Scope, prefix: &str) -> Result<Vec<String>, StateError> {
291 let dir = self.root.join(scope_dir_name(scope));
292 let ext = self.format.ext();
293 let data_suffix = format!(".{ext}");
294
295 let mut entries = match tokio::fs::read_dir(&dir).await {
296 Ok(entries) => entries,
297 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(vec![]),
298 Err(e) => return Err(StateError::WriteFailed(e.to_string())),
299 };
300
301 let mut keys = Vec::new();
302 while let Some(entry) = entries
303 .next_entry()
304 .await
305 .map_err(|e| StateError::WriteFailed(e.to_string()))?
306 {
307 if let Some(filename) = entry.file_name().to_str()
308 && !filename.ends_with("_meta.json")
310 && filename.ends_with(&data_suffix)
311 && let Some(key) = filename_to_key(filename, ext)
312 && key.starts_with(prefix)
313 {
314 let stem = filename.strip_suffix(&data_suffix).unwrap_or(filename);
316 let meta_path = dir.join(format!("{stem}_meta.json"));
317 if meta_path.exists() && is_expired(&meta_path) {
318 continue;
319 }
320 keys.push(key);
321 }
322 }
323 Ok(keys)
324 }
325
326 async fn search(
327 &self,
328 scope: &Scope,
329 query: &str,
330 limit: usize,
331 ) -> Result<Vec<SearchResult>, StateError> {
332 if query.is_empty() {
333 return Ok(vec![]);
334 }
335
336 let keys = self.list(scope, "").await?;
337 let query_lower = query.to_lowercase();
338
339 #[cfg(feature = "regex")]
342 let compiled_regex = regex::Regex::new(query).ok();
343
344 let mut results = Vec::new();
345
346 for key in keys {
347 let Some(value) = self.read(scope, &key).await? else {
348 continue;
349 };
350
351 let text = match &value {
352 serde_json::Value::String(s) => s.clone(),
353 other => serde_json::to_string(other).unwrap_or_default(),
354 };
355
356 let match_count = {
357 #[cfg(feature = "regex")]
358 {
359 if let Some(ref re) = compiled_regex {
360 re.find_iter(&text).count()
361 } else {
362 count_occurrences(&text.to_lowercase(), &query_lower)
363 }
364 }
365 #[cfg(not(feature = "regex"))]
366 {
367 count_occurrences(&text.to_lowercase(), &query_lower)
368 }
369 };
370
371 if match_count > 0 {
372 let score = if text.is_empty() {
373 0.0
374 } else {
375 match_count as f64 / text.len() as f64
376 };
377 let mut result = SearchResult::new(key, score);
378 result.snippet = Some(text.chars().take(200).collect());
379 results.push(result);
380 }
381 }
382
383 results.sort_by(|a, b| {
384 b.score
385 .partial_cmp(&a.score)
386 .unwrap_or(std::cmp::Ordering::Equal)
387 });
388 results.truncate(limit);
389 Ok(results)
390 }
391
392 async fn write_hinted(
393 &self,
394 scope: &Scope,
395 key: &str,
396 value: serde_json::Value,
397 options: &StoreOptions,
398 ) -> Result<(), StateError> {
399 self.write(scope, key, value).await?;
401
402 if let Some(ttl) = options.ttl {
405 let dir = self.root.join(scope_dir_name(scope));
406 let stem = key_to_filename(key);
407 let meta_path = dir.join(format!("{stem}_meta.json"));
408
409 let now_ms = SystemTime::now()
410 .duration_since(UNIX_EPOCH)
411 .map(|d| d.as_millis() as u64)
412 .unwrap_or(0);
413 let expires_at = now_ms.saturating_add(ttl.as_millis());
414
415 let meta = serde_json::json!({ "expires_at": expires_at });
416 let contents = serde_json::to_string(&meta)
417 .map_err(|e| StateError::Serialization(e.to_string()))?;
418 tokio::fs::write(&meta_path, contents)
419 .await
420 .map_err(|e| StateError::WriteFailed(e.to_string()))?;
421 }
422
423 Ok(())
424 }
425}
426
427#[cfg(test)]
428mod tests {
429 use super::*;
430 use serde_json::json;
431
432 #[test]
433 fn key_encoding_roundtrip() {
434 let keys = [
435 "simple",
436 "user:name",
437 "path/to/key",
438 "has spaces",
439 "emoji🎉",
440 ];
441 for key in &keys {
442 let stem = key_to_filename(key);
443 let filename = format!("{stem}.json");
444 let decoded = filename_to_key(&filename, "json").unwrap();
445 assert_eq!(*key, decoded, "roundtrip failed for {key}");
446 }
447 }
448
449 #[test]
450 fn scope_dir_name_is_deterministic() {
451 let scope = Scope::Global;
452 let dir1 = scope_dir_name(&scope);
453 let dir2 = scope_dir_name(&scope);
454 assert_eq!(dir1, dir2);
455 }
456
457 #[test]
458 fn different_scopes_get_different_dirs() {
459 let global = scope_dir_name(&Scope::Global);
460 let session = scope_dir_name(&Scope::Session(layer0::SessionId::new("s1")));
461 assert_ne!(global, session);
462 }
463
464 #[test]
465 fn key_to_filename_returns_stem_without_extension() {
466 let stem = key_to_filename("test");
467 assert!(
468 !stem.ends_with(".json"),
469 "key_to_filename should return a stem without the .json extension"
470 );
471 }
472
473 #[test]
474 fn filename_to_key_rejects_non_json() {
475 let result = filename_to_key("test.txt", "json");
476 assert!(result.is_none());
477 }
478
479 #[tokio::test]
480 async fn write_and_read_roundtrip() {
481 let dir = tempfile::tempdir().unwrap();
482 let store = FsStore::new(dir.path());
483 let scope = Scope::Global;
484
485 store.write(&scope, "key1", json!("hello")).await.unwrap();
486 let val = store.read(&scope, "key1").await.unwrap();
487 assert_eq!(val, Some(json!("hello")));
488 }
489
490 #[tokio::test]
491 async fn read_nonexistent_returns_none() {
492 let dir = tempfile::tempdir().unwrap();
493 let store = FsStore::new(dir.path());
494 let scope = Scope::Global;
495
496 let val = store.read(&scope, "missing").await.unwrap();
497 assert_eq!(val, None);
498 }
499
500 #[tokio::test]
501 async fn delete_removes_file() {
502 let dir = tempfile::tempdir().unwrap();
503 let store = FsStore::new(dir.path());
504 let scope = Scope::Global;
505
506 store.write(&scope, "key1", json!("hello")).await.unwrap();
507 store.delete(&scope, "key1").await.unwrap();
508 let val = store.read(&scope, "key1").await.unwrap();
509 assert_eq!(val, None);
510 }
511
512 #[tokio::test]
513 async fn delete_nonexistent_is_ok() {
514 let dir = tempfile::tempdir().unwrap();
515 let store = FsStore::new(dir.path());
516 let scope = Scope::Global;
517
518 let result = store.delete(&scope, "missing").await;
519 assert!(result.is_ok());
520 }
521
522 #[tokio::test]
523 async fn list_keys_with_prefix() {
524 let dir = tempfile::tempdir().unwrap();
525 let store = FsStore::new(dir.path());
526 let scope = Scope::Global;
527
528 store
529 .write(&scope, "user:name", json!("Alice"))
530 .await
531 .unwrap();
532 store.write(&scope, "user:age", json!(30)).await.unwrap();
533 store
534 .write(&scope, "system:version", json!("1.0"))
535 .await
536 .unwrap();
537
538 let mut keys = store.list(&scope, "user:").await.unwrap();
539 keys.sort();
540 assert_eq!(keys, vec!["user:age", "user:name"]);
541 }
542
543 #[tokio::test]
544 async fn list_nonexistent_dir_returns_empty() {
545 let dir = tempfile::tempdir().unwrap();
546 let store = FsStore::new(dir.path());
547 let scope = Scope::Global;
548
549 let keys = store.list(&scope, "").await.unwrap();
550 assert!(keys.is_empty());
551 }
552
553 #[tokio::test]
554 async fn scopes_are_isolated() {
555 let dir = tempfile::tempdir().unwrap();
556 let store = FsStore::new(dir.path());
557 let global = Scope::Global;
558 let session = Scope::Session(layer0::SessionId::new("s1"));
559
560 store
561 .write(&global, "key", json!("global_val"))
562 .await
563 .unwrap();
564 store
565 .write(&session, "key", json!("session_val"))
566 .await
567 .unwrap();
568
569 let global_val = store.read(&global, "key").await.unwrap();
570 let session_val = store.read(&session, "key").await.unwrap();
571
572 assert_eq!(global_val, Some(json!("global_val")));
573 assert_eq!(session_val, Some(json!("session_val")));
574 }
575
576 #[tokio::test]
577 async fn search_returns_empty() {
578 let dir = tempfile::tempdir().unwrap();
579 let store = FsStore::new(dir.path());
580 let scope = Scope::Global;
581
582 let results = store.search(&scope, "query", 10).await.unwrap();
583 assert!(results.is_empty());
584 }
585
586 #[test]
587 fn fs_store_implements_state_store() {
588 fn _assert_state_store<T: StateStore>() {}
589 _assert_state_store::<FsStore>();
590 }
591
592 #[tokio::test]
593 async fn test_fsstore_ttl_expiration() {
594 use layer0::DurationMs;
595 use std::time::Duration;
596
597 let dir = tempfile::tempdir().unwrap();
598 let store = FsStore::new(dir.path());
599 let scope = Scope::Global;
600
601 let opts = StoreOptions {
602 ttl: Some(DurationMs::from_millis(1)),
603 ..Default::default()
604 };
605 store
606 .write_hinted(&scope, "expiring", serde_json::json!("value"), &opts)
607 .await
608 .unwrap();
609
610 tokio::time::sleep(Duration::from_millis(10)).await;
612
613 let result = store.read(&scope, "expiring").await.unwrap();
615 assert!(result.is_none(), "expired entry should return None");
616
617 let keys = store.list(&scope, "").await.unwrap();
619 assert!(
620 !keys.contains(&"expiring".to_string()),
621 "expired entry must not appear in list"
622 );
623 }
624
625 #[tokio::test]
626 async fn test_fsstore_no_ttl_reads_normally() {
627 let dir = tempfile::tempdir().unwrap();
628 let store = FsStore::new(dir.path());
629 let scope = Scope::Global;
630
631 let opts = StoreOptions::default();
633 store
634 .write_hinted(&scope, "durable", serde_json::json!("keep"), &opts)
635 .await
636 .unwrap();
637
638 let result = store.read(&scope, "durable").await.unwrap();
640 assert_eq!(result, Some(serde_json::json!("keep")));
641
642 let keys = store.list(&scope, "").await.unwrap();
644 assert!(keys.contains(&"durable".to_string()));
645 }
646
647 #[tokio::test]
648 async fn test_fsstore_durable_and_expiring_coexist() {
649 use layer0::DurationMs;
650 use std::time::Duration;
651
652 let dir = tempfile::tempdir().unwrap();
653 let store = FsStore::new(dir.path());
654 let scope = Scope::Global;
655
656 store
658 .write(&scope, "durable_a", serde_json::json!("stays"))
659 .await
660 .unwrap();
661
662 let opts = StoreOptions {
664 ttl: Some(DurationMs::from_millis(1)),
665 ..Default::default()
666 };
667 store
668 .write_hinted(&scope, "expiring_b", serde_json::json!("gone"), &opts)
669 .await
670 .unwrap();
671
672 tokio::time::sleep(Duration::from_millis(10)).await;
674
675 let expired = store.read(&scope, "expiring_b").await.unwrap();
677 assert!(expired.is_none(), "expiring_b should have expired");
678
679 let keys = store.list(&scope, "").await.unwrap();
681 assert!(
682 keys.contains(&"durable_a".to_string()),
683 "durable_a should still be listed"
684 );
685 assert!(
686 !keys.contains(&"expiring_b".to_string()),
687 "expiring_b must not appear after expiry"
688 );
689 }
690
691 #[tokio::test]
694 async fn markdown_write_and_read_string() {
695 let dir = tempfile::tempdir().unwrap();
696 let store = FsStore::with_format(dir.path(), Format::Markdown);
697 let scope = Scope::Global;
698
699 store
700 .write(&scope, "note", json!("Hello, world!"))
701 .await
702 .unwrap();
703
704 let stem = key_to_filename("note");
706 let raw = std::fs::read_to_string(
707 dir.path()
708 .join(scope_dir_name(&scope))
709 .join(format!("{stem}.md")),
710 )
711 .unwrap();
712 assert_eq!(raw, "Hello, world!");
713
714 let val = store.read(&scope, "note").await.unwrap();
716 assert_eq!(val, Some(json!("Hello, world!")));
717 }
718
719 #[tokio::test]
720 async fn markdown_write_and_read_object() {
721 let dir = tempfile::tempdir().unwrap();
722 let store = FsStore::with_format(dir.path(), Format::Markdown);
723 let scope = Scope::Global;
724
725 let obj = json!({"name": "Alice", "age": 30});
726 store.write(&scope, "profile", obj.clone()).await.unwrap();
727
728 let stem = key_to_filename("profile");
730 let raw = std::fs::read_to_string(
731 dir.path()
732 .join(scope_dir_name(&scope))
733 .join(format!("{stem}.md")),
734 )
735 .unwrap();
736 assert!(raw.starts_with("```json\n"), "expected fenced code block");
737 assert!(raw.ends_with("\n```"), "expected closing fence");
738
739 let val = store.read(&scope, "profile").await.unwrap();
741 assert_eq!(val, Some(obj));
742 }
743
744 #[tokio::test]
745 async fn markdown_list_keys() {
746 let dir = tempfile::tempdir().unwrap();
747 let store = FsStore::with_format(dir.path(), Format::Markdown);
748 let scope = Scope::Global;
749
750 store.write(&scope, "alpha", json!("a")).await.unwrap();
751 store.write(&scope, "beta", json!("b")).await.unwrap();
752
753 let mut keys = store.list(&scope, "").await.unwrap();
754 keys.sort();
755 assert_eq!(keys, vec!["alpha", "beta"]);
756 }
757
758 #[tokio::test]
761 async fn search_finds_substring() {
762 let dir = tempfile::tempdir().unwrap();
763 let store = FsStore::new(dir.path());
764 let scope = Scope::Global;
765
766 store
767 .write(&scope, "a", json!("the quick brown fox"))
768 .await
769 .unwrap();
770 store
771 .write(&scope, "b", json!("jumped over the lazy dog"))
772 .await
773 .unwrap();
774 store
775 .write(&scope, "c", json!("completely unrelated content"))
776 .await
777 .unwrap();
778
779 let results = store.search(&scope, "fox", 10).await.unwrap();
780 assert_eq!(results.len(), 1);
781 assert_eq!(results[0].key, "a");
782 }
783
784 #[tokio::test]
785 async fn search_case_insensitive() {
786 let dir = tempfile::tempdir().unwrap();
787 let store = FsStore::new(dir.path());
788 let scope = Scope::Global;
789
790 store
791 .write(&scope, "k1", json!("Hello World"))
792 .await
793 .unwrap();
794
795 let results = store.search(&scope, "hello", 10).await.unwrap();
796 assert_eq!(results.len(), 1, "case-insensitive match should find Hello");
797
798 let results2 = store.search(&scope, "WORLD", 10).await.unwrap();
799 assert_eq!(
800 results2.len(),
801 1,
802 "case-insensitive match should find World"
803 );
804 }
805
806 #[tokio::test]
807 async fn search_respects_limit() {
808 let dir = tempfile::tempdir().unwrap();
809 let store = FsStore::new(dir.path());
810 let scope = Scope::Global;
811
812 for i in 0..10 {
813 store
814 .write(
815 &scope,
816 &format!("key{i}"),
817 json!(format!("needle content {i}")),
818 )
819 .await
820 .unwrap();
821 }
822
823 let results = store.search(&scope, "needle", 3).await.unwrap();
824 assert_eq!(results.len(), 3, "limit of 3 must be respected");
825 }
826
827 #[tokio::test]
828 async fn search_returns_empty_for_no_match() {
829 let dir = tempfile::tempdir().unwrap();
830 let store = FsStore::new(dir.path());
831 let scope = Scope::Global;
832
833 store
834 .write(&scope, "k1", json!("some content here"))
835 .await
836 .unwrap();
837
838 let results = store.search(&scope, "xyzzy_nonexistent", 10).await.unwrap();
839 assert!(results.is_empty());
840 }
841}