#![allow(clippy::match_ref_pats)]
#![allow(clippy::option_map_unit_fn)]
use thiserror::Error;
pub mod disperse;
pub mod driver;
#[cfg(feature = "explain")]
pub mod explain;
pub mod network;
pub mod retrieve;
pub mod soup;
pub mod sql;
pub mod types;
use rusqlite::hooks::Action;
use rusqlite::{OptionalExtension, ToSql};
use std::cell::RefCell;
use std::str::FromStr;
use std::sync::atomic::{self, AtomicBool};
use std::sync::{Arc, Mutex};
use crate::driver::{TypeTag, ENTITY_ID_TAG};
pub use crate::either::Either;
pub use crate::network::{GenericNetwork, Network, Ordering, OwnedNetwork};
pub use crate::retrieve::{NamedNetwork, Pattern, Variable};
pub use crate::soup::Encoded;
pub use crate::types::{Attribute, AttributeRef, Entity, Value, ValueRef};
pub mod traits {
pub use super::{BorrowedParse, FromBorrowedStr};
pub use crate::sql::PushToQuery;
}
pub(crate) const SCHEMA: &str = include_str!("../schema.sql");
pub fn create_schema(db: &rusqlite::Connection) -> rusqlite::Result<()> {
db.execute_batch(SCHEMA)
}
pub fn create_schema_in_transaction(db: &mut rusqlite::Connection) -> rusqlite::Result<()> {
let tx = db.transaction()?;
create_schema(&tx)?;
tx.commit()
}
pub fn new_in_memory() -> rusqlite::Result<rusqlite::Connection> {
let mut db = rusqlite::Connection::open_in_memory()?;
create_schema_in_transaction(&mut db)?;
Ok(db)
}
#[derive(Debug, Error)]
pub enum Error {
#[error("sql error")]
Sql(#[from] rusqlite::Error),
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
type Change = i64;
#[derive(Debug)]
pub struct DontWoof<'tx> {
tx: HookedTransaction<'tx>,
changes: Arc<Mutex<Vec<Change>>>,
changes_swap: RefCell<Vec<Change>>,
changes_failed: Arc<AtomicBool>,
}
impl<'tx> DontWoof<'tx> {
pub fn new(db: &'tx mut rusqlite::Connection) -> Result<Self> {
db.execute("pragma foreign_keys=on", [])?;
let tx = db.transaction()?;
Ok(Self::from(tx))
}
pub fn attribute<A: AsRef<AttributeRef>>(&self, a: Encoded<A>) -> Result<Encoded<Entity>> {
let sql = r#"SELECT rowid FROM "attributes" WHERE ident = ?"#;
self.tx
.query_row(sql, &[&a.rowid], |row| row.get::<_, i64>(0))
.map(Encoded::from_rowid)
.map_err(Error::from)
}
pub fn new_entity(&self) -> Result<Encoded<Entity>> {
let insert = r#"INSERT INTO "soup" (t, v) VALUES (?, randomblob(16))"#;
let mut insert = self.tx.prepare_cached(insert)?;
let n = insert.execute(rusqlite::params![ENTITY_ID_TAG])?;
assert_eq!(n, 1);
let rowid = self.tx.last_insert_rowid();
Ok(Encoded::from_rowid(rowid))
}
pub fn decode<T: driver::FromTypeTagAndSqlValue>(&self, e: Encoded<T>) -> Result<T> {
use driver::FromSqlRow;
let select = r#"SELECT t, v
FROM "soup"
WHERE rowid = ?"#;
let mut select = self.tx.prepare_cached(select)?;
let t = select.query_row(rusqlite::params![e.rowid], |row| {
driver::just::<T>().from_start_of_row(row)
})?;
Ok(t)
}
pub fn fluent_entity(&self) -> Result<FluentEntity> {
self.new_entity().map(|e| self.fluent(e))
}
pub fn fluent(&self, e: Encoded<Entity>) -> FluentEntity {
FluentEntity { woof: self, e }
}
pub fn encode<V: TypeTag + ToSql>(&self, val: V) -> Result<Encoded<<V as TypeTag>::Factory>> {
let rowid: i64 = self._encode(val.type_tag(), &val as &dyn ToSql)?;
Ok(Encoded::from_rowid(rowid))
}
fn _encode(&self, tag: i64, val: &dyn ToSql) -> Result<i64> {
let params = rusqlite::params![tag, val];
let select = r#"SELECT rowid
FROM "soup"
WHERE t = ?
AND v = ?"#;
let mut select = self.tx.prepare_cached(select)?;
let rowid = match select
.query_row(params, |row| row.get::<_, i64>(0))
.optional()?
{
Some(rowid) => rowid,
None => {
let insert = r#"INSERT INTO "soup" (t, v) VALUES (?, ?)"#;
let mut insert = self.tx.prepare_cached(insert)?;
let n = insert.execute(params)?;
assert_eq!(n, 1);
self.tx.last_insert_rowid()
}
};
Ok(rowid)
}
pub fn assert<V: TypeTag>(
&self,
e: Encoded<Entity>,
a: Encoded<Entity>,
v: Encoded<V>,
) -> Result<&Self> {
let mut stmt = self
.tx
.prepare_cached(r#"INSERT INTO "triples" (e,a,v) VALUES (?, ?, ?)"#)?;
let n = stmt.execute(&[&e.rowid, &a.rowid, &v.rowid])?;
assert_eq!(n, 1);
self._update_attribute_indexes()?;
Ok(self)
}
fn _update_attribute_indexes(&self) -> rusqlite::Result<()> {
if let Ok(mut swap) = self.changes_swap.try_borrow_mut() {
debug_assert!(swap.is_empty());
if let Ok(ref mut mutex) = self.changes.try_lock() {
if mutex.is_empty() {
return Ok(());
}
std::mem::swap::<Vec<Change>>(mutex.as_mut(), swap.as_mut());
} else {
debug_assert!(false, "failed to lock changes");
self.changes_failed.store(true, atomic::Ordering::SeqCst);
return Ok(());
}
swap.sort_unstable();
swap.dedup();
let result = self._execute_attribute_index_changes(swap.as_slice());
swap.clear();
result.map(drop)
} else {
debug_assert!(false, "failed to borrow changes_swap");
self.changes_failed.store(true, atomic::Ordering::SeqCst);
Ok(())
}
}
fn _execute_attribute_index_changes(&self, swap: &[Change]) -> rusqlite::Result<()> {
swap.iter().try_for_each(|rowid| {
let mut stmt = self
.tx
.prepare_cached(r#"SELECT count(*) FROM "attributes" WHERE rowid = ?"#)?;
let c: i64 = stmt.query_row(&[rowid], |row| row.get(0))?;
let sql = if 0 < c {
format!(
r#"CREATE INDEX
IF NOT EXISTS "triples-ave-{rowid}"
ON "triples" (v, e)
WHERE a = {rowid}"#,
rowid = rowid
)
} else {
format!(
r#"DROP INDEX IF EXISTS "triples-ave-{rowid}""#,
rowid = rowid
)
};
self.tx.execute(&sql, []).map(drop)
})
}
pub fn retract<V: TypeTag>(
&self,
e: Encoded<Entity>,
a: Encoded<Entity>,
v: Encoded<V>,
) -> Result<&Self> {
let mut stmt = self.tx.prepare_cached(
r#"DELETE FROM "triples"
WHERE e = ?
AND a = ?
AND v = ?"#,
)?;
let n = stmt.execute(&[&e.rowid, &a.rowid, &v.rowid])?;
if 0 < n {
self._update_attribute_indexes()?;
}
Ok(self)
}
pub fn optimize(&self) -> rusqlite::Result<()> {
self.tx
.execute("SELECT * FROM pragma_optimize()", [])
.map(drop)
}
pub fn prefetch_attributes<V>(&self, network: &mut Network<V>) -> Result<()>
where
V: TypeTag + ToSql,
{
use crate::network::{Constraint, Field, Match};
network
.constraints_mut()
.iter_mut()
.try_for_each(|constraint| match constraint {
&mut Constraint::Eq { lh, rh: Match::Value(ref v) }
if lh.field() == Field::Attribute =>
{
let mut stmt = self.tx.prepare_cached(
r#"
SELECT a.rowid
FROM attributes a
JOIN soup s ON a.ident = s.rowid
WHERE s.t = ? AND s.v = ?
LIMIT 1
"#,
)?;
let type_tag = v.type_tag();
let rh = stmt
.query_row(rusqlite::params![type_tag, v], |row| row.get(0))
.map(Encoded::from_rowid)
.map(Match::Encoded)
.optional()?;
if let Some(rh) = rh {
*constraint = Constraint::Eq { lh, rh };
}
Ok(())
}
_ => Result::<(), Error>::Ok(()),
})?;
Ok(())
}
pub fn into_tx(self) -> rusqlite::Transaction<'tx> {
self.tx.unwrap()
}
}
#[derive(Debug)]
struct HookedTransaction<'tx>(Option<rusqlite::Transaction<'tx>>);
impl<'tx> std::ops::Deref for HookedTransaction<'tx> {
type Target = rusqlite::Transaction<'tx>;
fn deref(&self) -> &Self::Target {
self.0.as_ref().unwrap()
}
}
impl<'tx> HookedTransaction<'tx> {
fn new<F>(tx: rusqlite::Transaction<'tx>, hook: F) -> Self
where
F: FnMut(Action, &str, &str, i64) + Send + 'static,
{
tx.update_hook(Some(hook));
HookedTransaction(Some(tx))
}
fn unwrap(mut self) -> rusqlite::Transaction<'tx> {
let tx = self.0.take().unwrap();
HookedTransaction::_unhook(&tx);
tx
}
fn _unhook(db: &rusqlite::Connection) {
let no_hook = None::<fn(_: Action, _: &str, _: &str, _: i64)>;
db.update_hook(no_hook);
}
}
impl<'tx> Drop for HookedTransaction<'tx> {
fn drop(&mut self) {
if let Some(tx) = self.0.take() {
HookedTransaction::_unhook(&tx);
}
}
}
impl<'tx> From<rusqlite::Transaction<'tx>> for DontWoof<'tx> {
fn from(tx: rusqlite::Transaction<'tx>) -> Self {
let foreign_keys: i64 = tx
.query_row("pragma foreign_keys", [], |row| row.get(0))
.unwrap();
assert!(1 == foreign_keys);
let changes = Arc::new(Mutex::new(Vec::<Change>::default()));
let changes_failed = Arc::new(AtomicBool::new(false));
let hook = {
let changes = Arc::clone(&changes);
let changes_failed = Arc::clone(&changes_failed);
move |_action: Action, _database: &str, table: &str, rowid: i64| {
if table == "attributes" {
if let Ok(ref mut mutex) = changes.try_lock() {
mutex.push(rowid);
} else {
changes_failed.store(true, atomic::Ordering::SeqCst);
}
}
}
};
DontWoof {
tx: HookedTransaction::new(tx, hook),
changes,
changes_swap: RefCell::new(Vec::<Change>::default()),
changes_failed,
}
}
}
impl<'tx> std::ops::Deref for DontWoof<'tx> {
type Target = rusqlite::Transaction<'tx>;
fn deref(&self) -> &Self::Target {
&self.tx
}
}
pub struct FluentEntity<'w, 'tx> {
woof: &'w DontWoof<'tx>,
e: Encoded<Entity>,
}
impl FluentEntity<'_, '_> {
pub fn assert<V: TypeTag>(&self, a: Encoded<Entity>, v: Encoded<V>) -> Result<&Self> {
self.woof.assert(self.e, a, v)?;
Ok(self)
}
pub fn retract<V: TypeTag>(&self, a: Encoded<Entity>, v: Encoded<V>) -> Result<&Self> {
self.woof.retract(self.e, a, v)?;
Ok(self)
}
}
impl From<&FluentEntity<'_, '_>> for Encoded<Entity> {
fn from(fl: &FluentEntity) -> Self {
fl.e
}
}
pub mod either {
pub use Either::{Left, Left as left, Right, Right as right};
#[cfg_attr(
feature = "serde",
derive(serde::Serialize, serde::Deserialize),
serde(untagged)
)]
#[derive(Debug, PartialEq)]
pub enum Either<L, R> {
Left(L),
Right(R),
}
impl<L, R> Either<L, R> {
pub fn map_left<LL, F: FnOnce(L) -> LL>(self, f: F) -> Either<LL, R> {
match self {
Either::Left(l) => Either::Left(f(l)),
Either::Right(r) => Either::Right(r),
}
}
pub fn map_right<RR, F: FnOnce(R) -> RR>(self, f: F) -> Either<L, RR> {
match self {
Either::Left(l) => Either::Left(l),
Either::Right(r) => Either::Right(f(r)),
}
}
}
use super::FromBorrowedStr;
impl<'a, L, R> FromBorrowedStr<'a> for Either<L, R>
where
L: FromBorrowedStr<'a>,
R: FromBorrowedStr<'a>,
{
type Err = (
<L as FromBorrowedStr<'a>>::Err,
<R as FromBorrowedStr<'a>>::Err,
);
fn from_borrowed_str(s: &'a str) -> Result<Self, Self::Err> {
L::from_borrowed_str(s).map(Either::Left).or_else(|a_err| {
R::from_borrowed_str(s)
.map(Either::Right)
.map_err(|b_err| (a_err, b_err))
})
}
}
}
pub trait FromBorrowedStr<'a>: Sized {
type Err;
fn from_borrowed_str(s: &'a str) -> Result<Self, Self::Err>;
}
impl<'a, T> FromBorrowedStr<'a> for T
where
T: FromStr,
{
type Err = <T as FromStr>::Err;
fn from_borrowed_str(s: &'a str) -> Result<Self, Self::Err> {
s.parse()
}
}
pub trait BorrowedParse<'a> {
fn borrowed_parse<F>(&'a self) -> Result<F, <F as FromBorrowedStr<'a>>::Err>
where
F: FromBorrowedStr<'a>;
}
impl<'a> BorrowedParse<'a> for str {
fn borrowed_parse<F>(&'a self) -> Result<F, <F as FromBorrowedStr<'a>>::Err>
where
F: FromBorrowedStr<'a>,
{
F::from_borrowed_str(self)
}
}
pub trait Optional<T> {
fn optional(self) -> Result<Option<T>>;
}
impl<T> Optional<T> for Result<T, Error> {
fn optional(self) -> Result<Option<T>> {
self.map_err(|Error::Sql(err)| err)
.optional()
.map_err(Error::from)
}
}
#[cfg(test)]
mod tests {
use super::*;
pub(crate) fn rusqlite_in_memory() -> Result<rusqlite::Connection> {
let mut db = rusqlite::Connection::open_in_memory()?;
{
let tx = db.transaction()?;
tx.execute_batch(SCHEMA)?;
tx.commit()?;
}
Ok(db)
}
#[test]
fn test_decode_new_entity() -> anyhow::Result<()> {
let mut db = rusqlite_in_memory()?;
let woof = DontWoof::new(&mut db)?;
let e = woof.new_entity()?;
let _ = woof.decode(e)?;
Ok(())
}
#[test]
fn test_decode() -> anyhow::Result<()> {
let mut db = rusqlite_in_memory()?;
let woof = DontWoof::new(&mut db)?;
let v = woof.encode(ValueRef::from("hello world"))?;
assert_eq!(Value::Text("hello world".to_owned()), woof.decode(v)?);
Ok(())
}
#[test]
fn test_retract() -> anyhow::Result<()> {
let mut db = rusqlite_in_memory()?;
let woof = DontWoof::new(&mut db)?;
let db_id = woof.attribute(woof.encode(AttributeRef::from_static(":db/id"))?)?;
let db_attr = woof.attribute(woof.encode(AttributeRef::from_static(":db/attribute"))?)?;
let pet_name = woof
.fluent_entity()?
.assert(db_attr, woof.encode(":pet/name".parse::<Attribute>()?)?)?
.into();
let animal_name: Encoded<Entity> = woof
.fluent_entity()?
.assert(db_attr, woof.encode(":animal/name".parse::<Attribute>()?)?)?
.into();
let garfield: Encoded<Entity> = woof
.fluent_entity()?
.assert(pet_name, woof.encode(ValueRef::from("Garfield"))?)?
.assert(animal_name, woof.encode(ValueRef::from("Cat"))?)?
.into();
assert!(woof.retract(garfield, db_id, garfield).is_err());
assert!(woof
.fluent(animal_name)
.retract(db_attr, woof.encode(":animal/name".parse::<Attribute>()?)?)
.is_err());
use crate::traits::*;
let mut garfield_query = Network::<_>::default();
garfield_query.fluent_triples().link_entity(garfield);
let garfield_facts = garfield_query.select().to_query();
assert_eq!(3, garfield_facts.count(&woof).unwrap());
assert!(woof
.fluent(garfield)
.retract(pet_name, woof.encode(ValueRef::from("Garfield"))?)?
.retract(db_id, garfield)
.is_err());
assert_eq!(2, garfield_facts.count(&woof).unwrap());
woof.fluent(garfield)
.retract(animal_name, woof.encode(ValueRef::from("Cat"))?)?
.retract(db_id, garfield)?;
assert_eq!(0, garfield_facts.count(&woof).unwrap());
woof.fluent(animal_name)
.retract(db_attr, woof.encode(":animal/name".parse::<Attribute>()?)?)?;
Ok(())
}
}