1#![doc = document_features::document_features!()]
29#![warn(rust_2018_idioms, future_incompatible)]
30#![warn(missing_docs)]
31#![allow(clippy::type_complexity)]
32#![allow(clippy::too_many_arguments)]
33
34use std::collections::BTreeMap;
35use std::path::Path;
36use std::sync::Arc;
37#[allow(unused_imports)]
38use std::time::Instant;
39
40use crossbeam::channel::{bounded, Receiver, Sender};
41use data::functions::current_validity;
42use lazy_static::lazy_static;
43pub use miette::Error;
44use miette::Report;
45#[allow(unused_imports)]
46use miette::{
47 bail, miette, GraphicalReportHandler, GraphicalTheme, IntoDiagnostic, JSONReportHandler,
48 Result, ThemeCharacters, ThemeStyles,
49};
50use parse::parse_script;
51use parse::CozoScript;
52use serde_json::json;
53
54pub use data::value::{DataValue, Num, RegexWrapper, UuidWrapper, Validity, ValidityTs};
55pub use fixed_rule::{FixedRule, FixedRuleInputRelation, FixedRulePayload};
56pub use runtime::db::Db;
57pub use runtime::db::NamedRows;
58pub use runtime::relation::decode_tuple_from_kv;
59pub use runtime::temp_store::RegularTempStore;
60pub use storage::mem::{new_cozo_mem, MemStorage};
61#[cfg(feature = "storage-new-rocksdb")]
62pub use storage::newrocks::{new_cozo_newrocksdb, NewRocksDbStorage};
63#[cfg(feature = "storage-rocksdb")]
64pub use storage::rocks::{new_cozo_rocksdb, RocksDbStorage};
65#[cfg(feature = "storage-sled")]
66pub use storage::sled::{new_cozo_sled, SledStorage};
67#[cfg(feature = "storage-sqlite")]
68pub use storage::sqlite::{new_cozo_sqlite, SqliteStorage};
69#[cfg(feature = "storage-tikv")]
70pub use storage::tikv::{new_cozo_tikv, TiKvStorage};
71pub use storage::{Storage, StoreTx};
72
73pub use crate::data::expr::Expr;
74use crate::data::json::JsonValue;
75pub use crate::data::symb::Symbol;
76pub use crate::data::value::{JsonData, Vector};
77pub use crate::fixed_rule::SimpleFixedRule;
78pub use crate::parse::SourceSpan;
79pub use crate::runtime::callback::CallbackOp;
80pub use crate::runtime::db::evaluate_expressions;
81pub use crate::runtime::db::get_variables;
82pub use crate::runtime::db::Payload;
83pub use crate::runtime::db::Poison;
84pub use crate::runtime::db::ScriptMutability;
85pub use crate::runtime::db::TransactionPayload;
86
87pub mod data;
88pub(crate) mod fixed_rule;
89pub(crate) mod fts;
90pub mod parse;
91pub(crate) mod query;
92pub(crate) mod runtime;
93pub(crate) mod storage;
94pub(crate) mod utils;
95
96use rayon::spawn;
97
98#[derive(Clone)]
108pub enum DbInstance {
109 Mem(Db<MemStorage>),
111 #[cfg(feature = "storage-sqlite")]
112 Sqlite(Db<SqliteStorage>),
114 #[cfg(feature = "storage-rocksdb")]
115 RocksDb(Db<RocksDbStorage>),
117 #[cfg(feature = "storage-new-rocksdb")]
118 NewRocksDb(Db<NewRocksDbStorage>),
120 #[cfg(feature = "storage-sled")]
121 Sled(Db<SledStorage>),
123 #[cfg(feature = "storage-tikv")]
124 TiKv(Db<TiKvStorage>),
126}
127
128impl Default for DbInstance {
129 fn default() -> Self {
130 Self::new("mem", "", Default::default()).unwrap()
131 }
132}
133
134impl DbInstance {
135 #[allow(unused_variables)]
151 pub fn new(engine: &str, path: impl AsRef<Path>, options: &str) -> Result<Self> {
152 let options = if options.is_empty() { "{}" } else { options };
153 Ok(match engine {
154 "mem" => Self::Mem(new_cozo_mem()?),
155 #[cfg(feature = "storage-sqlite")]
156 "sqlite" => Self::Sqlite(new_cozo_sqlite(path)?),
157 #[cfg(feature = "storage-rocksdb")]
158 "rocksdb" => Self::RocksDb(new_cozo_rocksdb(path)?),
159 #[cfg(feature = "storage-new-rocksdb")]
160 "newrocksdb" => Self::NewRocksDb(new_cozo_newrocksdb(path)?),
161 #[cfg(feature = "storage-sled")]
162 "sled" => Self::Sled(new_cozo_sled(path)?),
163 #[cfg(feature = "storage-tikv")]
164 "tikv" => {
165 #[derive(serde_derive::Deserialize)]
166 struct TiKvOpts {
167 end_points: Vec<String>,
168 optimistic: bool,
169 }
170 let opts: TiKvOpts = serde_json::from_str(options).into_diagnostic()?;
171 Self::TiKv(new_cozo_tikv(opts.end_points.clone(), opts.optimistic)?)
172 }
173 k => bail!(
174 "database engine '{}' not supported (maybe not compiled in)",
175 k
176 ),
177 })
178 }
179 pub fn new_with_str(
181 engine: &str,
182 path: &str,
183 options: &str,
184 ) -> std::result::Result<Self, String> {
185 Self::new(engine, path, options).map_err(|err| err.to_string())
186 }
187
188 pub fn get_fixed_rules(&self) -> BTreeMap<String, Arc<Box<dyn FixedRule>>> {
190 match self {
191 DbInstance::Mem(db) => db.get_fixed_rules(),
192 #[cfg(feature = "storage-sqlite")]
193 DbInstance::Sqlite(db) => db.get_fixed_rules(),
194 #[cfg(feature = "storage-rocksdb")]
195 DbInstance::RocksDb(db) => db.get_fixed_rules(),
196 #[cfg(feature = "storage-new-rocksdb")]
197 DbInstance::NewRocksDb(db) => db.get_fixed_rules(),
198 #[cfg(feature = "storage-sled")]
199 DbInstance::Sled(db) => db.get_fixed_rules(),
200 #[cfg(feature = "storage-tikv")]
201 DbInstance::TiKv(db) => db.get_fixed_rules(),
202 }
203 }
204 pub fn run_script(
206 &self,
207 payload: &str,
208 params: BTreeMap<String, DataValue>,
209 mutability: ScriptMutability,
210 ) -> Result<NamedRows> {
211 let cur_vld = current_validity();
212 self.run_script_ast(
213 parse_script(payload, ¶ms, &self.get_fixed_rules(), cur_vld)?,
214 cur_vld,
215 mutability,
216 )
217 }
218 pub fn run_default(&self, payload: &str) -> Result<NamedRows> {
220 self.run_script(payload, BTreeMap::new(), ScriptMutability::Mutable)
221 }
222 pub fn run_script_ast(
224 &self,
225 payload: CozoScript,
226 cur_vld: ValidityTs,
227 mutability: ScriptMutability,
228 ) -> Result<NamedRows> {
229 match self {
230 DbInstance::Mem(db) => db.run_script_ast(payload, cur_vld, mutability),
231 #[cfg(feature = "storage-sqlite")]
232 DbInstance::Sqlite(db) => db.run_script_ast(payload, cur_vld, mutability),
233 #[cfg(feature = "storage-rocksdb")]
234 DbInstance::RocksDb(db) => db.run_script_ast(payload, cur_vld, mutability),
235 #[cfg(feature = "storage-new-rocksdb")]
236 DbInstance::NewRocksDb(db) => db.run_script_ast(payload, cur_vld, mutability),
237 #[cfg(feature = "storage-sled")]
238 DbInstance::Sled(db) => db.run_script_ast(payload, cur_vld, mutability),
239 #[cfg(feature = "storage-tikv")]
240 DbInstance::TiKv(db) => db.run_script_ast(payload, cur_vld, mutability),
241 }
242 }
243 pub fn run_script_fold_err(
247 &self,
248 payload: &str,
249 params: BTreeMap<String, DataValue>,
250 mutability: ScriptMutability,
251 ) -> JsonValue {
252 #[cfg(not(target_arch = "wasm32"))]
253 let start = Instant::now();
254
255 match self.run_script(payload, params, mutability) {
256 Ok(named_rows) => {
257 let mut j_val = named_rows.into_json();
258 #[cfg(not(target_arch = "wasm32"))]
259 let took = start.elapsed().as_secs_f64();
260 let map = j_val.as_object_mut().unwrap();
261 map.insert("ok".to_string(), json!(true));
262 #[cfg(not(target_arch = "wasm32"))]
263 map.insert("took".to_string(), json!(took));
264
265 j_val
266 }
267 Err(err) => format_error_as_json(err, Some(payload)),
268 }
269 }
270 pub fn run_script_str(&self, payload: &str, params: &str, immutable: bool) -> String {
273 let params_json = if params.is_empty() {
274 BTreeMap::default()
275 } else {
276 match serde_json::from_str::<BTreeMap<String, JsonValue>>(params) {
277 Ok(map) => map
278 .into_iter()
279 .map(|(k, v)| (k, DataValue::from(v)))
280 .collect(),
281 Err(_) => {
282 return json!({"ok": false, "message": "params argument is not a JSON map"})
283 .to_string();
284 }
285 }
286 };
287 self.run_script_fold_err(
288 payload,
289 params_json,
290 if immutable {
291 ScriptMutability::Immutable
292 } else {
293 ScriptMutability::Mutable
294 },
295 )
296 .to_string()
297 }
298 pub fn export_relations<I, T>(&self, relations: I) -> Result<BTreeMap<String, NamedRows>>
300 where
301 T: AsRef<str>,
302 I: Iterator<Item = T>,
303 {
304 match self {
305 DbInstance::Mem(db) => db.export_relations(relations),
306 #[cfg(feature = "storage-sqlite")]
307 DbInstance::Sqlite(db) => db.export_relations(relations),
308 #[cfg(feature = "storage-rocksdb")]
309 DbInstance::RocksDb(db) => db.export_relations(relations),
310 #[cfg(feature = "storage-new-rocksdb")]
311 DbInstance::NewRocksDb(db) => db.export_relations(relations),
312 #[cfg(feature = "storage-sled")]
313 DbInstance::Sled(db) => db.export_relations(relations),
314 #[cfg(feature = "storage-tikv")]
315 DbInstance::TiKv(db) => db.export_relations(relations),
316 }
317 }
318 pub fn export_relations_str(&self, data: &str) -> String {
321 match self.export_relations_str_inner(data) {
322 Ok(s) => {
323 let ret = json!({"ok": true, "data": s});
324 format!("{ret}")
325 }
326 Err(err) => {
327 let ret = json!({"ok": false, "message": err.to_string()});
328 format!("{ret}")
329 }
330 }
331 }
332 fn export_relations_str_inner(&self, data: &str) -> Result<JsonValue> {
333 #[derive(serde_derive::Deserialize)]
334 struct Payload {
335 relations: Vec<String>,
336 }
337 let j_val: Payload = serde_json::from_str(data).into_diagnostic()?;
338 let results = self.export_relations(j_val.relations.iter().map(|s| s as &str))?;
339 Ok(results
340 .into_iter()
341 .map(|(k, v)| (k, v.into_json()))
342 .collect())
343 }
344 pub fn import_relations(&self, data: BTreeMap<String, NamedRows>) -> Result<()> {
346 match self {
347 DbInstance::Mem(db) => db.import_relations(data),
348 #[cfg(feature = "storage-sqlite")]
349 DbInstance::Sqlite(db) => db.import_relations(data),
350 #[cfg(feature = "storage-rocksdb")]
351 DbInstance::RocksDb(db) => db.import_relations(data),
352 #[cfg(feature = "storage-new-rocksdb")]
353 DbInstance::NewRocksDb(db) => db.import_relations(data),
354 #[cfg(feature = "storage-sled")]
355 DbInstance::Sled(db) => db.import_relations(data),
356 #[cfg(feature = "storage-tikv")]
357 DbInstance::TiKv(db) => db.import_relations(data),
358 }
359 }
360 pub fn import_relations_str(&self, data: &str) -> String {
363 match self.import_relations_str_with_err(data) {
364 Ok(()) => {
365 format!("{}", json!({"ok": true}))
366 }
367 Err(err) => {
368 format!("{}", json!({"ok": false, "message": err.to_string()}))
369 }
370 }
371 }
372 pub fn import_relations_str_with_err(&self, data: &str) -> Result<()> {
375 let json_data: JsonValue = serde_json::from_str(data).into_diagnostic()?;
376 let json_object = json_data
377 .as_object()
378 .ok_or_else(|| miette!("A JSON object is requried"))?;
379 let mapping = json_object
380 .iter()
381 .map(|(k, v)| -> Result<(String, NamedRows)> {
382 Ok((k.to_string(), NamedRows::from_json(v)?))
383 })
384 .collect::<Result<_>>()?;
385 self.import_relations(mapping)
386 }
387 pub fn backup_db(&self, out_file: impl AsRef<Path>) -> Result<()> {
389 match self {
390 DbInstance::Mem(db) => db.backup_db(out_file),
391 #[cfg(feature = "storage-sqlite")]
392 DbInstance::Sqlite(db) => db.backup_db(out_file),
393 #[cfg(feature = "storage-rocksdb")]
394 DbInstance::RocksDb(db) => db.backup_db(out_file),
395 #[cfg(feature = "storage-new-rocksdb")]
396 DbInstance::NewRocksDb(db) => db.backup_db(out_file),
397 #[cfg(feature = "storage-sled")]
398 DbInstance::Sled(db) => db.backup_db(out_file),
399 #[cfg(feature = "storage-tikv")]
400 DbInstance::TiKv(db) => db.backup_db(out_file),
401 }
402 }
403 pub fn backup_db_str(&self, out_file: impl AsRef<Path>) -> String {
406 match self.backup_db(out_file) {
407 Ok(_) => json!({"ok": true}).to_string(),
408 Err(err) => json!({"ok": false, "message": err.to_string()}).to_string(),
409 }
410 }
411 pub fn restore_backup(&self, in_file: impl AsRef<Path>) -> Result<()> {
413 match self {
414 DbInstance::Mem(db) => db.restore_backup(in_file),
415 #[cfg(feature = "storage-sqlite")]
416 DbInstance::Sqlite(db) => db.restore_backup(in_file),
417 #[cfg(feature = "storage-rocksdb")]
418 DbInstance::RocksDb(db) => db.restore_backup(in_file),
419 #[cfg(feature = "storage-new-rocksdb")]
420 DbInstance::NewRocksDb(db) => db.restore_backup(in_file),
421 #[cfg(feature = "storage-sled")]
422 DbInstance::Sled(db) => db.restore_backup(in_file),
423 #[cfg(feature = "storage-tikv")]
424 DbInstance::TiKv(db) => db.restore_backup(in_file),
425 }
426 }
427 pub fn restore_backup_str(&self, in_file: impl AsRef<Path>) -> String {
430 match self.restore_backup(in_file) {
431 Ok(_) => json!({"ok": true}).to_string(),
432 Err(err) => json!({"ok": false, "message": err.to_string()}).to_string(),
433 }
434 }
435 pub fn import_from_backup(
437 &self,
438 in_file: impl AsRef<Path>,
439 relations: &[String],
440 ) -> Result<()> {
441 match self {
442 DbInstance::Mem(db) => db.import_from_backup(in_file, relations),
443 #[cfg(feature = "storage-sqlite")]
444 DbInstance::Sqlite(db) => db.import_from_backup(in_file, relations),
445 #[cfg(feature = "storage-rocksdb")]
446 DbInstance::RocksDb(db) => db.import_from_backup(in_file, relations),
447 #[cfg(feature = "storage-new-rocksdb")]
448 DbInstance::NewRocksDb(db) => db.import_from_backup(in_file, relations),
449 #[cfg(feature = "storage-sled")]
450 DbInstance::Sled(db) => db.import_from_backup(in_file, relations),
451 #[cfg(feature = "storage-tikv")]
452 DbInstance::TiKv(db) => db.import_from_backup(in_file, relations),
453 }
454 }
455 pub fn import_from_backup_str(&self, payload: &str) -> String {
458 match self.import_from_backup_str_inner(payload) {
459 Ok(_) => json!({"ok": true}).to_string(),
460 Err(err) => json!({"ok": false, "message": err.to_string()}).to_string(),
461 }
462 }
463 fn import_from_backup_str_inner(&self, payload: &str) -> Result<()> {
464 #[derive(serde_derive::Deserialize)]
465 struct Payload {
466 path: String,
467 relations: Vec<String>,
468 }
469 let json_payload: Payload = serde_json::from_str(payload).into_diagnostic()?;
470
471 self.import_from_backup(&json_payload.path, &json_payload.relations)
472 }
473
474 #[cfg(not(target_arch = "wasm32"))]
476 pub fn register_callback(
477 &self,
478 relation: &str,
479 capacity: Option<usize>,
480 ) -> (u32, Receiver<(CallbackOp, NamedRows, NamedRows)>) {
481 match self {
482 DbInstance::Mem(db) => db.register_callback(relation, capacity),
483 #[cfg(feature = "storage-sqlite")]
484 DbInstance::Sqlite(db) => db.register_callback(relation, capacity),
485 #[cfg(feature = "storage-rocksdb")]
486 DbInstance::RocksDb(db) => db.register_callback(relation, capacity),
487 #[cfg(feature = "storage-new-rocksdb")]
488 DbInstance::NewRocksDb(db) => db.register_callback(relation, capacity),
489 #[cfg(feature = "storage-sled")]
490 DbInstance::Sled(db) => db.register_callback(relation, capacity),
491 #[cfg(feature = "storage-tikv")]
492 DbInstance::TiKv(db) => db.register_callback(relation, capacity),
493 }
494 }
495
496 #[cfg(not(target_arch = "wasm32"))]
498 pub fn unregister_callback(&self, id: u32) -> bool {
499 match self {
500 DbInstance::Mem(db) => db.unregister_callback(id),
501 #[cfg(feature = "storage-sqlite")]
502 DbInstance::Sqlite(db) => db.unregister_callback(id),
503 #[cfg(feature = "storage-rocksdb")]
504 DbInstance::RocksDb(db) => db.unregister_callback(id),
505 #[cfg(feature = "storage-new-rocksdb")]
506 DbInstance::NewRocksDb(db) => db.unregister_callback(id),
507 #[cfg(feature = "storage-sled")]
508 DbInstance::Sled(db) => db.unregister_callback(id),
509 #[cfg(feature = "storage-tikv")]
510 DbInstance::TiKv(db) => db.unregister_callback(id),
511 }
512 }
513 pub fn register_fixed_rule<R>(&self, name: String, rule_impl: R) -> Result<()>
515 where
516 R: FixedRule + 'static,
517 {
518 match self {
519 DbInstance::Mem(db) => db.register_fixed_rule(name, rule_impl),
520 #[cfg(feature = "storage-sqlite")]
521 DbInstance::Sqlite(db) => db.register_fixed_rule(name, rule_impl),
522 #[cfg(feature = "storage-rocksdb")]
523 DbInstance::RocksDb(db) => db.register_fixed_rule(name, rule_impl),
524 #[cfg(feature = "storage-new-rocksdb")]
525 DbInstance::NewRocksDb(db) => db.register_fixed_rule(name, rule_impl),
526 #[cfg(feature = "storage-sled")]
527 DbInstance::Sled(db) => db.register_fixed_rule(name, rule_impl),
528 #[cfg(feature = "storage-tikv")]
529 DbInstance::TiKv(db) => db.register_fixed_rule(name, rule_impl),
530 }
531 }
532 pub fn unregister_fixed_rule(&self, name: &str) -> Result<bool> {
534 match self {
535 DbInstance::Mem(db) => db.unregister_fixed_rule(name),
536 #[cfg(feature = "storage-sqlite")]
537 DbInstance::Sqlite(db) => db.unregister_fixed_rule(name),
538 #[cfg(feature = "storage-rocksdb")]
539 DbInstance::RocksDb(db) => db.unregister_fixed_rule(name),
540 #[cfg(feature = "storage-new-rocksdb")]
541 DbInstance::NewRocksDb(db) => db.unregister_fixed_rule(name),
542 #[cfg(feature = "storage-sled")]
543 DbInstance::Sled(db) => db.unregister_fixed_rule(name),
544 #[cfg(feature = "storage-tikv")]
545 DbInstance::TiKv(db) => db.unregister_fixed_rule(name),
546 }
547 }
548
549 pub fn run_multi_transaction(
551 &self,
552 write: bool,
553 payloads: Receiver<TransactionPayload>,
554 results: Sender<Result<NamedRows>>,
555 ) {
556 match self {
557 DbInstance::Mem(db) => db.run_multi_transaction(write, payloads, results),
558 #[cfg(feature = "storage-sqlite")]
559 DbInstance::Sqlite(db) => db.run_multi_transaction(write, payloads, results),
560 #[cfg(feature = "storage-rocksdb")]
561 DbInstance::RocksDb(db) => db.run_multi_transaction(write, payloads, results),
562 #[cfg(feature = "storage-new-rocksdb")]
563 DbInstance::NewRocksDb(db) => db.run_multi_transaction(write, payloads, results),
564 #[cfg(feature = "storage-sled")]
565 DbInstance::Sled(db) => db.run_multi_transaction(write, payloads, results),
566 #[cfg(feature = "storage-tikv")]
567 DbInstance::TiKv(db) => db.run_multi_transaction(write, payloads, results),
568 }
569 }
570 pub fn multi_transaction(&self, write: bool) -> MultiTransaction {
573 let (app2db_send, app2db_recv) = bounded(1);
574 let (db2app_send, db2app_recv) = bounded(1);
575 let db = self.clone();
576 #[cfg(target_arch = "wasm32")]
577 std::thread::spawn(move || db.run_multi_transaction(write, app2db_recv, db2app_send));
578 #[cfg(not(target_arch = "wasm32"))]
579 rayon::spawn(move || db.run_multi_transaction(write, app2db_recv, db2app_send));
580 MultiTransaction {
581 sender: app2db_send,
582 receiver: db2app_recv,
583 }
584 }
585}
586
587pub struct MultiTransaction {
590 pub sender: Sender<TransactionPayload>,
592 pub receiver: Receiver<Result<NamedRows>>,
594}
595
596impl MultiTransaction {
597 pub fn run_script(
599 &self,
600 payload: &str,
601 params: BTreeMap<String, DataValue>,
602 ) -> Result<NamedRows> {
603 if let Err(err) = self
604 .sender
605 .send(TransactionPayload::Query((payload.to_string(), params)))
606 {
607 bail!(err);
608 }
609 match self.receiver.recv() {
610 Ok(r) => r,
611 Err(err) => bail!(err),
612 }
613 }
614 pub fn commit(&self) -> Result<()> {
616 if let Err(err) = self.sender.send(TransactionPayload::Commit) {
617 bail!(err);
618 }
619 match self.receiver.recv() {
620 Ok(_) => Ok(()),
621 Err(err) => bail!(err),
622 }
623 }
624 pub fn abort(&self) -> Result<()> {
626 if let Err(err) = self.sender.send(TransactionPayload::Abort) {
627 bail!(err);
628 }
629 match self.receiver.recv() {
630 Ok(_) => Ok(()),
631 Err(err) => bail!(err),
632 }
633 }
634}
635
636pub fn format_error_as_json(mut err: Report, source: Option<&str>) -> JsonValue {
638 if err.source_code().is_none() {
639 if let Some(src) = source {
640 err = err.with_source_code(format!("{src} "));
641 }
642 }
643 let mut text_err = String::new();
644 let mut json_err = String::new();
645 TEXT_ERR_HANDLER
646 .render_report(&mut text_err, err.as_ref())
647 .expect("render text error failed");
648 JSON_ERR_HANDLER
649 .render_report(&mut json_err, err.as_ref())
650 .expect("render json error failed");
651 let mut json: serde_json::Value =
652 serde_json::from_str(&json_err).expect("parse rendered json error failed");
653 let map = json.as_object_mut().unwrap();
654 map.insert("ok".to_string(), json!(false));
655 map.insert("display".to_string(), json!(text_err));
656 json
657}
658
659lazy_static! {
660 static ref TEXT_ERR_HANDLER: GraphicalReportHandler = miette::GraphicalReportHandler::new()
661 .with_theme(GraphicalTheme {
662 characters: ThemeCharacters::unicode(),
663 styles: ThemeStyles::ansi()
664 });
665 static ref JSON_ERR_HANDLER: JSONReportHandler = miette::JSONReportHandler::new();
666}