1#![allow(dead_code)]
22
23use std::collections::HashMap;
24use std::fs::File;
25use std::io::Write;
26use std::path::{Path, PathBuf};
27use std::sync::Arc;
28use std::time::{SystemTime, UNIX_EPOCH};
29
30use indexmap::IndexMap;
31use parking_lot::{Mutex, RwLock};
32use rkyv::{Archive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize};
33
34use crate::error::{StrykeError, StrykeResult};
35use crate::value::StrykeValue;
36
37pub const KV_MAGIC: u32 = 0x53544b56; pub const KV_FORMAT_VERSION: u32 = 1;
41
42#[derive(Archive, RkyvDeserialize, RkyvSerialize, Debug, Clone)]
54#[archive(check_bytes)]
55#[archive(bound(serialize = "__S: rkyv::ser::Serializer + rkyv::ser::ScratchSpace",))]
56#[archive_attr(check_bytes(
57 bound = "__C: rkyv::validation::ArchiveContext, <__C as rkyv::Fallible>::Error: std::error::Error"
58))]
59pub enum WireValue {
61 Undef,
63 Bool(bool),
65 Int(i64),
67 Float(f64),
69 Str(String),
71 Bytes(Vec<u8>),
73 Array(
74 #[omit_bounds]
75 #[archive_attr(omit_bounds)]
76 Vec<WireValue>,
77 ),
78 Hash(
79 #[omit_bounds]
80 #[archive_attr(omit_bounds)]
81 Vec<(String, WireValue)>,
82 ),
83}
84
85impl WireValue {
86 pub fn from_stryke(v: &StrykeValue) -> Self {
89 if v.is_undef() {
90 return WireValue::Undef;
91 }
92 if let Some(n) = v.as_integer() {
93 return WireValue::Int(n);
94 }
95 if let Some(f) = v.as_float() {
96 return WireValue::Float(f);
97 }
98 if let Some(b) = v.as_bytes_arc() {
99 return WireValue::Bytes((*b).clone());
100 }
101 if let Some(s) = v.as_str() {
102 return WireValue::Str(s.to_string());
103 }
104 if let Some(ar) = v.as_array_ref() {
106 let g = ar.read();
107 return WireValue::Array(g.iter().map(WireValue::from_stryke).collect());
108 }
109 if let Some(hr) = v.as_hash_ref() {
111 let g = hr.read();
112 let mut entries: Vec<(String, WireValue)> = g
113 .iter()
114 .map(|(k, val)| (k.clone(), WireValue::from_stryke(val)))
115 .collect();
116 entries.sort_by(|a, b| a.0.cmp(&b.0));
117 return WireValue::Hash(entries);
118 }
119 if let Some(arr) = v.as_array_vec() {
120 return WireValue::Array(arr.iter().map(WireValue::from_stryke).collect());
121 }
122 if let Some(h) = v.as_hash_map() {
123 let mut entries: Vec<(String, WireValue)> = h
124 .iter()
125 .map(|(k, val)| (k.clone(), WireValue::from_stryke(val)))
126 .collect();
127 entries.sort_by(|a, b| a.0.cmp(&b.0));
128 return WireValue::Hash(entries);
129 }
130 WireValue::Str(v.to_string())
132 }
133
134 pub fn into_stryke(self) -> StrykeValue {
136 match self {
137 WireValue::Undef => StrykeValue::UNDEF,
138 WireValue::Bool(b) => StrykeValue::integer(if b { 1 } else { 0 }),
139 WireValue::Int(n) => StrykeValue::integer(n),
140 WireValue::Float(f) => StrykeValue::float(f),
141 WireValue::Str(s) => StrykeValue::string(s),
142 WireValue::Bytes(b) => StrykeValue::bytes(Arc::new(b)),
143 WireValue::Array(items) => {
144 let v: Vec<StrykeValue> = items.into_iter().map(|x| x.into_stryke()).collect();
147 StrykeValue::array_ref(Arc::new(RwLock::new(v)))
148 }
149 WireValue::Hash(pairs) => {
150 let mut m: IndexMap<String, StrykeValue> = IndexMap::with_capacity(pairs.len());
153 for (k, v) in pairs {
154 m.insert(k, v.into_stryke());
155 }
156 StrykeValue::hash_ref(Arc::new(RwLock::new(m)))
157 }
158 }
159 }
160}
161
162#[derive(Archive, RkyvDeserialize, RkyvSerialize, Debug, Clone)]
165#[archive(check_bytes)]
166pub struct KvHeader {
167 pub magic: u32,
169 pub format_version: u32,
171 pub stryke_version: String,
173 pub created_at_secs: u64,
175 pub last_commit_secs: u64,
177 pub commit_count: u64,
179}
180
181impl Default for KvHeader {
182 fn default() -> Self {
183 Self {
184 magic: KV_MAGIC,
185 format_version: KV_FORMAT_VERSION,
186 stryke_version: env!("CARGO_PKG_VERSION").to_string(),
187 created_at_secs: now_secs(),
188 last_commit_secs: 0,
189 commit_count: 0,
190 }
191 }
192}
193#[derive(Archive, RkyvDeserialize, RkyvSerialize, Debug, Clone, Default)]
195#[archive(check_bytes)]
196pub struct KvRoot {
197 pub header: KvHeader,
199 pub entries: HashMap<String, WireValue>,
201}
202
203#[derive(Debug)]
210pub struct KvStore {
211 pub path: PathBuf,
213 pub root: KvRoot,
215 pub dirty: bool,
217}
218
219impl KvStore {
220 pub fn open(path: impl Into<PathBuf>) -> StrykeResult<Self> {
225 let path = path.into();
226 if !path.exists() {
227 return Ok(Self {
228 path,
229 root: KvRoot::default(),
230 dirty: false,
231 });
232 }
233 let bytes = std::fs::read(&path).map_err(|e| {
234 StrykeError::runtime(format!("kv_open: read {}: {}", path.display(), e), 0)
235 })?;
236 let archived = rkyv::check_archived_root::<KvRoot>(&bytes[..]).map_err(|e| {
237 StrykeError::runtime(
238 format!(
239 "kv_open: corrupt or wrong-format file {}: {}",
240 path.display(),
241 e
242 ),
243 0,
244 )
245 })?;
246 if archived.header.magic != KV_MAGIC {
247 return Err(StrykeError::runtime(
248 format!("kv_open: bad magic in {}", path.display()),
249 0,
250 ));
251 }
252 if archived.header.format_version != KV_FORMAT_VERSION {
253 return Err(StrykeError::runtime(
254 format!(
255 "kv_open: format version {} (expected {})",
256 archived.header.format_version, KV_FORMAT_VERSION
257 ),
258 0,
259 ));
260 }
261 let root: KvRoot = archived
262 .deserialize(&mut rkyv::Infallible)
263 .map_err(|_| StrykeError::runtime("kv_open: deserialize failed", 0))?;
264 Ok(Self {
265 path,
266 root,
267 dirty: false,
268 })
269 }
270 pub fn put(&mut self, key: String, value: WireValue) {
272 self.root.entries.insert(key, value);
273 self.dirty = true;
274 }
275 pub fn get(&self, key: &str) -> Option<&WireValue> {
277 self.root.entries.get(key)
278 }
279 pub fn del(&mut self, key: &str) -> bool {
281 let existed = self.root.entries.remove(key).is_some();
282 if existed {
283 self.dirty = true;
284 }
285 existed
286 }
287 pub fn exists(&self, key: &str) -> bool {
289 self.root.entries.contains_key(key)
290 }
291 pub fn len(&self) -> usize {
293 self.root.entries.len()
294 }
295 pub fn is_empty(&self) -> bool {
297 self.root.entries.is_empty()
298 }
299
300 pub fn keys(&self, prefix: Option<&str>) -> Vec<String> {
302 let mut ks: Vec<String> = match prefix {
303 Some(p) => self
304 .root
305 .entries
306 .keys()
307 .filter(|k| k.starts_with(p))
308 .cloned()
309 .collect(),
310 None => self.root.entries.keys().cloned().collect(),
311 };
312 ks.sort_unstable();
313 ks
314 }
315
316 pub fn commit(&mut self) -> StrykeResult<()> {
320 if !self.dirty {
321 return Ok(());
322 }
323 self.root.header.last_commit_secs = now_secs();
324 self.root.header.commit_count = self.root.header.commit_count.saturating_add(1);
325 let bytes = rkyv::to_bytes::<_, 4096>(&self.root)
326 .map_err(|e| StrykeError::runtime(format!("kv_commit: rkyv: {}", e), 0))?;
327
328 let parent = self
329 .path
330 .parent()
331 .ok_or_else(|| StrykeError::runtime("kv_commit: path has no parent", 0))?;
332 let _ = std::fs::create_dir_all(parent);
333
334 let pid = std::process::id();
335 let nanos = SystemTime::now()
336 .duration_since(UNIX_EPOCH)
337 .map(|d| d.as_nanos())
338 .unwrap_or(0);
339 let fname = self
340 .path
341 .file_name()
342 .and_then(|s| s.to_str())
343 .unwrap_or("store.rkyv");
344 let tmp_path = parent.join(format!("{}.tmp.{}.{}", fname, pid, nanos));
345
346 {
347 let mut f = File::create(&tmp_path)
348 .map_err(|e| StrykeError::runtime(format!("kv_commit: tmp create: {}", e), 0))?;
349 f.write_all(&bytes)
350 .map_err(|e| StrykeError::runtime(format!("kv_commit: tmp write: {}", e), 0))?;
351 f.sync_all()
352 .map_err(|e| StrykeError::runtime(format!("kv_commit: tmp fsync: {}", e), 0))?;
353 }
354
355 std::fs::rename(&tmp_path, &self.path)
356 .map_err(|e| StrykeError::runtime(format!("kv_commit: rename: {}", e), 0))?;
357 self.dirty = false;
358 Ok(())
359 }
360 pub fn stats(&self) -> Vec<(String, StrykeValue)> {
362 vec![
363 (
364 "path".into(),
365 StrykeValue::string(self.path.display().to_string()),
366 ),
367 (
368 "entries".into(),
369 StrykeValue::integer(self.root.entries.len() as i64),
370 ),
371 (
372 "dirty".into(),
373 StrykeValue::integer(if self.dirty { 1 } else { 0 }),
374 ),
375 (
376 "format_version".into(),
377 StrykeValue::integer(self.root.header.format_version as i64),
378 ),
379 (
380 "created_at_secs".into(),
381 StrykeValue::integer(self.root.header.created_at_secs as i64),
382 ),
383 (
384 "last_commit_secs".into(),
385 StrykeValue::integer(self.root.header.last_commit_secs as i64),
386 ),
387 (
388 "commit_count".into(),
389 StrykeValue::integer(self.root.header.commit_count as i64),
390 ),
391 (
392 "stryke_version".into(),
393 StrykeValue::string(self.root.header.stryke_version.clone()),
394 ),
395 ]
396 }
397}
398
399fn now_secs() -> u64 {
400 SystemTime::now()
401 .duration_since(UNIX_EPOCH)
402 .map(|d| d.as_secs())
403 .unwrap_or(0)
404}
405
406fn store_arg(v: &StrykeValue, fn_name: &str, line: usize) -> StrykeResult<Arc<Mutex<KvStore>>> {
409 v.as_kv_store().ok_or_else(|| {
410 StrykeError::runtime(
411 format!("{}: first argument must be a KvStore handle", fn_name),
412 line,
413 )
414 })
415}
416
417fn key_arg(v: &StrykeValue) -> String {
418 v.to_string()
419}
420
421fn as_any_array(v: &StrykeValue) -> Option<Vec<StrykeValue>> {
424 if let Some(ar) = v.as_array_ref() {
425 return Some(ar.read().clone());
426 }
427 v.as_array_vec()
428}
429
430pub(crate) fn builtin_kv_open(args: &[StrykeValue], line: usize) -> StrykeResult<StrykeValue> {
432 let path_v = args
433 .first()
434 .ok_or_else(|| StrykeError::runtime("kv_open: missing path argument", line))?;
435 let path = path_v.to_string();
436 let store = KvStore::open(Path::new(&path))
437 .map_err(|e| StrykeError::runtime(format!("kv_open: {}", e.message), line))?;
438 Ok(StrykeValue::kv_store(Arc::new(Mutex::new(store))))
439}
440
441pub(crate) fn builtin_kv_put(args: &[StrykeValue], line: usize) -> StrykeResult<StrykeValue> {
444 let s = store_arg(args.first().unwrap_or(&StrykeValue::UNDEF), "kv_put", line)?;
445 let k = key_arg(args.get(1).unwrap_or(&StrykeValue::UNDEF));
446 let v = args.get(2).cloned().unwrap_or(StrykeValue::UNDEF);
447 let wv = WireValue::from_stryke(&v);
448 let prev = {
449 let mut g = s.lock();
450 let prev = g.get(&k).cloned();
451 g.put(k, wv);
452 prev
453 };
454 Ok(prev.map(|p| p.into_stryke()).unwrap_or(StrykeValue::UNDEF))
455}
456
457pub(crate) fn builtin_kv_get(args: &[StrykeValue], line: usize) -> StrykeResult<StrykeValue> {
459 let s = store_arg(args.first().unwrap_or(&StrykeValue::UNDEF), "kv_get", line)?;
460 let k = key_arg(args.get(1).unwrap_or(&StrykeValue::UNDEF));
461 let g = s.lock();
462 Ok(g.get(&k)
463 .cloned()
464 .map(|v| v.into_stryke())
465 .unwrap_or(StrykeValue::UNDEF))
466}
467
468pub(crate) fn builtin_kv_del(args: &[StrykeValue], line: usize) -> StrykeResult<StrykeValue> {
470 let s = store_arg(args.first().unwrap_or(&StrykeValue::UNDEF), "kv_del", line)?;
471 let k = key_arg(args.get(1).unwrap_or(&StrykeValue::UNDEF));
472 let existed = s.lock().del(&k);
473 Ok(StrykeValue::integer(if existed { 1 } else { 0 }))
474}
475
476pub(crate) fn builtin_kv_exists(args: &[StrykeValue], line: usize) -> StrykeResult<StrykeValue> {
478 let s = store_arg(
479 args.first().unwrap_or(&StrykeValue::UNDEF),
480 "kv_exists",
481 line,
482 )?;
483 let k = key_arg(args.get(1).unwrap_or(&StrykeValue::UNDEF));
484 let yes = s.lock().exists(&k);
485 Ok(StrykeValue::integer(if yes { 1 } else { 0 }))
486}
487
488pub(crate) fn builtin_kv_keys(args: &[StrykeValue], line: usize) -> StrykeResult<StrykeValue> {
490 let s = store_arg(args.first().unwrap_or(&StrykeValue::UNDEF), "kv_keys", line)?;
491 let prefix = args.get(1).map(|v| v.to_string());
492 let keys = s.lock().keys(prefix.as_deref());
493 let arr: Vec<StrykeValue> = keys.into_iter().map(StrykeValue::string).collect();
494 Ok(StrykeValue::array(arr))
495}
496
497pub(crate) fn builtin_kv_scan(args: &[StrykeValue], line: usize) -> StrykeResult<StrykeValue> {
501 let s = store_arg(args.first().unwrap_or(&StrykeValue::UNDEF), "kv_scan", line)?;
502 let prefix = args.get(1).map(|v| v.to_string()).unwrap_or_default();
503 let g = s.lock();
504 let mut pairs: Vec<(String, StrykeValue)> = g
505 .root
506 .entries
507 .iter()
508 .filter(|(k, _)| k.starts_with(&prefix))
509 .map(|(k, v)| (k.clone(), v.clone().into_stryke()))
510 .collect();
511 pairs.sort_by(|a, b| a.0.cmp(&b.0));
512 let arr: Vec<StrykeValue> = pairs
513 .into_iter()
514 .map(|(k, v)| {
515 StrykeValue::array_ref(Arc::new(RwLock::new(vec![StrykeValue::string(k), v])))
518 })
519 .collect();
520 Ok(StrykeValue::array(arr))
521}
522
523pub(crate) fn builtin_kv_len(args: &[StrykeValue], line: usize) -> StrykeResult<StrykeValue> {
525 let s = store_arg(args.first().unwrap_or(&StrykeValue::UNDEF), "kv_len", line)?;
526 let n = s.lock().len() as i64;
527 Ok(StrykeValue::integer(n))
528}
529
530pub(crate) fn builtin_kv_commit(args: &[StrykeValue], line: usize) -> StrykeResult<StrykeValue> {
532 let s = store_arg(
533 args.first().unwrap_or(&StrykeValue::UNDEF),
534 "kv_commit",
535 line,
536 )?;
537 s.lock()
538 .commit()
539 .map_err(|e| StrykeError::runtime(format!("kv_commit: {}", e.message), line))?;
540 Ok(StrykeValue::integer(1))
541}
542
543pub(crate) fn builtin_kv_batch(args: &[StrykeValue], line: usize) -> StrykeResult<StrykeValue> {
547 let s = store_arg(
548 args.first().unwrap_or(&StrykeValue::UNDEF),
549 "kv_batch",
550 line,
551 )?;
552 let ops_v = args
553 .get(1)
554 .ok_or_else(|| StrykeError::runtime("kv_batch: missing ops array", line))?;
555 let ops = as_any_array(ops_v)
556 .ok_or_else(|| StrykeError::runtime("kv_batch: ops must be an array of triples", line))?;
557
558 let snapshot = s.lock().root.entries.clone();
560 let mut applied: usize = 0;
561 let result: StrykeResult<usize> = (|| {
562 for (i, op_v) in ops.iter().enumerate() {
563 let op_arr = as_any_array(op_v).ok_or_else(|| {
564 StrykeError::runtime(format!("kv_batch: op {} is not an array", i), line)
565 })?;
566 let kind = op_arr.first().map(|x| x.to_string()).unwrap_or_default();
567 match kind.as_str() {
568 "put" => {
569 let k = op_arr.get(1).map(|v| v.to_string()).ok_or_else(|| {
570 StrykeError::runtime(format!("kv_batch: op {}: put missing key", i), line)
571 })?;
572 let v = op_arr.get(2).cloned().unwrap_or(StrykeValue::UNDEF);
573 s.lock().put(k, WireValue::from_stryke(&v));
574 }
575 "del" => {
576 let k = op_arr.get(1).map(|v| v.to_string()).ok_or_else(|| {
577 StrykeError::runtime(format!("kv_batch: op {}: del missing key", i), line)
578 })?;
579 s.lock().del(&k);
580 }
581 other => {
582 return Err(StrykeError::runtime(
583 format!("kv_batch: op {}: unknown kind '{}'", i, other),
584 line,
585 ));
586 }
587 }
588 applied += 1;
589 }
590 Ok(applied)
591 })();
592
593 match result {
594 Ok(n) => Ok(StrykeValue::integer(n as i64)),
595 Err(e) => {
596 let mut g = s.lock();
598 g.root.entries = snapshot;
599 g.dirty = !g.root.entries.is_empty();
600 Err(e)
601 }
602 }
603}
604
605pub(crate) fn builtin_kv_close(args: &[StrykeValue], line: usize) -> StrykeResult<StrykeValue> {
608 let s = store_arg(
609 args.first().unwrap_or(&StrykeValue::UNDEF),
610 "kv_close",
611 line,
612 )?;
613 let mut g = s.lock();
614 if g.dirty {
615 g.commit()
616 .map_err(|e| StrykeError::runtime(format!("kv_close: {}", e.message), line))?;
617 }
618 Ok(StrykeValue::integer(1))
619}
620
621pub(crate) fn builtin_kv_stats(args: &[StrykeValue], line: usize) -> StrykeResult<StrykeValue> {
623 let s = store_arg(
624 args.first().unwrap_or(&StrykeValue::UNDEF),
625 "kv_stats",
626 line,
627 )?;
628 let pairs = s.lock().stats();
629 let mut m: IndexMap<String, StrykeValue> = IndexMap::with_capacity(pairs.len());
630 for (k, v) in pairs {
631 m.insert(k, v);
632 }
633 Ok(StrykeValue::hash_ref(Arc::new(RwLock::new(m))))
634}
635
636#[cfg(test)]
637mod tests {
638 use super::*;
639 use std::env;
640
641 fn tmp_path(name: &str) -> PathBuf {
642 let mut p = env::temp_dir();
643 let nanos = SystemTime::now()
644 .duration_since(UNIX_EPOCH)
645 .map(|d| d.as_nanos())
646 .unwrap_or(0);
647 p.push(format!("stryke_kvtest_{}_{}.rkyv", name, nanos));
648 p
649 }
650
651 #[test]
652 fn put_get_roundtrip() {
653 let p = tmp_path("rt");
654 let mut s = KvStore::open(&p).unwrap();
655 s.put("alpha".into(), WireValue::Int(42));
656 s.put("beta".into(), WireValue::Str("hello".into()));
657 assert!(matches!(s.get("alpha"), Some(WireValue::Int(42))));
658 assert!(matches!(s.get("beta"), Some(WireValue::Str(_))));
659 let _ = std::fs::remove_file(&p);
660 }
661
662 #[test]
663 fn commit_then_reopen_sees_data() {
664 let p = tmp_path("commit");
665 {
666 let mut s = KvStore::open(&p).unwrap();
667 s.put("k1".into(), WireValue::Int(1));
668 s.put("k2".into(), WireValue::Int(2));
669 s.commit().unwrap();
670 }
671 {
672 let s = KvStore::open(&p).unwrap();
673 assert_eq!(s.len(), 2);
674 assert!(matches!(s.get("k1"), Some(WireValue::Int(1))));
675 assert!(matches!(s.get("k2"), Some(WireValue::Int(2))));
676 }
677 let _ = std::fs::remove_file(&p);
678 }
679
680 #[test]
681 fn keys_prefix_filter_sorted() {
682 let p = tmp_path("keys");
683 let mut s = KvStore::open(&p).unwrap();
684 s.put("user:1".into(), WireValue::Int(1));
685 s.put("user:2".into(), WireValue::Int(2));
686 s.put("log:1".into(), WireValue::Int(99));
687 let ks = s.keys(Some("user:"));
688 assert_eq!(ks, vec!["user:1".to_string(), "user:2".to_string()]);
689 let _ = std::fs::remove_file(&p);
690 }
691
692 #[test]
693 fn del_returns_existed() {
694 let p = tmp_path("del");
695 let mut s = KvStore::open(&p).unwrap();
696 s.put("x".into(), WireValue::Int(1));
697 assert!(s.del("x"));
698 assert!(!s.del("x"));
699 let _ = std::fs::remove_file(&p);
700 }
701
702 #[test]
703 fn nested_array_roundtrip() {
704 let p = tmp_path("nested");
705 let mut s = KvStore::open(&p).unwrap();
706 let nested = WireValue::Array(vec![
707 WireValue::Int(1),
708 WireValue::Array(vec![WireValue::Str("a".into()), WireValue::Int(2)]),
709 WireValue::Hash(vec![("k".into(), WireValue::Int(3))]),
710 ]);
711 s.put("nest".into(), nested);
712 s.commit().unwrap();
713 let s2 = KvStore::open(&p).unwrap();
714 match s2.get("nest") {
715 Some(WireValue::Array(items)) => {
716 assert_eq!(items.len(), 3);
717 }
718 _ => panic!("expected array"),
719 }
720 let _ = std::fs::remove_file(&p);
721 }
722}