1use std::collections::btree_map::Entry;
10use std::collections::{BTreeMap, BTreeSet};
11use std::default::Default;
12use std::fmt::{Debug, Formatter};
13use std::iter;
14use std::path::Path;
15#[allow(unused_imports)]
16use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
17use std::sync::{Arc, Mutex};
18#[allow(unused_imports)]
19use std::thread;
20#[allow(unused_imports)]
21use std::time::{Duration, SystemTime, UNIX_EPOCH};
22
23#[allow(unused_imports)]
24use crossbeam::channel::{bounded, unbounded, Receiver, Sender};
25use crossbeam::sync::ShardedLock;
26use either::{Left, Right};
27use itertools::Itertools;
28use miette::Report;
29#[allow(unused_imports)]
30use miette::{bail, ensure, miette, Diagnostic, IntoDiagnostic, Result, WrapErr};
31use serde_json::json;
32use smartstring::{LazyCompact, SmartString};
33use thiserror::Error;
34
35use crate::data::functions::current_validity;
36use crate::data::json::JsonValue;
37use crate::data::program::{InputProgram, QueryAssertion, RelationOp, ReturnMutation};
38use crate::data::relation::ColumnDef;
39use crate::data::tuple::{Tuple, TupleT};
40use crate::data::value::{DataValue, ValidityTs, LARGEST_UTF_CHAR};
41use crate::fixed_rule::DEFAULT_FIXED_RULES;
42use crate::fts::TokenizerCache;
43use crate::parse::sys::SysOp;
44use crate::parse::{parse_expressions, parse_script, CozoScript, SourceSpan};
45use crate::query::compile::{CompiledProgram, CompiledRule, CompiledRuleSet};
46use crate::query::ra::{
47 FilteredRA, FtsSearchRA, HnswSearchRA, InnerJoin, LshSearchRA, NegJoin, RelAlgebra, ReorderRA,
48 StoredRA, StoredWithValidityRA, TempStoreRA, UnificationRA,
49};
50#[allow(unused_imports)]
51use crate::runtime::callback::{
52 CallbackCollector, CallbackDeclaration, CallbackOp, EventCallbackRegistry,
53};
54use crate::runtime::relation::{
55 extend_tuple_from_v, AccessLevel, InsufficientAccessLevel, RelationHandle, RelationId,
56};
57use crate::runtime::transact::SessionTx;
58use crate::storage::temp::TempStorage;
59use crate::storage::Storage;
60use crate::{decode_tuple_from_kv, FixedRule, Symbol};
61
62pub(crate) struct RunningQueryHandle {
63 pub(crate) started_at: f64,
64 pub(crate) poison: Poison,
65}
66
67pub(crate) struct RunningQueryCleanup {
68 pub(crate) id: u64,
69 pub(crate) running_queries: Arc<Mutex<BTreeMap<u64, RunningQueryHandle>>>,
70}
71
72impl Drop for RunningQueryCleanup {
73 fn drop(&mut self) {
74 let mut map = self.running_queries.lock().unwrap();
75 if let Some(handle) = map.remove(&self.id) {
76 handle.poison.0.store(true, Ordering::Relaxed);
77 }
78 }
79}
80
81#[derive(serde_derive::Serialize, serde_derive::Deserialize)]
82pub struct DbManifest {
83 pub storage_version: u64,
84}
85
86#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
88pub enum ScriptMutability {
89 Mutable,
91 Immutable,
93}
94
95#[derive(Clone)]
97pub struct Db<S> {
98 pub(crate) db: S,
99 temp_db: TempStorage,
100 relation_store_id: Arc<AtomicU64>,
101 pub(crate) queries_count: Arc<AtomicU64>,
102 pub(crate) running_queries: Arc<Mutex<BTreeMap<u64, RunningQueryHandle>>>,
103 pub(crate) fixed_rules: Arc<ShardedLock<BTreeMap<String, Arc<Box<dyn FixedRule>>>>>,
104 pub(crate) tokenizers: Arc<TokenizerCache>,
105 #[cfg(not(target_arch = "wasm32"))]
106 callback_count: Arc<AtomicU32>,
107 #[cfg(not(target_arch = "wasm32"))]
108 pub(crate) event_callbacks: Arc<ShardedLock<EventCallbackRegistry>>,
109 relation_locks: Arc<ShardedLock<BTreeMap<SmartString<LazyCompact>, Arc<ShardedLock<()>>>>>,
110}
111
112impl<S> Debug for Db<S> {
113 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
114 write!(f, "Db")
115 }
116}
117
118#[derive(Debug, Diagnostic, Error)]
119#[error("Initialization of database failed")]
120#[diagnostic(code(db::init))]
121pub(crate) struct BadDbInit(#[help] pub(crate) String);
122
123#[derive(Debug, Error, Diagnostic)]
124#[error("Cannot import data into relation {0} as it is an index")]
125#[diagnostic(code(tx::import_into_index))]
126pub(crate) struct ImportIntoIndex(pub(crate) String);
127
128#[derive(serde_derive::Serialize, serde_derive::Deserialize, Debug, Clone, Default)]
129pub struct NamedRows {
131 pub headers: Vec<String>,
133 pub rows: Vec<Tuple>,
135 pub next: Option<Box<NamedRows>>,
137}
138
139impl IntoIterator for NamedRows {
140 type Item = Tuple;
141 type IntoIter = std::vec::IntoIter<Self::Item>;
142
143 fn into_iter(self) -> Self::IntoIter {
144 self.rows.into_iter()
145 }
146}
147
148impl NamedRows {
149 pub fn new(headers: Vec<String>, rows: Vec<Tuple>) -> Self {
151 Self {
152 headers,
153 rows,
154 next: None,
155 }
156 }
157
158 pub fn has_more(&self) -> bool {
160 self.next.is_some()
161 }
162
163 pub fn flatten(self) -> Vec<Self> {
165 let mut collected = vec![];
166 let mut current = self;
167 loop {
168 let nxt = current.next.take();
169 collected.push(current);
170 if let Some(n) = nxt {
171 current = *n;
172 } else {
173 break;
174 }
175 }
176 collected
177 }
178
179 pub fn into_json(self) -> JsonValue {
181 let nxt = match self.next {
182 None => json!(null),
183 Some(more) => more.into_json(),
184 };
185 let rows = self
186 .rows
187 .into_iter()
188 .map(|row| row.into_iter().map(JsonValue::from).collect::<JsonValue>())
189 .collect::<JsonValue>();
190 json!({
191 "headers": self.headers,
192 "rows": rows,
193 "next": nxt,
194 })
195 }
196 pub fn from_json(value: &JsonValue) -> Result<Self> {
198 let headers = value
199 .get("headers")
200 .ok_or_else(|| miette!("NamedRows requires 'headers' field"))?;
201 let headers = headers
202 .as_array()
203 .ok_or_else(|| miette!("'headers' field must be an array"))?;
204 let headers = headers
205 .iter()
206 .map(|h| -> Result<String> {
207 let h = h
208 .as_str()
209 .ok_or_else(|| miette!("'headers' field must be an array of strings"))?;
210 Ok(h.to_string())
211 })
212 .try_collect()?;
213 let rows = value
214 .get("rows")
215 .ok_or_else(|| miette!("NamedRows requires 'rows' field"))?;
216 let rows = rows
217 .as_array()
218 .ok_or_else(|| miette!("'rows' field must be an array"))?;
219 let rows = rows
220 .iter()
221 .map(|row| -> Result<Vec<DataValue>> {
222 let row = row
223 .as_array()
224 .ok_or_else(|| miette!("'rows' field must be an array of arrays"))?;
225 Ok(row.iter().map(DataValue::from).collect_vec())
226 })
227 .try_collect()?;
228 Ok(Self {
229 headers,
230 rows,
231 next: None,
232 })
233 }
234
235 pub fn into_payload(self, relation: &str, op: &str) -> Payload {
238 let cols_str = self.headers.join(", ");
239 let query = format!("?[{cols_str}] <- $data :{op} {relation} {{ {cols_str} }}");
240 let data = DataValue::List(self.rows.into_iter().map(|r| DataValue::List(r)).collect());
241 (query, [("data".to_string(), data)].into())
242 }
243}
244
245const STATUS_STR: &str = "status";
246const OK_STR: &str = "OK";
247
248pub type Payload = (String, BTreeMap<String, DataValue>);
250
251#[derive(Eq, PartialEq, Debug)]
253pub enum TransactionPayload {
254 Commit,
256 Abort,
258 Query(Payload),
260}
261
262impl<'s, S: Storage<'s>> Db<S> {
263 pub fn new(storage: S) -> Result<Self> {
267 let ret = Self {
268 db: storage,
269 temp_db: Default::default(),
270 relation_store_id: Default::default(),
271 queries_count: Default::default(),
272 running_queries: Default::default(),
273 fixed_rules: Arc::new(ShardedLock::new(DEFAULT_FIXED_RULES.clone())),
274 tokenizers: Arc::new(Default::default()),
275 #[cfg(not(target_arch = "wasm32"))]
276 callback_count: Default::default(),
277 #[cfg(not(target_arch = "wasm32"))]
279 event_callbacks: Default::default(),
280 relation_locks: Default::default(),
281 };
282 Ok(ret)
283 }
284
285 pub fn initialize(&'s self) -> Result<()> {
287 self.load_last_ids()?;
288 Ok(())
289 }
290
291 pub fn run_multi_transaction(
299 &'s self,
300 is_write: bool,
301 payloads: Receiver<TransactionPayload>,
302 results: Sender<Result<NamedRows>>,
303 ) {
304 let tx = if is_write {
305 self.transact_write()
306 } else {
307 self.transact()
308 };
309 let mut cleanups: Vec<(Vec<u8>, Vec<u8>)> = vec![];
310 let mut tx = match tx {
311 Ok(tx) => tx,
312 Err(err) => {
313 let _ = results.send(Err(err));
314 return;
315 }
316 };
317
318 let ts = current_validity();
319 let callback_targets = self.current_callback_targets();
320 let mut callback_collector = BTreeMap::new();
321 let mut write_locks = BTreeMap::new();
322
323 for payload in payloads {
324 match payload {
325 TransactionPayload::Commit => {
326 for (lower, upper) in cleanups {
327 if let Err(err) = tx.store_tx.del_range_from_persisted(&lower, &upper) {
328 eprintln!("{err:?}")
329 }
330 }
331
332 let _ = results.send(tx.commit_tx().map(|_| NamedRows::default()));
333 #[cfg(not(target_arch = "wasm32"))]
334 if !callback_collector.is_empty() {
335 self.send_callbacks(callback_collector)
336 }
337
338 break;
339 }
340 TransactionPayload::Abort => {
341 let _ = results.send(Ok(NamedRows::default()));
342 break;
343 }
344 TransactionPayload::Query((script, params)) => {
345 let p =
346 match parse_script(&script, ¶ms, &self.fixed_rules.read().unwrap(), ts)
347 {
348 Ok(p) => p,
349 Err(err) => {
350 if results.send(Err(err)).is_err() {
351 break;
352 } else {
353 continue;
354 }
355 }
356 };
357
358 let p = match p.get_single_program() {
359 Ok(p) => p,
360 Err(err) => {
361 if results.send(Err(err)).is_err() {
362 break;
363 } else {
364 continue;
365 }
366 }
367 };
368 if let Some(write_lock_name) = p.needs_write_lock() {
369 match write_locks.entry(write_lock_name) {
370 Entry::Vacant(e) => {
371 let lock = self
372 .obtain_relation_locks(iter::once(e.key()))
373 .pop()
374 .unwrap();
375 e.insert(lock);
376 }
377 Entry::Occupied(_) => {}
378 }
379 }
380
381 let res = self.execute_single_program(
382 p,
383 &mut tx,
384 &mut cleanups,
385 ts,
386 &callback_targets,
387 &mut callback_collector,
388 );
389 if results.send(res).is_err() {
390 break;
391 }
392 }
393 }
394 }
395 }
396
397 pub fn get_fixed_rules(&'s self) -> BTreeMap<String, Arc<Box<dyn FixedRule>>> {
399 return self.fixed_rules.read().unwrap().clone();
400 }
401
402 pub fn run_script(
404 &'s self,
405 payload: &str,
406 params: BTreeMap<String, DataValue>,
407 mutability: ScriptMutability,
408 ) -> Result<NamedRows> {
409 self.run_script_ast(
410 parse_script(
411 payload,
412 ¶ms,
413 &self.get_fixed_rules(),
414 current_validity(),
415 )?,
416 current_validity(),
417 mutability,
418 )
419 }
420
421 pub fn run_script_read_only(
423 &'s self,
424 payload: &str,
425 params: BTreeMap<String, DataValue>,
426 ) -> Result<NamedRows> {
427 self.run_script(payload, params, ScriptMutability::Immutable)
428 }
429
430 pub fn run_script_ast(
432 &'s self,
433 payload: CozoScript,
434 cur_vld: ValidityTs,
435 mutability: ScriptMutability,
436 ) -> Result<NamedRows> {
437 let read_only = mutability == ScriptMutability::Immutable;
438 match payload {
439 CozoScript::Single(p) => self.execute_single(cur_vld, p, read_only),
440 CozoScript::Imperative(ps) => self.execute_imperative(cur_vld, &ps, read_only),
441 CozoScript::Sys(op) => self.run_sys_op(op, read_only),
442 }
443 }
444
445 pub fn export_relations<I, T>(&'s self, relations: I) -> Result<BTreeMap<String, NamedRows>>
449 where
450 T: AsRef<str>,
451 I: Iterator<Item = T>,
452 {
453 let tx = self.transact()?;
454 let mut ret: BTreeMap<String, NamedRows> = BTreeMap::new();
455 for rel in relations {
456 let handle = tx.get_relation(rel.as_ref(), false)?;
457 let size_hint = handle.metadata.keys.len() + handle.metadata.non_keys.len();
458
459 if handle.access_level < AccessLevel::ReadOnly {
460 bail!(InsufficientAccessLevel(
461 handle.name.to_string(),
462 "data export".to_string(),
463 handle.access_level
464 ));
465 }
466
467 let mut cols = handle
468 .metadata
469 .keys
470 .iter()
471 .map(|col| col.name.clone())
472 .collect_vec();
473 cols.extend(
474 handle
475 .metadata
476 .non_keys
477 .iter()
478 .map(|col| col.name.clone())
479 .collect_vec(),
480 );
481
482 let start = Tuple::default().encode_as_key(handle.id);
483 let end = Tuple::default().encode_as_key(handle.id.next());
484
485 let mut rows = vec![];
486 for data in tx.store_tx.range_scan(&start, &end) {
487 let (k, v) = data?;
488 let tuple = decode_tuple_from_kv(&k, &v, Some(size_hint));
489 rows.push(tuple);
490 }
491 let headers = cols.iter().map(|col| col.to_string()).collect_vec();
492 ret.insert(rel.as_ref().to_string(), NamedRows::new(headers, rows));
493 }
494 Ok(ret)
495 }
496 pub fn import_relations(&'s self, data: BTreeMap<String, NamedRows>) -> Result<()> {
504 #[derive(Debug, Diagnostic, Error)]
505 #[error("cannot import data for relation '{0}': {1}")]
506 #[diagnostic(code(import::bad_data))]
507 struct BadDataForRelation(String, JsonValue);
508
509 let rel_names = data.keys().map(SmartString::from).collect_vec();
510 let locks = self.obtain_relation_locks(rel_names.iter());
511 let _guards = locks.iter().map(|l| l.read().unwrap()).collect_vec();
512
513 let cur_vld = current_validity();
514
515 let mut tx = self.transact_write()?;
516
517 for (relation_op, in_data) in data {
518 let is_delete;
519 let relation: &str = match relation_op.strip_prefix('-') {
520 None => {
521 is_delete = false;
522 &relation_op
523 }
524 Some(s) => {
525 is_delete = true;
526 s
527 }
528 };
529 if relation.contains(':') {
530 bail!(ImportIntoIndex(relation.to_string()))
531 }
532 let handle = tx.get_relation(relation, false)?;
533 let has_indices = !handle.indices.is_empty();
534
535 if handle.access_level < AccessLevel::Protected {
536 bail!(InsufficientAccessLevel(
537 handle.name.to_string(),
538 "data import".to_string(),
539 handle.access_level
540 ));
541 }
542
543 let header2idx: BTreeMap<_, _> = in_data
544 .headers
545 .iter()
546 .enumerate()
547 .map(|(i, k)| -> Result<(&str, usize)> { Ok((k as &str, i)) })
548 .try_collect()?;
549
550 let key_indices: Vec<_> = handle
551 .metadata
552 .keys
553 .iter()
554 .map(|col| -> Result<(usize, &ColumnDef)> {
555 let idx = header2idx.get(&col.name as &str).ok_or_else(|| {
556 miette!(
557 "required header {} not found for relation {}",
558 col.name,
559 relation
560 )
561 })?;
562 Ok((*idx, col))
563 })
564 .try_collect()?;
565
566 let val_indices: Vec<_> = if is_delete {
567 vec![]
568 } else {
569 handle
570 .metadata
571 .non_keys
572 .iter()
573 .map(|col| -> Result<(usize, &ColumnDef)> {
574 let idx = header2idx.get(&col.name as &str).ok_or_else(|| {
575 miette!(
576 "required header {} not found for relation {}",
577 col.name,
578 relation
579 )
580 })?;
581 Ok((*idx, col))
582 })
583 .try_collect()?
584 };
585
586 for row in in_data.rows {
587 let keys: Vec<_> = key_indices
588 .iter()
589 .map(|(i, col)| -> Result<DataValue> {
590 let v = row
591 .get(*i)
592 .ok_or_else(|| miette!("row too short: {:?}", row))?;
593 col.typing.coerce(v.clone(), cur_vld)
594 })
595 .try_collect()?;
596 let k_store = handle.encode_key_for_store(&keys, Default::default())?;
597 if has_indices {
598 if let Some(existing) = tx.store_tx.get(&k_store, false)? {
599 let mut old = keys.clone();
600 extend_tuple_from_v(&mut old, &existing);
601 if is_delete || old != row {
602 for (idx_rel, extractor) in handle.indices.values() {
603 let idx_tup =
604 extractor.iter().map(|i| old[*i].clone()).collect_vec();
605 let encoded =
606 idx_rel.encode_key_for_store(&idx_tup, Default::default())?;
607 tx.store_tx.del(&encoded)?;
608 }
609 }
610 }
611 }
612 if is_delete {
613 tx.store_tx.del(&k_store)?;
614 } else {
615 let vals: Vec<_> = val_indices
616 .iter()
617 .map(|(i, col)| -> Result<DataValue> {
618 let v = row
619 .get(*i)
620 .ok_or_else(|| miette!("row too short: {:?}", row))?;
621 col.typing.coerce(v.clone(), cur_vld)
622 })
623 .try_collect()?;
624 let v_store = handle.encode_val_only_for_store(&vals, Default::default())?;
625 tx.store_tx.put(&k_store, &v_store)?;
626 if has_indices {
627 let mut kv = keys;
628 kv.extend(vals);
629 for (idx_rel, extractor) in handle.indices.values() {
630 let idx_tup = extractor.iter().map(|i| kv[*i].clone()).collect_vec();
631 let encoded =
632 idx_rel.encode_key_for_store(&idx_tup, Default::default())?;
633 tx.store_tx.put(&encoded, &[])?;
634 }
635 }
636 }
637 }
638 }
639 tx.commit_tx()?;
640 Ok(())
641 }
642 #[allow(unused_variables)]
644 pub fn backup_db(&'s self, out_file: impl AsRef<Path>) -> Result<()> {
645 #[cfg(feature = "storage-sqlite")]
646 {
647 let sqlite_db = crate::new_cozo_sqlite(out_file)?;
648 if sqlite_db.relation_store_id.load(Ordering::SeqCst) != 0 {
649 bail!("Cannot create backup: data exists in the target database.");
650 }
651 let mut tx = self.transact()?;
652 let iter = tx.store_tx.range_scan(&[], &[0xFF]);
653 sqlite_db.db.batch_put(iter)?;
654 tx.commit_tx()?;
655 Ok(())
656 }
657 #[cfg(not(feature = "storage-sqlite"))]
658 bail!("backup requires the 'storage-sqlite' feature to be enabled")
659 }
660 #[allow(unused_variables)]
662 pub fn restore_backup(&'s self, in_file: impl AsRef<Path>) -> Result<()> {
663 #[cfg(feature = "storage-sqlite")]
664 {
665 let sqlite_db = crate::new_cozo_sqlite(in_file)?;
666 let mut s_tx = sqlite_db.transact()?;
667 {
668 let mut tx = self.transact()?;
669 let store_id = tx.relation_store_id.load(Ordering::SeqCst);
670 if store_id != 0 {
671 bail!(
672 "Cannot restore backup: data exists in the current database. \
673 You can only restore into a new database (store id: {}).",
674 store_id
675 );
676 }
677 tx.commit_tx()?;
678 }
679 let iter = s_tx.store_tx.total_scan();
680 self.db.batch_put(iter)?;
681 s_tx.commit_tx()?;
682 Ok(())
683 }
684 #[cfg(not(feature = "storage-sqlite"))]
685 bail!("backup requires the 'storage-sqlite' feature to be enabled")
686 }
687 #[allow(unused_variables)]
695 pub fn import_from_backup(
696 &'s self,
697 in_file: impl AsRef<Path>,
698 relations: &[String],
699 ) -> Result<()> {
700 #[cfg(not(feature = "storage-sqlite"))]
701 bail!("backup requires the 'storage-sqlite' feature to be enabled");
702
703 #[cfg(feature = "storage-sqlite")]
704 {
705 let rel_names = relations.iter().map(SmartString::from).collect_vec();
706 let locks = self.obtain_relation_locks(rel_names.iter());
707 let _guards = locks.iter().map(|l| l.read().unwrap()).collect_vec();
708
709 let source_db = crate::new_cozo_sqlite(in_file)?;
710 let mut src_tx = source_db.transact()?;
711 let mut dst_tx = self.transact_write()?;
712
713 for relation in relations {
714 if relation.contains(':') {
715 bail!(ImportIntoIndex(relation.to_string()))
716 }
717 let src_handle = src_tx.get_relation(relation, false)?;
718 let dst_handle = dst_tx.get_relation(relation, false)?;
719
720 if !dst_handle.indices.is_empty() {
721 #[derive(Debug, Error, Diagnostic)]
722 #[error("Cannot import data into relation {0} from backup as the relation has indices")]
723 #[diagnostic(code(tx::bare_import_with_indices))]
724 #[diagnostic(help("Use `import_relations()` instead"))]
725 pub(crate) struct RestoreIntoRelWithIndices(pub(crate) String);
726
727 bail!(RestoreIntoRelWithIndices(dst_handle.name.to_string()))
728 }
729
730 if dst_handle.access_level < AccessLevel::Protected {
731 bail!(InsufficientAccessLevel(
732 dst_handle.name.to_string(),
733 "data import".to_string(),
734 dst_handle.access_level
735 ));
736 }
737
738 let src_lower = Tuple::default().encode_as_key(src_handle.id);
739 let src_upper = Tuple::default().encode_as_key(src_handle.id.next());
740
741 let data_it = src_tx.store_tx.range_scan(&src_lower, &src_upper).map(
742 |src_pair| -> Result<(Vec<u8>, Vec<u8>)> {
743 let (mut src_k, mut src_v) = src_pair?;
744 dst_handle.amend_key_prefix(&mut src_k);
745 dst_handle.amend_key_prefix(&mut src_v);
746 Ok((src_k, src_v))
747 },
748 );
749 for result in data_it {
750 let (key, val) = result?;
751 dst_tx.store_tx.put(&key, &val)?;
752 }
753 }
754
755 src_tx.commit_tx()?;
756 dst_tx.commit_tx()
757 }
758 }
759 pub fn register_fixed_rule<R>(&self, name: String, rule_impl: R) -> Result<()>
761 where
762 R: FixedRule + 'static,
763 {
764 match self.fixed_rules.write().unwrap().entry(name) {
765 Entry::Vacant(ent) => {
766 ent.insert(Arc::new(Box::new(rule_impl)));
767 Ok(())
768 }
769 Entry::Occupied(ent) => {
770 bail!(
771 "A fixed rule with the name {} is already registered",
772 ent.key()
773 )
774 }
775 }
776 }
777
778 pub fn unregister_fixed_rule(&self, name: &str) -> Result<bool> {
780 if DEFAULT_FIXED_RULES.contains_key(name) {
781 bail!("Cannot unregister builtin fixed rule {}", name);
782 }
783 Ok(self.fixed_rules.write().unwrap().remove(name).is_some())
784 }
785
786 #[cfg(not(target_arch = "wasm32"))]
789 pub fn register_callback(
790 &self,
791 relation: &str,
792 capacity: Option<usize>,
793 ) -> (u32, Receiver<(CallbackOp, NamedRows, NamedRows)>) {
794 let (sender, receiver) = if let Some(c) = capacity {
795 bounded(c)
796 } else {
797 unbounded()
798 };
799 let cb = CallbackDeclaration {
800 dependent: SmartString::from(relation),
801 sender,
802 };
803
804 let mut guard = self.event_callbacks.write().unwrap();
805 let new_id = self.callback_count.fetch_add(1, Ordering::SeqCst);
806 guard
807 .1
808 .entry(SmartString::from(relation))
809 .or_default()
810 .insert(new_id);
811
812 guard.0.insert(new_id, cb);
813 (new_id, receiver)
814 }
815
816 #[cfg(not(target_arch = "wasm32"))]
818 pub fn unregister_callback(&self, id: u32) -> bool {
819 let mut guard = self.event_callbacks.write().unwrap();
820 let ret = guard.0.remove(&id);
821 if let Some(cb) = &ret {
822 guard.1.get_mut(&cb.dependent).unwrap().remove(&id);
823
824 if guard.1.get(&cb.dependent).unwrap().is_empty() {
825 guard.1.remove(&cb.dependent);
826 }
827 }
828 ret.is_some()
829 }
830
831 pub(crate) fn obtain_relation_locks<'a, T: Iterator<Item = &'a SmartString<LazyCompact>>>(
832 &'s self,
833 rels: T,
834 ) -> Vec<Arc<ShardedLock<()>>> {
835 let mut collected = vec![];
836 let mut pending = vec![];
837 {
838 let locks = self.relation_locks.read().unwrap();
839 for rel in rels {
840 match locks.get(rel) {
841 None => {
842 pending.push(rel);
843 }
844 Some(lock) => collected.push(lock.clone()),
845 }
846 }
847 }
848 if !pending.is_empty() {
849 let mut locks = self.relation_locks.write().unwrap();
850 for rel in pending {
851 let lock = locks.entry(rel.clone()).or_default().clone();
852 collected.push(lock);
853 }
854 }
855 collected
856 }
857
858 fn compact_relation(&'s self) -> Result<()> {
859 let l = Tuple::default().encode_as_key(RelationId(0));
860 let u = vec![DataValue::Bot].encode_as_key(RelationId(u64::MAX));
861 self.db.range_compact(&l, &u)?;
862 Ok(())
863 }
864
865 fn load_last_ids(&'s self) -> Result<()> {
866 let mut tx = self.transact_write()?;
867 self.relation_store_id
868 .store(tx.init_storage()?.0, Ordering::Release);
869 tx.commit_tx()?;
870 Ok(())
871 }
872 pub(crate) fn transact(&'s self) -> Result<SessionTx<'_>> {
873 let ret = SessionTx {
874 store_tx: Box::new(self.db.transact(false)?),
875 temp_store_tx: self.temp_db.transact(true)?,
876 relation_store_id: self.relation_store_id.clone(),
877 temp_store_id: Default::default(),
878 tokenizers: self.tokenizers.clone(),
879 };
880 Ok(ret)
881 }
882 pub(crate) fn transact_write(&'s self) -> Result<SessionTx<'_>> {
883 let ret = SessionTx {
884 store_tx: Box::new(self.db.transact(true)?),
885 temp_store_tx: self.temp_db.transact(true)?,
886 relation_store_id: self.relation_store_id.clone(),
887 temp_store_id: Default::default(),
888 tokenizers: self.tokenizers.clone(),
889 };
890 Ok(ret)
891 }
892
893 pub(crate) fn execute_single_program(
894 &'s self,
895 p: InputProgram,
896 tx: &mut SessionTx<'_>,
897 cleanups: &mut Vec<(Vec<u8>, Vec<u8>)>,
898 cur_vld: ValidityTs,
899 callback_targets: &BTreeSet<SmartString<LazyCompact>>,
900 callback_collector: &mut CallbackCollector,
901 ) -> Result<NamedRows> {
902 #[allow(unused_variables)]
903 let sleep_opt = p.out_opts.sleep;
904 let (q_res, q_cleanups) =
905 self.run_query(tx, p, cur_vld, callback_targets, callback_collector, true)?;
906 cleanups.extend(q_cleanups);
907 #[cfg(not(target_arch = "wasm32"))]
908 if let Some(secs) = sleep_opt {
909 thread::sleep(Duration::from_micros((secs * 1000000.) as u64));
910 }
911 Ok(q_res)
912 }
913
914 fn execute_single(
915 &'s self,
916 cur_vld: ValidityTs,
917 p: InputProgram,
918 read_only: bool,
919 ) -> Result<NamedRows, Report> {
920 let mut callback_collector = BTreeMap::new();
921 let write_lock_names = p.needs_write_lock();
922 let is_write = write_lock_names.is_some();
923 if read_only && is_write {
924 bail!("write lock required for read-only query");
925 }
926 let write_lock = self.obtain_relation_locks(write_lock_names.iter());
927 let _write_lock_guards = if is_write {
928 Some(write_lock[0].read().unwrap())
929 } else {
930 None
931 };
932 let callback_targets = if is_write {
933 self.current_callback_targets()
934 } else {
935 Default::default()
936 };
937 let mut cleanups = vec![];
938 let res;
939 {
940 let mut tx = if is_write {
941 self.transact_write()?
942 } else {
943 self.transact()?
944 };
945
946 res = self.execute_single_program(
947 p,
948 &mut tx,
949 &mut cleanups,
950 cur_vld,
951 &callback_targets,
952 &mut callback_collector,
953 )?;
954
955 for (lower, upper) in cleanups {
956 tx.store_tx.del_range_from_persisted(&lower, &upper)?;
957 }
958
959 tx.commit_tx()?;
960 }
961 #[cfg(not(target_arch = "wasm32"))]
962 if !callback_collector.is_empty() {
963 self.send_callbacks(callback_collector)
964 }
965
966 Ok(res)
967 }
968 fn explain_compiled(&self, strata: &[CompiledProgram]) -> Result<NamedRows> {
969 let mut ret: Vec<JsonValue> = vec![];
970 const STRATUM: &str = "stratum";
971 const ATOM_IDX: &str = "atom_idx";
972 const OP: &str = "op";
973 const RULE_IDX: &str = "rule_idx";
974 const RULE_NAME: &str = "rule";
975 const REF_NAME: &str = "ref";
976 const OUT_BINDINGS: &str = "out_relation";
977 const JOINS_ON: &str = "joins_on";
978 const FILTERS: &str = "filters/expr";
979
980 let headers = vec![
981 STRATUM.to_string(),
982 RULE_IDX.to_string(),
983 RULE_NAME.to_string(),
984 ATOM_IDX.to_string(),
985 OP.to_string(),
986 REF_NAME.to_string(),
987 JOINS_ON.to_string(),
988 FILTERS.to_string(),
989 OUT_BINDINGS.to_string(),
990 ];
991
992 for (stratum, p) in strata.iter().enumerate() {
993 let mut clause_idx = -1;
994 for (rule_name, v) in p {
995 match v {
996 CompiledRuleSet::Rules(rules) => {
997 for CompiledRule { aggr, relation, .. } in rules.iter() {
998 clause_idx += 1;
999 let mut ret_for_relation = vec![];
1000 let mut rel_stack = vec![relation];
1001 let mut idx = 0;
1002 let mut atom_type = "out";
1003 for (a, _) in aggr.iter().flatten() {
1004 if a.is_meet {
1005 if atom_type == "out" {
1006 atom_type = "meet_aggr_out";
1007 }
1008 } else {
1009 atom_type = "aggr_out";
1010 }
1011 }
1012
1013 ret_for_relation.push(json!({
1014 STRATUM: stratum,
1015 ATOM_IDX: idx,
1016 OP: atom_type,
1017 RULE_IDX: clause_idx,
1018 RULE_NAME: rule_name.to_string(),
1019 OUT_BINDINGS: relation.bindings_after_eliminate().into_iter().map(|v| v.to_string()).collect_vec()
1020 }));
1021 idx += 1;
1022
1023 while let Some(rel) = rel_stack.pop() {
1024 let (atom_type, ref_name, joins_on, filters) = match rel {
1025 r @ RelAlgebra::Fixed(..) => {
1026 if r.is_unit() {
1027 continue;
1028 }
1029 ("fixed", json!(null), json!(null), json!(null))
1030 }
1031 RelAlgebra::TempStore(TempStoreRA {
1032 storage_key,
1033 filters,
1034 ..
1035 }) => (
1036 "load_mem",
1037 json!(storage_key.to_string()),
1038 json!(null),
1039 json!(filters.iter().map(|f| f.to_string()).collect_vec()),
1040 ),
1041 RelAlgebra::Stored(StoredRA {
1042 storage, filters, ..
1043 }) => (
1044 "load_stored",
1045 json!(format!(":{}", storage.name)),
1046 json!(null),
1047 json!(filters.iter().map(|f| f.to_string()).collect_vec()),
1048 ),
1049 RelAlgebra::StoredWithValidity(StoredWithValidityRA {
1050 storage,
1051 filters,
1052 ..
1053 }) => (
1054 "load_stored_with_validity",
1055 json!(format!(":{}", storage.name)),
1056 json!(null),
1057 json!(filters.iter().map(|f| f.to_string()).collect_vec()),
1058 ),
1059 RelAlgebra::Join(inner) => {
1060 if inner.left.is_unit() {
1061 rel_stack.push(&inner.right);
1062 continue;
1063 }
1064 let t = inner.join_type();
1065 let InnerJoin {
1066 left,
1067 right,
1068 joiner,
1069 ..
1070 } = inner.as_ref();
1071 rel_stack.push(left);
1072 rel_stack.push(right);
1073 (t, json!(null), json!(joiner.as_map()), json!(null))
1074 }
1075 RelAlgebra::NegJoin(inner) => {
1076 let t = inner.join_type();
1077 let NegJoin {
1078 left,
1079 right,
1080 joiner,
1081 ..
1082 } = inner.as_ref();
1083 rel_stack.push(left);
1084 rel_stack.push(right);
1085 (t, json!(null), json!(joiner.as_map()), json!(null))
1086 }
1087 RelAlgebra::Reorder(ReorderRA { relation, .. }) => {
1088 rel_stack.push(relation);
1089 ("reorder", json!(null), json!(null), json!(null))
1090 }
1091 RelAlgebra::Filter(FilteredRA {
1092 parent,
1093 filters: pred,
1094 ..
1095 }) => {
1096 rel_stack.push(parent);
1097 (
1098 "filter",
1099 json!(null),
1100 json!(null),
1101 json!(pred.iter().map(|f| f.to_string()).collect_vec()),
1102 )
1103 }
1104 RelAlgebra::Unification(UnificationRA {
1105 parent,
1106 binding,
1107 expr,
1108 is_multi,
1109 ..
1110 }) => {
1111 rel_stack.push(parent);
1112 (
1113 if *is_multi { "multi-unify" } else { "unify" },
1114 json!(binding.name),
1115 json!(null),
1116 json!(expr.to_string()),
1117 )
1118 }
1119 RelAlgebra::HnswSearch(HnswSearchRA {
1120 hnsw_search, ..
1121 }) => (
1122 "hnsw_index",
1123 json!(format!(":{}", hnsw_search.query.name)),
1124 json!(hnsw_search.query.name),
1125 json!(hnsw_search
1126 .filter
1127 .iter()
1128 .map(|f| f.to_string())
1129 .collect_vec()),
1130 ),
1131 RelAlgebra::FtsSearch(FtsSearchRA { fts_search, .. }) => (
1132 "fts_index",
1133 json!(format!(":{}", fts_search.query.name)),
1134 json!(fts_search.query.name),
1135 json!(fts_search
1136 .filter
1137 .iter()
1138 .map(|f| f.to_string())
1139 .collect_vec()),
1140 ),
1141 RelAlgebra::LshSearch(LshSearchRA { lsh_search, .. }) => (
1142 "lsh_index",
1143 json!(format!(":{}", lsh_search.query.name)),
1144 json!(lsh_search.query.name),
1145 json!(lsh_search
1146 .filter
1147 .iter()
1148 .map(|f| f.to_string())
1149 .collect_vec()),
1150 ),
1151 };
1152 ret_for_relation.push(json!({
1153 STRATUM: stratum,
1154 ATOM_IDX: idx,
1155 OP: atom_type,
1156 RULE_IDX: clause_idx,
1157 RULE_NAME: rule_name.to_string(),
1158 REF_NAME: ref_name,
1159 OUT_BINDINGS: rel.bindings_after_eliminate().into_iter().map(|v| v.to_string()).collect_vec(),
1160 JOINS_ON: joins_on,
1161 FILTERS: filters,
1162 }));
1163 idx += 1;
1164 }
1165 ret_for_relation.reverse();
1166 ret.extend(ret_for_relation)
1167 }
1168 }
1169 CompiledRuleSet::Fixed(_) => ret.push(json!({
1170 STRATUM: stratum,
1171 ATOM_IDX: 0,
1172 OP: "algo",
1173 RULE_IDX: 0,
1174 RULE_NAME: rule_name.to_string(),
1175 })),
1176 }
1177 }
1178 }
1179
1180 let rows = ret
1181 .into_iter()
1182 .map(|m| {
1183 headers
1184 .iter()
1185 .map(|i| DataValue::from(m.get(i).unwrap_or(&JsonValue::Null)))
1186 .collect_vec()
1187 })
1188 .collect_vec();
1189
1190 Ok(NamedRows::new(headers, rows))
1191 }
1192 pub(crate) fn run_sys_op_with_tx(
1193 &'s self,
1194 tx: &mut SessionTx<'_>,
1195 op: &SysOp,
1196 read_only: bool,
1197 skip_locking: bool,
1198 ) -> Result<NamedRows> {
1199 match op {
1200 SysOp::Explain(prog) => {
1201 let (normalized_program, _) = prog.clone().into_normalized_program(tx)?;
1202 let (stratified_program, _) = normalized_program.into_stratified_program()?;
1203 let program = stratified_program.magic_sets_rewrite(tx)?;
1204 let compiled = tx.stratified_magic_compile(program)?;
1205 self.explain_compiled(&compiled)
1206 }
1207 SysOp::Compact => {
1208 if read_only {
1209 bail!("Cannot compact in read-only mode");
1210 }
1211 self.compact_relation()?;
1212 Ok(NamedRows::new(
1213 vec![STATUS_STR.to_string()],
1214 vec![vec![DataValue::from(OK_STR)]],
1215 ))
1216 }
1217 SysOp::ListRelations => self.list_relations(tx),
1218 SysOp::ListFixedRules => {
1219 let rules = self.fixed_rules.read().unwrap();
1220 Ok(NamedRows::new(
1221 vec!["rule".to_string()],
1222 rules
1223 .keys()
1224 .map(|k| vec![DataValue::from(k as &str)])
1225 .collect_vec(),
1226 ))
1227 }
1228 SysOp::RemoveRelation(rel_names) => {
1229 if read_only {
1230 bail!("Cannot remove relations in read-only mode");
1231 }
1232 let rel_name_strs = rel_names.iter().map(|n| &n.name);
1233 let locks = if skip_locking {
1234 vec![]
1235 } else {
1236 self.obtain_relation_locks(rel_name_strs)
1237 };
1238 let _guards = locks.iter().map(|l| l.read().unwrap()).collect_vec();
1239 let mut bounds = vec![];
1240 for rs in rel_names {
1241 let bound = tx.destroy_relation(rs)?;
1242 if !rs.is_temp_store_name() {
1243 bounds.extend(bound);
1244 }
1245 }
1246 for (lower, upper) in bounds {
1247 tx.store_tx.del_range_from_persisted(&lower, &upper)?;
1248 }
1249 Ok(NamedRows::new(
1250 vec![STATUS_STR.to_string()],
1251 vec![vec![DataValue::from(OK_STR)]],
1252 ))
1253 }
1254 SysOp::DescribeRelation(rel_name, description) => {
1255 tx.describe_relation(rel_name, description)?;
1256 Ok(NamedRows::new(
1257 vec![STATUS_STR.to_string()],
1258 vec![vec![DataValue::from(OK_STR)]],
1259 ))
1260 }
1261 SysOp::CreateIndex(rel_name, idx_name, cols) => {
1262 if read_only {
1263 bail!("Cannot create index in read-only mode");
1264 }
1265 if skip_locking {
1266 tx.create_index(rel_name, idx_name, cols)?;
1267 } else {
1268 let lock = self
1269 .obtain_relation_locks(iter::once(&rel_name.name))
1270 .pop()
1271 .unwrap();
1272 let _guard = lock.write().unwrap();
1273 tx.create_index(rel_name, idx_name, cols)?;
1274 }
1275 Ok(NamedRows::new(
1276 vec![STATUS_STR.to_string()],
1277 vec![vec![DataValue::from(OK_STR)]],
1278 ))
1279 }
1280 SysOp::CreateVectorIndex(config) => {
1281 if read_only {
1282 bail!("Cannot create vector index in read-only mode");
1283 }
1284 if skip_locking {
1285 tx.create_hnsw_index(config)?;
1286 } else {
1287 let lock = self
1288 .obtain_relation_locks(iter::once(&config.base_relation))
1289 .pop()
1290 .unwrap();
1291 let _guard = lock.write().unwrap();
1292 tx.create_hnsw_index(config)?;
1293 }
1294 Ok(NamedRows::new(
1295 vec![STATUS_STR.to_string()],
1296 vec![vec![DataValue::from(OK_STR)]],
1297 ))
1298 }
1299 SysOp::CreateFtsIndex(config) => {
1300 if read_only {
1301 bail!("Cannot create fts index in read-only mode");
1302 }
1303 if skip_locking {
1304 tx.create_fts_index(config)?;
1305 } else {
1306 let lock = self
1307 .obtain_relation_locks(iter::once(&config.base_relation))
1308 .pop()
1309 .unwrap();
1310 let _guard = lock.write().unwrap();
1311 tx.create_fts_index(config)?;
1312 }
1313 Ok(NamedRows::new(
1314 vec![STATUS_STR.to_string()],
1315 vec![vec![DataValue::from(OK_STR)]],
1316 ))
1317 }
1318 SysOp::CreateMinHashLshIndex(config) => {
1319 if read_only {
1320 bail!("Cannot create minhash lsh index in read-only mode");
1321 }
1322 if skip_locking {
1323 tx.create_minhash_lsh_index(config)?;
1324 } else {
1325 let lock = self
1326 .obtain_relation_locks(iter::once(&config.base_relation))
1327 .pop()
1328 .unwrap();
1329 let _guard = lock.write().unwrap();
1330 tx.create_minhash_lsh_index(config)?;
1331 }
1332
1333 Ok(NamedRows::new(
1334 vec![STATUS_STR.to_string()],
1335 vec![vec![DataValue::from(OK_STR)]],
1336 ))
1337 }
1338 SysOp::RemoveIndex(rel_name, idx_name) => {
1339 if read_only {
1340 bail!("Cannot remove index in read-only mode");
1341 }
1342 let bounds = if skip_locking {
1343 tx.remove_index(rel_name, idx_name)?
1344 } else {
1345 let lock = self
1346 .obtain_relation_locks(iter::once(&rel_name.name))
1347 .pop()
1348 .unwrap();
1349 let _guard = lock.read().unwrap();
1350 tx.remove_index(rel_name, idx_name)?
1351 };
1352
1353 for (lower, upper) in bounds {
1354 tx.store_tx.del_range_from_persisted(&lower, &upper)?;
1355 }
1356 Ok(NamedRows::new(
1357 vec![STATUS_STR.to_string()],
1358 vec![vec![DataValue::from(OK_STR)]],
1359 ))
1360 }
1361 SysOp::ListColumns(rs) => self.list_columns(tx, rs),
1362 SysOp::ListIndices(rs) => self.list_indices(tx, rs),
1363 SysOp::RenameRelation(rename_pairs) => {
1364 if read_only {
1365 bail!("Cannot rename relations in read-only mode");
1366 }
1367 let rel_names = rename_pairs.iter().flat_map(|(f, t)| [&f.name, &t.name]);
1368 let locks = if skip_locking {
1369 vec![]
1370 } else {
1371 self.obtain_relation_locks(rel_names)
1372 };
1373 let _guards = locks.iter().map(|l| l.read().unwrap()).collect_vec();
1374 for (old, new) in rename_pairs {
1375 tx.rename_relation(old, new)?;
1376 }
1377 Ok(NamedRows::new(
1378 vec![STATUS_STR.to_string()],
1379 vec![vec![DataValue::from(OK_STR)]],
1380 ))
1381 }
1382 SysOp::ListRunning => self.list_running(),
1383 SysOp::KillRunning(id) => {
1384 let queries = self.running_queries.lock().unwrap();
1385 Ok(match queries.get(id) {
1386 None => NamedRows::new(
1387 vec![STATUS_STR.to_string()],
1388 vec![vec![DataValue::from("NOT_FOUND")]],
1389 ),
1390 Some(handle) => {
1391 handle.poison.0.store(true, Ordering::Relaxed);
1392 NamedRows::new(
1393 vec![STATUS_STR.to_string()],
1394 vec![vec![DataValue::from("KILLING")]],
1395 )
1396 }
1397 })
1398 }
1399 SysOp::ShowTrigger(name) => {
1400 let rel = tx.get_relation(name, false)?;
1401 let mut rows: Vec<Vec<JsonValue>> = vec![];
1402 for (i, trigger) in rel.put_triggers.iter().enumerate() {
1403 rows.push(vec![json!("put"), json!(i), json!(trigger)])
1404 }
1405 for (i, trigger) in rel.rm_triggers.iter().enumerate() {
1406 rows.push(vec![json!("rm"), json!(i), json!(trigger)])
1407 }
1408 for (i, trigger) in rel.replace_triggers.iter().enumerate() {
1409 rows.push(vec![json!("replace"), json!(i), json!(trigger)])
1410 }
1411 let rows = rows
1412 .into_iter()
1413 .map(|row| row.into_iter().map(DataValue::from).collect_vec())
1414 .collect_vec();
1415 Ok(NamedRows::new(
1416 vec!["type".to_string(), "idx".to_string(), "trigger".to_string()],
1417 rows,
1418 ))
1419 }
1420 SysOp::SetTriggers(name, puts, rms, replaces) => {
1421 if read_only {
1422 bail!("Cannot set triggers in read-only mode");
1423 }
1424 tx.set_relation_triggers(name, puts, rms, replaces)?;
1425 Ok(NamedRows::new(
1426 vec![STATUS_STR.to_string()],
1427 vec![vec![DataValue::from(OK_STR)]],
1428 ))
1429 }
1430 SysOp::SetAccessLevel(names, level) => {
1431 if read_only {
1432 bail!("Cannot set access level in read-only mode");
1433 }
1434 for name in names {
1435 tx.set_access_level(name, *level)?;
1436 }
1437 Ok(NamedRows::new(
1438 vec![STATUS_STR.to_string()],
1439 vec![vec![DataValue::from(OK_STR)]],
1440 ))
1441 }
1442 }
1443 }
1444 fn run_sys_op(&'s self, op: SysOp, read_only: bool) -> Result<NamedRows> {
1445 let mut tx = if read_only {
1446 self.transact()?
1447 } else {
1448 self.transact_write()?
1449 };
1450 let res = self.run_sys_op_with_tx(&mut tx, &op, read_only, false)?;
1451 tx.commit_tx()?;
1452 Ok(res)
1453 }
1454 pub(crate) fn run_query(
1456 &self,
1457 tx: &mut SessionTx<'_>,
1458 input_program: InputProgram,
1459 cur_vld: ValidityTs,
1460 callback_targets: &BTreeSet<SmartString<LazyCompact>>,
1461 callback_collector: &mut CallbackCollector,
1462 top_level: bool,
1463 ) -> Result<(NamedRows, Vec<(Vec<u8>, Vec<u8>)>)> {
1464 let mut clean_ups = vec![];
1466
1467 if let Some((meta, op, _)) = &input_program.out_opts.store_relation {
1469 if *op == RelationOp::Create {
1470 #[derive(Debug, Error, Diagnostic)]
1471 #[error("Stored relation {0} conflicts with an existing one")]
1472 #[diagnostic(code(eval::stored_relation_conflict))]
1473 struct StoreRelationConflict(String);
1474
1475 ensure!(
1476 !tx.relation_exists(&meta.name)?,
1477 StoreRelationConflict(meta.name.to_string())
1478 )
1479 } else if *op != RelationOp::Replace {
1480 #[derive(Debug, Error, Diagnostic)]
1481 #[error("Stored relation {0} not found")]
1482 #[diagnostic(code(eval::stored_relation_not_found))]
1483 struct StoreRelationNotFoundError(String);
1484
1485 let existing = tx.get_relation(&meta.name, false)?;
1486
1487 ensure!(
1488 tx.relation_exists(&meta.name)?,
1489 StoreRelationNotFoundError(meta.name.to_string())
1490 );
1491
1492 existing.ensure_compatible(
1493 meta,
1494 *op == RelationOp::Rm || *op == RelationOp::Delete || *op == RelationOp::Update,
1495 )?;
1496 }
1497 };
1498
1499 let entry_head_or_default = input_program.get_entry_out_head_or_default()?;
1501 let (normalized_program, out_opts) = input_program.into_normalized_program(tx)?;
1502 let (stratified_program, store_lifetimes) = normalized_program.into_stratified_program()?;
1503 let program = stratified_program.magic_sets_rewrite(tx)?;
1504 let compiled = tx.stratified_magic_compile(program)?;
1505
1506 let poison = Poison::default();
1508 if let Some(secs) = out_opts.timeout {
1509 poison.set_timeout(secs)?;
1510 }
1511 let id = self.queries_count.fetch_add(1, Ordering::AcqRel);
1513
1514 let since_the_epoch = seconds_since_the_epoch()?;
1516
1517 let handle = RunningQueryHandle {
1518 started_at: since_the_epoch,
1519 poison: poison.clone(),
1520 };
1521 self.running_queries.lock().unwrap().insert(id, handle);
1522
1523 let _guard = RunningQueryCleanup {
1525 id,
1526 running_queries: self.running_queries.clone(),
1527 };
1528
1529 let total_num_to_take = if out_opts.sorters.is_empty() {
1530 out_opts.num_to_take()
1531 } else {
1532 None
1533 };
1534
1535 let num_to_skip = if out_opts.sorters.is_empty() {
1536 out_opts.offset
1537 } else {
1538 None
1539 };
1540
1541 let (result_store, early_return) = tx.stratified_magic_evaluate(
1543 &compiled,
1544 store_lifetimes,
1545 total_num_to_take,
1546 num_to_skip,
1547 poison,
1548 )?;
1549
1550 if let Some(assertion) = &out_opts.assertion {
1552 match assertion {
1553 QueryAssertion::AssertNone(span) => {
1554 if let Some(tuple) = result_store.all_iter().next() {
1555 #[derive(Debug, Error, Diagnostic)]
1556 #[error(
1557 "The query is asserted to return no result, but a tuple {0:?} is found"
1558 )]
1559 #[diagnostic(code(eval::assert_none_failure))]
1560 struct AssertNoneFailure(Tuple, #[label] SourceSpan);
1561 bail!(AssertNoneFailure(tuple.into_tuple(), *span))
1562 }
1563 }
1564 QueryAssertion::AssertSome(span) => {
1565 if result_store.all_iter().next().is_none() {
1566 #[derive(Debug, Error, Diagnostic)]
1567 #[error("The query is asserted to return some results, but returned none")]
1568 #[diagnostic(code(eval::assert_some_failure))]
1569 struct AssertSomeFailure(#[label] SourceSpan);
1570 bail!(AssertSomeFailure(*span))
1571 }
1572 }
1573 }
1574 }
1575
1576 if !out_opts.sorters.is_empty() {
1577 let sorted_result =
1579 tx.sort_and_collect(result_store, &out_opts.sorters, &entry_head_or_default)?;
1580 let sorted_iter = if let Some(offset) = out_opts.offset {
1581 Left(sorted_result.into_iter().skip(offset))
1582 } else {
1583 Right(sorted_result.into_iter())
1584 };
1585 let sorted_iter = if let Some(limit) = out_opts.limit {
1586 Left(sorted_iter.take(limit))
1587 } else {
1588 Right(sorted_iter)
1589 };
1590 if let Some((meta, relation_op, returning)) = &out_opts.store_relation {
1591 let to_clear = tx
1592 .execute_relation(
1593 self,
1594 sorted_iter,
1595 *relation_op,
1596 meta,
1597 &entry_head_or_default,
1598 cur_vld,
1599 callback_targets,
1600 callback_collector,
1601 top_level,
1602 if *returning == ReturnMutation::Returning {
1603 &meta.name.name
1604 } else {
1605 ""
1606 },
1607 )
1608 .wrap_err_with(|| format!("when executing against relation '{}'", meta.name))?;
1609 clean_ups.extend(to_clear);
1610 let returned_rows =
1611 tx.get_returning_rows(callback_collector, &meta.name, returning)?;
1612 Ok((returned_rows, clean_ups))
1613 } else {
1614 let rows: Vec<Tuple> = sorted_iter.collect_vec();
1616 Ok((
1617 NamedRows::new(
1618 entry_head_or_default
1619 .iter()
1620 .map(|s| s.to_string())
1621 .collect_vec(),
1622 rows,
1623 ),
1624 clean_ups,
1625 ))
1626 }
1627 } else {
1628 let scan = if early_return {
1629 Right(Left(
1630 result_store.early_returned_iter().map(|t| t.into_tuple()),
1631 ))
1632 } else if out_opts.limit.is_some() || out_opts.offset.is_some() {
1633 let limit = out_opts.limit.unwrap_or(usize::MAX);
1634 let offset = out_opts.offset.unwrap_or(0);
1635 Right(Right(
1636 result_store
1637 .all_iter()
1638 .skip(offset)
1639 .take(limit)
1640 .map(|t| t.into_tuple()),
1641 ))
1642 } else {
1643 Left(result_store.all_iter().map(|t| t.into_tuple()))
1644 };
1645
1646 if let Some((meta, relation_op, returning)) = &out_opts.store_relation {
1647 let to_clear = tx
1648 .execute_relation(
1649 self,
1650 scan,
1651 *relation_op,
1652 meta,
1653 &entry_head_or_default,
1654 cur_vld,
1655 callback_targets,
1656 callback_collector,
1657 top_level,
1658 if *returning == ReturnMutation::Returning {
1659 &meta.name.name
1660 } else {
1661 ""
1662 },
1663 )
1664 .wrap_err_with(|| format!("when executing against relation '{}'", meta.name))?;
1665 clean_ups.extend(to_clear);
1666 let returned_rows =
1667 tx.get_returning_rows(callback_collector, &meta.name, returning)?;
1668
1669 Ok((returned_rows, clean_ups))
1670 } else {
1671 let rows: Vec<Tuple> = scan.collect_vec();
1672
1673 Ok((
1674 NamedRows::new(
1675 entry_head_or_default
1676 .iter()
1677 .map(|s| s.to_string())
1678 .collect_vec(),
1679 rows,
1680 ),
1681 clean_ups,
1682 ))
1683 }
1684 }
1685 }
1686 pub(crate) fn list_running(&self) -> Result<NamedRows> {
1687 let rows = self
1688 .running_queries
1689 .lock()
1690 .unwrap()
1691 .iter()
1692 .map(|(k, v)| {
1693 vec![
1694 DataValue::from(*k as i64),
1695 DataValue::from(format!("{:?}", v.started_at)),
1696 ]
1697 })
1698 .collect_vec();
1699 Ok(NamedRows::new(
1700 vec!["id".to_string(), "started_at".to_string()],
1701 rows,
1702 ))
1703 }
1704 fn list_indices(&'s self, tx: &SessionTx<'_>, name: &str) -> Result<NamedRows> {
1705 let handle = tx.get_relation(name, false)?;
1706 let mut rows = vec![];
1707 for (name, (rel, cols)) in &handle.indices {
1708 rows.push(vec![
1709 json!(name),
1710 json!("normal"),
1711 json!([rel.name]),
1712 json!({ "indices": cols }),
1713 ]);
1714 }
1715 for (name, (rel, manifest)) in &handle.hnsw_indices {
1716 rows.push(vec![
1717 json!(name),
1718 json!("hnsw"),
1719 json!([rel.name]),
1720 json!({
1721 "vec_dim": manifest.vec_dim,
1722 "dtype": manifest.dtype,
1723 "vec_fields": manifest.vec_fields,
1724 "distance": manifest.distance,
1725 "ef_construction": manifest.ef_construction,
1726 "m_neighbours": manifest.m_neighbours,
1727 "m_max": manifest.m_max,
1728 "m_max0": manifest.m_max0,
1729 "level_multiplier": manifest.level_multiplier,
1730 "extend_candidates": manifest.extend_candidates,
1731 "keep_pruned_connections": manifest.keep_pruned_connections,
1732 }),
1733 ]);
1734 }
1735 for (name, (rel, manifest)) in &handle.fts_indices {
1736 rows.push(vec![
1737 json!(name),
1738 json!("fts"),
1739 json!([rel.name]),
1740 json!({
1741 "extractor": manifest.extractor,
1742 "tokenizer": manifest.tokenizer,
1743 "tokenizer_filters": manifest.filters,
1744 }),
1745 ]);
1746 }
1747 for (name, (rel, inv_rel, manifest)) in &handle.lsh_indices {
1748 rows.push(vec![
1749 json!(name),
1750 json!("lsh"),
1751 json!([rel.name, inv_rel.name]),
1752 json!({
1753 "extractor": manifest.extractor,
1754 "tokenizer": manifest.tokenizer,
1755 "tokenizer_filters": manifest.filters,
1756 "n_gram": manifest.n_gram,
1757 "num_perm": manifest.num_perm,
1758 "n_bands": manifest.n_bands,
1759 "n_rows_in_band": manifest.n_rows_in_band,
1760 "threshold": manifest.threshold,
1761 }),
1762 ]);
1763 }
1764 let rows = rows
1765 .into_iter()
1766 .map(|row| row.into_iter().map(DataValue::from).collect_vec())
1767 .collect_vec();
1768 Ok(NamedRows::new(
1769 vec![
1770 "name".to_string(),
1771 "type".to_string(),
1772 "relations".to_string(),
1773 "config".to_string(),
1774 ],
1775 rows,
1776 ))
1777 }
1778 fn list_columns(&'s self, tx: &SessionTx<'_>, name: &str) -> Result<NamedRows> {
1779 let handle = tx.get_relation(name, false)?;
1780 let mut rows = vec![];
1781 let mut idx = 0;
1782 for col in &handle.metadata.keys {
1783 let default_expr = col.default_gen.as_ref().map(|gen| format!("{}", gen));
1784
1785 rows.push(vec![
1786 json!(col.name),
1787 json!(true),
1788 json!(idx),
1789 json!(col.typing.to_string()),
1790 json!(col.default_gen.is_some()),
1791 json!(default_expr),
1792 ]);
1793 idx += 1;
1794 }
1795 for col in &handle.metadata.non_keys {
1796 let default_expr = col.default_gen.as_ref().map(|gen| format!("{}", gen));
1797
1798 rows.push(vec![
1799 json!(col.name),
1800 json!(false),
1801 json!(idx),
1802 json!(col.typing.to_string()),
1803 json!(col.default_gen.is_some()),
1804 json!(default_expr),
1805 ]);
1806 idx += 1;
1807 }
1808 let rows = rows
1809 .into_iter()
1810 .map(|row| row.into_iter().map(DataValue::from).collect_vec())
1811 .collect_vec();
1812 Ok(NamedRows::new(
1813 vec![
1814 "column".to_string(),
1815 "is_key".to_string(),
1816 "index".to_string(),
1817 "type".to_string(),
1818 "has_default".to_string(),
1819 "default_expr".to_string(),
1820 ],
1821 rows,
1822 ))
1823 }
1824 fn list_relations(&'s self, tx: &SessionTx<'_>) -> Result<NamedRows> {
1825 let lower = vec![DataValue::from("")].encode_as_key(RelationId::SYSTEM);
1826 let upper =
1827 vec![DataValue::from(String::from(LARGEST_UTF_CHAR))].encode_as_key(RelationId::SYSTEM);
1828 let mut rows: Vec<Vec<JsonValue>> = vec![];
1829 for kv_res in tx.store_tx.range_scan(&lower, &upper) {
1830 let (k_slice, v_slice) = kv_res?;
1831 if upper <= k_slice {
1832 break;
1833 }
1834 let meta = RelationHandle::decode(&v_slice)?;
1835 let n_keys = meta.metadata.keys.len();
1836 let n_dependents = meta.metadata.non_keys.len();
1837 let arity = n_keys + n_dependents;
1838 let name = meta.name;
1839 let access_level = if name.contains(':') {
1840 "index".to_string()
1841 } else {
1842 meta.access_level.to_string()
1843 };
1844 rows.push(vec![
1845 json!(name),
1846 json!(arity),
1847 json!(access_level),
1848 json!(n_keys),
1849 json!(n_dependents),
1850 json!(meta.put_triggers.len()),
1851 json!(meta.rm_triggers.len()),
1852 json!(meta.replace_triggers.len()),
1853 json!(meta.description),
1854 ]);
1855 }
1856 let rows = rows
1857 .into_iter()
1858 .map(|row| row.into_iter().map(DataValue::from).collect_vec())
1859 .collect_vec();
1860 Ok(NamedRows::new(
1861 vec![
1862 "name".to_string(),
1863 "arity".to_string(),
1864 "access_level".to_string(),
1865 "n_keys".to_string(),
1866 "n_non_keys".to_string(),
1867 "n_put_triggers".to_string(),
1868 "n_rm_triggers".to_string(),
1869 "n_replace_triggers".to_string(),
1870 "description".to_string(),
1871 ],
1872 rows,
1873 ))
1874 }
1875}
1876
1877pub fn evaluate_expressions(
1879 src: &str,
1880 params: &BTreeMap<String, DataValue>,
1881 vars: &BTreeMap<String, DataValue>,
1882) -> Result<DataValue> {
1883 _evaluate_expressions(src, params, vars).map_err(|err| {
1884 if err.source().is_none() {
1885 err.with_source_code(format!("{src} "))
1886 } else {
1887 err
1888 }
1889 })
1890}
1891
1892pub fn get_variables(src: &str, params: &BTreeMap<String, DataValue>) -> Result<BTreeSet<String>> {
1894 _get_variables(src, params).map_err(|err| {
1895 if err.source().is_none() {
1896 err.with_source_code(format!("{src} "))
1897 } else {
1898 err
1899 }
1900 })
1901}
1902
1903fn _evaluate_expressions(
1904 src: &str,
1905 params: &BTreeMap<String, DataValue>,
1906 vars: &BTreeMap<String, DataValue>,
1907) -> Result<DataValue> {
1908 let mut expr = parse_expressions(src, params)?;
1909 let mut ctx = vec![];
1910 let mut binding_map = BTreeMap::new();
1911 for (i, (k, v)) in vars.iter().enumerate() {
1912 ctx.push(v.clone());
1913 binding_map.insert(Symbol::new(k, Default::default()), i);
1914 }
1915 expr.fill_binding_indices(&binding_map)?;
1916 expr.eval(&ctx)
1917}
1918
1919fn _get_variables(src: &str, params: &BTreeMap<String, DataValue>) -> Result<BTreeSet<String>> {
1920 let expr = parse_expressions(src, params)?;
1921 expr.get_variables()
1922}
1923
1924#[derive(Clone, Default)]
1926pub struct Poison(pub(crate) Arc<AtomicBool>);
1927
1928impl Poison {
1929 #[inline(always)]
1931 pub fn check(&self) -> Result<()> {
1932 #[derive(Debug, Error, Diagnostic)]
1933 #[error("Running query is killed before completion")]
1934 #[diagnostic(code(eval::killed))]
1935 #[diagnostic(help("A query may be killed by timeout, or explicit command"))]
1936 struct ProcessKilled;
1937
1938 if self.0.load(Ordering::Relaxed) {
1939 bail!(ProcessKilled)
1940 }
1941 Ok(())
1942 }
1943 #[cfg(target_arch = "wasm32")]
1944 pub(crate) fn set_timeout(&self, _secs: f64) -> Result<()> {
1945 bail!("Cannot set timeout when threading is disallowed");
1946 }
1947 #[cfg(not(target_arch = "wasm32"))]
1948 pub(crate) fn set_timeout(&self, secs: f64) -> Result<()> {
1949 let pill = self.clone();
1950 thread::spawn(move || {
1951 thread::sleep(Duration::from_micros((secs * 1000000.) as u64));
1952 pill.0.store(true, Ordering::Relaxed);
1953 });
1954 Ok(())
1955 }
1956}
1957
1958pub(crate) fn seconds_since_the_epoch() -> Result<f64> {
1959 #[cfg(not(target_arch = "wasm32"))]
1960 let now = SystemTime::now();
1961 #[cfg(not(target_arch = "wasm32"))]
1962 return Ok(now
1963 .duration_since(UNIX_EPOCH)
1964 .into_diagnostic()?
1965 .as_secs_f64());
1966
1967 #[cfg(target_arch = "wasm32")]
1968 Ok(js_sys::Date::now())
1969}