1use anyhow::{bail, Result};
2use arcstr::{literal, ArcStr};
3use compact_str::format_compact;
4use graphix_compiler::{
5 errf,
6 expr::ExprId,
7 typ::{FnType, Type},
8 ExecCtx, Node, Rt, Scope, TypecheckPhase, UserEvent,
9};
10use graphix_package_core::{CachedArgsAsync, CachedVals, EvalCachedAsync};
11use netidx::{path::Path, publisher::Typ};
12use netidx_core::pack::Pack;
13use netidx_derive::Pack;
14use netidx_value::{ValArray, Value};
15use poolshark::{global::GPooled, local::LPooled};
16use std::sync::Arc;
17
18use crate::encoding::{
19 decode_key, decode_value, encode_key, encode_value, parse_batch_ops, ENCODE_MANY_POOL,
20};
21
22#[derive(Debug, Clone)]
27pub struct DbValue {
28 pub(crate) inner: Arc<sled::Db>,
29}
30
31graphix_package_core::impl_abstract_arc!(DbValue, pub(crate) static DB_WRAPPER = [
32 0xd1, 0xe2, 0xf3, 0x04, 0x15, 0x26, 0x47, 0x38,
33 0x49, 0x5a, 0x6b, 0x7c, 0x8d, 0x9e, 0xaf, 0xb0,
34]);
35
36pub(crate) fn get_db(cached: &CachedVals, idx: usize) -> Option<sled::Db> {
37 match cached.0.get(idx)?.as_ref()? {
38 Value::Abstract(a) => {
39 let dv = a.downcast_ref::<DbValue>()?;
40 Some((*dv.inner).clone())
41 }
42 _ => None,
43 }
44}
45
46#[derive(Debug)]
49pub(crate) struct TreeInner {
50 pub(crate) tree: sled::Tree,
51 pub(crate) key_typ: Option<Typ>,
52}
53
54#[derive(Debug, Clone)]
57pub struct TreeValue {
58 pub(crate) inner: Arc<TreeInner>,
59}
60
61graphix_package_core::impl_abstract_arc!(TreeValue, pub(crate) static TREE_WRAPPER = [
62 0xd2, 0xe3, 0xf4, 0x05, 0x16, 0x27, 0x48, 0x39,
63 0x4a, 0x5b, 0x6c, 0x7d, 0x8e, 0x9f, 0xa0, 0xb1,
64]);
65
66pub(crate) fn get_tree_inner(cached: &CachedVals, idx: usize) -> Option<Arc<TreeInner>> {
67 match cached.0.get(idx)?.as_ref()? {
68 Value::Abstract(a) => {
69 let tv = a.downcast_ref::<TreeValue>()?;
70 Some(tv.inner.clone())
71 }
72 _ => None,
73 }
74}
75
76pub(crate) fn wrap_tree(tree: sled::Tree, key_typ: Option<Typ>) -> Value {
77 TREE_WRAPPER.wrap(TreeValue { inner: Arc::new(TreeInner { tree, key_typ }) })
78}
79
80pub(crate) static META_TREE: ArcStr = literal!("$$__graphix_meta__$$");
83pub(crate) static DEFAULT_TREE_META: ArcStr = literal!("$$__graphix_default__$$");
84
85pub(crate) trait MetaStore {
91 fn get(&self, key: &[u8]) -> Result<Option<sled::IVec>>;
92 fn insert_if_absent(&self, key: &[u8], value: &[u8]) -> Result<Option<sled::IVec>>;
93}
94
95impl MetaStore for sled::Tree {
96 fn get(&self, key: &[u8]) -> Result<Option<sled::IVec>> {
97 Ok(sled::Tree::get(self, key)?)
98 }
99
100 fn insert_if_absent(&self, key: &[u8], value: &[u8]) -> Result<Option<sled::IVec>> {
101 match self.compare_and_swap(key, None as Option<&[u8]>, Some(value))? {
102 Ok(()) => Ok(None),
103 Err(cas_err) => Ok(cas_err.current),
104 }
105 }
106}
107
108impl MetaStore for sled::transaction::TransactionalTree {
109 fn get(&self, key: &[u8]) -> Result<Option<sled::IVec>> {
110 Ok(sled::transaction::TransactionalTree::get(self, key)?)
111 }
112
113 fn insert_if_absent(&self, key: &[u8], value: &[u8]) -> Result<Option<sled::IVec>> {
114 match sled::transaction::TransactionalTree::get(self, key)? {
115 Some(existing) => Ok(Some(existing)),
116 None => {
117 self.insert(key, value)?;
118 Ok(None)
119 }
120 }
121 }
122}
123
124pub(crate) fn read_meta(
125 meta: &impl MetaStore,
126 tree_name: &str,
127) -> Result<Option<(ArcStr, ArcStr)>> {
128 match meta.get(tree_name.as_bytes())? {
129 None => Ok(None),
130 Some(stored) => {
131 let stored = std::str::from_utf8(&stored)?;
132 let mut parts = stored.splitn(2, '\0');
133 let k = parts.next().unwrap_or("?");
134 let v = parts.next().unwrap_or("?");
135 Ok(Some((ArcStr::from(k), ArcStr::from(v))))
136 }
137 }
138}
139
140pub(crate) fn check_or_store_meta(
141 meta: &impl MetaStore,
142 tree_name: &str,
143 key_typ_str: &str,
144 val_typ_str: &str,
145) -> Result<()> {
146 let meta_val = format_compact!("{key_typ_str}\0{val_typ_str}");
147 match meta.insert_if_absent(tree_name.as_bytes(), meta_val.as_bytes())? {
148 None => Ok(()),
149 Some(existing) => {
150 let stored = std::str::from_utf8(&existing)?;
151 let mut parts = stored.splitn(2, '\0');
152 let sk = parts.next().unwrap_or("?");
153 let sv = parts.next().unwrap_or("?");
154 if sk != key_typ_str || sv != val_typ_str {
155 bail!(
156 "tree '{tree_name}' has type Tree<{sk}, {sv}> \
157 but was opened as Tree<{key_typ_str}, {val_typ_str}>"
158 )
159 } else {
160 Ok(())
161 }
162 }
163 }
164}
165
166fn prim_typ(t: &Type) -> Option<Typ> {
169 match t {
170 Type::Primitive(flags) if flags.iter().count() == 1 => flags.iter().next(),
171 _ => None,
172 }
173}
174
175fn find_tree_params(t: &Type) -> Option<&[Type]> {
177 match t {
178 Type::Ref { name, params, .. } if Path::basename(&**name) == Some("Result") => {
179 params.iter().find_map(|p| match p {
180 Type::Ref { name, params, .. }
181 if matches!(Path::basename(&**name), Some("Tree" | "TxnTree"))
182 && params.len() == 2 =>
183 {
184 Some(&**params)
185 }
186 _ => None,
187 })
188 }
189 _ => None,
190 }
191}
192
193pub(crate) fn extract_key_typ_from_rtype(resolved_typ: Option<&FnType>) -> Option<Typ> {
194 let ft = resolved_typ?;
195 find_tree_params(&ft.rtype).and_then(|params| prim_typ(¶ms[0]))
196}
197
198pub(crate) fn extract_type_strings_from_rtype(
199 resolved_typ: Option<&FnType>,
200) -> (ArcStr, ArcStr) {
201 let Some(ft) = resolved_typ else {
202 return (arcstr::literal!("?"), arcstr::literal!("?"));
203 };
204 match find_tree_params(&ft.rtype) {
205 Some(params) if params.len() >= 2 => (
206 ArcStr::from(format!("{}", params[0]).as_str()),
207 ArcStr::from(format!("{}", params[1]).as_str()),
208 ),
209 _ => (arcstr::literal!("?"), arcstr::literal!("?")),
210 }
211}
212
213pub(crate) fn types_are_concrete(key_typ_str: &str, val_typ_str: &str) -> bool {
214 fn concrete(s: &str) -> bool {
215 s != "?" && !s.starts_with('\'')
216 }
217 concrete(key_typ_str) && concrete(val_typ_str)
218}
219
220#[derive(Debug, Default)]
225pub(crate) struct DbGetTypeEv;
226
227impl EvalCachedAsync for DbGetTypeEv {
228 const NAME: &str = "db_get_type";
229 const NEEDS_CALLSITE: bool = false;
230 type Args = (sled::Db, ArcStr);
231
232 fn prepare_args(&mut self, cached: &CachedVals) -> Option<Self::Args> {
233 let db = get_db(cached, 0)?;
234 let name = match cached.0.get(1)?.as_ref()? {
235 Value::Null => DEFAULT_TREE_META.clone(),
236 Value::String(s) => s.clone(),
237 _ => return None,
238 };
239 Some((db, name))
240 }
241
242 fn eval((db, name): Self::Args) -> impl Future<Output = Value> + Send {
243 async move {
244 match tokio::task::spawn_blocking(move || -> Result<Value> {
245 let meta = db.open_tree(&*META_TREE)?;
246 match read_meta(&meta, &name)? {
247 None => Ok(Value::Null),
248 Some((k, v)) => Ok(Value::Array(ValArray::from([
249 Value::String(k),
250 Value::String(v),
251 ]))),
252 }
253 })
254 .await
255 {
256 Err(e) => errf!("DbErr", "task panicked: {e:?}"),
257 Ok(Err(e)) => errf!("DbErr", "{e:?}"),
258 Ok(Ok(v)) => v,
259 }
260 }
261 }
262}
263
264pub(crate) type DbGetType = CachedArgsAsync<DbGetTypeEv>;
265
266#[derive(Debug, Default)]
269pub(crate) struct DbOpenEv;
270
271impl EvalCachedAsync for DbOpenEv {
272 const NAME: &str = "db_open";
273 const NEEDS_CALLSITE: bool = false;
274 type Args = ArcStr;
275
276 fn prepare_args(&mut self, cached: &CachedVals) -> Option<Self::Args> {
277 cached.get::<ArcStr>(0)
278 }
279
280 fn eval(path: Self::Args) -> impl Future<Output = Value> + Send {
281 async move {
282 match tokio::task::spawn_blocking(move || sled::open(&*path)).await {
283 Err(e) => errf!("DbErr", "task panicked: {e}"),
284 Ok(Err(e)) => errf!("DbErr", "{e}"),
285 Ok(Ok(db)) => DB_WRAPPER.wrap(DbValue { inner: Arc::new(db) }),
286 }
287 }
288 }
289}
290
291pub(crate) type DbOpen = CachedArgsAsync<DbOpenEv>;
292
293#[derive(Debug, Default)]
296pub(crate) struct DbFlushEv;
297
298impl EvalCachedAsync for DbFlushEv {
299 const NAME: &str = "db_flush";
300 const NEEDS_CALLSITE: bool = false;
301 type Args = sled::Db;
302
303 fn prepare_args(&mut self, cached: &CachedVals) -> Option<Self::Args> {
304 get_db(cached, 0)
305 }
306
307 fn eval(db: Self::Args) -> impl Future<Output = Value> + Send {
308 async move {
309 match tokio::task::spawn_blocking(move || db.flush()).await {
310 Err(e) => errf!("DbErr", "task panicked: {e}"),
311 Ok(Err(e)) => errf!("DbErr", "{e}"),
312 Ok(Ok(_)) => Value::Null,
313 }
314 }
315 }
316}
317
318pub(crate) type DbFlush = CachedArgsAsync<DbFlushEv>;
319
320#[derive(Debug, Default)]
323pub(crate) struct DbGenerateIdEv;
324
325impl EvalCachedAsync for DbGenerateIdEv {
326 const NAME: &str = "db_generate_id";
327 const NEEDS_CALLSITE: bool = false;
328 type Args = sled::Db;
329
330 fn prepare_args(&mut self, cached: &CachedVals) -> Option<Self::Args> {
331 get_db(cached, 0)
332 }
333
334 fn eval(db: Self::Args) -> impl Future<Output = Value> + Send {
335 async move {
336 match tokio::task::spawn_blocking(move || db.generate_id()).await {
337 Err(e) => errf!("DbErr", "task panicked: {e}"),
338 Ok(Err(e)) => errf!("DbErr", "{e}"),
339 Ok(Ok(id)) => Value::U64(id),
340 }
341 }
342 }
343}
344
345pub(crate) type DbGenerateId = CachedArgsAsync<DbGenerateIdEv>;
346
347#[derive(Debug, Default)]
350pub(crate) struct DbTreeNamesEv;
351
352impl EvalCachedAsync for DbTreeNamesEv {
353 const NAME: &str = "db_tree_names";
354 const NEEDS_CALLSITE: bool = false;
355 type Args = sled::Db;
356
357 fn prepare_args(&mut self, cached: &CachedVals) -> Option<Self::Args> {
358 get_db(cached, 0)
359 }
360
361 fn eval(db: Self::Args) -> impl Future<Output = Value> + Send {
362 async move {
363 match tokio::task::spawn_blocking(move || db.tree_names()).await {
364 Err(e) => errf!("DbErr", "task panicked: {e}"),
365 Ok(names) => {
366 let mut vals: LPooled<Vec<_>> = names
367 .into_iter()
368 .filter_map(|ivec| {
369 std::str::from_utf8(&ivec)
370 .ok()
371 .map(|s| Value::String(ArcStr::from(s)))
372 })
373 .collect();
374 Value::Array(ValArray::from_iter_exact(vals.drain(..)))
375 }
376 }
377 }
378 }
379}
380
381pub(crate) type DbTreeNames = CachedArgsAsync<DbTreeNamesEv>;
382
383#[derive(Debug, Default)]
386pub(crate) struct DbDropTreeEv;
387
388impl EvalCachedAsync for DbDropTreeEv {
389 const NAME: &str = "db_drop_tree";
390 const NEEDS_CALLSITE: bool = false;
391 type Args = (sled::Db, ArcStr);
392
393 fn prepare_args(&mut self, cached: &CachedVals) -> Option<Self::Args> {
394 let db = get_db(cached, 0)?;
395 let name = cached.get::<ArcStr>(1)?;
396 Some((db, name))
397 }
398
399 fn eval((db, name): Self::Args) -> impl Future<Output = Value> + Send {
400 async move {
401 match tokio::task::spawn_blocking(move || db.drop_tree(name.as_bytes())).await
402 {
403 Err(e) => errf!("DbErr", "task panicked: {e}"),
404 Ok(Err(e)) => errf!("DbErr", "{e}"),
405 Ok(Ok(existed)) => Value::Bool(existed),
406 }
407 }
408 }
409}
410
411pub(crate) type DbDropTree = CachedArgsAsync<DbDropTreeEv>;
412
413#[derive(Debug)]
416pub(crate) struct DbTreeArgs {
417 db: sled::Db,
418 name: Option<ArcStr>,
419 key_typ: Option<Typ>,
420 key_typ_str: ArcStr,
421 val_typ_str: ArcStr,
422}
423
424#[derive(Debug, Default)]
425pub(crate) struct DbTreeEv {
426 key_typ: Option<Typ>,
427 key_typ_str: ArcStr,
428 val_typ_str: ArcStr,
429}
430
431impl EvalCachedAsync for DbTreeEv {
432 const NAME: &str = "db_tree";
433 const NEEDS_CALLSITE: bool = true;
434 type Args = DbTreeArgs;
435
436 fn init<R: Rt, E: UserEvent>(
437 _ctx: &mut ExecCtx<R, E>,
438 _typ: &FnType,
439 resolved: Option<&FnType>,
440 _scope: &Scope,
441 _from: &[Node<R, E>],
442 _top_id: ExprId,
443 ) -> Self {
444 let key_typ = extract_key_typ_from_rtype(resolved);
445 let (key_typ_str, val_typ_str) = extract_type_strings_from_rtype(resolved);
446 DbTreeEv { key_typ, key_typ_str, val_typ_str }
447 }
448
449 fn typecheck<R: Rt, E: UserEvent>(
450 &mut self,
451 _ctx: &mut ExecCtx<R, E>,
452 _from: &mut [Node<R, E>],
453 phase: TypecheckPhase<'_>,
454 ) -> Result<()> {
455 match phase {
456 TypecheckPhase::Lambda => Ok(()),
457 TypecheckPhase::CallSite(resolved) => {
458 self.key_typ = extract_key_typ_from_rtype(Some(resolved));
459 let (k, v) = extract_type_strings_from_rtype(Some(resolved));
460 self.key_typ_str = k;
461 self.val_typ_str = v;
462 if self.key_typ.is_none() {
463 bail!("db::tree requires concrete key and value types")
464 }
465 Ok(())
466 }
467 }
468 }
469
470 fn prepare_args(&mut self, cached: &CachedVals) -> Option<Self::Args> {
471 let db = get_db(cached, 0)?;
472 let name = match cached.0.get(1)?.as_ref()? {
473 Value::Null => None,
474 Value::String(s) => Some(s.clone()),
475 _ => return None,
476 };
477 Some(DbTreeArgs {
478 db,
479 name,
480 key_typ: self.key_typ,
481 key_typ_str: self.key_typ_str.clone(),
482 val_typ_str: self.val_typ_str.clone(),
483 })
484 }
485
486 fn eval(args: Self::Args) -> impl Future<Output = Value> + Send {
487 async move {
488 let DbTreeArgs { db, name, key_typ, key_typ_str, val_typ_str } = args;
489 match tokio::task::spawn_blocking(move || -> Result<Value> {
490 if !types_are_concrete(&key_typ_str, &val_typ_str) {
491 bail!("tree requires concrete type annotations")
492 }
493 let meta = db.open_tree(&META_TREE)?;
494 match name {
495 Some(name) => {
496 if &*name == DEFAULT_TREE_META
497 || name.as_bytes() == META_TREE.as_bytes()
498 {
499 bail!("tree name '{name}' is reserved");
500 }
501 check_or_store_meta(&meta, &name, &key_typ_str, &val_typ_str)?;
502 Ok(db
503 .open_tree(name.as_bytes())
504 .map(|tree| wrap_tree(tree, key_typ))?)
505 }
506 None => {
507 check_or_store_meta(
508 &meta,
509 &DEFAULT_TREE_META,
510 &key_typ_str,
511 &val_typ_str,
512 )?;
513 Ok(wrap_tree((*db).clone(), key_typ))
514 }
515 }
516 })
517 .await
518 {
519 Err(e) => errf!("DbErr", "task panicked: {e}"),
520 Ok(Err(e)) => errf!("DbErr", "{e:?}"),
521 Ok(Ok(v)) => v,
522 }
523 }
524 }
525}
526
527pub(crate) type DbTree = CachedArgsAsync<DbTreeEv>;
528
529#[derive(Debug, Default)]
534pub(crate) struct DbGetEv;
535
536impl EvalCachedAsync for DbGetEv {
537 const NAME: &str = "db_get";
538 const NEEDS_CALLSITE: bool = false;
539 type Args = (Arc<TreeInner>, GPooled<Vec<u8>>);
540
541 fn prepare_args(&mut self, cached: &CachedVals) -> Option<Self::Args> {
542 let tree = get_tree_inner(cached, 0)?;
543 let key_val = cached.0.get(1)?.as_ref()?;
544 let key = encode_key(tree.key_typ, key_val)?;
545 Some((tree, key))
546 }
547
548 fn eval((tree, key): Self::Args) -> impl Future<Output = Value> + Send {
549 async move {
550 match tokio::task::spawn_blocking(move || tree.tree.get(&*key)).await {
551 Err(e) => errf!("DbErr", "task panicked: {e}"),
552 Ok(Err(e)) => errf!("DbErr", "{e}"),
553 Ok(Ok(None)) => Value::Null,
554 Ok(Ok(Some(ivec))) => match decode_value(&ivec) {
555 Some(v) => v,
556 None => errf!("DbErr", "failed to decode value"),
557 },
558 }
559 }
560 }
561}
562
563pub(crate) type DbGet = CachedArgsAsync<DbGetEv>;
564
565#[derive(Debug, Default)]
568pub(crate) struct DbInsertEv;
569
570impl EvalCachedAsync for DbInsertEv {
571 const NAME: &str = "db_insert";
572 const NEEDS_CALLSITE: bool = false;
573 type Args = (Arc<TreeInner>, GPooled<Vec<u8>>, GPooled<Vec<u8>>);
574
575 fn prepare_args(&mut self, cached: &CachedVals) -> Option<Self::Args> {
576 let tree = get_tree_inner(cached, 0)?;
577 let key_val = cached.0.get(1)?.as_ref()?;
578 let key = encode_key(tree.key_typ, key_val)?;
579 let val = encode_value(cached.0.get(2)?.as_ref()?)?;
580 Some((tree, key, val))
581 }
582
583 fn eval((tree, key, val): Self::Args) -> impl Future<Output = Value> + Send {
584 async move {
585 match tokio::task::spawn_blocking(move || {
586 tree.tree.insert(&*key, val.as_slice())
587 })
588 .await
589 {
590 Err(e) => errf!("DbErr", "task panicked: {e}"),
591 Ok(Err(e)) => errf!("DbErr", "{e}"),
592 Ok(Ok(None)) => Value::Null,
593 Ok(Ok(Some(old))) => match decode_value(&old) {
594 Some(v) => v,
595 None => errf!("DbErr", "failed to decode previous value"),
596 },
597 }
598 }
599 }
600}
601
602pub(crate) type DbInsert = CachedArgsAsync<DbInsertEv>;
603
604#[derive(Debug, Default)]
607pub(crate) struct DbRemoveEv;
608
609impl EvalCachedAsync for DbRemoveEv {
610 const NAME: &str = "db_remove";
611 const NEEDS_CALLSITE: bool = false;
612 type Args = (Arc<TreeInner>, GPooled<Vec<u8>>);
613
614 fn prepare_args(&mut self, cached: &CachedVals) -> Option<Self::Args> {
615 let tree = get_tree_inner(cached, 0)?;
616 let key_val = cached.0.get(1)?.as_ref()?;
617 let key = encode_key(tree.key_typ, key_val)?;
618 Some((tree, key))
619 }
620
621 fn eval((tree, key): Self::Args) -> impl Future<Output = Value> + Send {
622 async move {
623 match tokio::task::spawn_blocking(move || tree.tree.remove(&*key)).await {
624 Err(e) => errf!("DbErr", "task panicked: {e}"),
625 Ok(Err(e)) => errf!("DbErr", "{e}"),
626 Ok(Ok(None)) => Value::Null,
627 Ok(Ok(Some(old))) => match decode_value(&old) {
628 Some(v) => v,
629 None => errf!("DbErr", "failed to decode previous value"),
630 },
631 }
632 }
633 }
634}
635
636pub(crate) type DbRemove = CachedArgsAsync<DbRemoveEv>;
637
638#[derive(Debug, Default)]
641pub(crate) struct DbContainsKeyEv;
642
643impl EvalCachedAsync for DbContainsKeyEv {
644 const NAME: &str = "db_contains_key";
645 const NEEDS_CALLSITE: bool = false;
646 type Args = (Arc<TreeInner>, GPooled<Vec<u8>>);
647
648 fn prepare_args(&mut self, cached: &CachedVals) -> Option<Self::Args> {
649 let tree = get_tree_inner(cached, 0)?;
650 let key_val = cached.0.get(1)?.as_ref()?;
651 let key = encode_key(tree.key_typ, key_val)?;
652 Some((tree, key))
653 }
654
655 fn eval((tree, key): Self::Args) -> impl Future<Output = Value> + Send {
656 async move {
657 match tokio::task::spawn_blocking(move || tree.tree.contains_key(&*key)).await
658 {
659 Err(e) => errf!("DbErr", "task panicked: {e}"),
660 Ok(Err(e)) => errf!("DbErr", "{e}"),
661 Ok(Ok(exists)) => Value::Bool(exists),
662 }
663 }
664 }
665}
666
667pub(crate) type DbContainsKey = CachedArgsAsync<DbContainsKeyEv>;
668
669#[derive(Debug, Default)]
672pub(crate) struct DbGetManyEv;
673
674impl EvalCachedAsync for DbGetManyEv {
675 const NAME: &str = "db_get_many";
676 const NEEDS_CALLSITE: bool = false;
677 type Args = (Arc<TreeInner>, GPooled<Vec<GPooled<Vec<u8>>>>);
678
679 fn prepare_args(&mut self, cached: &CachedVals) -> Option<Self::Args> {
680 let tree = get_tree_inner(cached, 0)?;
681 let arr = match cached.0.get(1)?.as_ref()? {
682 Value::Array(a) => a,
683 _ => return None,
684 };
685 let mut keys = ENCODE_MANY_POOL.take();
686 for k in arr.iter() {
687 keys.push(encode_key(tree.key_typ, k)?);
688 }
689 Some((tree, keys))
690 }
691
692 fn eval((tree, keys): Self::Args) -> impl Future<Output = Value> + Send {
693 async move {
694 match tokio::task::spawn_blocking(move || {
695 let mut results: LPooled<Vec<Value>> = LPooled::take();
696 for key in keys.iter() {
697 match tree.tree.get(&**key) {
698 Err(e) => return Err(errf!("DbErr", "{e}")),
699 Ok(None) => results.push(Value::Null),
700 Ok(Some(ivec)) => match decode_value(&ivec) {
701 Some(v) => results.push(v),
702 None => return Err(errf!("DbErr", "failed to decode value")),
703 },
704 }
705 }
706 Ok(Value::Array(ValArray::from_iter_exact(results.drain(..))))
707 })
708 .await
709 {
710 Err(e) => errf!("DbErr", "task panicked: {e}"),
711 Ok(Err(e)) => e,
712 Ok(Ok(v)) => v,
713 }
714 }
715 }
716}
717
718pub(crate) type DbGetMany = CachedArgsAsync<DbGetManyEv>;
719
720fn decode_kv_result(
723 tree: &TreeInner,
724 result: sled::Result<Option<(sled::IVec, sled::IVec)>>,
725) -> Value {
726 match result {
727 Err(e) => errf!("DbErr", "{e}"),
728 Ok(None) => Value::Null,
729 Ok(Some((k, v))) => match (decode_key(tree.key_typ, &k), decode_value(&v)) {
730 (Some(key), Some(val)) => Value::Array(ValArray::from([key, val])),
731 _ => errf!("DbErr", "failed to decode entry"),
732 },
733 }
734}
735
736#[derive(Debug, Default)]
741pub(crate) struct DbFirstEv;
742
743impl EvalCachedAsync for DbFirstEv {
744 const NAME: &str = "db_first";
745 const NEEDS_CALLSITE: bool = false;
746 type Args = Arc<TreeInner>;
747
748 fn prepare_args(&mut self, cached: &CachedVals) -> Option<Self::Args> {
749 get_tree_inner(cached, 0)
750 }
751
752 fn eval(tree: Self::Args) -> impl Future<Output = Value> + Send {
753 async move {
754 match tokio::task::spawn_blocking(move || {
755 let result = tree.tree.first();
756 decode_kv_result(&tree, result)
757 })
758 .await
759 {
760 Err(e) => errf!("DbErr", "task panicked: {e}"),
761 Ok(v) => v,
762 }
763 }
764 }
765}
766
767pub(crate) type DbFirst = CachedArgsAsync<DbFirstEv>;
768
769#[derive(Debug, Default)]
772pub(crate) struct DbLastEv;
773
774impl EvalCachedAsync for DbLastEv {
775 const NAME: &str = "db_last";
776 const NEEDS_CALLSITE: bool = false;
777 type Args = Arc<TreeInner>;
778
779 fn prepare_args(&mut self, cached: &CachedVals) -> Option<Self::Args> {
780 get_tree_inner(cached, 0)
781 }
782
783 fn eval(tree: Self::Args) -> impl Future<Output = Value> + Send {
784 async move {
785 match tokio::task::spawn_blocking(move || {
786 let result = tree.tree.last();
787 decode_kv_result(&tree, result)
788 })
789 .await
790 {
791 Err(e) => errf!("DbErr", "task panicked: {e}"),
792 Ok(v) => v,
793 }
794 }
795 }
796}
797
798pub(crate) type DbLast = CachedArgsAsync<DbLastEv>;
799
800#[derive(Debug, Default)]
803pub(crate) struct DbPopMinEv;
804
805impl EvalCachedAsync for DbPopMinEv {
806 const NAME: &str = "db_pop_min";
807 const NEEDS_CALLSITE: bool = false;
808 type Args = Arc<TreeInner>;
809
810 fn prepare_args(&mut self, cached: &CachedVals) -> Option<Self::Args> {
811 get_tree_inner(cached, 0)
812 }
813
814 fn eval(tree: Self::Args) -> impl Future<Output = Value> + Send {
815 async move {
816 match tokio::task::spawn_blocking(move || {
817 let result = tree.tree.pop_min();
818 decode_kv_result(&tree, result)
819 })
820 .await
821 {
822 Err(e) => errf!("DbErr", "task panicked: {e}"),
823 Ok(v) => v,
824 }
825 }
826 }
827}
828
829pub(crate) type DbPopMin = CachedArgsAsync<DbPopMinEv>;
830
831#[derive(Debug, Default)]
834pub(crate) struct DbPopMaxEv;
835
836impl EvalCachedAsync for DbPopMaxEv {
837 const NAME: &str = "db_pop_max";
838 const NEEDS_CALLSITE: bool = false;
839 type Args = Arc<TreeInner>;
840
841 fn prepare_args(&mut self, cached: &CachedVals) -> Option<Self::Args> {
842 get_tree_inner(cached, 0)
843 }
844
845 fn eval(tree: Self::Args) -> impl Future<Output = Value> + Send {
846 async move {
847 match tokio::task::spawn_blocking(move || {
848 let result = tree.tree.pop_max();
849 decode_kv_result(&tree, result)
850 })
851 .await
852 {
853 Err(e) => errf!("DbErr", "task panicked: {e}"),
854 Ok(v) => v,
855 }
856 }
857 }
858}
859
860pub(crate) type DbPopMax = CachedArgsAsync<DbPopMaxEv>;
861
862#[derive(Debug, Default)]
865pub(crate) struct DbGetLtEv;
866
867impl EvalCachedAsync for DbGetLtEv {
868 const NAME: &str = "db_get_lt";
869 const NEEDS_CALLSITE: bool = false;
870 type Args = (Arc<TreeInner>, GPooled<Vec<u8>>);
871
872 fn prepare_args(&mut self, cached: &CachedVals) -> Option<Self::Args> {
873 let tree = get_tree_inner(cached, 0)?;
874 let key_val = cached.0.get(1)?.as_ref()?;
875 let key = encode_key(tree.key_typ, key_val)?;
876 Some((tree, key))
877 }
878
879 fn eval((tree, key): Self::Args) -> impl Future<Output = Value> + Send {
880 async move {
881 match tokio::task::spawn_blocking(move || {
882 let result = tree.tree.get_lt(&*key);
883 decode_kv_result(&tree, result)
884 })
885 .await
886 {
887 Err(e) => errf!("DbErr", "task panicked: {e}"),
888 Ok(v) => v,
889 }
890 }
891 }
892}
893
894pub(crate) type DbGetLt = CachedArgsAsync<DbGetLtEv>;
895
896#[derive(Debug, Default)]
899pub(crate) struct DbGetGtEv;
900
901impl EvalCachedAsync for DbGetGtEv {
902 const NAME: &str = "db_get_gt";
903 const NEEDS_CALLSITE: bool = false;
904 type Args = (Arc<TreeInner>, GPooled<Vec<u8>>);
905
906 fn prepare_args(&mut self, cached: &CachedVals) -> Option<Self::Args> {
907 let tree = get_tree_inner(cached, 0)?;
908 let key_val = cached.0.get(1)?.as_ref()?;
909 let key = encode_key(tree.key_typ, key_val)?;
910 Some((tree, key))
911 }
912
913 fn eval((tree, key): Self::Args) -> impl Future<Output = Value> + Send {
914 async move {
915 match tokio::task::spawn_blocking(move || {
916 let result = tree.tree.get_gt(&*key);
917 decode_kv_result(&tree, result)
918 })
919 .await
920 {
921 Err(e) => errf!("DbErr", "task panicked: {e}"),
922 Ok(v) => v,
923 }
924 }
925 }
926}
927
928pub(crate) type DbGetGt = CachedArgsAsync<DbGetGtEv>;
929
930#[derive(Debug, Default)]
935pub(crate) struct DbCompareAndSwapEv;
936
937impl EvalCachedAsync for DbCompareAndSwapEv {
938 const NAME: &str = "db_compare_and_swap";
939 const NEEDS_CALLSITE: bool = false;
940 type Args = (
941 Arc<TreeInner>,
942 GPooled<Vec<u8>>,
943 Option<GPooled<Vec<u8>>>,
944 Option<GPooled<Vec<u8>>>,
945 );
946
947 fn prepare_args(&mut self, cached: &CachedVals) -> Option<Self::Args> {
948 let tree = get_tree_inner(cached, 0)?;
949 let key_val = cached.0.get(1)?.as_ref()?;
950 let key = encode_key(tree.key_typ, key_val)?;
951 let old_val = match cached.0.get(2)?.as_ref()? {
952 Value::Null => None,
953 v => Some(encode_value(v)?),
954 };
955 let new_val = match cached.0.get(3)?.as_ref()? {
956 Value::Null => None,
957 v => Some(encode_value(v)?),
958 };
959 Some((tree, key, old_val, new_val))
960 }
961
962 fn eval(
963 (tree, key, old_val, new_val): Self::Args,
964 ) -> impl Future<Output = Value> + Send {
965 async move {
966 match tokio::task::spawn_blocking(move || {
967 let old_ref: Option<&[u8]> = old_val.as_ref().map(|v| v.as_slice());
968 let new_ref: Option<&[u8]> = new_val.as_ref().map(|v| v.as_slice());
969 tree.tree.compare_and_swap(key.as_slice(), old_ref, new_ref)
970 })
971 .await
972 {
973 Err(e) => errf!("DbErr", "task panicked: {e}"),
974 Ok(Err(e)) => errf!("DbErr", "{e}"),
975 Ok(Ok(Ok(()))) => Value::Null,
976 Ok(Ok(Err(cas_err))) => {
977 let current = match cas_err.current {
978 None => Value::Null,
979 Some(ivec) => match decode_value(&ivec) {
980 Some(v) => v,
981 None => {
982 return errf!("DbErr", "failed to decode current value")
983 }
984 },
985 };
986 Value::Array(ValArray::from([
987 Value::String(arcstr::literal!("Mismatch")),
988 current,
989 ]))
990 }
991 }
992 }
993 }
994}
995
996pub(crate) type DbCompareAndSwap = CachedArgsAsync<DbCompareAndSwapEv>;
997
998#[derive(Debug, Default)]
1001pub(crate) struct DbBatchEv;
1002
1003impl EvalCachedAsync for DbBatchEv {
1004 const NAME: &str = "db_batch";
1005 const NEEDS_CALLSITE: bool = false;
1006 type Args = (Arc<TreeInner>, sled::Batch);
1007
1008 fn prepare_args(&mut self, cached: &CachedVals) -> Option<Self::Args> {
1009 let tree = get_tree_inner(cached, 0)?;
1010 let arr = match cached.0.get(1)?.as_ref()? {
1011 Value::Array(a) => a,
1012 _ => return None,
1013 };
1014 let batch = parse_batch_ops(tree.key_typ, arr)?;
1015 Some((tree, batch))
1016 }
1017
1018 fn eval((tree, batch): Self::Args) -> impl Future<Output = Value> + Send {
1019 async move {
1020 match tokio::task::spawn_blocking(move || tree.tree.apply_batch(batch)).await
1021 {
1022 Err(e) => errf!("DbErr", "task panicked: {e}"),
1023 Ok(Err(e)) => errf!("DbErr", "{e}"),
1024 Ok(Ok(())) => Value::Null,
1025 }
1026 }
1027 }
1028}
1029
1030pub(crate) type DbBatch = CachedArgsAsync<DbBatchEv>;
1031
1032#[derive(Debug, Default)]
1037pub(crate) struct DbLenEv;
1038
1039impl EvalCachedAsync for DbLenEv {
1040 const NAME: &str = "db_len";
1041 const NEEDS_CALLSITE: bool = false;
1042 type Args = Arc<TreeInner>;
1043
1044 fn prepare_args(&mut self, cached: &CachedVals) -> Option<Self::Args> {
1045 get_tree_inner(cached, 0)
1046 }
1047
1048 fn eval(tree: Self::Args) -> impl Future<Output = Value> + Send {
1049 async move {
1050 match tokio::task::spawn_blocking(move || tree.tree.len()).await {
1051 Err(e) => errf!("DbErr", "task panicked: {e}"),
1052 Ok(len) => Value::U64(len as u64),
1053 }
1054 }
1055 }
1056}
1057
1058pub(crate) type DbLen = CachedArgsAsync<DbLenEv>;
1059
1060#[derive(Debug, Default)]
1063pub(crate) struct DbIsEmptyEv;
1064
1065impl EvalCachedAsync for DbIsEmptyEv {
1066 const NAME: &str = "db_is_empty";
1067 const NEEDS_CALLSITE: bool = false;
1068 type Args = Arc<TreeInner>;
1069
1070 fn prepare_args(&mut self, cached: &CachedVals) -> Option<Self::Args> {
1071 get_tree_inner(cached, 0)
1072 }
1073
1074 fn eval(tree: Self::Args) -> impl Future<Output = Value> + Send {
1075 async move {
1076 match tokio::task::spawn_blocking(move || tree.tree.is_empty()).await {
1077 Err(e) => errf!("DbErr", "task panicked: {e}"),
1078 Ok(empty) => Value::Bool(empty),
1079 }
1080 }
1081 }
1082}
1083
1084pub(crate) type DbIsEmpty = CachedArgsAsync<DbIsEmptyEv>;
1085
1086#[derive(Debug, Default)]
1091pub(crate) struct DbSizeOnDiskEv;
1092
1093impl EvalCachedAsync for DbSizeOnDiskEv {
1094 const NAME: &str = "db_size_on_disk";
1095 const NEEDS_CALLSITE: bool = false;
1096 type Args = sled::Db;
1097
1098 fn prepare_args(&mut self, cached: &CachedVals) -> Option<Self::Args> {
1099 get_db(cached, 0)
1100 }
1101
1102 fn eval(db: Self::Args) -> impl Future<Output = Value> + Send {
1103 async move {
1104 match tokio::task::spawn_blocking(move || db.size_on_disk()).await {
1105 Err(e) => errf!("DbErr", "task panicked: {e}"),
1106 Ok(Err(e)) => errf!("DbErr", "{e}"),
1107 Ok(Ok(size)) => Value::U64(size),
1108 }
1109 }
1110 }
1111}
1112
1113pub(crate) type DbSizeOnDisk = CachedArgsAsync<DbSizeOnDiskEv>;
1114
1115#[derive(Debug, Default)]
1118pub(crate) struct DbWasRecoveredEv;
1119
1120impl EvalCachedAsync for DbWasRecoveredEv {
1121 const NAME: &str = "db_was_recovered";
1122 const NEEDS_CALLSITE: bool = false;
1123 type Args = sled::Db;
1124
1125 fn prepare_args(&mut self, cached: &CachedVals) -> Option<Self::Args> {
1126 get_db(cached, 0)
1127 }
1128
1129 fn eval(db: Self::Args) -> impl Future<Output = Value> + Send {
1130 async move {
1131 match tokio::task::spawn_blocking(move || db.was_recovered()).await {
1132 Err(e) => errf!("DbErr", "task panicked: {e}"),
1133 Ok(recovered) => Value::Bool(recovered),
1134 }
1135 }
1136 }
1137}
1138
1139pub(crate) type DbWasRecovered = CachedArgsAsync<DbWasRecoveredEv>;
1140
1141#[derive(Debug, Default)]
1144pub(crate) struct DbChecksumEv;
1145
1146impl EvalCachedAsync for DbChecksumEv {
1147 const NAME: &str = "db_checksum";
1148 const NEEDS_CALLSITE: bool = false;
1149 type Args = sled::Db;
1150
1151 fn prepare_args(&mut self, cached: &CachedVals) -> Option<Self::Args> {
1152 get_db(cached, 0)
1153 }
1154
1155 fn eval(db: Self::Args) -> impl Future<Output = Value> + Send {
1156 async move {
1157 match tokio::task::spawn_blocking(move || db.checksum()).await {
1158 Err(e) => errf!("DbErr", "task panicked: {e}"),
1159 Ok(Err(e)) => errf!("DbErr", "{e}"),
1160 Ok(Ok(crc)) => Value::U32(crc),
1161 }
1162 }
1163 }
1164}
1165
1166pub(crate) type DbChecksum = CachedArgsAsync<DbChecksumEv>;
1167
1168#[derive(Pack)]
1171struct ExportTree {
1172 typ: Vec<u8>,
1173 name: Vec<u8>,
1174 entries: Vec<Vec<Vec<u8>>>,
1175}
1176
1177#[derive(Pack)]
1178struct ExportData {
1179 trees: Vec<ExportTree>,
1180}
1181
1182#[derive(Debug, Default)]
1185pub(crate) struct DbExportEv;
1186
1187impl EvalCachedAsync for DbExportEv {
1188 const NAME: &str = "db_export";
1189 const NEEDS_CALLSITE: bool = false;
1190 type Args = (sled::Db, ArcStr);
1191
1192 fn prepare_args(&mut self, cached: &CachedVals) -> Option<Self::Args> {
1193 let db = get_db(cached, 0)?;
1194 let path = cached.get::<ArcStr>(1)?;
1195 Some((db, path))
1196 }
1197
1198 fn eval((db, path): Self::Args) -> impl Future<Output = Value> + Send {
1199 async move {
1200 match tokio::task::spawn_blocking(move || {
1201 use std::io::Write;
1202 let data = ExportData {
1203 trees: db
1204 .export()
1205 .into_iter()
1206 .map(|(typ, name, iter)| ExportTree {
1207 typ,
1208 name,
1209 entries: iter.collect(),
1210 })
1211 .collect(),
1212 };
1213 let mut buf = Vec::with_capacity(data.encoded_len());
1214 data.encode(&mut buf).map_err(|e| errf!("DbErr", "{e}"))?;
1215 let file =
1216 std::fs::File::create(&*path).map_err(|e| errf!("DbErr", "{e}"))?;
1217 let mut w = std::io::BufWriter::new(file);
1218 w.write_all(&buf).map_err(|e| errf!("DbErr", "{e}"))?;
1219 w.flush().map_err(|e| errf!("DbErr", "{e}"))?;
1220 Ok(Value::Null)
1221 })
1222 .await
1223 {
1224 Err(e) => errf!("DbErr", "task panicked: {e}"),
1225 Ok(Err(e)) => e,
1226 Ok(Ok(v)) => v,
1227 }
1228 }
1229 }
1230}
1231
1232pub(crate) type DbExport = CachedArgsAsync<DbExportEv>;
1233
1234#[derive(Debug, Default)]
1237pub(crate) struct DbImportEv;
1238
1239impl EvalCachedAsync for DbImportEv {
1240 const NAME: &str = "db_import";
1241 const NEEDS_CALLSITE: bool = false;
1242 type Args = (sled::Db, ArcStr);
1243
1244 fn prepare_args(&mut self, cached: &CachedVals) -> Option<Self::Args> {
1245 let db = get_db(cached, 0)?;
1246 let path = cached.get::<ArcStr>(1)?;
1247 Some((db, path))
1248 }
1249
1250 fn eval((db, path): Self::Args) -> impl Future<Output = Value> + Send {
1251 async move {
1252 match tokio::task::spawn_blocking(move || {
1253 let buf = std::fs::read(&*path).map_err(|e| errf!("DbErr", "{e}"))?;
1254 let data = ExportData::decode(&mut buf.as_slice())
1255 .map_err(|e| errf!("DbErr", "{e}"))?;
1256 let collections: Vec<_> = data
1257 .trees
1258 .into_iter()
1259 .map(|t| (t.typ, t.name, t.entries.into_iter()))
1260 .collect();
1261 db.import(collections);
1262 Ok(Value::Null)
1263 })
1264 .await
1265 {
1266 Err(e) => errf!("DbErr", "task panicked: {e}"),
1267 Ok(Err(e)) => e,
1268 Ok(Ok(v)) => v,
1269 }
1270 }
1271 }
1272}
1273
1274pub(crate) type DbImport = CachedArgsAsync<DbImportEv>;