use crate::ctx::Context;
use crate::ctx::MutableContext;
use crate::dbs::capabilities::ExperimentalTarget;
use crate::dbs::Options;
use crate::dbs::Statement;
use crate::doc::CursorDoc;
use crate::doc::CursorValue;
use crate::doc::Document;
use crate::err::Error;
use crate::key::r#ref::Ref;
use crate::kvs::KeyDecode;
use crate::sql::dir::Dir;
use crate::sql::edges::Edges;
use crate::sql::paths::EDGE;
use crate::sql::paths::IN;
use crate::sql::paths::OUT;
use crate::sql::reference::ReferenceDeleteStrategy;
use crate::sql::statements::DeleteStatement;
use crate::sql::statements::UpdateStatement;
use crate::sql::table::Tables;
use crate::sql::value::{Value, Values};
use crate::sql::Data;
use crate::sql::Operator;
use crate::sql::Part;
use crate::sql::Thing;
use futures::StreamExt;
use reblessive::tree::Stk;
impl Document {
pub(super) async fn purge(
&self,
stk: &mut Stk,
ctx: &Context,
opt: &Options,
_stm: &Statement<'_>,
) -> Result<(), Error> {
if !self.changed() {
return Ok(());
}
let txn = ctx.tx();
if let Some(rid) = &self.id {
let (ns, db) = opt.ns_db()?;
txn.del_record(ns, db, &rid.tb, &rid.id).await?;
match (
self.initial.doc.as_ref().pick(&*EDGE),
self.initial.doc.as_ref().pick(&*IN),
self.initial.doc.as_ref().pick(&*OUT),
) {
(Value::Bool(true), Value::Thing(ref l), Value::Thing(ref r)) => {
let mut txn = txn.lock().await;
let (ref o, ref i) = (Dir::Out, Dir::In);
let key = crate::key::graph::new(ns, db, &l.tb, &l.id, o, rid);
txn.del(key).await?;
let key = crate::key::graph::new(ns, db, &rid.tb, &rid.id, i, l);
txn.del(key).await?;
let key = crate::key::graph::new(ns, db, &rid.tb, &rid.id, o, r);
txn.del(key).await?;
let key = crate::key::graph::new(ns, db, &r.tb, &r.id, i, rid);
txn.del(key).await?;
drop(txn);
}
_ => {
let stm = DeleteStatement {
what: Values(vec![Value::from(Edges {
dir: Dir::Both,
from: rid.as_ref().clone(),
what: Tables::default(),
})]),
..DeleteStatement::default()
};
stm.compute(stk, ctx, opt, None).await?;
}
}
if ctx.get_capabilities().allows_experimental(&ExperimentalTarget::RecordReferences) {
let prefix = crate::key::r#ref::prefix(ns, db, &rid.tb, &rid.id)?;
let suffix = crate::key::r#ref::suffix(ns, db, &rid.tb, &rid.id)?;
let range = prefix..suffix;
let txn = ctx.tx();
let mut stream = txn.stream_keys(range.clone(), None);
while let Some(res) = stream.next().await {
let key = res?;
let r#ref = Ref::decode(&key)?;
let fd = txn.get_tb_field(ns, db, r#ref.ft, r#ref.ff).await?;
if let Some(reference) = &fd.reference {
match &reference.on_delete {
ReferenceDeleteStrategy::Ignore => (),
ReferenceDeleteStrategy::Reject => {
let thing = Thing {
tb: r#ref.ft.to_string(),
id: r#ref.fk.clone(),
};
return Err(Error::DeleteRejectedByReference(
rid.to_string(),
thing.to_string(),
));
}
ReferenceDeleteStrategy::Cascade => {
let thing = Thing {
tb: r#ref.ft.to_string(),
id: r#ref.fk.clone(),
};
let stm = DeleteStatement {
what: Values(vec![Value::from(thing)]),
..DeleteStatement::default()
};
stm.compute(stk, ctx, &opt.clone().with_perms(false), None)
.await
.map_err(|e| {
Error::RefsUpdateFailure(rid.to_string(), e.to_string())
})?;
}
ReferenceDeleteStrategy::Unset => {
let thing = Thing {
tb: r#ref.ft.to_string(),
id: r#ref.fk.clone(),
};
let data = match fd.name.last() {
Some(Part::All) => Data::SetExpression(vec![(
fd.name.as_ref()[..fd.name.len() - 1].into(),
Operator::Dec,
Value::Thing(rid.as_ref().clone()),
)]),
_ => Data::UnsetExpression(vec![fd.name.as_ref().into()]),
};
let stm = UpdateStatement {
what: Values(vec![Value::from(thing)]),
data: Some(data),
..UpdateStatement::default()
};
stm.compute(stk, ctx, &opt.clone().with_perms(false), None)
.await
.map_err(|e| {
Error::RefsUpdateFailure(rid.to_string(), e.to_string())
})?;
}
ReferenceDeleteStrategy::Custom(v) => {
let reference = Value::from(rid.as_ref().clone());
let this = Thing {
tb: r#ref.ft.to_string(),
id: r#ref.fk.clone(),
};
let mut ctx = MutableContext::new(ctx);
ctx.add_value("reference", reference.into());
let ctx = ctx.freeze();
let doc: CursorValue = Value::Thing(this)
.get(
stk,
&ctx,
&opt.clone().with_perms(false),
None,
&[Part::All],
)
.await?
.into();
let doc = CursorDoc::new(None, None, doc);
v.compute(stk, &ctx, &opt.clone().with_perms(false), Some(&doc))
.await
.map_err(|e| {
Error::RefsUpdateFailure(rid.to_string(), e.to_string())
})?;
}
}
}
}
txn.delr(range).await?;
}
}
Ok(())
}
}