cozo_ce/runtime/
db.rs

1/*
2 * Copyright 2022, The Cozo Project Authors.
3 *
4 * This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0.
5 * If a copy of the MPL was not distributed with this file,
6 * You can obtain one at https://mozilla.org/MPL/2.0/.
7 */
8
9use std::collections::btree_map::Entry;
10use std::collections::{BTreeMap, BTreeSet};
11use std::default::Default;
12use std::fmt::{Debug, Formatter};
13use std::iter;
14use std::path::Path;
15#[allow(unused_imports)]
16use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
17use std::sync::{Arc, Mutex};
18#[allow(unused_imports)]
19use std::thread;
20#[allow(unused_imports)]
21use std::time::{Duration, SystemTime, UNIX_EPOCH};
22
23#[allow(unused_imports)]
24use crossbeam::channel::{bounded, unbounded, Receiver, Sender};
25use crossbeam::sync::ShardedLock;
26use either::{Left, Right};
27use itertools::Itertools;
28use miette::Report;
29#[allow(unused_imports)]
30use miette::{bail, ensure, miette, Diagnostic, IntoDiagnostic, Result, WrapErr};
31use serde_json::json;
32use smartstring::{LazyCompact, SmartString};
33use thiserror::Error;
34
35use crate::data::functions::current_validity;
36use crate::data::json::JsonValue;
37use crate::data::program::{InputProgram, QueryAssertion, RelationOp, ReturnMutation};
38use crate::data::relation::ColumnDef;
39use crate::data::tuple::{Tuple, TupleT};
40use crate::data::value::{DataValue, ValidityTs, LARGEST_UTF_CHAR};
41use crate::fixed_rule::DEFAULT_FIXED_RULES;
42use crate::fts::TokenizerCache;
43use crate::parse::sys::SysOp;
44use crate::parse::{parse_expressions, parse_script, CozoScript, SourceSpan};
45use crate::query::compile::{CompiledProgram, CompiledRule, CompiledRuleSet};
46use crate::query::ra::{
47    FilteredRA, FtsSearchRA, HnswSearchRA, InnerJoin, LshSearchRA, NegJoin, RelAlgebra, ReorderRA,
48    StoredRA, StoredWithValidityRA, TempStoreRA, UnificationRA,
49};
50#[allow(unused_imports)]
51use crate::runtime::callback::{
52    CallbackCollector, CallbackDeclaration, CallbackOp, EventCallbackRegistry,
53};
54use crate::runtime::relation::{
55    extend_tuple_from_v, AccessLevel, InsufficientAccessLevel, RelationHandle, RelationId,
56};
57use crate::runtime::transact::SessionTx;
58use crate::storage::temp::TempStorage;
59use crate::storage::Storage;
60use crate::{decode_tuple_from_kv, FixedRule, Symbol};
61
62pub(crate) struct RunningQueryHandle {
63    pub(crate) started_at: f64,
64    pub(crate) poison: Poison,
65}
66
67pub(crate) struct RunningQueryCleanup {
68    pub(crate) id: u64,
69    pub(crate) running_queries: Arc<Mutex<BTreeMap<u64, RunningQueryHandle>>>,
70}
71
72impl Drop for RunningQueryCleanup {
73    fn drop(&mut self) {
74        let mut map = self.running_queries.lock().unwrap();
75        if let Some(handle) = map.remove(&self.id) {
76            handle.poison.0.store(true, Ordering::Relaxed);
77        }
78    }
79}
80
81#[derive(serde_derive::Serialize, serde_derive::Deserialize)]
82pub struct DbManifest {
83    pub storage_version: u64,
84}
85
86/// Whether a script is mutable or immutable.
87#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
88pub enum ScriptMutability {
89    /// The script is mutable.
90    Mutable,
91    /// The script is immutable.
92    Immutable,
93}
94
95/// The database object of Cozo.
96#[derive(Clone)]
97pub struct Db<S> {
98    pub(crate) db: S,
99    temp_db: TempStorage,
100    relation_store_id: Arc<AtomicU64>,
101    pub(crate) queries_count: Arc<AtomicU64>,
102    pub(crate) running_queries: Arc<Mutex<BTreeMap<u64, RunningQueryHandle>>>,
103    pub(crate) fixed_rules: Arc<ShardedLock<BTreeMap<String, Arc<Box<dyn FixedRule>>>>>,
104    pub(crate) tokenizers: Arc<TokenizerCache>,
105    #[cfg(not(target_arch = "wasm32"))]
106    callback_count: Arc<AtomicU32>,
107    #[cfg(not(target_arch = "wasm32"))]
108    pub(crate) event_callbacks: Arc<ShardedLock<EventCallbackRegistry>>,
109    relation_locks: Arc<ShardedLock<BTreeMap<SmartString<LazyCompact>, Arc<ShardedLock<()>>>>>,
110}
111
112impl<S> Debug for Db<S> {
113    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
114        write!(f, "Db")
115    }
116}
117
118#[derive(Debug, Diagnostic, Error)]
119#[error("Initialization of database failed")]
120#[diagnostic(code(db::init))]
121pub(crate) struct BadDbInit(#[help] pub(crate) String);
122
123#[derive(Debug, Error, Diagnostic)]
124#[error("Cannot import data into relation {0} as it is an index")]
125#[diagnostic(code(tx::import_into_index))]
126pub(crate) struct ImportIntoIndex(pub(crate) String);
127
128#[derive(serde_derive::Serialize, serde_derive::Deserialize, Debug, Clone, Default)]
129/// Rows in a relation, together with headers for the fields.
130pub struct NamedRows {
131    /// The headers
132    pub headers: Vec<String>,
133    /// The rows
134    pub rows: Vec<Tuple>,
135    /// Contains the next named rows, if exists
136    pub next: Option<Box<NamedRows>>,
137}
138
139impl IntoIterator for NamedRows {
140    type Item = Tuple;
141    type IntoIter = std::vec::IntoIter<Self::Item>;
142
143    fn into_iter(self) -> Self::IntoIter {
144        self.rows.into_iter()
145    }
146}
147
148impl NamedRows {
149    /// create a named rows with the given headers and rows
150    pub fn new(headers: Vec<String>, rows: Vec<Tuple>) -> Self {
151        Self {
152            headers,
153            rows,
154            next: None,
155        }
156    }
157
158    /// If there are more named rows after the current one
159    pub fn has_more(&self) -> bool {
160        self.next.is_some()
161    }
162
163    /// convert a chain of named rows to individual named rows
164    pub fn flatten(self) -> Vec<Self> {
165        let mut collected = vec![];
166        let mut current = self;
167        loop {
168            let nxt = current.next.take();
169            collected.push(current);
170            if let Some(n) = nxt {
171                current = *n;
172            } else {
173                break;
174            }
175        }
176        collected
177    }
178
179    /// Convert to a JSON object
180    pub fn into_json(self) -> JsonValue {
181        let nxt = match self.next {
182            None => json!(null),
183            Some(more) => more.into_json(),
184        };
185        let rows = self
186            .rows
187            .into_iter()
188            .map(|row| row.into_iter().map(JsonValue::from).collect::<JsonValue>())
189            .collect::<JsonValue>();
190        json!({
191            "headers": self.headers,
192            "rows": rows,
193            "next": nxt,
194        })
195    }
196    /// Make named rows from JSON
197    pub fn from_json(value: &JsonValue) -> Result<Self> {
198        let headers = value
199            .get("headers")
200            .ok_or_else(|| miette!("NamedRows requires 'headers' field"))?;
201        let headers = headers
202            .as_array()
203            .ok_or_else(|| miette!("'headers' field must be an array"))?;
204        let headers = headers
205            .iter()
206            .map(|h| -> Result<String> {
207                let h = h
208                    .as_str()
209                    .ok_or_else(|| miette!("'headers' field must be an array of strings"))?;
210                Ok(h.to_string())
211            })
212            .try_collect()?;
213        let rows = value
214            .get("rows")
215            .ok_or_else(|| miette!("NamedRows requires 'rows' field"))?;
216        let rows = rows
217            .as_array()
218            .ok_or_else(|| miette!("'rows' field must be an array"))?;
219        let rows = rows
220            .iter()
221            .map(|row| -> Result<Vec<DataValue>> {
222                let row = row
223                    .as_array()
224                    .ok_or_else(|| miette!("'rows' field must be an array of arrays"))?;
225                Ok(row.iter().map(DataValue::from).collect_vec())
226            })
227            .try_collect()?;
228        Ok(Self {
229            headers,
230            rows,
231            next: None,
232        })
233    }
234
235    /// Create a query and parameters to apply an operation (insert, put, delete, rm) to a stored
236    /// relation with the named rows.
237    pub fn into_payload(self, relation: &str, op: &str) -> Payload {
238        let cols_str = self.headers.join(", ");
239        let query = format!("?[{cols_str}] <- $data :{op} {relation} {{ {cols_str} }}");
240        let data = DataValue::List(self.rows.into_iter().map(|r| DataValue::List(r)).collect());
241        (query, [("data".to_string(), data)].into())
242    }
243}
244
245const STATUS_STR: &str = "status";
246const OK_STR: &str = "OK";
247
248/// The query and parameters.
249pub type Payload = (String, BTreeMap<String, DataValue>);
250
251/// Commands to be sent to a multi-transaction
252#[derive(Eq, PartialEq, Debug)]
253pub enum TransactionPayload {
254    /// Commit the current transaction
255    Commit,
256    /// Abort the current transaction
257    Abort,
258    /// Run a query inside the transaction
259    Query(Payload),
260}
261
262impl<'s, S: Storage<'s>> Db<S> {
263    /// Create a new database object with the given storage.
264    /// You must call [`initialize`](Self::initialize) immediately after creation.
265    /// Due to lifetime restrictions we are not able to call that for you automatically.
266    pub fn new(storage: S) -> Result<Self> {
267        let ret = Self {
268            db: storage,
269            temp_db: Default::default(),
270            relation_store_id: Default::default(),
271            queries_count: Default::default(),
272            running_queries: Default::default(),
273            fixed_rules: Arc::new(ShardedLock::new(DEFAULT_FIXED_RULES.clone())),
274            tokenizers: Arc::new(Default::default()),
275            #[cfg(not(target_arch = "wasm32"))]
276            callback_count: Default::default(),
277            // callback_receiver: Arc::new(receiver),
278            #[cfg(not(target_arch = "wasm32"))]
279            event_callbacks: Default::default(),
280            relation_locks: Default::default(),
281        };
282        Ok(ret)
283    }
284
285    /// Must be called after creation of the database to initialize the runtime state.
286    pub fn initialize(&'s self) -> Result<()> {
287        self.load_last_ids()?;
288        Ok(())
289    }
290
291    /// Run a multi-transaction. A command should be sent to `payloads`, and the result should be
292    /// retrieved from `results`. A transaction ends when it receives a `Commit` or `Abort`,
293    /// or when a query is not successful. After a transaction ends, sending / receiving from
294    /// the channels will fail.
295    ///
296    /// Write transactions _may_ block other reads, but we guarantee that this does not happen
297    /// for the RocksDB backend.
298    pub fn run_multi_transaction(
299        &'s self,
300        is_write: bool,
301        payloads: Receiver<TransactionPayload>,
302        results: Sender<Result<NamedRows>>,
303    ) {
304        let tx = if is_write {
305            self.transact_write()
306        } else {
307            self.transact()
308        };
309        let mut cleanups: Vec<(Vec<u8>, Vec<u8>)> = vec![];
310        let mut tx = match tx {
311            Ok(tx) => tx,
312            Err(err) => {
313                let _ = results.send(Err(err));
314                return;
315            }
316        };
317
318        let ts = current_validity();
319        let callback_targets = self.current_callback_targets();
320        let mut callback_collector = BTreeMap::new();
321        let mut write_locks = BTreeMap::new();
322
323        for payload in payloads {
324            match payload {
325                TransactionPayload::Commit => {
326                    for (lower, upper) in cleanups {
327                        if let Err(err) = tx.store_tx.del_range_from_persisted(&lower, &upper) {
328                            eprintln!("{err:?}")
329                        }
330                    }
331
332                    let _ = results.send(tx.commit_tx().map(|_| NamedRows::default()));
333                    #[cfg(not(target_arch = "wasm32"))]
334                    if !callback_collector.is_empty() {
335                        self.send_callbacks(callback_collector)
336                    }
337
338                    break;
339                }
340                TransactionPayload::Abort => {
341                    let _ = results.send(Ok(NamedRows::default()));
342                    break;
343                }
344                TransactionPayload::Query((script, params)) => {
345                    let p =
346                        match parse_script(&script, &params, &self.fixed_rules.read().unwrap(), ts)
347                        {
348                            Ok(p) => p,
349                            Err(err) => {
350                                if results.send(Err(err)).is_err() {
351                                    break;
352                                } else {
353                                    continue;
354                                }
355                            }
356                        };
357
358                    let p = match p.get_single_program() {
359                        Ok(p) => p,
360                        Err(err) => {
361                            if results.send(Err(err)).is_err() {
362                                break;
363                            } else {
364                                continue;
365                            }
366                        }
367                    };
368                    if let Some(write_lock_name) = p.needs_write_lock() {
369                        match write_locks.entry(write_lock_name) {
370                            Entry::Vacant(e) => {
371                                let lock = self
372                                    .obtain_relation_locks(iter::once(e.key()))
373                                    .pop()
374                                    .unwrap();
375                                e.insert(lock);
376                            }
377                            Entry::Occupied(_) => {}
378                        }
379                    }
380
381                    let res = self.execute_single_program(
382                        p,
383                        &mut tx,
384                        &mut cleanups,
385                        ts,
386                        &callback_targets,
387                        &mut callback_collector,
388                    );
389                    if results.send(res).is_err() {
390                        break;
391                    }
392                }
393            }
394        }
395    }
396
397    /// This returns the set of fixed rule implementations for this specific backend.
398    pub fn get_fixed_rules(&'s self) -> BTreeMap<String, Arc<Box<dyn FixedRule>>> {
399        return self.fixed_rules.read().unwrap().clone();
400    }
401
402    /// Run the CozoScript passed in. The `params` argument is a map of parameters.
403    pub fn run_script(
404        &'s self,
405        payload: &str,
406        params: BTreeMap<String, DataValue>,
407        mutability: ScriptMutability,
408    ) -> Result<NamedRows> {
409        self.run_script_ast(
410            parse_script(
411                payload,
412                &params,
413                &self.get_fixed_rules(),
414                current_validity(),
415            )?,
416            current_validity(),
417            mutability,
418        )
419    }
420
421    /// Run the CozoScript passed in. The `params` argument is a map of parameters.
422    pub fn run_script_read_only(
423        &'s self,
424        payload: &str,
425        params: BTreeMap<String, DataValue>,
426    ) -> Result<NamedRows> {
427        self.run_script(payload, params, ScriptMutability::Immutable)
428    }
429
430    /// Run the AST CozoScript passed in.
431    pub fn run_script_ast(
432        &'s self,
433        payload: CozoScript,
434        cur_vld: ValidityTs,
435        mutability: ScriptMutability,
436    ) -> Result<NamedRows> {
437        let read_only = mutability == ScriptMutability::Immutable;
438        match payload {
439            CozoScript::Single(p) => self.execute_single(cur_vld, p, read_only),
440            CozoScript::Imperative(ps) => self.execute_imperative(cur_vld, &ps, read_only),
441            CozoScript::Sys(op) => self.run_sys_op(op, read_only),
442        }
443    }
444
445    /// Export relations to JSON data.
446    ///
447    /// `relations` contains names of the stored relations to export.
448    pub fn export_relations<I, T>(&'s self, relations: I) -> Result<BTreeMap<String, NamedRows>>
449    where
450        T: AsRef<str>,
451        I: Iterator<Item = T>,
452    {
453        let tx = self.transact()?;
454        let mut ret: BTreeMap<String, NamedRows> = BTreeMap::new();
455        for rel in relations {
456            let handle = tx.get_relation(rel.as_ref(), false)?;
457            let size_hint = handle.metadata.keys.len() + handle.metadata.non_keys.len();
458
459            if handle.access_level < AccessLevel::ReadOnly {
460                bail!(InsufficientAccessLevel(
461                    handle.name.to_string(),
462                    "data export".to_string(),
463                    handle.access_level
464                ));
465            }
466
467            let mut cols = handle
468                .metadata
469                .keys
470                .iter()
471                .map(|col| col.name.clone())
472                .collect_vec();
473            cols.extend(
474                handle
475                    .metadata
476                    .non_keys
477                    .iter()
478                    .map(|col| col.name.clone())
479                    .collect_vec(),
480            );
481
482            let start = Tuple::default().encode_as_key(handle.id);
483            let end = Tuple::default().encode_as_key(handle.id.next());
484
485            let mut rows = vec![];
486            for data in tx.store_tx.range_scan(&start, &end) {
487                let (k, v) = data?;
488                let tuple = decode_tuple_from_kv(&k, &v, Some(size_hint));
489                rows.push(tuple);
490            }
491            let headers = cols.iter().map(|col| col.to_string()).collect_vec();
492            ret.insert(rel.as_ref().to_string(), NamedRows::new(headers, rows));
493        }
494        Ok(ret)
495    }
496    /// Import relations. The argument `data` accepts data in the shape of
497    /// what was returned by [Self::export_relations].
498    /// The target stored relations must already exist in the database.
499    /// Any associated indices will be updated.
500    ///
501    /// Note that triggers and callbacks are _not_ run for the relations, if any exists.
502    /// If you need to activate triggers or callbacks, use queries with parameters.
503    pub fn import_relations(&'s self, data: BTreeMap<String, NamedRows>) -> Result<()> {
504        #[derive(Debug, Diagnostic, Error)]
505        #[error("cannot import data for relation '{0}': {1}")]
506        #[diagnostic(code(import::bad_data))]
507        struct BadDataForRelation(String, JsonValue);
508
509        let rel_names = data.keys().map(SmartString::from).collect_vec();
510        let locks = self.obtain_relation_locks(rel_names.iter());
511        let _guards = locks.iter().map(|l| l.read().unwrap()).collect_vec();
512
513        let cur_vld = current_validity();
514
515        let mut tx = self.transact_write()?;
516
517        for (relation_op, in_data) in data {
518            let is_delete;
519            let relation: &str = match relation_op.strip_prefix('-') {
520                None => {
521                    is_delete = false;
522                    &relation_op
523                }
524                Some(s) => {
525                    is_delete = true;
526                    s
527                }
528            };
529            if relation.contains(':') {
530                bail!(ImportIntoIndex(relation.to_string()))
531            }
532            let handle = tx.get_relation(relation, false)?;
533            let has_indices = !handle.indices.is_empty();
534
535            if handle.access_level < AccessLevel::Protected {
536                bail!(InsufficientAccessLevel(
537                    handle.name.to_string(),
538                    "data import".to_string(),
539                    handle.access_level
540                ));
541            }
542
543            let header2idx: BTreeMap<_, _> = in_data
544                .headers
545                .iter()
546                .enumerate()
547                .map(|(i, k)| -> Result<(&str, usize)> { Ok((k as &str, i)) })
548                .try_collect()?;
549
550            let key_indices: Vec<_> = handle
551                .metadata
552                .keys
553                .iter()
554                .map(|col| -> Result<(usize, &ColumnDef)> {
555                    let idx = header2idx.get(&col.name as &str).ok_or_else(|| {
556                        miette!(
557                            "required header {} not found for relation {}",
558                            col.name,
559                            relation
560                        )
561                    })?;
562                    Ok((*idx, col))
563                })
564                .try_collect()?;
565
566            let val_indices: Vec<_> = if is_delete {
567                vec![]
568            } else {
569                handle
570                    .metadata
571                    .non_keys
572                    .iter()
573                    .map(|col| -> Result<(usize, &ColumnDef)> {
574                        let idx = header2idx.get(&col.name as &str).ok_or_else(|| {
575                            miette!(
576                                "required header {} not found for relation {}",
577                                col.name,
578                                relation
579                            )
580                        })?;
581                        Ok((*idx, col))
582                    })
583                    .try_collect()?
584            };
585
586            for row in in_data.rows {
587                let keys: Vec<_> = key_indices
588                    .iter()
589                    .map(|(i, col)| -> Result<DataValue> {
590                        let v = row
591                            .get(*i)
592                            .ok_or_else(|| miette!("row too short: {:?}", row))?;
593                        col.typing.coerce(v.clone(), cur_vld)
594                    })
595                    .try_collect()?;
596                let k_store = handle.encode_key_for_store(&keys, Default::default())?;
597                if has_indices {
598                    if let Some(existing) = tx.store_tx.get(&k_store, false)? {
599                        let mut old = keys.clone();
600                        extend_tuple_from_v(&mut old, &existing);
601                        if is_delete || old != row {
602                            for (idx_rel, extractor) in handle.indices.values() {
603                                let idx_tup =
604                                    extractor.iter().map(|i| old[*i].clone()).collect_vec();
605                                let encoded =
606                                    idx_rel.encode_key_for_store(&idx_tup, Default::default())?;
607                                tx.store_tx.del(&encoded)?;
608                            }
609                        }
610                    }
611                }
612                if is_delete {
613                    tx.store_tx.del(&k_store)?;
614                } else {
615                    let vals: Vec<_> = val_indices
616                        .iter()
617                        .map(|(i, col)| -> Result<DataValue> {
618                            let v = row
619                                .get(*i)
620                                .ok_or_else(|| miette!("row too short: {:?}", row))?;
621                            col.typing.coerce(v.clone(), cur_vld)
622                        })
623                        .try_collect()?;
624                    let v_store = handle.encode_val_only_for_store(&vals, Default::default())?;
625                    tx.store_tx.put(&k_store, &v_store)?;
626                    if has_indices {
627                        let mut kv = keys;
628                        kv.extend(vals);
629                        for (idx_rel, extractor) in handle.indices.values() {
630                            let idx_tup = extractor.iter().map(|i| kv[*i].clone()).collect_vec();
631                            let encoded =
632                                idx_rel.encode_key_for_store(&idx_tup, Default::default())?;
633                            tx.store_tx.put(&encoded, &[])?;
634                        }
635                    }
636                }
637            }
638        }
639        tx.commit_tx()?;
640        Ok(())
641    }
642    /// Backup the running database into an Sqlite file
643    #[allow(unused_variables)]
644    pub fn backup_db(&'s self, out_file: impl AsRef<Path>) -> Result<()> {
645        #[cfg(feature = "storage-sqlite")]
646        {
647            let sqlite_db = crate::new_cozo_sqlite(out_file)?;
648            if sqlite_db.relation_store_id.load(Ordering::SeqCst) != 0 {
649                bail!("Cannot create backup: data exists in the target database.");
650            }
651            let mut tx = self.transact()?;
652            let iter = tx.store_tx.range_scan(&[], &[0xFF]);
653            sqlite_db.db.batch_put(iter)?;
654            tx.commit_tx()?;
655            Ok(())
656        }
657        #[cfg(not(feature = "storage-sqlite"))]
658        bail!("backup requires the 'storage-sqlite' feature to be enabled")
659    }
660    /// Restore from an Sqlite backup
661    #[allow(unused_variables)]
662    pub fn restore_backup(&'s self, in_file: impl AsRef<Path>) -> Result<()> {
663        #[cfg(feature = "storage-sqlite")]
664        {
665            let sqlite_db = crate::new_cozo_sqlite(in_file)?;
666            let mut s_tx = sqlite_db.transact()?;
667            {
668                let mut tx = self.transact()?;
669                let store_id = tx.relation_store_id.load(Ordering::SeqCst);
670                if store_id != 0 {
671                    bail!(
672                        "Cannot restore backup: data exists in the current database. \
673                You can only restore into a new database (store id: {}).",
674                        store_id
675                    );
676                }
677                tx.commit_tx()?;
678            }
679            let iter = s_tx.store_tx.total_scan();
680            self.db.batch_put(iter)?;
681            s_tx.commit_tx()?;
682            Ok(())
683        }
684        #[cfg(not(feature = "storage-sqlite"))]
685        bail!("backup requires the 'storage-sqlite' feature to be enabled")
686    }
687    /// Import data from relations in a backup file.
688    /// The target stored relations must already exist in the database, and it must not
689    /// have any associated indices. If you want to import into relations with indices,
690    /// use [Db::import_relations].
691    ///
692    /// Note that triggers and callbacks are _not_ run for the relations, if any exists.
693    /// If you need to activate triggers or callbacks, use queries with parameters.
694    #[allow(unused_variables)]
695    pub fn import_from_backup(
696        &'s self,
697        in_file: impl AsRef<Path>,
698        relations: &[String],
699    ) -> Result<()> {
700        #[cfg(not(feature = "storage-sqlite"))]
701        bail!("backup requires the 'storage-sqlite' feature to be enabled");
702
703        #[cfg(feature = "storage-sqlite")]
704        {
705            let rel_names = relations.iter().map(SmartString::from).collect_vec();
706            let locks = self.obtain_relation_locks(rel_names.iter());
707            let _guards = locks.iter().map(|l| l.read().unwrap()).collect_vec();
708
709            let source_db = crate::new_cozo_sqlite(in_file)?;
710            let mut src_tx = source_db.transact()?;
711            let mut dst_tx = self.transact_write()?;
712
713            for relation in relations {
714                if relation.contains(':') {
715                    bail!(ImportIntoIndex(relation.to_string()))
716                }
717                let src_handle = src_tx.get_relation(relation, false)?;
718                let dst_handle = dst_tx.get_relation(relation, false)?;
719
720                if !dst_handle.indices.is_empty() {
721                    #[derive(Debug, Error, Diagnostic)]
722                    #[error("Cannot import data into relation {0} from backup as the relation has indices")]
723                    #[diagnostic(code(tx::bare_import_with_indices))]
724                    #[diagnostic(help("Use `import_relations()` instead"))]
725                    pub(crate) struct RestoreIntoRelWithIndices(pub(crate) String);
726
727                    bail!(RestoreIntoRelWithIndices(dst_handle.name.to_string()))
728                }
729
730                if dst_handle.access_level < AccessLevel::Protected {
731                    bail!(InsufficientAccessLevel(
732                        dst_handle.name.to_string(),
733                        "data import".to_string(),
734                        dst_handle.access_level
735                    ));
736                }
737
738                let src_lower = Tuple::default().encode_as_key(src_handle.id);
739                let src_upper = Tuple::default().encode_as_key(src_handle.id.next());
740
741                let data_it = src_tx.store_tx.range_scan(&src_lower, &src_upper).map(
742                    |src_pair| -> Result<(Vec<u8>, Vec<u8>)> {
743                        let (mut src_k, mut src_v) = src_pair?;
744                        dst_handle.amend_key_prefix(&mut src_k);
745                        dst_handle.amend_key_prefix(&mut src_v);
746                        Ok((src_k, src_v))
747                    },
748                );
749                for result in data_it {
750                    let (key, val) = result?;
751                    dst_tx.store_tx.put(&key, &val)?;
752                }
753            }
754
755            src_tx.commit_tx()?;
756            dst_tx.commit_tx()
757        }
758    }
759    /// Register a custom fixed rule implementation.
760    pub fn register_fixed_rule<R>(&self, name: String, rule_impl: R) -> Result<()>
761    where
762        R: FixedRule + 'static,
763    {
764        match self.fixed_rules.write().unwrap().entry(name) {
765            Entry::Vacant(ent) => {
766                ent.insert(Arc::new(Box::new(rule_impl)));
767                Ok(())
768            }
769            Entry::Occupied(ent) => {
770                bail!(
771                    "A fixed rule with the name {} is already registered",
772                    ent.key()
773                )
774            }
775        }
776    }
777
778    /// Unregister a custom fixed rule implementation.
779    pub fn unregister_fixed_rule(&self, name: &str) -> Result<bool> {
780        if DEFAULT_FIXED_RULES.contains_key(name) {
781            bail!("Cannot unregister builtin fixed rule {}", name);
782        }
783        Ok(self.fixed_rules.write().unwrap().remove(name).is_some())
784    }
785
786    /// Register callback channel to receive changes when the requested relation are successfully committed.
787    /// The returned ID can be used to unregister the callback channel.
788    #[cfg(not(target_arch = "wasm32"))]
789    pub fn register_callback(
790        &self,
791        relation: &str,
792        capacity: Option<usize>,
793    ) -> (u32, Receiver<(CallbackOp, NamedRows, NamedRows)>) {
794        let (sender, receiver) = if let Some(c) = capacity {
795            bounded(c)
796        } else {
797            unbounded()
798        };
799        let cb = CallbackDeclaration {
800            dependent: SmartString::from(relation),
801            sender,
802        };
803
804        let mut guard = self.event_callbacks.write().unwrap();
805        let new_id = self.callback_count.fetch_add(1, Ordering::SeqCst);
806        guard
807            .1
808            .entry(SmartString::from(relation))
809            .or_default()
810            .insert(new_id);
811
812        guard.0.insert(new_id, cb);
813        (new_id, receiver)
814    }
815
816    /// Unregister callbacks/channels to run when changes to relations are committed.
817    #[cfg(not(target_arch = "wasm32"))]
818    pub fn unregister_callback(&self, id: u32) -> bool {
819        let mut guard = self.event_callbacks.write().unwrap();
820        let ret = guard.0.remove(&id);
821        if let Some(cb) = &ret {
822            guard.1.get_mut(&cb.dependent).unwrap().remove(&id);
823
824            if guard.1.get(&cb.dependent).unwrap().is_empty() {
825                guard.1.remove(&cb.dependent);
826            }
827        }
828        ret.is_some()
829    }
830
831    pub(crate) fn obtain_relation_locks<'a, T: Iterator<Item = &'a SmartString<LazyCompact>>>(
832        &'s self,
833        rels: T,
834    ) -> Vec<Arc<ShardedLock<()>>> {
835        let mut collected = vec![];
836        let mut pending = vec![];
837        {
838            let locks = self.relation_locks.read().unwrap();
839            for rel in rels {
840                match locks.get(rel) {
841                    None => {
842                        pending.push(rel);
843                    }
844                    Some(lock) => collected.push(lock.clone()),
845                }
846            }
847        }
848        if !pending.is_empty() {
849            let mut locks = self.relation_locks.write().unwrap();
850            for rel in pending {
851                let lock = locks.entry(rel.clone()).or_default().clone();
852                collected.push(lock);
853            }
854        }
855        collected
856    }
857
858    fn compact_relation(&'s self) -> Result<()> {
859        let l = Tuple::default().encode_as_key(RelationId(0));
860        let u = vec![DataValue::Bot].encode_as_key(RelationId(u64::MAX));
861        self.db.range_compact(&l, &u)?;
862        Ok(())
863    }
864
865    fn load_last_ids(&'s self) -> Result<()> {
866        let mut tx = self.transact_write()?;
867        self.relation_store_id
868            .store(tx.init_storage()?.0, Ordering::Release);
869        tx.commit_tx()?;
870        Ok(())
871    }
872    pub(crate) fn transact(&'s self) -> Result<SessionTx<'_>> {
873        let ret = SessionTx {
874            store_tx: Box::new(self.db.transact(false)?),
875            temp_store_tx: self.temp_db.transact(true)?,
876            relation_store_id: self.relation_store_id.clone(),
877            temp_store_id: Default::default(),
878            tokenizers: self.tokenizers.clone(),
879        };
880        Ok(ret)
881    }
882    pub(crate) fn transact_write(&'s self) -> Result<SessionTx<'_>> {
883        let ret = SessionTx {
884            store_tx: Box::new(self.db.transact(true)?),
885            temp_store_tx: self.temp_db.transact(true)?,
886            relation_store_id: self.relation_store_id.clone(),
887            temp_store_id: Default::default(),
888            tokenizers: self.tokenizers.clone(),
889        };
890        Ok(ret)
891    }
892
893    pub(crate) fn execute_single_program(
894        &'s self,
895        p: InputProgram,
896        tx: &mut SessionTx<'_>,
897        cleanups: &mut Vec<(Vec<u8>, Vec<u8>)>,
898        cur_vld: ValidityTs,
899        callback_targets: &BTreeSet<SmartString<LazyCompact>>,
900        callback_collector: &mut CallbackCollector,
901    ) -> Result<NamedRows> {
902        #[allow(unused_variables)]
903        let sleep_opt = p.out_opts.sleep;
904        let (q_res, q_cleanups) =
905            self.run_query(tx, p, cur_vld, callback_targets, callback_collector, true)?;
906        cleanups.extend(q_cleanups);
907        #[cfg(not(target_arch = "wasm32"))]
908        if let Some(secs) = sleep_opt {
909            thread::sleep(Duration::from_micros((secs * 1000000.) as u64));
910        }
911        Ok(q_res)
912    }
913
914    fn execute_single(
915        &'s self,
916        cur_vld: ValidityTs,
917        p: InputProgram,
918        read_only: bool,
919    ) -> Result<NamedRows, Report> {
920        let mut callback_collector = BTreeMap::new();
921        let write_lock_names = p.needs_write_lock();
922        let is_write = write_lock_names.is_some();
923        if read_only && is_write {
924            bail!("write lock required for read-only query");
925        }
926        let write_lock = self.obtain_relation_locks(write_lock_names.iter());
927        let _write_lock_guards = if is_write {
928            Some(write_lock[0].read().unwrap())
929        } else {
930            None
931        };
932        let callback_targets = if is_write {
933            self.current_callback_targets()
934        } else {
935            Default::default()
936        };
937        let mut cleanups = vec![];
938        let res;
939        {
940            let mut tx = if is_write {
941                self.transact_write()?
942            } else {
943                self.transact()?
944            };
945
946            res = self.execute_single_program(
947                p,
948                &mut tx,
949                &mut cleanups,
950                cur_vld,
951                &callback_targets,
952                &mut callback_collector,
953            )?;
954
955            for (lower, upper) in cleanups {
956                tx.store_tx.del_range_from_persisted(&lower, &upper)?;
957            }
958
959            tx.commit_tx()?;
960        }
961        #[cfg(not(target_arch = "wasm32"))]
962        if !callback_collector.is_empty() {
963            self.send_callbacks(callback_collector)
964        }
965
966        Ok(res)
967    }
968    fn explain_compiled(&self, strata: &[CompiledProgram]) -> Result<NamedRows> {
969        let mut ret: Vec<JsonValue> = vec![];
970        const STRATUM: &str = "stratum";
971        const ATOM_IDX: &str = "atom_idx";
972        const OP: &str = "op";
973        const RULE_IDX: &str = "rule_idx";
974        const RULE_NAME: &str = "rule";
975        const REF_NAME: &str = "ref";
976        const OUT_BINDINGS: &str = "out_relation";
977        const JOINS_ON: &str = "joins_on";
978        const FILTERS: &str = "filters/expr";
979
980        let headers = vec![
981            STRATUM.to_string(),
982            RULE_IDX.to_string(),
983            RULE_NAME.to_string(),
984            ATOM_IDX.to_string(),
985            OP.to_string(),
986            REF_NAME.to_string(),
987            JOINS_ON.to_string(),
988            FILTERS.to_string(),
989            OUT_BINDINGS.to_string(),
990        ];
991
992        for (stratum, p) in strata.iter().enumerate() {
993            let mut clause_idx = -1;
994            for (rule_name, v) in p {
995                match v {
996                    CompiledRuleSet::Rules(rules) => {
997                        for CompiledRule { aggr, relation, .. } in rules.iter() {
998                            clause_idx += 1;
999                            let mut ret_for_relation = vec![];
1000                            let mut rel_stack = vec![relation];
1001                            let mut idx = 0;
1002                            let mut atom_type = "out";
1003                            for (a, _) in aggr.iter().flatten() {
1004                                if a.is_meet {
1005                                    if atom_type == "out" {
1006                                        atom_type = "meet_aggr_out";
1007                                    }
1008                                } else {
1009                                    atom_type = "aggr_out";
1010                                }
1011                            }
1012
1013                            ret_for_relation.push(json!({
1014                                STRATUM: stratum,
1015                                ATOM_IDX: idx,
1016                                OP: atom_type,
1017                                RULE_IDX: clause_idx,
1018                                RULE_NAME: rule_name.to_string(),
1019                                OUT_BINDINGS: relation.bindings_after_eliminate().into_iter().map(|v| v.to_string()).collect_vec()
1020                            }));
1021                            idx += 1;
1022
1023                            while let Some(rel) = rel_stack.pop() {
1024                                let (atom_type, ref_name, joins_on, filters) = match rel {
1025                                    r @ RelAlgebra::Fixed(..) => {
1026                                        if r.is_unit() {
1027                                            continue;
1028                                        }
1029                                        ("fixed", json!(null), json!(null), json!(null))
1030                                    }
1031                                    RelAlgebra::TempStore(TempStoreRA {
1032                                        storage_key,
1033                                        filters,
1034                                        ..
1035                                    }) => (
1036                                        "load_mem",
1037                                        json!(storage_key.to_string()),
1038                                        json!(null),
1039                                        json!(filters.iter().map(|f| f.to_string()).collect_vec()),
1040                                    ),
1041                                    RelAlgebra::Stored(StoredRA {
1042                                        storage, filters, ..
1043                                    }) => (
1044                                        "load_stored",
1045                                        json!(format!(":{}", storage.name)),
1046                                        json!(null),
1047                                        json!(filters.iter().map(|f| f.to_string()).collect_vec()),
1048                                    ),
1049                                    RelAlgebra::StoredWithValidity(StoredWithValidityRA {
1050                                        storage,
1051                                        filters,
1052                                        ..
1053                                    }) => (
1054                                        "load_stored_with_validity",
1055                                        json!(format!(":{}", storage.name)),
1056                                        json!(null),
1057                                        json!(filters.iter().map(|f| f.to_string()).collect_vec()),
1058                                    ),
1059                                    RelAlgebra::Join(inner) => {
1060                                        if inner.left.is_unit() {
1061                                            rel_stack.push(&inner.right);
1062                                            continue;
1063                                        }
1064                                        let t = inner.join_type();
1065                                        let InnerJoin {
1066                                            left,
1067                                            right,
1068                                            joiner,
1069                                            ..
1070                                        } = inner.as_ref();
1071                                        rel_stack.push(left);
1072                                        rel_stack.push(right);
1073                                        (t, json!(null), json!(joiner.as_map()), json!(null))
1074                                    }
1075                                    RelAlgebra::NegJoin(inner) => {
1076                                        let t = inner.join_type();
1077                                        let NegJoin {
1078                                            left,
1079                                            right,
1080                                            joiner,
1081                                            ..
1082                                        } = inner.as_ref();
1083                                        rel_stack.push(left);
1084                                        rel_stack.push(right);
1085                                        (t, json!(null), json!(joiner.as_map()), json!(null))
1086                                    }
1087                                    RelAlgebra::Reorder(ReorderRA { relation, .. }) => {
1088                                        rel_stack.push(relation);
1089                                        ("reorder", json!(null), json!(null), json!(null))
1090                                    }
1091                                    RelAlgebra::Filter(FilteredRA {
1092                                        parent,
1093                                        filters: pred,
1094                                        ..
1095                                    }) => {
1096                                        rel_stack.push(parent);
1097                                        (
1098                                            "filter",
1099                                            json!(null),
1100                                            json!(null),
1101                                            json!(pred.iter().map(|f| f.to_string()).collect_vec()),
1102                                        )
1103                                    }
1104                                    RelAlgebra::Unification(UnificationRA {
1105                                        parent,
1106                                        binding,
1107                                        expr,
1108                                        is_multi,
1109                                        ..
1110                                    }) => {
1111                                        rel_stack.push(parent);
1112                                        (
1113                                            if *is_multi { "multi-unify" } else { "unify" },
1114                                            json!(binding.name),
1115                                            json!(null),
1116                                            json!(expr.to_string()),
1117                                        )
1118                                    }
1119                                    RelAlgebra::HnswSearch(HnswSearchRA {
1120                                        hnsw_search, ..
1121                                    }) => (
1122                                        "hnsw_index",
1123                                        json!(format!(":{}", hnsw_search.query.name)),
1124                                        json!(hnsw_search.query.name),
1125                                        json!(hnsw_search
1126                                            .filter
1127                                            .iter()
1128                                            .map(|f| f.to_string())
1129                                            .collect_vec()),
1130                                    ),
1131                                    RelAlgebra::FtsSearch(FtsSearchRA { fts_search, .. }) => (
1132                                        "fts_index",
1133                                        json!(format!(":{}", fts_search.query.name)),
1134                                        json!(fts_search.query.name),
1135                                        json!(fts_search
1136                                            .filter
1137                                            .iter()
1138                                            .map(|f| f.to_string())
1139                                            .collect_vec()),
1140                                    ),
1141                                    RelAlgebra::LshSearch(LshSearchRA { lsh_search, .. }) => (
1142                                        "lsh_index",
1143                                        json!(format!(":{}", lsh_search.query.name)),
1144                                        json!(lsh_search.query.name),
1145                                        json!(lsh_search
1146                                            .filter
1147                                            .iter()
1148                                            .map(|f| f.to_string())
1149                                            .collect_vec()),
1150                                    ),
1151                                };
1152                                ret_for_relation.push(json!({
1153                                    STRATUM: stratum,
1154                                    ATOM_IDX: idx,
1155                                    OP: atom_type,
1156                                    RULE_IDX: clause_idx,
1157                                    RULE_NAME: rule_name.to_string(),
1158                                    REF_NAME: ref_name,
1159                                    OUT_BINDINGS: rel.bindings_after_eliminate().into_iter().map(|v| v.to_string()).collect_vec(),
1160                                    JOINS_ON: joins_on,
1161                                    FILTERS: filters,
1162                                }));
1163                                idx += 1;
1164                            }
1165                            ret_for_relation.reverse();
1166                            ret.extend(ret_for_relation)
1167                        }
1168                    }
1169                    CompiledRuleSet::Fixed(_) => ret.push(json!({
1170                        STRATUM: stratum,
1171                        ATOM_IDX: 0,
1172                        OP: "algo",
1173                        RULE_IDX: 0,
1174                        RULE_NAME: rule_name.to_string(),
1175                    })),
1176                }
1177            }
1178        }
1179
1180        let rows = ret
1181            .into_iter()
1182            .map(|m| {
1183                headers
1184                    .iter()
1185                    .map(|i| DataValue::from(m.get(i).unwrap_or(&JsonValue::Null)))
1186                    .collect_vec()
1187            })
1188            .collect_vec();
1189
1190        Ok(NamedRows::new(headers, rows))
1191    }
1192    pub(crate) fn run_sys_op_with_tx(
1193        &'s self,
1194        tx: &mut SessionTx<'_>,
1195        op: &SysOp,
1196        read_only: bool,
1197        skip_locking: bool,
1198    ) -> Result<NamedRows> {
1199        match op {
1200            SysOp::Explain(prog) => {
1201                let (normalized_program, _) = prog.clone().into_normalized_program(tx)?;
1202                let (stratified_program, _) = normalized_program.into_stratified_program()?;
1203                let program = stratified_program.magic_sets_rewrite(tx)?;
1204                let compiled = tx.stratified_magic_compile(program)?;
1205                self.explain_compiled(&compiled)
1206            }
1207            SysOp::Compact => {
1208                if read_only {
1209                    bail!("Cannot compact in read-only mode");
1210                }
1211                self.compact_relation()?;
1212                Ok(NamedRows::new(
1213                    vec![STATUS_STR.to_string()],
1214                    vec![vec![DataValue::from(OK_STR)]],
1215                ))
1216            }
1217            SysOp::ListRelations => self.list_relations(tx),
1218            SysOp::ListFixedRules => {
1219                let rules = self.fixed_rules.read().unwrap();
1220                Ok(NamedRows::new(
1221                    vec!["rule".to_string()],
1222                    rules
1223                        .keys()
1224                        .map(|k| vec![DataValue::from(k as &str)])
1225                        .collect_vec(),
1226                ))
1227            }
1228            SysOp::RemoveRelation(rel_names) => {
1229                if read_only {
1230                    bail!("Cannot remove relations in read-only mode");
1231                }
1232                let rel_name_strs = rel_names.iter().map(|n| &n.name);
1233                let locks = if skip_locking {
1234                    vec![]
1235                } else {
1236                    self.obtain_relation_locks(rel_name_strs)
1237                };
1238                let _guards = locks.iter().map(|l| l.read().unwrap()).collect_vec();
1239                let mut bounds = vec![];
1240                for rs in rel_names {
1241                    let bound = tx.destroy_relation(rs)?;
1242                    if !rs.is_temp_store_name() {
1243                        bounds.extend(bound);
1244                    }
1245                }
1246                for (lower, upper) in bounds {
1247                    tx.store_tx.del_range_from_persisted(&lower, &upper)?;
1248                }
1249                Ok(NamedRows::new(
1250                    vec![STATUS_STR.to_string()],
1251                    vec![vec![DataValue::from(OK_STR)]],
1252                ))
1253            }
1254            SysOp::DescribeRelation(rel_name, description) => {
1255                tx.describe_relation(rel_name, description)?;
1256                Ok(NamedRows::new(
1257                    vec![STATUS_STR.to_string()],
1258                    vec![vec![DataValue::from(OK_STR)]],
1259                ))
1260            }
1261            SysOp::CreateIndex(rel_name, idx_name, cols) => {
1262                if read_only {
1263                    bail!("Cannot create index in read-only mode");
1264                }
1265                if skip_locking {
1266                    tx.create_index(rel_name, idx_name, cols)?;
1267                } else {
1268                    let lock = self
1269                        .obtain_relation_locks(iter::once(&rel_name.name))
1270                        .pop()
1271                        .unwrap();
1272                    let _guard = lock.write().unwrap();
1273                    tx.create_index(rel_name, idx_name, cols)?;
1274                }
1275                Ok(NamedRows::new(
1276                    vec![STATUS_STR.to_string()],
1277                    vec![vec![DataValue::from(OK_STR)]],
1278                ))
1279            }
1280            SysOp::CreateVectorIndex(config) => {
1281                if read_only {
1282                    bail!("Cannot create vector index in read-only mode");
1283                }
1284                if skip_locking {
1285                    tx.create_hnsw_index(config)?;
1286                } else {
1287                    let lock = self
1288                        .obtain_relation_locks(iter::once(&config.base_relation))
1289                        .pop()
1290                        .unwrap();
1291                    let _guard = lock.write().unwrap();
1292                    tx.create_hnsw_index(config)?;
1293                }
1294                Ok(NamedRows::new(
1295                    vec![STATUS_STR.to_string()],
1296                    vec![vec![DataValue::from(OK_STR)]],
1297                ))
1298            }
1299            SysOp::CreateFtsIndex(config) => {
1300                if read_only {
1301                    bail!("Cannot create fts index in read-only mode");
1302                }
1303                if skip_locking {
1304                    tx.create_fts_index(config)?;
1305                } else {
1306                    let lock = self
1307                        .obtain_relation_locks(iter::once(&config.base_relation))
1308                        .pop()
1309                        .unwrap();
1310                    let _guard = lock.write().unwrap();
1311                    tx.create_fts_index(config)?;
1312                }
1313                Ok(NamedRows::new(
1314                    vec![STATUS_STR.to_string()],
1315                    vec![vec![DataValue::from(OK_STR)]],
1316                ))
1317            }
1318            SysOp::CreateMinHashLshIndex(config) => {
1319                if read_only {
1320                    bail!("Cannot create minhash lsh index in read-only mode");
1321                }
1322                if skip_locking {
1323                    tx.create_minhash_lsh_index(config)?;
1324                } else {
1325                    let lock = self
1326                        .obtain_relation_locks(iter::once(&config.base_relation))
1327                        .pop()
1328                        .unwrap();
1329                    let _guard = lock.write().unwrap();
1330                    tx.create_minhash_lsh_index(config)?;
1331                }
1332
1333                Ok(NamedRows::new(
1334                    vec![STATUS_STR.to_string()],
1335                    vec![vec![DataValue::from(OK_STR)]],
1336                ))
1337            }
1338            SysOp::RemoveIndex(rel_name, idx_name) => {
1339                if read_only {
1340                    bail!("Cannot remove index in read-only mode");
1341                }
1342                let bounds = if skip_locking {
1343                    tx.remove_index(rel_name, idx_name)?
1344                } else {
1345                    let lock = self
1346                        .obtain_relation_locks(iter::once(&rel_name.name))
1347                        .pop()
1348                        .unwrap();
1349                    let _guard = lock.read().unwrap();
1350                    tx.remove_index(rel_name, idx_name)?
1351                };
1352
1353                for (lower, upper) in bounds {
1354                    tx.store_tx.del_range_from_persisted(&lower, &upper)?;
1355                }
1356                Ok(NamedRows::new(
1357                    vec![STATUS_STR.to_string()],
1358                    vec![vec![DataValue::from(OK_STR)]],
1359                ))
1360            }
1361            SysOp::ListColumns(rs) => self.list_columns(tx, rs),
1362            SysOp::ListIndices(rs) => self.list_indices(tx, rs),
1363            SysOp::RenameRelation(rename_pairs) => {
1364                if read_only {
1365                    bail!("Cannot rename relations in read-only mode");
1366                }
1367                let rel_names = rename_pairs.iter().flat_map(|(f, t)| [&f.name, &t.name]);
1368                let locks = if skip_locking {
1369                    vec![]
1370                } else {
1371                    self.obtain_relation_locks(rel_names)
1372                };
1373                let _guards = locks.iter().map(|l| l.read().unwrap()).collect_vec();
1374                for (old, new) in rename_pairs {
1375                    tx.rename_relation(old, new)?;
1376                }
1377                Ok(NamedRows::new(
1378                    vec![STATUS_STR.to_string()],
1379                    vec![vec![DataValue::from(OK_STR)]],
1380                ))
1381            }
1382            SysOp::ListRunning => self.list_running(),
1383            SysOp::KillRunning(id) => {
1384                let queries = self.running_queries.lock().unwrap();
1385                Ok(match queries.get(id) {
1386                    None => NamedRows::new(
1387                        vec![STATUS_STR.to_string()],
1388                        vec![vec![DataValue::from("NOT_FOUND")]],
1389                    ),
1390                    Some(handle) => {
1391                        handle.poison.0.store(true, Ordering::Relaxed);
1392                        NamedRows::new(
1393                            vec![STATUS_STR.to_string()],
1394                            vec![vec![DataValue::from("KILLING")]],
1395                        )
1396                    }
1397                })
1398            }
1399            SysOp::ShowTrigger(name) => {
1400                let rel = tx.get_relation(name, false)?;
1401                let mut rows: Vec<Vec<JsonValue>> = vec![];
1402                for (i, trigger) in rel.put_triggers.iter().enumerate() {
1403                    rows.push(vec![json!("put"), json!(i), json!(trigger)])
1404                }
1405                for (i, trigger) in rel.rm_triggers.iter().enumerate() {
1406                    rows.push(vec![json!("rm"), json!(i), json!(trigger)])
1407                }
1408                for (i, trigger) in rel.replace_triggers.iter().enumerate() {
1409                    rows.push(vec![json!("replace"), json!(i), json!(trigger)])
1410                }
1411                let rows = rows
1412                    .into_iter()
1413                    .map(|row| row.into_iter().map(DataValue::from).collect_vec())
1414                    .collect_vec();
1415                Ok(NamedRows::new(
1416                    vec!["type".to_string(), "idx".to_string(), "trigger".to_string()],
1417                    rows,
1418                ))
1419            }
1420            SysOp::SetTriggers(name, puts, rms, replaces) => {
1421                if read_only {
1422                    bail!("Cannot set triggers in read-only mode");
1423                }
1424                tx.set_relation_triggers(name, puts, rms, replaces)?;
1425                Ok(NamedRows::new(
1426                    vec![STATUS_STR.to_string()],
1427                    vec![vec![DataValue::from(OK_STR)]],
1428                ))
1429            }
1430            SysOp::SetAccessLevel(names, level) => {
1431                if read_only {
1432                    bail!("Cannot set access level in read-only mode");
1433                }
1434                for name in names {
1435                    tx.set_access_level(name, *level)?;
1436                }
1437                Ok(NamedRows::new(
1438                    vec![STATUS_STR.to_string()],
1439                    vec![vec![DataValue::from(OK_STR)]],
1440                ))
1441            }
1442        }
1443    }
1444    fn run_sys_op(&'s self, op: SysOp, read_only: bool) -> Result<NamedRows> {
1445        let mut tx = if read_only {
1446            self.transact()?
1447        } else {
1448            self.transact_write()?
1449        };
1450        let res = self.run_sys_op_with_tx(&mut tx, &op, read_only, false)?;
1451        tx.commit_tx()?;
1452        Ok(res)
1453    }
1454    /// This is the entry to query evaluation
1455    pub(crate) fn run_query(
1456        &self,
1457        tx: &mut SessionTx<'_>,
1458        input_program: InputProgram,
1459        cur_vld: ValidityTs,
1460        callback_targets: &BTreeSet<SmartString<LazyCompact>>,
1461        callback_collector: &mut CallbackCollector,
1462        top_level: bool,
1463    ) -> Result<(NamedRows, Vec<(Vec<u8>, Vec<u8>)>)> {
1464        // cleanups contain stored relations that should be deleted at the end of query
1465        let mut clean_ups = vec![];
1466
1467        // Some checks in case the query specifies mutation
1468        if let Some((meta, op, _)) = &input_program.out_opts.store_relation {
1469            if *op == RelationOp::Create {
1470                #[derive(Debug, Error, Diagnostic)]
1471                #[error("Stored relation {0} conflicts with an existing one")]
1472                #[diagnostic(code(eval::stored_relation_conflict))]
1473                struct StoreRelationConflict(String);
1474
1475                ensure!(
1476                    !tx.relation_exists(&meta.name)?,
1477                    StoreRelationConflict(meta.name.to_string())
1478                )
1479            } else if *op != RelationOp::Replace {
1480                #[derive(Debug, Error, Diagnostic)]
1481                #[error("Stored relation {0} not found")]
1482                #[diagnostic(code(eval::stored_relation_not_found))]
1483                struct StoreRelationNotFoundError(String);
1484
1485                let existing = tx.get_relation(&meta.name, false)?;
1486
1487                ensure!(
1488                    tx.relation_exists(&meta.name)?,
1489                    StoreRelationNotFoundError(meta.name.to_string())
1490                );
1491
1492                existing.ensure_compatible(
1493                    meta,
1494                    *op == RelationOp::Rm || *op == RelationOp::Delete || *op == RelationOp::Update,
1495                )?;
1496            }
1497        };
1498
1499        // query compilation
1500        let entry_head_or_default = input_program.get_entry_out_head_or_default()?;
1501        let (normalized_program, out_opts) = input_program.into_normalized_program(tx)?;
1502        let (stratified_program, store_lifetimes) = normalized_program.into_stratified_program()?;
1503        let program = stratified_program.magic_sets_rewrite(tx)?;
1504        let compiled = tx.stratified_magic_compile(program)?;
1505
1506        // poison is used to terminate queries early
1507        let poison = Poison::default();
1508        if let Some(secs) = out_opts.timeout {
1509            poison.set_timeout(secs)?;
1510        }
1511        // give the query an ID and store it so that it can be queried and cancelled
1512        let id = self.queries_count.fetch_add(1, Ordering::AcqRel);
1513
1514        // time the query
1515        let since_the_epoch = seconds_since_the_epoch()?;
1516
1517        let handle = RunningQueryHandle {
1518            started_at: since_the_epoch,
1519            poison: poison.clone(),
1520        };
1521        self.running_queries.lock().unwrap().insert(id, handle);
1522
1523        // RAII cleanups of running query handle
1524        let _guard = RunningQueryCleanup {
1525            id,
1526            running_queries: self.running_queries.clone(),
1527        };
1528
1529        let total_num_to_take = if out_opts.sorters.is_empty() {
1530            out_opts.num_to_take()
1531        } else {
1532            None
1533        };
1534
1535        let num_to_skip = if out_opts.sorters.is_empty() {
1536            out_opts.offset
1537        } else {
1538            None
1539        };
1540
1541        // the real evaluation
1542        let (result_store, early_return) = tx.stratified_magic_evaluate(
1543            &compiled,
1544            store_lifetimes,
1545            total_num_to_take,
1546            num_to_skip,
1547            poison,
1548        )?;
1549
1550        // deal with assertions
1551        if let Some(assertion) = &out_opts.assertion {
1552            match assertion {
1553                QueryAssertion::AssertNone(span) => {
1554                    if let Some(tuple) = result_store.all_iter().next() {
1555                        #[derive(Debug, Error, Diagnostic)]
1556                        #[error(
1557                            "The query is asserted to return no result, but a tuple {0:?} is found"
1558                        )]
1559                        #[diagnostic(code(eval::assert_none_failure))]
1560                        struct AssertNoneFailure(Tuple, #[label] SourceSpan);
1561                        bail!(AssertNoneFailure(tuple.into_tuple(), *span))
1562                    }
1563                }
1564                QueryAssertion::AssertSome(span) => {
1565                    if result_store.all_iter().next().is_none() {
1566                        #[derive(Debug, Error, Diagnostic)]
1567                        #[error("The query is asserted to return some results, but returned none")]
1568                        #[diagnostic(code(eval::assert_some_failure))]
1569                        struct AssertSomeFailure(#[label] SourceSpan);
1570                        bail!(AssertSomeFailure(*span))
1571                    }
1572                }
1573            }
1574        }
1575
1576        if !out_opts.sorters.is_empty() {
1577            // sort outputs if required
1578            let sorted_result =
1579                tx.sort_and_collect(result_store, &out_opts.sorters, &entry_head_or_default)?;
1580            let sorted_iter = if let Some(offset) = out_opts.offset {
1581                Left(sorted_result.into_iter().skip(offset))
1582            } else {
1583                Right(sorted_result.into_iter())
1584            };
1585            let sorted_iter = if let Some(limit) = out_opts.limit {
1586                Left(sorted_iter.take(limit))
1587            } else {
1588                Right(sorted_iter)
1589            };
1590            if let Some((meta, relation_op, returning)) = &out_opts.store_relation {
1591                let to_clear = tx
1592                    .execute_relation(
1593                        self,
1594                        sorted_iter,
1595                        *relation_op,
1596                        meta,
1597                        &entry_head_or_default,
1598                        cur_vld,
1599                        callback_targets,
1600                        callback_collector,
1601                        top_level,
1602                        if *returning == ReturnMutation::Returning {
1603                            &meta.name.name
1604                        } else {
1605                            ""
1606                        },
1607                    )
1608                    .wrap_err_with(|| format!("when executing against relation '{}'", meta.name))?;
1609                clean_ups.extend(to_clear);
1610                let returned_rows =
1611                    tx.get_returning_rows(callback_collector, &meta.name, returning)?;
1612                Ok((returned_rows, clean_ups))
1613            } else {
1614                // not sorting outputs
1615                let rows: Vec<Tuple> = sorted_iter.collect_vec();
1616                Ok((
1617                    NamedRows::new(
1618                        entry_head_or_default
1619                            .iter()
1620                            .map(|s| s.to_string())
1621                            .collect_vec(),
1622                        rows,
1623                    ),
1624                    clean_ups,
1625                ))
1626            }
1627        } else {
1628            let scan = if early_return {
1629                Right(Left(
1630                    result_store.early_returned_iter().map(|t| t.into_tuple()),
1631                ))
1632            } else if out_opts.limit.is_some() || out_opts.offset.is_some() {
1633                let limit = out_opts.limit.unwrap_or(usize::MAX);
1634                let offset = out_opts.offset.unwrap_or(0);
1635                Right(Right(
1636                    result_store
1637                        .all_iter()
1638                        .skip(offset)
1639                        .take(limit)
1640                        .map(|t| t.into_tuple()),
1641                ))
1642            } else {
1643                Left(result_store.all_iter().map(|t| t.into_tuple()))
1644            };
1645
1646            if let Some((meta, relation_op, returning)) = &out_opts.store_relation {
1647                let to_clear = tx
1648                    .execute_relation(
1649                        self,
1650                        scan,
1651                        *relation_op,
1652                        meta,
1653                        &entry_head_or_default,
1654                        cur_vld,
1655                        callback_targets,
1656                        callback_collector,
1657                        top_level,
1658                        if *returning == ReturnMutation::Returning {
1659                            &meta.name.name
1660                        } else {
1661                            ""
1662                        },
1663                    )
1664                    .wrap_err_with(|| format!("when executing against relation '{}'", meta.name))?;
1665                clean_ups.extend(to_clear);
1666                let returned_rows =
1667                    tx.get_returning_rows(callback_collector, &meta.name, returning)?;
1668
1669                Ok((returned_rows, clean_ups))
1670            } else {
1671                let rows: Vec<Tuple> = scan.collect_vec();
1672
1673                Ok((
1674                    NamedRows::new(
1675                        entry_head_or_default
1676                            .iter()
1677                            .map(|s| s.to_string())
1678                            .collect_vec(),
1679                        rows,
1680                    ),
1681                    clean_ups,
1682                ))
1683            }
1684        }
1685    }
1686    pub(crate) fn list_running(&self) -> Result<NamedRows> {
1687        let rows = self
1688            .running_queries
1689            .lock()
1690            .unwrap()
1691            .iter()
1692            .map(|(k, v)| {
1693                vec![
1694                    DataValue::from(*k as i64),
1695                    DataValue::from(format!("{:?}", v.started_at)),
1696                ]
1697            })
1698            .collect_vec();
1699        Ok(NamedRows::new(
1700            vec!["id".to_string(), "started_at".to_string()],
1701            rows,
1702        ))
1703    }
1704    fn list_indices(&'s self, tx: &SessionTx<'_>, name: &str) -> Result<NamedRows> {
1705        let handle = tx.get_relation(name, false)?;
1706        let mut rows = vec![];
1707        for (name, (rel, cols)) in &handle.indices {
1708            rows.push(vec![
1709                json!(name),
1710                json!("normal"),
1711                json!([rel.name]),
1712                json!({ "indices": cols }),
1713            ]);
1714        }
1715        for (name, (rel, manifest)) in &handle.hnsw_indices {
1716            rows.push(vec![
1717                json!(name),
1718                json!("hnsw"),
1719                json!([rel.name]),
1720                json!({
1721                    "vec_dim": manifest.vec_dim,
1722                    "dtype": manifest.dtype,
1723                    "vec_fields": manifest.vec_fields,
1724                    "distance": manifest.distance,
1725                    "ef_construction": manifest.ef_construction,
1726                    "m_neighbours": manifest.m_neighbours,
1727                    "m_max": manifest.m_max,
1728                    "m_max0": manifest.m_max0,
1729                    "level_multiplier": manifest.level_multiplier,
1730                    "extend_candidates": manifest.extend_candidates,
1731                    "keep_pruned_connections": manifest.keep_pruned_connections,
1732                }),
1733            ]);
1734        }
1735        for (name, (rel, manifest)) in &handle.fts_indices {
1736            rows.push(vec![
1737                json!(name),
1738                json!("fts"),
1739                json!([rel.name]),
1740                json!({
1741                    "extractor": manifest.extractor,
1742                    "tokenizer": manifest.tokenizer,
1743                    "tokenizer_filters": manifest.filters,
1744                }),
1745            ]);
1746        }
1747        for (name, (rel, inv_rel, manifest)) in &handle.lsh_indices {
1748            rows.push(vec![
1749                json!(name),
1750                json!("lsh"),
1751                json!([rel.name, inv_rel.name]),
1752                json!({
1753                    "extractor": manifest.extractor,
1754                    "tokenizer": manifest.tokenizer,
1755                    "tokenizer_filters": manifest.filters,
1756                    "n_gram": manifest.n_gram,
1757                    "num_perm": manifest.num_perm,
1758                    "n_bands": manifest.n_bands,
1759                    "n_rows_in_band": manifest.n_rows_in_band,
1760                    "threshold": manifest.threshold,
1761                }),
1762            ]);
1763        }
1764        let rows = rows
1765            .into_iter()
1766            .map(|row| row.into_iter().map(DataValue::from).collect_vec())
1767            .collect_vec();
1768        Ok(NamedRows::new(
1769            vec![
1770                "name".to_string(),
1771                "type".to_string(),
1772                "relations".to_string(),
1773                "config".to_string(),
1774            ],
1775            rows,
1776        ))
1777    }
1778    fn list_columns(&'s self, tx: &SessionTx<'_>, name: &str) -> Result<NamedRows> {
1779        let handle = tx.get_relation(name, false)?;
1780        let mut rows = vec![];
1781        let mut idx = 0;
1782        for col in &handle.metadata.keys {
1783            let default_expr = col.default_gen.as_ref().map(|gen| format!("{}", gen));
1784
1785            rows.push(vec![
1786                json!(col.name),
1787                json!(true),
1788                json!(idx),
1789                json!(col.typing.to_string()),
1790                json!(col.default_gen.is_some()),
1791                json!(default_expr),
1792            ]);
1793            idx += 1;
1794        }
1795        for col in &handle.metadata.non_keys {
1796            let default_expr = col.default_gen.as_ref().map(|gen| format!("{}", gen));
1797
1798            rows.push(vec![
1799                json!(col.name),
1800                json!(false),
1801                json!(idx),
1802                json!(col.typing.to_string()),
1803                json!(col.default_gen.is_some()),
1804                json!(default_expr),
1805            ]);
1806            idx += 1;
1807        }
1808        let rows = rows
1809            .into_iter()
1810            .map(|row| row.into_iter().map(DataValue::from).collect_vec())
1811            .collect_vec();
1812        Ok(NamedRows::new(
1813            vec![
1814                "column".to_string(),
1815                "is_key".to_string(),
1816                "index".to_string(),
1817                "type".to_string(),
1818                "has_default".to_string(),
1819                "default_expr".to_string(),
1820            ],
1821            rows,
1822        ))
1823    }
1824    fn list_relations(&'s self, tx: &SessionTx<'_>) -> Result<NamedRows> {
1825        let lower = vec![DataValue::from("")].encode_as_key(RelationId::SYSTEM);
1826        let upper =
1827            vec![DataValue::from(String::from(LARGEST_UTF_CHAR))].encode_as_key(RelationId::SYSTEM);
1828        let mut rows: Vec<Vec<JsonValue>> = vec![];
1829        for kv_res in tx.store_tx.range_scan(&lower, &upper) {
1830            let (k_slice, v_slice) = kv_res?;
1831            if upper <= k_slice {
1832                break;
1833            }
1834            let meta = RelationHandle::decode(&v_slice)?;
1835            let n_keys = meta.metadata.keys.len();
1836            let n_dependents = meta.metadata.non_keys.len();
1837            let arity = n_keys + n_dependents;
1838            let name = meta.name;
1839            let access_level = if name.contains(':') {
1840                "index".to_string()
1841            } else {
1842                meta.access_level.to_string()
1843            };
1844            rows.push(vec![
1845                json!(name),
1846                json!(arity),
1847                json!(access_level),
1848                json!(n_keys),
1849                json!(n_dependents),
1850                json!(meta.put_triggers.len()),
1851                json!(meta.rm_triggers.len()),
1852                json!(meta.replace_triggers.len()),
1853                json!(meta.description),
1854            ]);
1855        }
1856        let rows = rows
1857            .into_iter()
1858            .map(|row| row.into_iter().map(DataValue::from).collect_vec())
1859            .collect_vec();
1860        Ok(NamedRows::new(
1861            vec![
1862                "name".to_string(),
1863                "arity".to_string(),
1864                "access_level".to_string(),
1865                "n_keys".to_string(),
1866                "n_non_keys".to_string(),
1867                "n_put_triggers".to_string(),
1868                "n_rm_triggers".to_string(),
1869                "n_replace_triggers".to_string(),
1870                "description".to_string(),
1871            ],
1872            rows,
1873        ))
1874    }
1875}
1876
1877/// Evaluate a string expression in the context of a set of parameters and variables
1878pub fn evaluate_expressions(
1879    src: &str,
1880    params: &BTreeMap<String, DataValue>,
1881    vars: &BTreeMap<String, DataValue>,
1882) -> Result<DataValue> {
1883    _evaluate_expressions(src, params, vars).map_err(|err| {
1884        if err.source().is_none() {
1885            err.with_source_code(format!("{src} "))
1886        } else {
1887            err
1888        }
1889    })
1890}
1891
1892/// Get the variables referenced in a string expression
1893pub fn get_variables(src: &str, params: &BTreeMap<String, DataValue>) -> Result<BTreeSet<String>> {
1894    _get_variables(src, params).map_err(|err| {
1895        if err.source().is_none() {
1896            err.with_source_code(format!("{src} "))
1897        } else {
1898            err
1899        }
1900    })
1901}
1902
1903fn _evaluate_expressions(
1904    src: &str,
1905    params: &BTreeMap<String, DataValue>,
1906    vars: &BTreeMap<String, DataValue>,
1907) -> Result<DataValue> {
1908    let mut expr = parse_expressions(src, params)?;
1909    let mut ctx = vec![];
1910    let mut binding_map = BTreeMap::new();
1911    for (i, (k, v)) in vars.iter().enumerate() {
1912        ctx.push(v.clone());
1913        binding_map.insert(Symbol::new(k, Default::default()), i);
1914    }
1915    expr.fill_binding_indices(&binding_map)?;
1916    expr.eval(&ctx)
1917}
1918
1919fn _get_variables(src: &str, params: &BTreeMap<String, DataValue>) -> Result<BTreeSet<String>> {
1920    let expr = parse_expressions(src, params)?;
1921    expr.get_variables()
1922}
1923
1924/// Used for user-initiated termination of running queries
1925#[derive(Clone, Default)]
1926pub struct Poison(pub(crate) Arc<AtomicBool>);
1927
1928impl Poison {
1929    /// Will return `Err` if user has initiated termination.
1930    #[inline(always)]
1931    pub fn check(&self) -> Result<()> {
1932        #[derive(Debug, Error, Diagnostic)]
1933        #[error("Running query is killed before completion")]
1934        #[diagnostic(code(eval::killed))]
1935        #[diagnostic(help("A query may be killed by timeout, or explicit command"))]
1936        struct ProcessKilled;
1937
1938        if self.0.load(Ordering::Relaxed) {
1939            bail!(ProcessKilled)
1940        }
1941        Ok(())
1942    }
1943    #[cfg(target_arch = "wasm32")]
1944    pub(crate) fn set_timeout(&self, _secs: f64) -> Result<()> {
1945        bail!("Cannot set timeout when threading is disallowed");
1946    }
1947    #[cfg(not(target_arch = "wasm32"))]
1948    pub(crate) fn set_timeout(&self, secs: f64) -> Result<()> {
1949        let pill = self.clone();
1950        thread::spawn(move || {
1951            thread::sleep(Duration::from_micros((secs * 1000000.) as u64));
1952            pill.0.store(true, Ordering::Relaxed);
1953        });
1954        Ok(())
1955    }
1956}
1957
1958pub(crate) fn seconds_since_the_epoch() -> Result<f64> {
1959    #[cfg(not(target_arch = "wasm32"))]
1960    let now = SystemTime::now();
1961    #[cfg(not(target_arch = "wasm32"))]
1962    return Ok(now
1963        .duration_since(UNIX_EPOCH)
1964        .into_diagnostic()?
1965        .as_secs_f64());
1966
1967    #[cfg(target_arch = "wasm32")]
1968    Ok(js_sys::Date::now())
1969}