1#![doc = document_features::document_features!()]
29#![warn(rust_2018_idioms, future_incompatible)]
30#![warn(missing_docs)]
31#![allow(clippy::type_complexity)]
32#![allow(clippy::too_many_arguments)]
33
34use std::collections::BTreeMap;
35use std::path::Path;
36#[allow(unused_imports)]
37use std::time::Instant;
38
39use crossbeam::channel::{bounded, Receiver, Sender};
40use lazy_static::lazy_static;
41pub use miette::Error;
42use miette::Report;
43#[allow(unused_imports)]
44use miette::{
45 bail, miette, GraphicalReportHandler, GraphicalTheme, IntoDiagnostic, JSONReportHandler,
46 Result, ThemeCharacters, ThemeStyles,
47};
48use serde_json::json;
49
50pub use data::value::{DataValue, Num, RegexWrapper, UuidWrapper, Validity, ValidityTs};
51pub use fixed_rule::{FixedRule, FixedRuleInputRelation, FixedRulePayload};
52pub use runtime::db::Db;
53pub use runtime::db::NamedRows;
54pub use runtime::relation::decode_tuple_from_kv;
55pub use runtime::temp_store::RegularTempStore;
56pub use storage::mem::{new_cozo_mem, MemStorage};
57#[cfg(feature = "storage-rocksdb")]
58pub use storage::rocks::{new_cozo_rocksdb, RocksDbStorage};
59#[cfg(feature = "storage-sled")]
60pub use storage::sled::{new_cozo_sled, SledStorage};
61#[cfg(feature = "storage-sqlite")]
62pub use storage::sqlite::{new_cozo_sqlite, SqliteStorage};
63#[cfg(feature = "storage-tikv")]
64pub use storage::tikv::{new_cozo_tikv, TiKvStorage};
65pub use storage::{Storage, StoreTx};
66
67pub use crate::data::expr::Expr;
68use crate::data::json::JsonValue;
69pub use crate::data::symb::Symbol;
70pub use crate::data::value::{JsonData, Vector};
71pub use crate::fixed_rule::SimpleFixedRule;
72pub use crate::parse::SourceSpan;
73pub use crate::runtime::callback::CallbackOp;
74pub use crate::runtime::db::evaluate_expressions;
75pub use crate::runtime::db::get_variables;
76pub use crate::runtime::db::Poison;
77pub use crate::runtime::db::ScriptMutability;
78pub use crate::runtime::db::TransactionPayload;
79
80pub(crate) mod data;
81pub(crate) mod fixed_rule;
82pub(crate) mod fts;
83pub(crate) mod parse;
84pub(crate) mod query;
85pub(crate) mod runtime;
86pub(crate) mod storage;
87pub(crate) mod utils;
88
89#[derive(Clone)]
99pub enum DbInstance {
100 Mem(Db<MemStorage>),
102 #[cfg(feature = "storage-sqlite")]
103 Sqlite(Db<SqliteStorage>),
105 #[cfg(feature = "storage-rocksdb")]
106 RocksDb(Db<RocksDbStorage>),
108 #[cfg(feature = "storage-sled")]
109 Sled(Db<SledStorage>),
111 #[cfg(feature = "storage-tikv")]
112 TiKv(Db<TiKvStorage>),
114}
115
116impl Default for DbInstance {
117 fn default() -> Self {
118 Self::new("mem", "", Default::default()).unwrap()
119 }
120}
121
122impl DbInstance {
123 #[allow(unused_variables)]
138 pub fn new(engine: &str, path: impl AsRef<Path>, options: &str) -> Result<Self> {
139 let options = if options.is_empty() { "{}" } else { options };
140 Ok(match engine {
141 "mem" => Self::Mem(new_cozo_mem()?),
142 #[cfg(feature = "storage-sqlite")]
143 "sqlite" => Self::Sqlite(new_cozo_sqlite(path)?),
144 #[cfg(feature = "storage-rocksdb")]
145 "rocksdb" => Self::RocksDb(new_cozo_rocksdb(path)?),
146 #[cfg(feature = "storage-sled")]
147 "sled" => Self::Sled(new_cozo_sled(path)?),
148 #[cfg(feature = "storage-tikv")]
149 "tikv" => {
150 #[derive(serde_derive::Deserialize)]
151 struct TiKvOpts {
152 end_points: Vec<String>,
153 optimistic: bool,
154 }
155 let opts: TiKvOpts = serde_json::from_str(options).into_diagnostic()?;
156 Self::TiKv(new_cozo_tikv(opts.end_points.clone(), opts.optimistic)?)
157 }
158 k => bail!(
159 "database engine '{}' not supported (maybe not compiled in)",
160 k
161 ),
162 })
163 }
164 pub fn new_with_str(
166 engine: &str,
167 path: &str,
168 options: &str,
169 ) -> std::result::Result<Self, String> {
170 Self::new(engine, path, options).map_err(|err| err.to_string())
171 }
172 pub fn run_script(
174 &self,
175 payload: &str,
176 params: BTreeMap<String, DataValue>,
177 mutability: ScriptMutability,
178 ) -> Result<NamedRows> {
179 match self {
180 DbInstance::Mem(db) => db.run_script(payload, params, mutability),
181 #[cfg(feature = "storage-sqlite")]
182 DbInstance::Sqlite(db) => db.run_script(payload, params, mutability),
183 #[cfg(feature = "storage-rocksdb")]
184 DbInstance::RocksDb(db) => db.run_script(payload, params, mutability),
185 #[cfg(feature = "storage-sled")]
186 DbInstance::Sled(db) => db.run_script(payload, params, mutability),
187 #[cfg(feature = "storage-tikv")]
188 DbInstance::TiKv(db) => db.run_script(payload, params, mutability),
189 }
190 }
191 pub fn run_default(&self, payload: &str) -> Result<NamedRows> {
193 self.run_script(payload, BTreeMap::new(), ScriptMutability::Mutable)
194 }
195 pub fn run_script_fold_err(
199 &self,
200 payload: &str,
201 params: BTreeMap<String, DataValue>,
202 mutability: ScriptMutability,
203 ) -> JsonValue {
204 #[cfg(not(target_arch = "wasm32"))]
205 let start = Instant::now();
206
207 match self.run_script(payload, params, mutability) {
208 Ok(named_rows) => {
209 let mut j_val = named_rows.into_json();
210 #[cfg(not(target_arch = "wasm32"))]
211 let took = start.elapsed().as_secs_f64();
212 let map = j_val.as_object_mut().unwrap();
213 map.insert("ok".to_string(), json!(true));
214 #[cfg(not(target_arch = "wasm32"))]
215 map.insert("took".to_string(), json!(took));
216
217 j_val
218 }
219 Err(err) => format_error_as_json(err, Some(payload)),
220 }
221 }
222 pub fn run_script_str(&self, payload: &str, params: &str, immutable: bool) -> String {
225 let params_json = if params.is_empty() {
226 BTreeMap::default()
227 } else {
228 match serde_json::from_str::<BTreeMap<String, JsonValue>>(params) {
229 Ok(map) => map
230 .into_iter()
231 .map(|(k, v)| (k, DataValue::from(v)))
232 .collect(),
233 Err(_) => {
234 return json!({"ok": false, "message": "params argument is not a JSON map"})
235 .to_string();
236 }
237 }
238 };
239 self.run_script_fold_err(
240 payload,
241 params_json,
242 if immutable {
243 ScriptMutability::Immutable
244 } else {
245 ScriptMutability::Mutable
246 },
247 )
248 .to_string()
249 }
250 pub fn export_relations<I, T>(&self, relations: I) -> Result<BTreeMap<String, NamedRows>>
252 where
253 T: AsRef<str>,
254 I: Iterator<Item=T>,
255 {
256 match self {
257 DbInstance::Mem(db) => db.export_relations(relations),
258 #[cfg(feature = "storage-sqlite")]
259 DbInstance::Sqlite(db) => db.export_relations(relations),
260 #[cfg(feature = "storage-rocksdb")]
261 DbInstance::RocksDb(db) => db.export_relations(relations),
262 #[cfg(feature = "storage-sled")]
263 DbInstance::Sled(db) => db.export_relations(relations),
264 #[cfg(feature = "storage-tikv")]
265 DbInstance::TiKv(db) => db.export_relations(relations),
266 }
267 }
268 pub fn export_relations_str(&self, data: &str) -> String {
271 match self.export_relations_str_inner(data) {
272 Ok(s) => {
273 let ret = json!({"ok": true, "data": s});
274 format!("{ret}")
275 }
276 Err(err) => {
277 let ret = json!({"ok": false, "message": err.to_string()});
278 format!("{ret}")
279 }
280 }
281 }
282 fn export_relations_str_inner(&self, data: &str) -> Result<JsonValue> {
283 #[derive(serde_derive::Deserialize)]
284 struct Payload {
285 relations: Vec<String>,
286 }
287 let j_val: Payload = serde_json::from_str(data).into_diagnostic()?;
288 let results = self.export_relations(j_val.relations.iter().map(|s| s as &str))?;
289 Ok(results
290 .into_iter()
291 .map(|(k, v)| (k, v.into_json()))
292 .collect())
293 }
294 pub fn import_relations(&self, data: BTreeMap<String, NamedRows>) -> Result<()> {
296 match self {
297 DbInstance::Mem(db) => db.import_relations(data),
298 #[cfg(feature = "storage-sqlite")]
299 DbInstance::Sqlite(db) => db.import_relations(data),
300 #[cfg(feature = "storage-rocksdb")]
301 DbInstance::RocksDb(db) => db.import_relations(data),
302 #[cfg(feature = "storage-sled")]
303 DbInstance::Sled(db) => db.import_relations(data),
304 #[cfg(feature = "storage-tikv")]
305 DbInstance::TiKv(db) => db.import_relations(data),
306 }
307 }
308 pub fn import_relations_str(&self, data: &str) -> String {
311 match self.import_relations_str_with_err(data) {
312 Ok(()) => {
313 format!("{}", json!({"ok": true}))
314 }
315 Err(err) => {
316 format!("{}", json!({"ok": false, "message": err.to_string()}))
317 }
318 }
319 }
320 pub fn import_relations_str_with_err(&self, data: &str) -> Result<()> {
323 let json_data: JsonValue = serde_json::from_str(data).into_diagnostic()?;
324 let json_object = json_data
325 .as_object()
326 .ok_or_else(|| miette!("A JSON object is requried"))?;
327 let mapping = json_object
328 .iter()
329 .map(|(k, v)| -> Result<(String, NamedRows)> {
330 Ok((k.to_string(), NamedRows::from_json(v)?))
331 })
332 .collect::<Result<_>>()?;
333 self.import_relations(mapping)
334 }
335 pub fn backup_db(&self, out_file: impl AsRef<Path>) -> Result<()> {
337 match self {
338 DbInstance::Mem(db) => db.backup_db(out_file),
339 #[cfg(feature = "storage-sqlite")]
340 DbInstance::Sqlite(db) => db.backup_db(out_file),
341 #[cfg(feature = "storage-rocksdb")]
342 DbInstance::RocksDb(db) => db.backup_db(out_file),
343 #[cfg(feature = "storage-sled")]
344 DbInstance::Sled(db) => db.backup_db(out_file),
345 #[cfg(feature = "storage-tikv")]
346 DbInstance::TiKv(db) => db.backup_db(out_file),
347 }
348 }
349 pub fn backup_db_str(&self, out_file: impl AsRef<Path>) -> String {
352 match self.backup_db(out_file) {
353 Ok(_) => json!({"ok": true}).to_string(),
354 Err(err) => json!({"ok": false, "message": err.to_string()}).to_string(),
355 }
356 }
357 pub fn restore_backup(&self, in_file: impl AsRef<Path>) -> Result<()> {
359 match self {
360 DbInstance::Mem(db) => db.restore_backup(in_file),
361 #[cfg(feature = "storage-sqlite")]
362 DbInstance::Sqlite(db) => db.restore_backup(in_file),
363 #[cfg(feature = "storage-rocksdb")]
364 DbInstance::RocksDb(db) => db.restore_backup(in_file),
365 #[cfg(feature = "storage-sled")]
366 DbInstance::Sled(db) => db.restore_backup(in_file),
367 #[cfg(feature = "storage-tikv")]
368 DbInstance::TiKv(db) => db.restore_backup(in_file),
369 }
370 }
371 pub fn restore_backup_str(&self, in_file: impl AsRef<Path>) -> String {
374 match self.restore_backup(in_file) {
375 Ok(_) => json!({"ok": true}).to_string(),
376 Err(err) => json!({"ok": false, "message": err.to_string()}).to_string(),
377 }
378 }
379 pub fn import_from_backup(
381 &self,
382 in_file: impl AsRef<Path>,
383 relations: &[String],
384 ) -> Result<()> {
385 match self {
386 DbInstance::Mem(db) => db.import_from_backup(in_file, relations),
387 #[cfg(feature = "storage-sqlite")]
388 DbInstance::Sqlite(db) => db.import_from_backup(in_file, relations),
389 #[cfg(feature = "storage-rocksdb")]
390 DbInstance::RocksDb(db) => db.import_from_backup(in_file, relations),
391 #[cfg(feature = "storage-sled")]
392 DbInstance::Sled(db) => db.import_from_backup(in_file, relations),
393 #[cfg(feature = "storage-tikv")]
394 DbInstance::TiKv(db) => db.import_from_backup(in_file, relations),
395 }
396 }
397 pub fn import_from_backup_str(&self, payload: &str) -> String {
400 match self.import_from_backup_str_inner(payload) {
401 Ok(_) => json!({"ok": true}).to_string(),
402 Err(err) => json!({"ok": false, "message": err.to_string()}).to_string(),
403 }
404 }
405 fn import_from_backup_str_inner(&self, payload: &str) -> Result<()> {
406 #[derive(serde_derive::Deserialize)]
407 struct Payload {
408 path: String,
409 relations: Vec<String>,
410 }
411 let json_payload: Payload = serde_json::from_str(payload).into_diagnostic()?;
412
413 self.import_from_backup(&json_payload.path, &json_payload.relations)
414 }
415
416 #[cfg(not(target_arch = "wasm32"))]
418 pub fn register_callback(
419 &self,
420 relation: &str,
421 capacity: Option<usize>,
422 ) -> (u32, Receiver<(CallbackOp, NamedRows, NamedRows)>) {
423 match self {
424 DbInstance::Mem(db) => db.register_callback(relation, capacity),
425 #[cfg(feature = "storage-sqlite")]
426 DbInstance::Sqlite(db) => db.register_callback(relation, capacity),
427 #[cfg(feature = "storage-rocksdb")]
428 DbInstance::RocksDb(db) => db.register_callback(relation, capacity),
429 #[cfg(feature = "storage-sled")]
430 DbInstance::Sled(db) => db.register_callback(relation, capacity),
431 #[cfg(feature = "storage-tikv")]
432 DbInstance::TiKv(db) => db.register_callback(relation, capacity),
433 }
434 }
435
436 #[cfg(not(target_arch = "wasm32"))]
438 pub fn unregister_callback(&self, id: u32) -> bool {
439 match self {
440 DbInstance::Mem(db) => db.unregister_callback(id),
441 #[cfg(feature = "storage-sqlite")]
442 DbInstance::Sqlite(db) => db.unregister_callback(id),
443 #[cfg(feature = "storage-rocksdb")]
444 DbInstance::RocksDb(db) => db.unregister_callback(id),
445 #[cfg(feature = "storage-sled")]
446 DbInstance::Sled(db) => db.unregister_callback(id),
447 #[cfg(feature = "storage-tikv")]
448 DbInstance::TiKv(db) => db.unregister_callback(id),
449 }
450 }
451 pub fn register_fixed_rule<R>(&self, name: String, rule_impl: R) -> Result<()>
453 where
454 R: FixedRule + 'static,
455 {
456 match self {
457 DbInstance::Mem(db) => db.register_fixed_rule(name, rule_impl),
458 #[cfg(feature = "storage-sqlite")]
459 DbInstance::Sqlite(db) => db.register_fixed_rule(name, rule_impl),
460 #[cfg(feature = "storage-rocksdb")]
461 DbInstance::RocksDb(db) => db.register_fixed_rule(name, rule_impl),
462 #[cfg(feature = "storage-sled")]
463 DbInstance::Sled(db) => db.register_fixed_rule(name, rule_impl),
464 #[cfg(feature = "storage-tikv")]
465 DbInstance::TiKv(db) => db.register_fixed_rule(name, rule_impl),
466 }
467 }
468 pub fn unregister_fixed_rule(&self, name: &str) -> Result<bool> {
470 match self {
471 DbInstance::Mem(db) => db.unregister_fixed_rule(name),
472 #[cfg(feature = "storage-sqlite")]
473 DbInstance::Sqlite(db) => db.unregister_fixed_rule(name),
474 #[cfg(feature = "storage-rocksdb")]
475 DbInstance::RocksDb(db) => db.unregister_fixed_rule(name),
476 #[cfg(feature = "storage-sled")]
477 DbInstance::Sled(db) => db.unregister_fixed_rule(name),
478 #[cfg(feature = "storage-tikv")]
479 DbInstance::TiKv(db) => db.unregister_fixed_rule(name),
480 }
481 }
482
483 pub fn run_multi_transaction(
485 &self,
486 write: bool,
487 payloads: Receiver<TransactionPayload>,
488 results: Sender<Result<NamedRows>>,
489 ) {
490 match self {
491 DbInstance::Mem(db) => db.run_multi_transaction(write, payloads, results),
492 #[cfg(feature = "storage-sqlite")]
493 DbInstance::Sqlite(db) => db.run_multi_transaction(write, payloads, results),
494 #[cfg(feature = "storage-rocksdb")]
495 DbInstance::RocksDb(db) => db.run_multi_transaction(write, payloads, results),
496 #[cfg(feature = "storage-sled")]
497 DbInstance::Sled(db) => db.run_multi_transaction(write, payloads, results),
498 #[cfg(feature = "storage-tikv")]
499 DbInstance::TiKv(db) => db.run_multi_transaction(write, payloads, results),
500 }
501 }
502 pub fn multi_transaction(&self, write: bool) -> MultiTransaction {
505 let (app2db_send, app2db_recv) = bounded(1);
506 let (db2app_send, db2app_recv) = bounded(1);
507 let db = self.clone();
508 #[cfg(target_arch = "wasm32")]
509 std::thread::spawn(move || db.run_multi_transaction(write, app2db_recv, db2app_send));
510 #[cfg(not(target_arch = "wasm32"))]
511 rayon::spawn(move || db.run_multi_transaction(write, app2db_recv, db2app_send));
512 MultiTransaction {
513 sender: app2db_send,
514 receiver: db2app_recv,
515 }
516 }
517}
518
519pub struct MultiTransaction {
522 pub sender: Sender<TransactionPayload>,
524 pub receiver: Receiver<Result<NamedRows>>,
526}
527
528impl MultiTransaction {
529 pub fn run_script(
531 &self,
532 payload: &str,
533 params: BTreeMap<String, DataValue>,
534 ) -> Result<NamedRows> {
535 if let Err(err) = self
536 .sender
537 .send(TransactionPayload::Query((payload.to_string(), params)))
538 {
539 bail!(err);
540 }
541 match self.receiver.recv() {
542 Ok(r) => r,
543 Err(err) => bail!(err),
544 }
545 }
546 pub fn commit(&self) -> Result<()> {
548 if let Err(err) = self.sender.send(TransactionPayload::Commit) {
549 bail!(err);
550 }
551 match self.receiver.recv() {
552 Ok(_) => Ok(()),
553 Err(err) => bail!(err),
554 }
555 }
556 pub fn abort(&self) -> Result<()> {
558 if let Err(err) = self.sender.send(TransactionPayload::Abort) {
559 bail!(err);
560 }
561 match self.receiver.recv() {
562 Ok(_) => Ok(()),
563 Err(err) => bail!(err),
564 }
565 }
566}
567
568pub fn format_error_as_json(mut err: Report, source: Option<&str>) -> JsonValue {
570 if err.source_code().is_none() {
571 if let Some(src) = source {
572 err = err.with_source_code(format!("{src} "));
573 }
574 }
575 let mut text_err = String::new();
576 let mut json_err = String::new();
577 TEXT_ERR_HANDLER
578 .render_report(&mut text_err, err.as_ref())
579 .expect("render text error failed");
580 JSON_ERR_HANDLER
581 .render_report(&mut json_err, err.as_ref())
582 .expect("render json error failed");
583 let mut json: serde_json::Value =
584 serde_json::from_str(&json_err).expect("parse rendered json error failed");
585 let map = json.as_object_mut().unwrap();
586 map.insert("ok".to_string(), json!(false));
587 map.insert("display".to_string(), json!(text_err));
588 json
589}
590
591lazy_static! {
592 static ref TEXT_ERR_HANDLER: GraphicalReportHandler = miette::GraphicalReportHandler::new()
593 .with_theme(GraphicalTheme {
594 characters: ThemeCharacters::unicode(),
595 styles: ThemeStyles::ansi()
596 });
597 static ref JSON_ERR_HANDLER: JSONReportHandler = miette::JSONReportHandler::new();
598}