Skip to main content

graphix_package_db/
tree.rs

1use anyhow::{bail, Result};
2use arcstr::{literal, ArcStr};
3use compact_str::format_compact;
4use graphix_compiler::{
5    errf,
6    expr::ExprId,
7    typ::{FnType, Type},
8    ExecCtx, Node, Rt, Scope, TypecheckPhase, UserEvent,
9};
10use graphix_package_core::{CachedArgsAsync, CachedVals, EvalCachedAsync};
11use netidx::{path::Path, publisher::Typ};
12use netidx_core::pack::Pack;
13use netidx_derive::Pack;
14use netidx_value::{ValArray, Value};
15use poolshark::{global::GPooled, local::LPooled};
16use std::sync::Arc;
17
18use crate::encoding::{
19    decode_key, decode_value, encode_key, encode_value, parse_batch_ops, ENCODE_MANY_POOL,
20};
21
22// ── Abstract types ────────────────────────────────────────────────
23
24// -- DbValue --
25
26#[derive(Debug, Clone)]
27pub struct DbValue {
28    pub(crate) inner: Arc<sled::Db>,
29}
30
31graphix_package_core::impl_abstract_arc!(DbValue, pub(crate) static DB_WRAPPER = [
32    0xd1, 0xe2, 0xf3, 0x04, 0x15, 0x26, 0x47, 0x38,
33    0x49, 0x5a, 0x6b, 0x7c, 0x8d, 0x9e, 0xaf, 0xb0,
34]);
35
36pub(crate) fn get_db(cached: &CachedVals, idx: usize) -> Option<sled::Db> {
37    match cached.0.get(idx)?.as_ref()? {
38        Value::Abstract(a) => {
39            let dv = a.downcast_ref::<DbValue>()?;
40            Some((*dv.inner).clone())
41        }
42        _ => None,
43    }
44}
45
46// -- TreeInner --
47
48#[derive(Debug)]
49pub(crate) struct TreeInner {
50    pub(crate) tree: sled::Tree,
51    pub(crate) key_typ: Option<Typ>,
52}
53
54// -- TreeValue --
55
56#[derive(Debug, Clone)]
57pub struct TreeValue {
58    pub(crate) inner: Arc<TreeInner>,
59}
60
61graphix_package_core::impl_abstract_arc!(TreeValue, pub(crate) static TREE_WRAPPER = [
62    0xd2, 0xe3, 0xf4, 0x05, 0x16, 0x27, 0x48, 0x39,
63    0x4a, 0x5b, 0x6c, 0x7d, 0x8e, 0x9f, 0xa0, 0xb1,
64]);
65
66pub(crate) fn get_tree_inner(cached: &CachedVals, idx: usize) -> Option<Arc<TreeInner>> {
67    match cached.0.get(idx)?.as_ref()? {
68        Value::Abstract(a) => {
69            let tv = a.downcast_ref::<TreeValue>()?;
70            Some(tv.inner.clone())
71        }
72        _ => None,
73    }
74}
75
76pub(crate) fn wrap_tree(tree: sled::Tree, key_typ: Option<Typ>) -> Value {
77    TREE_WRAPPER.wrap(TreeValue { inner: Arc::new(TreeInner { tree, key_typ }) })
78}
79
80// ── Tree metadata ─────────────────────────────────────────────────
81
82pub(crate) static META_TREE: ArcStr = literal!("$$__graphix_meta__$$");
83pub(crate) static DEFAULT_TREE_META: ArcStr = literal!("$$__graphix_default__$$");
84
85// ── MetaStore trait ──────────────────────────────────────────────
86//
87// Unifies sled::Tree (CAS-based) and TransactionalTree (get+insert)
88// so that check_or_store_meta works in both contexts.
89
90pub(crate) trait MetaStore {
91    fn get(&self, key: &[u8]) -> Result<Option<sled::IVec>>;
92    fn insert_if_absent(&self, key: &[u8], value: &[u8]) -> Result<Option<sled::IVec>>;
93}
94
95impl MetaStore for sled::Tree {
96    fn get(&self, key: &[u8]) -> Result<Option<sled::IVec>> {
97        Ok(sled::Tree::get(self, key)?)
98    }
99
100    fn insert_if_absent(&self, key: &[u8], value: &[u8]) -> Result<Option<sled::IVec>> {
101        match self.compare_and_swap(key, None as Option<&[u8]>, Some(value))? {
102            Ok(()) => Ok(None),
103            Err(cas_err) => Ok(cas_err.current),
104        }
105    }
106}
107
108impl MetaStore for sled::transaction::TransactionalTree {
109    fn get(&self, key: &[u8]) -> Result<Option<sled::IVec>> {
110        Ok(sled::transaction::TransactionalTree::get(self, key)?)
111    }
112
113    fn insert_if_absent(&self, key: &[u8], value: &[u8]) -> Result<Option<sled::IVec>> {
114        match sled::transaction::TransactionalTree::get(self, key)? {
115            Some(existing) => Ok(Some(existing)),
116            None => {
117                self.insert(key, value)?;
118                Ok(None)
119            }
120        }
121    }
122}
123
124pub(crate) fn read_meta(
125    meta: &impl MetaStore,
126    tree_name: &str,
127) -> Result<Option<(ArcStr, ArcStr)>> {
128    match meta.get(tree_name.as_bytes())? {
129        None => Ok(None),
130        Some(stored) => {
131            let stored = std::str::from_utf8(&stored)?;
132            let mut parts = stored.splitn(2, '\0');
133            let k = parts.next().unwrap_or("?");
134            let v = parts.next().unwrap_or("?");
135            Ok(Some((ArcStr::from(k), ArcStr::from(v))))
136        }
137    }
138}
139
140pub(crate) fn check_or_store_meta(
141    meta: &impl MetaStore,
142    tree_name: &str,
143    key_typ_str: &str,
144    val_typ_str: &str,
145) -> Result<()> {
146    let meta_val = format_compact!("{key_typ_str}\0{val_typ_str}");
147    match meta.insert_if_absent(tree_name.as_bytes(), meta_val.as_bytes())? {
148        None => Ok(()),
149        Some(existing) => {
150            let stored = std::str::from_utf8(&existing)?;
151            let mut parts = stored.splitn(2, '\0');
152            let sk = parts.next().unwrap_or("?");
153            let sv = parts.next().unwrap_or("?");
154            if sk != key_typ_str || sv != val_typ_str {
155                bail!(
156                    "tree '{tree_name}' has type Tree<{sk}, {sv}> \
157                    but was opened as Tree<{key_typ_str}, {val_typ_str}>"
158                )
159            } else {
160                Ok(())
161            }
162        }
163    }
164}
165
166// ── Type extraction helpers ──────────────────────────────────────
167
168fn prim_typ(t: &Type) -> Option<Typ> {
169    match t {
170        Type::Primitive(flags) if flags.iter().count() == 1 => flags.iter().next(),
171        _ => None,
172    }
173}
174
175// The resolved return type is Ref("/Result", [Ref("/Tree"|"/TxnTree", [k, v]), ...]).
176fn find_tree_params(t: &Type) -> Option<&[Type]> {
177    match t {
178        Type::Ref { name, params, .. } if Path::basename(&**name) == Some("Result") => {
179            params.iter().find_map(|p| match p {
180                Type::Ref { name, params, .. }
181                    if matches!(Path::basename(&**name), Some("Tree" | "TxnTree"))
182                        && params.len() == 2 =>
183                {
184                    Some(&**params)
185                }
186                _ => None,
187            })
188        }
189        _ => None,
190    }
191}
192
193pub(crate) fn extract_key_typ_from_rtype(resolved_typ: Option<&FnType>) -> Option<Typ> {
194    let ft = resolved_typ?;
195    find_tree_params(&ft.rtype).and_then(|params| prim_typ(&params[0]))
196}
197
198pub(crate) fn extract_type_strings_from_rtype(
199    resolved_typ: Option<&FnType>,
200) -> (ArcStr, ArcStr) {
201    let Some(ft) = resolved_typ else {
202        return (arcstr::literal!("?"), arcstr::literal!("?"));
203    };
204    match find_tree_params(&ft.rtype) {
205        Some(params) if params.len() >= 2 => (
206            ArcStr::from(format!("{}", params[0]).as_str()),
207            ArcStr::from(format!("{}", params[1]).as_str()),
208        ),
209        _ => (arcstr::literal!("?"), arcstr::literal!("?")),
210    }
211}
212
213pub(crate) fn types_are_concrete(key_typ_str: &str, val_typ_str: &str) -> bool {
214    fn concrete(s: &str) -> bool {
215        s != "?" && !s.starts_with('\'')
216    }
217    concrete(key_typ_str) && concrete(val_typ_str)
218}
219
220// ── Builtins ──────────────────────────────────────────────────────
221
222// -- DbGetType --
223
224#[derive(Debug, Default)]
225pub(crate) struct DbGetTypeEv;
226
227impl EvalCachedAsync for DbGetTypeEv {
228    const NAME: &str = "db_get_type";
229    const NEEDS_CALLSITE: bool = false;
230    type Args = (sled::Db, ArcStr);
231
232    fn prepare_args(&mut self, cached: &CachedVals) -> Option<Self::Args> {
233        let db = get_db(cached, 0)?;
234        let name = match cached.0.get(1)?.as_ref()? {
235            Value::Null => DEFAULT_TREE_META.clone(),
236            Value::String(s) => s.clone(),
237            _ => return None,
238        };
239        Some((db, name))
240    }
241
242    fn eval((db, name): Self::Args) -> impl Future<Output = Value> + Send {
243        async move {
244            match tokio::task::spawn_blocking(move || -> Result<Value> {
245                let meta = db.open_tree(&*META_TREE)?;
246                match read_meta(&meta, &name)? {
247                    None => Ok(Value::Null),
248                    Some((k, v)) => Ok(Value::Array(ValArray::from([
249                        Value::String(k),
250                        Value::String(v),
251                    ]))),
252                }
253            })
254            .await
255            {
256                Err(e) => errf!("DbErr", "task panicked: {e:?}"),
257                Ok(Err(e)) => errf!("DbErr", "{e:?}"),
258                Ok(Ok(v)) => v,
259            }
260        }
261    }
262}
263
264pub(crate) type DbGetType = CachedArgsAsync<DbGetTypeEv>;
265
266// -- DbOpen --
267
268#[derive(Debug, Default)]
269pub(crate) struct DbOpenEv;
270
271impl EvalCachedAsync for DbOpenEv {
272    const NAME: &str = "db_open";
273    const NEEDS_CALLSITE: bool = false;
274    type Args = ArcStr;
275
276    fn prepare_args(&mut self, cached: &CachedVals) -> Option<Self::Args> {
277        cached.get::<ArcStr>(0)
278    }
279
280    fn eval(path: Self::Args) -> impl Future<Output = Value> + Send {
281        async move {
282            match tokio::task::spawn_blocking(move || sled::open(&*path)).await {
283                Err(e) => errf!("DbErr", "task panicked: {e}"),
284                Ok(Err(e)) => errf!("DbErr", "{e}"),
285                Ok(Ok(db)) => DB_WRAPPER.wrap(DbValue { inner: Arc::new(db) }),
286            }
287        }
288    }
289}
290
291pub(crate) type DbOpen = CachedArgsAsync<DbOpenEv>;
292
293// -- DbFlush --
294
295#[derive(Debug, Default)]
296pub(crate) struct DbFlushEv;
297
298impl EvalCachedAsync for DbFlushEv {
299    const NAME: &str = "db_flush";
300    const NEEDS_CALLSITE: bool = false;
301    type Args = sled::Db;
302
303    fn prepare_args(&mut self, cached: &CachedVals) -> Option<Self::Args> {
304        get_db(cached, 0)
305    }
306
307    fn eval(db: Self::Args) -> impl Future<Output = Value> + Send {
308        async move {
309            match tokio::task::spawn_blocking(move || db.flush()).await {
310                Err(e) => errf!("DbErr", "task panicked: {e}"),
311                Ok(Err(e)) => errf!("DbErr", "{e}"),
312                Ok(Ok(_)) => Value::Null,
313            }
314        }
315    }
316}
317
318pub(crate) type DbFlush = CachedArgsAsync<DbFlushEv>;
319
320// -- DbGenerateId --
321
322#[derive(Debug, Default)]
323pub(crate) struct DbGenerateIdEv;
324
325impl EvalCachedAsync for DbGenerateIdEv {
326    const NAME: &str = "db_generate_id";
327    const NEEDS_CALLSITE: bool = false;
328    type Args = sled::Db;
329
330    fn prepare_args(&mut self, cached: &CachedVals) -> Option<Self::Args> {
331        get_db(cached, 0)
332    }
333
334    fn eval(db: Self::Args) -> impl Future<Output = Value> + Send {
335        async move {
336            match tokio::task::spawn_blocking(move || db.generate_id()).await {
337                Err(e) => errf!("DbErr", "task panicked: {e}"),
338                Ok(Err(e)) => errf!("DbErr", "{e}"),
339                Ok(Ok(id)) => Value::U64(id),
340            }
341        }
342    }
343}
344
345pub(crate) type DbGenerateId = CachedArgsAsync<DbGenerateIdEv>;
346
347// -- DbTreeNames --
348
349#[derive(Debug, Default)]
350pub(crate) struct DbTreeNamesEv;
351
352impl EvalCachedAsync for DbTreeNamesEv {
353    const NAME: &str = "db_tree_names";
354    const NEEDS_CALLSITE: bool = false;
355    type Args = sled::Db;
356
357    fn prepare_args(&mut self, cached: &CachedVals) -> Option<Self::Args> {
358        get_db(cached, 0)
359    }
360
361    fn eval(db: Self::Args) -> impl Future<Output = Value> + Send {
362        async move {
363            match tokio::task::spawn_blocking(move || db.tree_names()).await {
364                Err(e) => errf!("DbErr", "task panicked: {e}"),
365                Ok(names) => {
366                    let mut vals: LPooled<Vec<_>> = names
367                        .into_iter()
368                        .filter_map(|ivec| {
369                            std::str::from_utf8(&ivec)
370                                .ok()
371                                .map(|s| Value::String(ArcStr::from(s)))
372                        })
373                        .collect();
374                    Value::Array(ValArray::from_iter_exact(vals.drain(..)))
375                }
376            }
377        }
378    }
379}
380
381pub(crate) type DbTreeNames = CachedArgsAsync<DbTreeNamesEv>;
382
383// -- DbDropTree --
384
385#[derive(Debug, Default)]
386pub(crate) struct DbDropTreeEv;
387
388impl EvalCachedAsync for DbDropTreeEv {
389    const NAME: &str = "db_drop_tree";
390    const NEEDS_CALLSITE: bool = false;
391    type Args = (sled::Db, ArcStr);
392
393    fn prepare_args(&mut self, cached: &CachedVals) -> Option<Self::Args> {
394        let db = get_db(cached, 0)?;
395        let name = cached.get::<ArcStr>(1)?;
396        Some((db, name))
397    }
398
399    fn eval((db, name): Self::Args) -> impl Future<Output = Value> + Send {
400        async move {
401            match tokio::task::spawn_blocking(move || db.drop_tree(name.as_bytes())).await
402            {
403                Err(e) => errf!("DbErr", "task panicked: {e}"),
404                Ok(Err(e)) => errf!("DbErr", "{e}"),
405                Ok(Ok(existed)) => Value::Bool(existed),
406            }
407        }
408    }
409}
410
411pub(crate) type DbDropTree = CachedArgsAsync<DbDropTreeEv>;
412
413// -- DbTree --
414
415#[derive(Debug)]
416pub(crate) struct DbTreeArgs {
417    db: sled::Db,
418    name: Option<ArcStr>,
419    key_typ: Option<Typ>,
420    key_typ_str: ArcStr,
421    val_typ_str: ArcStr,
422}
423
424#[derive(Debug, Default)]
425pub(crate) struct DbTreeEv {
426    key_typ: Option<Typ>,
427    key_typ_str: ArcStr,
428    val_typ_str: ArcStr,
429}
430
431impl EvalCachedAsync for DbTreeEv {
432    const NAME: &str = "db_tree";
433    const NEEDS_CALLSITE: bool = true;
434    type Args = DbTreeArgs;
435
436    fn init<R: Rt, E: UserEvent>(
437        _ctx: &mut ExecCtx<R, E>,
438        _typ: &FnType,
439        resolved: Option<&FnType>,
440        _scope: &Scope,
441        _from: &[Node<R, E>],
442        _top_id: ExprId,
443    ) -> Self {
444        let key_typ = extract_key_typ_from_rtype(resolved);
445        let (key_typ_str, val_typ_str) = extract_type_strings_from_rtype(resolved);
446        DbTreeEv { key_typ, key_typ_str, val_typ_str }
447    }
448
449    fn typecheck<R: Rt, E: UserEvent>(
450        &mut self,
451        _ctx: &mut ExecCtx<R, E>,
452        _from: &mut [Node<R, E>],
453        phase: TypecheckPhase<'_>,
454    ) -> Result<()> {
455        match phase {
456            TypecheckPhase::Lambda => Ok(()),
457            TypecheckPhase::CallSite(resolved) => {
458                self.key_typ = extract_key_typ_from_rtype(Some(resolved));
459                let (k, v) = extract_type_strings_from_rtype(Some(resolved));
460                self.key_typ_str = k;
461                self.val_typ_str = v;
462                if self.key_typ.is_none() {
463                    bail!("db::tree requires concrete key and value types")
464                }
465                Ok(())
466            }
467        }
468    }
469
470    fn prepare_args(&mut self, cached: &CachedVals) -> Option<Self::Args> {
471        let db = get_db(cached, 0)?;
472        let name = match cached.0.get(1)?.as_ref()? {
473            Value::Null => None,
474            Value::String(s) => Some(s.clone()),
475            _ => return None,
476        };
477        Some(DbTreeArgs {
478            db,
479            name,
480            key_typ: self.key_typ,
481            key_typ_str: self.key_typ_str.clone(),
482            val_typ_str: self.val_typ_str.clone(),
483        })
484    }
485
486    fn eval(args: Self::Args) -> impl Future<Output = Value> + Send {
487        async move {
488            let DbTreeArgs { db, name, key_typ, key_typ_str, val_typ_str } = args;
489            match tokio::task::spawn_blocking(move || -> Result<Value> {
490                if !types_are_concrete(&key_typ_str, &val_typ_str) {
491                    bail!("tree requires concrete type annotations")
492                }
493                let meta = db.open_tree(&META_TREE)?;
494                match name {
495                    Some(name) => {
496                        if &*name == DEFAULT_TREE_META
497                            || name.as_bytes() == META_TREE.as_bytes()
498                        {
499                            bail!("tree name '{name}' is reserved");
500                        }
501                        check_or_store_meta(&meta, &name, &key_typ_str, &val_typ_str)?;
502                        Ok(db
503                            .open_tree(name.as_bytes())
504                            .map(|tree| wrap_tree(tree, key_typ))?)
505                    }
506                    None => {
507                        check_or_store_meta(
508                            &meta,
509                            &DEFAULT_TREE_META,
510                            &key_typ_str,
511                            &val_typ_str,
512                        )?;
513                        Ok(wrap_tree((*db).clone(), key_typ))
514                    }
515                }
516            })
517            .await
518            {
519                Err(e) => errf!("DbErr", "task panicked: {e}"),
520                Ok(Err(e)) => errf!("DbErr", "{e:?}"),
521                Ok(Ok(v)) => v,
522            }
523        }
524    }
525}
526
527pub(crate) type DbTree = CachedArgsAsync<DbTreeEv>;
528
529// ── Key-encoding builtins ─────────────────────────────────────────
530
531// -- DbGet --
532
533#[derive(Debug, Default)]
534pub(crate) struct DbGetEv;
535
536impl EvalCachedAsync for DbGetEv {
537    const NAME: &str = "db_get";
538    const NEEDS_CALLSITE: bool = false;
539    type Args = (Arc<TreeInner>, GPooled<Vec<u8>>);
540
541    fn prepare_args(&mut self, cached: &CachedVals) -> Option<Self::Args> {
542        let tree = get_tree_inner(cached, 0)?;
543        let key_val = cached.0.get(1)?.as_ref()?;
544        let key = encode_key(tree.key_typ, key_val)?;
545        Some((tree, key))
546    }
547
548    fn eval((tree, key): Self::Args) -> impl Future<Output = Value> + Send {
549        async move {
550            match tokio::task::spawn_blocking(move || tree.tree.get(&*key)).await {
551                Err(e) => errf!("DbErr", "task panicked: {e}"),
552                Ok(Err(e)) => errf!("DbErr", "{e}"),
553                Ok(Ok(None)) => Value::Null,
554                Ok(Ok(Some(ivec))) => match decode_value(&ivec) {
555                    Some(v) => v,
556                    None => errf!("DbErr", "failed to decode value"),
557                },
558            }
559        }
560    }
561}
562
563pub(crate) type DbGet = CachedArgsAsync<DbGetEv>;
564
565// -- DbInsert --
566
567#[derive(Debug, Default)]
568pub(crate) struct DbInsertEv;
569
570impl EvalCachedAsync for DbInsertEv {
571    const NAME: &str = "db_insert";
572    const NEEDS_CALLSITE: bool = false;
573    type Args = (Arc<TreeInner>, GPooled<Vec<u8>>, GPooled<Vec<u8>>);
574
575    fn prepare_args(&mut self, cached: &CachedVals) -> Option<Self::Args> {
576        let tree = get_tree_inner(cached, 0)?;
577        let key_val = cached.0.get(1)?.as_ref()?;
578        let key = encode_key(tree.key_typ, key_val)?;
579        let val = encode_value(cached.0.get(2)?.as_ref()?)?;
580        Some((tree, key, val))
581    }
582
583    fn eval((tree, key, val): Self::Args) -> impl Future<Output = Value> + Send {
584        async move {
585            match tokio::task::spawn_blocking(move || {
586                tree.tree.insert(&*key, val.as_slice())
587            })
588            .await
589            {
590                Err(e) => errf!("DbErr", "task panicked: {e}"),
591                Ok(Err(e)) => errf!("DbErr", "{e}"),
592                Ok(Ok(None)) => Value::Null,
593                Ok(Ok(Some(old))) => match decode_value(&old) {
594                    Some(v) => v,
595                    None => errf!("DbErr", "failed to decode previous value"),
596                },
597            }
598        }
599    }
600}
601
602pub(crate) type DbInsert = CachedArgsAsync<DbInsertEv>;
603
604// -- DbRemove --
605
606#[derive(Debug, Default)]
607pub(crate) struct DbRemoveEv;
608
609impl EvalCachedAsync for DbRemoveEv {
610    const NAME: &str = "db_remove";
611    const NEEDS_CALLSITE: bool = false;
612    type Args = (Arc<TreeInner>, GPooled<Vec<u8>>);
613
614    fn prepare_args(&mut self, cached: &CachedVals) -> Option<Self::Args> {
615        let tree = get_tree_inner(cached, 0)?;
616        let key_val = cached.0.get(1)?.as_ref()?;
617        let key = encode_key(tree.key_typ, key_val)?;
618        Some((tree, key))
619    }
620
621    fn eval((tree, key): Self::Args) -> impl Future<Output = Value> + Send {
622        async move {
623            match tokio::task::spawn_blocking(move || tree.tree.remove(&*key)).await {
624                Err(e) => errf!("DbErr", "task panicked: {e}"),
625                Ok(Err(e)) => errf!("DbErr", "{e}"),
626                Ok(Ok(None)) => Value::Null,
627                Ok(Ok(Some(old))) => match decode_value(&old) {
628                    Some(v) => v,
629                    None => errf!("DbErr", "failed to decode previous value"),
630                },
631            }
632        }
633    }
634}
635
636pub(crate) type DbRemove = CachedArgsAsync<DbRemoveEv>;
637
638// -- DbContainsKey --
639
640#[derive(Debug, Default)]
641pub(crate) struct DbContainsKeyEv;
642
643impl EvalCachedAsync for DbContainsKeyEv {
644    const NAME: &str = "db_contains_key";
645    const NEEDS_CALLSITE: bool = false;
646    type Args = (Arc<TreeInner>, GPooled<Vec<u8>>);
647
648    fn prepare_args(&mut self, cached: &CachedVals) -> Option<Self::Args> {
649        let tree = get_tree_inner(cached, 0)?;
650        let key_val = cached.0.get(1)?.as_ref()?;
651        let key = encode_key(tree.key_typ, key_val)?;
652        Some((tree, key))
653    }
654
655    fn eval((tree, key): Self::Args) -> impl Future<Output = Value> + Send {
656        async move {
657            match tokio::task::spawn_blocking(move || tree.tree.contains_key(&*key)).await
658            {
659                Err(e) => errf!("DbErr", "task panicked: {e}"),
660                Ok(Err(e)) => errf!("DbErr", "{e}"),
661                Ok(Ok(exists)) => Value::Bool(exists),
662            }
663        }
664    }
665}
666
667pub(crate) type DbContainsKey = CachedArgsAsync<DbContainsKeyEv>;
668
669// -- DbGetMany --
670
671#[derive(Debug, Default)]
672pub(crate) struct DbGetManyEv;
673
674impl EvalCachedAsync for DbGetManyEv {
675    const NAME: &str = "db_get_many";
676    const NEEDS_CALLSITE: bool = false;
677    type Args = (Arc<TreeInner>, GPooled<Vec<GPooled<Vec<u8>>>>);
678
679    fn prepare_args(&mut self, cached: &CachedVals) -> Option<Self::Args> {
680        let tree = get_tree_inner(cached, 0)?;
681        let arr = match cached.0.get(1)?.as_ref()? {
682            Value::Array(a) => a,
683            _ => return None,
684        };
685        let mut keys = ENCODE_MANY_POOL.take();
686        for k in arr.iter() {
687            keys.push(encode_key(tree.key_typ, k)?);
688        }
689        Some((tree, keys))
690    }
691
692    fn eval((tree, keys): Self::Args) -> impl Future<Output = Value> + Send {
693        async move {
694            match tokio::task::spawn_blocking(move || {
695                let mut results: LPooled<Vec<Value>> = LPooled::take();
696                for key in keys.iter() {
697                    match tree.tree.get(&**key) {
698                        Err(e) => return Err(errf!("DbErr", "{e}")),
699                        Ok(None) => results.push(Value::Null),
700                        Ok(Some(ivec)) => match decode_value(&ivec) {
701                            Some(v) => results.push(v),
702                            None => return Err(errf!("DbErr", "failed to decode value")),
703                        },
704                    }
705                }
706                Ok(Value::Array(ValArray::from_iter_exact(results.drain(..))))
707            })
708            .await
709            {
710                Err(e) => errf!("DbErr", "task panicked: {e}"),
711                Ok(Err(e)) => e,
712                Ok(Ok(v)) => v,
713            }
714        }
715    }
716}
717
718pub(crate) type DbGetMany = CachedArgsAsync<DbGetManyEv>;
719
720// ── Key-value decode helper ─────────────────────────────────────
721
722fn decode_kv_result(
723    tree: &TreeInner,
724    result: sled::Result<Option<(sled::IVec, sled::IVec)>>,
725) -> Value {
726    match result {
727        Err(e) => errf!("DbErr", "{e}"),
728        Ok(None) => Value::Null,
729        Ok(Some((k, v))) => match (decode_key(tree.key_typ, &k), decode_value(&v)) {
730            (Some(key), Some(val)) => Value::Array(ValArray::from([key, val])),
731            _ => errf!("DbErr", "failed to decode entry"),
732        },
733    }
734}
735
736// ── Ordered access builtins ─────────────────────────────────────
737
738// -- DbFirst --
739
740#[derive(Debug, Default)]
741pub(crate) struct DbFirstEv;
742
743impl EvalCachedAsync for DbFirstEv {
744    const NAME: &str = "db_first";
745    const NEEDS_CALLSITE: bool = false;
746    type Args = Arc<TreeInner>;
747
748    fn prepare_args(&mut self, cached: &CachedVals) -> Option<Self::Args> {
749        get_tree_inner(cached, 0)
750    }
751
752    fn eval(tree: Self::Args) -> impl Future<Output = Value> + Send {
753        async move {
754            match tokio::task::spawn_blocking(move || {
755                let result = tree.tree.first();
756                decode_kv_result(&tree, result)
757            })
758            .await
759            {
760                Err(e) => errf!("DbErr", "task panicked: {e}"),
761                Ok(v) => v,
762            }
763        }
764    }
765}
766
767pub(crate) type DbFirst = CachedArgsAsync<DbFirstEv>;
768
769// -- DbLast --
770
771#[derive(Debug, Default)]
772pub(crate) struct DbLastEv;
773
774impl EvalCachedAsync for DbLastEv {
775    const NAME: &str = "db_last";
776    const NEEDS_CALLSITE: bool = false;
777    type Args = Arc<TreeInner>;
778
779    fn prepare_args(&mut self, cached: &CachedVals) -> Option<Self::Args> {
780        get_tree_inner(cached, 0)
781    }
782
783    fn eval(tree: Self::Args) -> impl Future<Output = Value> + Send {
784        async move {
785            match tokio::task::spawn_blocking(move || {
786                let result = tree.tree.last();
787                decode_kv_result(&tree, result)
788            })
789            .await
790            {
791                Err(e) => errf!("DbErr", "task panicked: {e}"),
792                Ok(v) => v,
793            }
794        }
795    }
796}
797
798pub(crate) type DbLast = CachedArgsAsync<DbLastEv>;
799
800// -- DbPopMin --
801
802#[derive(Debug, Default)]
803pub(crate) struct DbPopMinEv;
804
805impl EvalCachedAsync for DbPopMinEv {
806    const NAME: &str = "db_pop_min";
807    const NEEDS_CALLSITE: bool = false;
808    type Args = Arc<TreeInner>;
809
810    fn prepare_args(&mut self, cached: &CachedVals) -> Option<Self::Args> {
811        get_tree_inner(cached, 0)
812    }
813
814    fn eval(tree: Self::Args) -> impl Future<Output = Value> + Send {
815        async move {
816            match tokio::task::spawn_blocking(move || {
817                let result = tree.tree.pop_min();
818                decode_kv_result(&tree, result)
819            })
820            .await
821            {
822                Err(e) => errf!("DbErr", "task panicked: {e}"),
823                Ok(v) => v,
824            }
825        }
826    }
827}
828
829pub(crate) type DbPopMin = CachedArgsAsync<DbPopMinEv>;
830
831// -- DbPopMax --
832
833#[derive(Debug, Default)]
834pub(crate) struct DbPopMaxEv;
835
836impl EvalCachedAsync for DbPopMaxEv {
837    const NAME: &str = "db_pop_max";
838    const NEEDS_CALLSITE: bool = false;
839    type Args = Arc<TreeInner>;
840
841    fn prepare_args(&mut self, cached: &CachedVals) -> Option<Self::Args> {
842        get_tree_inner(cached, 0)
843    }
844
845    fn eval(tree: Self::Args) -> impl Future<Output = Value> + Send {
846        async move {
847            match tokio::task::spawn_blocking(move || {
848                let result = tree.tree.pop_max();
849                decode_kv_result(&tree, result)
850            })
851            .await
852            {
853                Err(e) => errf!("DbErr", "task panicked: {e}"),
854                Ok(v) => v,
855            }
856        }
857    }
858}
859
860pub(crate) type DbPopMax = CachedArgsAsync<DbPopMaxEv>;
861
862// -- DbGetLt --
863
864#[derive(Debug, Default)]
865pub(crate) struct DbGetLtEv;
866
867impl EvalCachedAsync for DbGetLtEv {
868    const NAME: &str = "db_get_lt";
869    const NEEDS_CALLSITE: bool = false;
870    type Args = (Arc<TreeInner>, GPooled<Vec<u8>>);
871
872    fn prepare_args(&mut self, cached: &CachedVals) -> Option<Self::Args> {
873        let tree = get_tree_inner(cached, 0)?;
874        let key_val = cached.0.get(1)?.as_ref()?;
875        let key = encode_key(tree.key_typ, key_val)?;
876        Some((tree, key))
877    }
878
879    fn eval((tree, key): Self::Args) -> impl Future<Output = Value> + Send {
880        async move {
881            match tokio::task::spawn_blocking(move || {
882                let result = tree.tree.get_lt(&*key);
883                decode_kv_result(&tree, result)
884            })
885            .await
886            {
887                Err(e) => errf!("DbErr", "task panicked: {e}"),
888                Ok(v) => v,
889            }
890        }
891    }
892}
893
894pub(crate) type DbGetLt = CachedArgsAsync<DbGetLtEv>;
895
896// -- DbGetGt --
897
898#[derive(Debug, Default)]
899pub(crate) struct DbGetGtEv;
900
901impl EvalCachedAsync for DbGetGtEv {
902    const NAME: &str = "db_get_gt";
903    const NEEDS_CALLSITE: bool = false;
904    type Args = (Arc<TreeInner>, GPooled<Vec<u8>>);
905
906    fn prepare_args(&mut self, cached: &CachedVals) -> Option<Self::Args> {
907        let tree = get_tree_inner(cached, 0)?;
908        let key_val = cached.0.get(1)?.as_ref()?;
909        let key = encode_key(tree.key_typ, key_val)?;
910        Some((tree, key))
911    }
912
913    fn eval((tree, key): Self::Args) -> impl Future<Output = Value> + Send {
914        async move {
915            match tokio::task::spawn_blocking(move || {
916                let result = tree.tree.get_gt(&*key);
917                decode_kv_result(&tree, result)
918            })
919            .await
920            {
921                Err(e) => errf!("DbErr", "task panicked: {e}"),
922                Ok(v) => v,
923            }
924        }
925    }
926}
927
928pub(crate) type DbGetGt = CachedArgsAsync<DbGetGtEv>;
929
930// ── Atomic operations ───────────────────────────────────────────
931
932// -- DbCompareAndSwap --
933
934#[derive(Debug, Default)]
935pub(crate) struct DbCompareAndSwapEv;
936
937impl EvalCachedAsync for DbCompareAndSwapEv {
938    const NAME: &str = "db_compare_and_swap";
939    const NEEDS_CALLSITE: bool = false;
940    type Args = (
941        Arc<TreeInner>,
942        GPooled<Vec<u8>>,
943        Option<GPooled<Vec<u8>>>,
944        Option<GPooled<Vec<u8>>>,
945    );
946
947    fn prepare_args(&mut self, cached: &CachedVals) -> Option<Self::Args> {
948        let tree = get_tree_inner(cached, 0)?;
949        let key_val = cached.0.get(1)?.as_ref()?;
950        let key = encode_key(tree.key_typ, key_val)?;
951        let old_val = match cached.0.get(2)?.as_ref()? {
952            Value::Null => None,
953            v => Some(encode_value(v)?),
954        };
955        let new_val = match cached.0.get(3)?.as_ref()? {
956            Value::Null => None,
957            v => Some(encode_value(v)?),
958        };
959        Some((tree, key, old_val, new_val))
960    }
961
962    fn eval(
963        (tree, key, old_val, new_val): Self::Args,
964    ) -> impl Future<Output = Value> + Send {
965        async move {
966            match tokio::task::spawn_blocking(move || {
967                let old_ref: Option<&[u8]> = old_val.as_ref().map(|v| v.as_slice());
968                let new_ref: Option<&[u8]> = new_val.as_ref().map(|v| v.as_slice());
969                tree.tree.compare_and_swap(key.as_slice(), old_ref, new_ref)
970            })
971            .await
972            {
973                Err(e) => errf!("DbErr", "task panicked: {e}"),
974                Ok(Err(e)) => errf!("DbErr", "{e}"),
975                Ok(Ok(Ok(()))) => Value::Null,
976                Ok(Ok(Err(cas_err))) => {
977                    let current = match cas_err.current {
978                        None => Value::Null,
979                        Some(ivec) => match decode_value(&ivec) {
980                            Some(v) => v,
981                            None => {
982                                return errf!("DbErr", "failed to decode current value")
983                            }
984                        },
985                    };
986                    Value::Array(ValArray::from([
987                        Value::String(arcstr::literal!("Mismatch")),
988                        current,
989                    ]))
990                }
991            }
992        }
993    }
994}
995
996pub(crate) type DbCompareAndSwap = CachedArgsAsync<DbCompareAndSwapEv>;
997
998// -- DbBatch --
999
1000#[derive(Debug, Default)]
1001pub(crate) struct DbBatchEv;
1002
1003impl EvalCachedAsync for DbBatchEv {
1004    const NAME: &str = "db_batch";
1005    const NEEDS_CALLSITE: bool = false;
1006    type Args = (Arc<TreeInner>, sled::Batch);
1007
1008    fn prepare_args(&mut self, cached: &CachedVals) -> Option<Self::Args> {
1009        let tree = get_tree_inner(cached, 0)?;
1010        let arr = match cached.0.get(1)?.as_ref()? {
1011            Value::Array(a) => a,
1012            _ => return None,
1013        };
1014        let batch = parse_batch_ops(tree.key_typ, arr)?;
1015        Some((tree, batch))
1016    }
1017
1018    fn eval((tree, batch): Self::Args) -> impl Future<Output = Value> + Send {
1019        async move {
1020            match tokio::task::spawn_blocking(move || tree.tree.apply_batch(batch)).await
1021            {
1022                Err(e) => errf!("DbErr", "task panicked: {e}"),
1023                Ok(Err(e)) => errf!("DbErr", "{e}"),
1024                Ok(Ok(())) => Value::Null,
1025            }
1026        }
1027    }
1028}
1029
1030pub(crate) type DbBatch = CachedArgsAsync<DbBatchEv>;
1031
1032// ── Collection introspection ────────────────────────────────────
1033
1034// -- DbLen --
1035
1036#[derive(Debug, Default)]
1037pub(crate) struct DbLenEv;
1038
1039impl EvalCachedAsync for DbLenEv {
1040    const NAME: &str = "db_len";
1041    const NEEDS_CALLSITE: bool = false;
1042    type Args = Arc<TreeInner>;
1043
1044    fn prepare_args(&mut self, cached: &CachedVals) -> Option<Self::Args> {
1045        get_tree_inner(cached, 0)
1046    }
1047
1048    fn eval(tree: Self::Args) -> impl Future<Output = Value> + Send {
1049        async move {
1050            match tokio::task::spawn_blocking(move || tree.tree.len()).await {
1051                Err(e) => errf!("DbErr", "task panicked: {e}"),
1052                Ok(len) => Value::U64(len as u64),
1053            }
1054        }
1055    }
1056}
1057
1058pub(crate) type DbLen = CachedArgsAsync<DbLenEv>;
1059
1060// -- DbIsEmpty --
1061
1062#[derive(Debug, Default)]
1063pub(crate) struct DbIsEmptyEv;
1064
1065impl EvalCachedAsync for DbIsEmptyEv {
1066    const NAME: &str = "db_is_empty";
1067    const NEEDS_CALLSITE: bool = false;
1068    type Args = Arc<TreeInner>;
1069
1070    fn prepare_args(&mut self, cached: &CachedVals) -> Option<Self::Args> {
1071        get_tree_inner(cached, 0)
1072    }
1073
1074    fn eval(tree: Self::Args) -> impl Future<Output = Value> + Send {
1075        async move {
1076            match tokio::task::spawn_blocking(move || tree.tree.is_empty()).await {
1077                Err(e) => errf!("DbErr", "task panicked: {e}"),
1078                Ok(empty) => Value::Bool(empty),
1079            }
1080        }
1081    }
1082}
1083
1084pub(crate) type DbIsEmpty = CachedArgsAsync<DbIsEmptyEv>;
1085
1086// ── Database-level operations ───────────────────────────────────
1087
1088// -- DbSizeOnDisk --
1089
1090#[derive(Debug, Default)]
1091pub(crate) struct DbSizeOnDiskEv;
1092
1093impl EvalCachedAsync for DbSizeOnDiskEv {
1094    const NAME: &str = "db_size_on_disk";
1095    const NEEDS_CALLSITE: bool = false;
1096    type Args = sled::Db;
1097
1098    fn prepare_args(&mut self, cached: &CachedVals) -> Option<Self::Args> {
1099        get_db(cached, 0)
1100    }
1101
1102    fn eval(db: Self::Args) -> impl Future<Output = Value> + Send {
1103        async move {
1104            match tokio::task::spawn_blocking(move || db.size_on_disk()).await {
1105                Err(e) => errf!("DbErr", "task panicked: {e}"),
1106                Ok(Err(e)) => errf!("DbErr", "{e}"),
1107                Ok(Ok(size)) => Value::U64(size),
1108            }
1109        }
1110    }
1111}
1112
1113pub(crate) type DbSizeOnDisk = CachedArgsAsync<DbSizeOnDiskEv>;
1114
1115// -- DbWasRecovered --
1116
1117#[derive(Debug, Default)]
1118pub(crate) struct DbWasRecoveredEv;
1119
1120impl EvalCachedAsync for DbWasRecoveredEv {
1121    const NAME: &str = "db_was_recovered";
1122    const NEEDS_CALLSITE: bool = false;
1123    type Args = sled::Db;
1124
1125    fn prepare_args(&mut self, cached: &CachedVals) -> Option<Self::Args> {
1126        get_db(cached, 0)
1127    }
1128
1129    fn eval(db: Self::Args) -> impl Future<Output = Value> + Send {
1130        async move {
1131            match tokio::task::spawn_blocking(move || db.was_recovered()).await {
1132                Err(e) => errf!("DbErr", "task panicked: {e}"),
1133                Ok(recovered) => Value::Bool(recovered),
1134            }
1135        }
1136    }
1137}
1138
1139pub(crate) type DbWasRecovered = CachedArgsAsync<DbWasRecoveredEv>;
1140
1141// -- DbChecksum --
1142
1143#[derive(Debug, Default)]
1144pub(crate) struct DbChecksumEv;
1145
1146impl EvalCachedAsync for DbChecksumEv {
1147    const NAME: &str = "db_checksum";
1148    const NEEDS_CALLSITE: bool = false;
1149    type Args = sled::Db;
1150
1151    fn prepare_args(&mut self, cached: &CachedVals) -> Option<Self::Args> {
1152        get_db(cached, 0)
1153    }
1154
1155    fn eval(db: Self::Args) -> impl Future<Output = Value> + Send {
1156        async move {
1157            match tokio::task::spawn_blocking(move || db.checksum()).await {
1158                Err(e) => errf!("DbErr", "task panicked: {e}"),
1159                Ok(Err(e)) => errf!("DbErr", "{e}"),
1160                Ok(Ok(crc)) => Value::U32(crc),
1161            }
1162        }
1163    }
1164}
1165
1166pub(crate) type DbChecksum = CachedArgsAsync<DbChecksumEv>;
1167
1168// -- DbExport / DbImport serialization format --
1169
1170#[derive(Pack)]
1171struct ExportTree {
1172    typ: Vec<u8>,
1173    name: Vec<u8>,
1174    entries: Vec<Vec<Vec<u8>>>,
1175}
1176
1177#[derive(Pack)]
1178struct ExportData {
1179    trees: Vec<ExportTree>,
1180}
1181
1182// -- DbExport --
1183
1184#[derive(Debug, Default)]
1185pub(crate) struct DbExportEv;
1186
1187impl EvalCachedAsync for DbExportEv {
1188    const NAME: &str = "db_export";
1189    const NEEDS_CALLSITE: bool = false;
1190    type Args = (sled::Db, ArcStr);
1191
1192    fn prepare_args(&mut self, cached: &CachedVals) -> Option<Self::Args> {
1193        let db = get_db(cached, 0)?;
1194        let path = cached.get::<ArcStr>(1)?;
1195        Some((db, path))
1196    }
1197
1198    fn eval((db, path): Self::Args) -> impl Future<Output = Value> + Send {
1199        async move {
1200            match tokio::task::spawn_blocking(move || {
1201                use std::io::Write;
1202                let data = ExportData {
1203                    trees: db
1204                        .export()
1205                        .into_iter()
1206                        .map(|(typ, name, iter)| ExportTree {
1207                            typ,
1208                            name,
1209                            entries: iter.collect(),
1210                        })
1211                        .collect(),
1212                };
1213                let mut buf = Vec::with_capacity(data.encoded_len());
1214                data.encode(&mut buf).map_err(|e| errf!("DbErr", "{e}"))?;
1215                let file =
1216                    std::fs::File::create(&*path).map_err(|e| errf!("DbErr", "{e}"))?;
1217                let mut w = std::io::BufWriter::new(file);
1218                w.write_all(&buf).map_err(|e| errf!("DbErr", "{e}"))?;
1219                w.flush().map_err(|e| errf!("DbErr", "{e}"))?;
1220                Ok(Value::Null)
1221            })
1222            .await
1223            {
1224                Err(e) => errf!("DbErr", "task panicked: {e}"),
1225                Ok(Err(e)) => e,
1226                Ok(Ok(v)) => v,
1227            }
1228        }
1229    }
1230}
1231
1232pub(crate) type DbExport = CachedArgsAsync<DbExportEv>;
1233
1234// -- DbImport --
1235
1236#[derive(Debug, Default)]
1237pub(crate) struct DbImportEv;
1238
1239impl EvalCachedAsync for DbImportEv {
1240    const NAME: &str = "db_import";
1241    const NEEDS_CALLSITE: bool = false;
1242    type Args = (sled::Db, ArcStr);
1243
1244    fn prepare_args(&mut self, cached: &CachedVals) -> Option<Self::Args> {
1245        let db = get_db(cached, 0)?;
1246        let path = cached.get::<ArcStr>(1)?;
1247        Some((db, path))
1248    }
1249
1250    fn eval((db, path): Self::Args) -> impl Future<Output = Value> + Send {
1251        async move {
1252            match tokio::task::spawn_blocking(move || {
1253                let buf = std::fs::read(&*path).map_err(|e| errf!("DbErr", "{e}"))?;
1254                let data = ExportData::decode(&mut buf.as_slice())
1255                    .map_err(|e| errf!("DbErr", "{e}"))?;
1256                let collections: Vec<_> = data
1257                    .trees
1258                    .into_iter()
1259                    .map(|t| (t.typ, t.name, t.entries.into_iter()))
1260                    .collect();
1261                db.import(collections);
1262                Ok(Value::Null)
1263            })
1264            .await
1265            {
1266                Err(e) => errf!("DbErr", "task panicked: {e}"),
1267                Ok(Err(e)) => e,
1268                Ok(Ok(v)) => v,
1269            }
1270        }
1271    }
1272}
1273
1274pub(crate) type DbImport = CachedArgsAsync<DbImportEv>;