1#![allow(dead_code)]
22
23use std::collections::HashMap;
24use std::fs::File;
25use std::io::Write;
26use std::path::{Path, PathBuf};
27use std::sync::Arc;
28use std::time::{SystemTime, UNIX_EPOCH};
29
30use indexmap::IndexMap;
31use parking_lot::{Mutex, RwLock};
32use rkyv::{Archive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize};
33
34use crate::error::{StrykeError, StrykeResult};
35use crate::value::StrykeValue;
36
37pub const KV_MAGIC: u32 = 0x53544b56; pub const KV_FORMAT_VERSION: u32 = 1;
41
42#[derive(Archive, RkyvDeserialize, RkyvSerialize, Debug, Clone)]
54#[archive(check_bytes)]
55#[archive(bound(serialize = "__S: rkyv::ser::Serializer + rkyv::ser::ScratchSpace",))]
56#[archive_attr(check_bytes(
57 bound = "__C: rkyv::validation::ArchiveContext, <__C as rkyv::Fallible>::Error: std::error::Error"
58))]
59pub enum WireValue {
60 Undef,
61 Bool(bool),
62 Int(i64),
63 Float(f64),
64 Str(String),
65 Bytes(Vec<u8>),
66 Array(
67 #[omit_bounds]
68 #[archive_attr(omit_bounds)]
69 Vec<WireValue>,
70 ),
71 Hash(
72 #[omit_bounds]
73 #[archive_attr(omit_bounds)]
74 Vec<(String, WireValue)>,
75 ),
76}
77
78impl WireValue {
79 pub fn from_stryke(v: &StrykeValue) -> Self {
82 if v.is_undef() {
83 return WireValue::Undef;
84 }
85 if let Some(n) = v.as_integer() {
86 return WireValue::Int(n);
87 }
88 if let Some(f) = v.as_float() {
89 return WireValue::Float(f);
90 }
91 if let Some(b) = v.as_bytes_arc() {
92 return WireValue::Bytes((*b).clone());
93 }
94 if let Some(s) = v.as_str() {
95 return WireValue::Str(s.to_string());
96 }
97 if let Some(ar) = v.as_array_ref() {
99 let g = ar.read();
100 return WireValue::Array(g.iter().map(WireValue::from_stryke).collect());
101 }
102 if let Some(hr) = v.as_hash_ref() {
104 let g = hr.read();
105 let mut entries: Vec<(String, WireValue)> = g
106 .iter()
107 .map(|(k, val)| (k.clone(), WireValue::from_stryke(val)))
108 .collect();
109 entries.sort_by(|a, b| a.0.cmp(&b.0));
110 return WireValue::Hash(entries);
111 }
112 if let Some(arr) = v.as_array_vec() {
113 return WireValue::Array(arr.iter().map(WireValue::from_stryke).collect());
114 }
115 if let Some(h) = v.as_hash_map() {
116 let mut entries: Vec<(String, WireValue)> = h
117 .iter()
118 .map(|(k, val)| (k.clone(), WireValue::from_stryke(val)))
119 .collect();
120 entries.sort_by(|a, b| a.0.cmp(&b.0));
121 return WireValue::Hash(entries);
122 }
123 WireValue::Str(v.to_string())
125 }
126
127 pub fn into_stryke(self) -> StrykeValue {
129 match self {
130 WireValue::Undef => StrykeValue::UNDEF,
131 WireValue::Bool(b) => StrykeValue::integer(if b { 1 } else { 0 }),
132 WireValue::Int(n) => StrykeValue::integer(n),
133 WireValue::Float(f) => StrykeValue::float(f),
134 WireValue::Str(s) => StrykeValue::string(s),
135 WireValue::Bytes(b) => StrykeValue::bytes(Arc::new(b)),
136 WireValue::Array(items) => {
137 let v: Vec<StrykeValue> = items.into_iter().map(|x| x.into_stryke()).collect();
140 StrykeValue::array_ref(Arc::new(RwLock::new(v)))
141 }
142 WireValue::Hash(pairs) => {
143 let mut m: IndexMap<String, StrykeValue> = IndexMap::with_capacity(pairs.len());
146 for (k, v) in pairs {
147 m.insert(k, v.into_stryke());
148 }
149 StrykeValue::hash_ref(Arc::new(RwLock::new(m)))
150 }
151 }
152 }
153}
154
155#[derive(Archive, RkyvDeserialize, RkyvSerialize, Debug, Clone)]
158#[archive(check_bytes)]
159pub struct KvHeader {
160 pub magic: u32,
161 pub format_version: u32,
162 pub stryke_version: String,
163 pub created_at_secs: u64,
164 pub last_commit_secs: u64,
165 pub commit_count: u64,
166}
167
168impl Default for KvHeader {
169 fn default() -> Self {
170 Self {
171 magic: KV_MAGIC,
172 format_version: KV_FORMAT_VERSION,
173 stryke_version: env!("CARGO_PKG_VERSION").to_string(),
174 created_at_secs: now_secs(),
175 last_commit_secs: 0,
176 commit_count: 0,
177 }
178 }
179}
180
181#[derive(Archive, RkyvDeserialize, RkyvSerialize, Debug, Clone, Default)]
182#[archive(check_bytes)]
183pub struct KvRoot {
184 pub header: KvHeader,
185 pub entries: HashMap<String, WireValue>,
186}
187
188#[derive(Debug)]
195pub struct KvStore {
196 pub path: PathBuf,
197 pub root: KvRoot,
198 pub dirty: bool,
199}
200
201impl KvStore {
202 pub fn open(path: impl Into<PathBuf>) -> StrykeResult<Self> {
207 let path = path.into();
208 if !path.exists() {
209 return Ok(Self {
210 path,
211 root: KvRoot::default(),
212 dirty: false,
213 });
214 }
215 let bytes = std::fs::read(&path).map_err(|e| {
216 StrykeError::runtime(format!("kv_open: read {}: {}", path.display(), e), 0)
217 })?;
218 let archived = rkyv::check_archived_root::<KvRoot>(&bytes[..]).map_err(|e| {
219 StrykeError::runtime(
220 format!(
221 "kv_open: corrupt or wrong-format file {}: {}",
222 path.display(),
223 e
224 ),
225 0,
226 )
227 })?;
228 if archived.header.magic != KV_MAGIC {
229 return Err(StrykeError::runtime(
230 format!("kv_open: bad magic in {}", path.display()),
231 0,
232 ));
233 }
234 if archived.header.format_version != KV_FORMAT_VERSION {
235 return Err(StrykeError::runtime(
236 format!(
237 "kv_open: format version {} (expected {})",
238 archived.header.format_version, KV_FORMAT_VERSION
239 ),
240 0,
241 ));
242 }
243 let root: KvRoot = archived
244 .deserialize(&mut rkyv::Infallible)
245 .map_err(|_| StrykeError::runtime("kv_open: deserialize failed", 0))?;
246 Ok(Self {
247 path,
248 root,
249 dirty: false,
250 })
251 }
252
253 pub fn put(&mut self, key: String, value: WireValue) {
254 self.root.entries.insert(key, value);
255 self.dirty = true;
256 }
257
258 pub fn get(&self, key: &str) -> Option<&WireValue> {
259 self.root.entries.get(key)
260 }
261
262 pub fn del(&mut self, key: &str) -> bool {
263 let existed = self.root.entries.remove(key).is_some();
264 if existed {
265 self.dirty = true;
266 }
267 existed
268 }
269
270 pub fn exists(&self, key: &str) -> bool {
271 self.root.entries.contains_key(key)
272 }
273
274 pub fn len(&self) -> usize {
275 self.root.entries.len()
276 }
277
278 pub fn is_empty(&self) -> bool {
279 self.root.entries.is_empty()
280 }
281
282 pub fn keys(&self, prefix: Option<&str>) -> Vec<String> {
284 let mut ks: Vec<String> = match prefix {
285 Some(p) => self
286 .root
287 .entries
288 .keys()
289 .filter(|k| k.starts_with(p))
290 .cloned()
291 .collect(),
292 None => self.root.entries.keys().cloned().collect(),
293 };
294 ks.sort_unstable();
295 ks
296 }
297
298 pub fn commit(&mut self) -> StrykeResult<()> {
302 if !self.dirty {
303 return Ok(());
304 }
305 self.root.header.last_commit_secs = now_secs();
306 self.root.header.commit_count = self.root.header.commit_count.saturating_add(1);
307 let bytes = rkyv::to_bytes::<_, 4096>(&self.root)
308 .map_err(|e| StrykeError::runtime(format!("kv_commit: rkyv: {}", e), 0))?;
309
310 let parent = self
311 .path
312 .parent()
313 .ok_or_else(|| StrykeError::runtime("kv_commit: path has no parent", 0))?;
314 let _ = std::fs::create_dir_all(parent);
315
316 let pid = std::process::id();
317 let nanos = SystemTime::now()
318 .duration_since(UNIX_EPOCH)
319 .map(|d| d.as_nanos())
320 .unwrap_or(0);
321 let fname = self
322 .path
323 .file_name()
324 .and_then(|s| s.to_str())
325 .unwrap_or("store.rkyv");
326 let tmp_path = parent.join(format!("{}.tmp.{}.{}", fname, pid, nanos));
327
328 {
329 let mut f = File::create(&tmp_path)
330 .map_err(|e| StrykeError::runtime(format!("kv_commit: tmp create: {}", e), 0))?;
331 f.write_all(&bytes)
332 .map_err(|e| StrykeError::runtime(format!("kv_commit: tmp write: {}", e), 0))?;
333 f.sync_all()
334 .map_err(|e| StrykeError::runtime(format!("kv_commit: tmp fsync: {}", e), 0))?;
335 }
336
337 std::fs::rename(&tmp_path, &self.path)
338 .map_err(|e| StrykeError::runtime(format!("kv_commit: rename: {}", e), 0))?;
339 self.dirty = false;
340 Ok(())
341 }
342
343 pub fn stats(&self) -> Vec<(String, StrykeValue)> {
344 vec![
345 (
346 "path".into(),
347 StrykeValue::string(self.path.display().to_string()),
348 ),
349 (
350 "entries".into(),
351 StrykeValue::integer(self.root.entries.len() as i64),
352 ),
353 (
354 "dirty".into(),
355 StrykeValue::integer(if self.dirty { 1 } else { 0 }),
356 ),
357 (
358 "format_version".into(),
359 StrykeValue::integer(self.root.header.format_version as i64),
360 ),
361 (
362 "created_at_secs".into(),
363 StrykeValue::integer(self.root.header.created_at_secs as i64),
364 ),
365 (
366 "last_commit_secs".into(),
367 StrykeValue::integer(self.root.header.last_commit_secs as i64),
368 ),
369 (
370 "commit_count".into(),
371 StrykeValue::integer(self.root.header.commit_count as i64),
372 ),
373 (
374 "stryke_version".into(),
375 StrykeValue::string(self.root.header.stryke_version.clone()),
376 ),
377 ]
378 }
379}
380
381fn now_secs() -> u64 {
382 SystemTime::now()
383 .duration_since(UNIX_EPOCH)
384 .map(|d| d.as_secs())
385 .unwrap_or(0)
386}
387
388fn store_arg(v: &StrykeValue, fn_name: &str, line: usize) -> StrykeResult<Arc<Mutex<KvStore>>> {
391 v.as_kv_store().ok_or_else(|| {
392 StrykeError::runtime(
393 format!("{}: first argument must be a KvStore handle", fn_name),
394 line,
395 )
396 })
397}
398
399fn key_arg(v: &StrykeValue) -> String {
400 v.to_string()
401}
402
403fn as_any_array(v: &StrykeValue) -> Option<Vec<StrykeValue>> {
406 if let Some(ar) = v.as_array_ref() {
407 return Some(ar.read().clone());
408 }
409 v.as_array_vec()
410}
411
412pub(crate) fn builtin_kv_open(args: &[StrykeValue], line: usize) -> StrykeResult<StrykeValue> {
414 let path_v = args
415 .first()
416 .ok_or_else(|| StrykeError::runtime("kv_open: missing path argument", line))?;
417 let path = path_v.to_string();
418 let store = KvStore::open(Path::new(&path))
419 .map_err(|e| StrykeError::runtime(format!("kv_open: {}", e.message), line))?;
420 Ok(StrykeValue::kv_store(Arc::new(Mutex::new(store))))
421}
422
423pub(crate) fn builtin_kv_put(args: &[StrykeValue], line: usize) -> StrykeResult<StrykeValue> {
426 let s = store_arg(args.first().unwrap_or(&StrykeValue::UNDEF), "kv_put", line)?;
427 let k = key_arg(args.get(1).unwrap_or(&StrykeValue::UNDEF));
428 let v = args.get(2).cloned().unwrap_or(StrykeValue::UNDEF);
429 let wv = WireValue::from_stryke(&v);
430 let prev = {
431 let mut g = s.lock();
432 let prev = g.get(&k).cloned();
433 g.put(k, wv);
434 prev
435 };
436 Ok(prev.map(|p| p.into_stryke()).unwrap_or(StrykeValue::UNDEF))
437}
438
439pub(crate) fn builtin_kv_get(args: &[StrykeValue], line: usize) -> StrykeResult<StrykeValue> {
441 let s = store_arg(args.first().unwrap_or(&StrykeValue::UNDEF), "kv_get", line)?;
442 let k = key_arg(args.get(1).unwrap_or(&StrykeValue::UNDEF));
443 let g = s.lock();
444 Ok(g.get(&k)
445 .cloned()
446 .map(|v| v.into_stryke())
447 .unwrap_or(StrykeValue::UNDEF))
448}
449
450pub(crate) fn builtin_kv_del(args: &[StrykeValue], line: usize) -> StrykeResult<StrykeValue> {
452 let s = store_arg(args.first().unwrap_or(&StrykeValue::UNDEF), "kv_del", line)?;
453 let k = key_arg(args.get(1).unwrap_or(&StrykeValue::UNDEF));
454 let existed = s.lock().del(&k);
455 Ok(StrykeValue::integer(if existed { 1 } else { 0 }))
456}
457
458pub(crate) fn builtin_kv_exists(args: &[StrykeValue], line: usize) -> StrykeResult<StrykeValue> {
460 let s = store_arg(
461 args.first().unwrap_or(&StrykeValue::UNDEF),
462 "kv_exists",
463 line,
464 )?;
465 let k = key_arg(args.get(1).unwrap_or(&StrykeValue::UNDEF));
466 let yes = s.lock().exists(&k);
467 Ok(StrykeValue::integer(if yes { 1 } else { 0 }))
468}
469
470pub(crate) fn builtin_kv_keys(args: &[StrykeValue], line: usize) -> StrykeResult<StrykeValue> {
472 let s = store_arg(args.first().unwrap_or(&StrykeValue::UNDEF), "kv_keys", line)?;
473 let prefix = args.get(1).map(|v| v.to_string());
474 let keys = s.lock().keys(prefix.as_deref());
475 let arr: Vec<StrykeValue> = keys.into_iter().map(StrykeValue::string).collect();
476 Ok(StrykeValue::array(arr))
477}
478
479pub(crate) fn builtin_kv_scan(args: &[StrykeValue], line: usize) -> StrykeResult<StrykeValue> {
483 let s = store_arg(args.first().unwrap_or(&StrykeValue::UNDEF), "kv_scan", line)?;
484 let prefix = args.get(1).map(|v| v.to_string()).unwrap_or_default();
485 let g = s.lock();
486 let mut pairs: Vec<(String, StrykeValue)> = g
487 .root
488 .entries
489 .iter()
490 .filter(|(k, _)| k.starts_with(&prefix))
491 .map(|(k, v)| (k.clone(), v.clone().into_stryke()))
492 .collect();
493 pairs.sort_by(|a, b| a.0.cmp(&b.0));
494 let arr: Vec<StrykeValue> = pairs
495 .into_iter()
496 .map(|(k, v)| {
497 StrykeValue::array_ref(Arc::new(RwLock::new(vec![StrykeValue::string(k), v])))
500 })
501 .collect();
502 Ok(StrykeValue::array(arr))
503}
504
505pub(crate) fn builtin_kv_len(args: &[StrykeValue], line: usize) -> StrykeResult<StrykeValue> {
507 let s = store_arg(args.first().unwrap_or(&StrykeValue::UNDEF), "kv_len", line)?;
508 let n = s.lock().len() as i64;
509 Ok(StrykeValue::integer(n))
510}
511
512pub(crate) fn builtin_kv_commit(args: &[StrykeValue], line: usize) -> StrykeResult<StrykeValue> {
514 let s = store_arg(
515 args.first().unwrap_or(&StrykeValue::UNDEF),
516 "kv_commit",
517 line,
518 )?;
519 s.lock()
520 .commit()
521 .map_err(|e| StrykeError::runtime(format!("kv_commit: {}", e.message), line))?;
522 Ok(StrykeValue::integer(1))
523}
524
525pub(crate) fn builtin_kv_batch(args: &[StrykeValue], line: usize) -> StrykeResult<StrykeValue> {
529 let s = store_arg(
530 args.first().unwrap_or(&StrykeValue::UNDEF),
531 "kv_batch",
532 line,
533 )?;
534 let ops_v = args
535 .get(1)
536 .ok_or_else(|| StrykeError::runtime("kv_batch: missing ops array", line))?;
537 let ops = as_any_array(ops_v)
538 .ok_or_else(|| StrykeError::runtime("kv_batch: ops must be an array of triples", line))?;
539
540 let snapshot = s.lock().root.entries.clone();
542 let mut applied: usize = 0;
543 let result: StrykeResult<usize> = (|| {
544 for (i, op_v) in ops.iter().enumerate() {
545 let op_arr = as_any_array(op_v).ok_or_else(|| {
546 StrykeError::runtime(format!("kv_batch: op {} is not an array", i), line)
547 })?;
548 let kind = op_arr.first().map(|x| x.to_string()).unwrap_or_default();
549 match kind.as_str() {
550 "put" => {
551 let k = op_arr.get(1).map(|v| v.to_string()).ok_or_else(|| {
552 StrykeError::runtime(format!("kv_batch: op {}: put missing key", i), line)
553 })?;
554 let v = op_arr.get(2).cloned().unwrap_or(StrykeValue::UNDEF);
555 s.lock().put(k, WireValue::from_stryke(&v));
556 }
557 "del" => {
558 let k = op_arr.get(1).map(|v| v.to_string()).ok_or_else(|| {
559 StrykeError::runtime(format!("kv_batch: op {}: del missing key", i), line)
560 })?;
561 s.lock().del(&k);
562 }
563 other => {
564 return Err(StrykeError::runtime(
565 format!("kv_batch: op {}: unknown kind '{}'", i, other),
566 line,
567 ));
568 }
569 }
570 applied += 1;
571 }
572 Ok(applied)
573 })();
574
575 match result {
576 Ok(n) => Ok(StrykeValue::integer(n as i64)),
577 Err(e) => {
578 let mut g = s.lock();
580 g.root.entries = snapshot;
581 g.dirty = !g.root.entries.is_empty();
582 Err(e)
583 }
584 }
585}
586
587pub(crate) fn builtin_kv_close(args: &[StrykeValue], line: usize) -> StrykeResult<StrykeValue> {
590 let s = store_arg(
591 args.first().unwrap_or(&StrykeValue::UNDEF),
592 "kv_close",
593 line,
594 )?;
595 let mut g = s.lock();
596 if g.dirty {
597 g.commit()
598 .map_err(|e| StrykeError::runtime(format!("kv_close: {}", e.message), line))?;
599 }
600 Ok(StrykeValue::integer(1))
601}
602
603pub(crate) fn builtin_kv_stats(args: &[StrykeValue], line: usize) -> StrykeResult<StrykeValue> {
605 let s = store_arg(
606 args.first().unwrap_or(&StrykeValue::UNDEF),
607 "kv_stats",
608 line,
609 )?;
610 let pairs = s.lock().stats();
611 let mut m: IndexMap<String, StrykeValue> = IndexMap::with_capacity(pairs.len());
612 for (k, v) in pairs {
613 m.insert(k, v);
614 }
615 Ok(StrykeValue::hash_ref(Arc::new(RwLock::new(m))))
616}
617
618#[cfg(test)]
619mod tests {
620 use super::*;
621 use std::env;
622
623 fn tmp_path(name: &str) -> PathBuf {
624 let mut p = env::temp_dir();
625 let nanos = SystemTime::now()
626 .duration_since(UNIX_EPOCH)
627 .map(|d| d.as_nanos())
628 .unwrap_or(0);
629 p.push(format!("stryke_kvtest_{}_{}.rkyv", name, nanos));
630 p
631 }
632
633 #[test]
634 fn put_get_roundtrip() {
635 let p = tmp_path("rt");
636 let mut s = KvStore::open(&p).unwrap();
637 s.put("alpha".into(), WireValue::Int(42));
638 s.put("beta".into(), WireValue::Str("hello".into()));
639 assert!(matches!(s.get("alpha"), Some(WireValue::Int(42))));
640 assert!(matches!(s.get("beta"), Some(WireValue::Str(_))));
641 let _ = std::fs::remove_file(&p);
642 }
643
644 #[test]
645 fn commit_then_reopen_sees_data() {
646 let p = tmp_path("commit");
647 {
648 let mut s = KvStore::open(&p).unwrap();
649 s.put("k1".into(), WireValue::Int(1));
650 s.put("k2".into(), WireValue::Int(2));
651 s.commit().unwrap();
652 }
653 {
654 let s = KvStore::open(&p).unwrap();
655 assert_eq!(s.len(), 2);
656 assert!(matches!(s.get("k1"), Some(WireValue::Int(1))));
657 assert!(matches!(s.get("k2"), Some(WireValue::Int(2))));
658 }
659 let _ = std::fs::remove_file(&p);
660 }
661
662 #[test]
663 fn keys_prefix_filter_sorted() {
664 let p = tmp_path("keys");
665 let mut s = KvStore::open(&p).unwrap();
666 s.put("user:1".into(), WireValue::Int(1));
667 s.put("user:2".into(), WireValue::Int(2));
668 s.put("log:1".into(), WireValue::Int(99));
669 let ks = s.keys(Some("user:"));
670 assert_eq!(ks, vec!["user:1".to_string(), "user:2".to_string()]);
671 let _ = std::fs::remove_file(&p);
672 }
673
674 #[test]
675 fn del_returns_existed() {
676 let p = tmp_path("del");
677 let mut s = KvStore::open(&p).unwrap();
678 s.put("x".into(), WireValue::Int(1));
679 assert!(s.del("x"));
680 assert!(!s.del("x"));
681 let _ = std::fs::remove_file(&p);
682 }
683
684 #[test]
685 fn nested_array_roundtrip() {
686 let p = tmp_path("nested");
687 let mut s = KvStore::open(&p).unwrap();
688 let nested = WireValue::Array(vec![
689 WireValue::Int(1),
690 WireValue::Array(vec![WireValue::Str("a".into()), WireValue::Int(2)]),
691 WireValue::Hash(vec![("k".into(), WireValue::Int(3))]),
692 ]);
693 s.put("nest".into(), nested);
694 s.commit().unwrap();
695 let s2 = KvStore::open(&p).unwrap();
696 match s2.get("nest") {
697 Some(WireValue::Array(items)) => {
698 assert_eq!(items.len(), 3);
699 }
700 _ => panic!("expected array"),
701 }
702 let _ = std::fs::remove_file(&p);
703 }
704}