use crate::ctx::Context;
use crate::ctx::MutableContext;
use crate::dbs::Options;
use crate::dbs::Workable;
use crate::err::Error;
use crate::iam::Action;
use crate::iam::ResourceKind;
use crate::idx::planner::iterators::IteratorRecord;
use crate::idx::planner::RecordStrategy;
use crate::kvs::cache;
use crate::sql::permission::Permission;
use crate::sql::statements::define::DefineEventStatement;
use crate::sql::statements::define::DefineFieldStatement;
use crate::sql::statements::define::DefineIndexStatement;
use crate::sql::statements::define::DefineTableStatement;
use crate::sql::statements::live::LiveStatement;
use crate::sql::table::Table;
use crate::sql::thing::Thing;
use crate::sql::value::Value;
use crate::sql::Base;
use reblessive::tree::Stk;
use std::fmt::{Debug, Formatter};
use std::mem;
use std::ops::Deref;
use std::sync::Arc;
pub(crate) struct Document {
pub(super) id: Option<Arc<Thing>>,
pub(super) gen: Option<Table>,
pub(super) retry: bool,
pub(super) extras: Workable,
pub(super) initial: CursorDoc,
pub(super) current: CursorDoc,
pub(super) initial_reduced: CursorDoc,
pub(super) current_reduced: CursorDoc,
pub(super) record_strategy: RecordStrategy,
}
#[non_exhaustive]
#[derive(Clone, Debug)]
pub(crate) struct CursorDoc {
pub(crate) rid: Option<Arc<Thing>>,
pub(crate) ir: Option<Arc<IteratorRecord>>,
pub(crate) doc: CursorValue,
}
#[non_exhaustive]
#[derive(Clone, Debug)]
pub(crate) struct CursorValue {
mutable: Value,
read_only: Option<Arc<Value>>,
}
impl CursorValue {
pub(crate) fn to_mut(&mut self) -> &mut Value {
if let Some(ro) = self.read_only.take() {
self.mutable = ro.as_ref().clone();
}
&mut self.mutable
}
pub(crate) fn as_arc(&mut self) -> Arc<Value> {
match &self.read_only {
None => {
let v = Arc::new(mem::take(&mut self.mutable));
self.read_only = Some(v.clone());
v
}
Some(v) => v.clone(),
}
}
pub(crate) fn as_ref(&self) -> &Value {
if let Some(ro) = &self.read_only {
ro.as_ref()
} else {
&self.mutable
}
}
pub(crate) fn into_owned(self) -> Value {
if let Some(ro) = &self.read_only {
ro.as_ref().clone()
} else {
self.mutable
}
}
}
impl Deref for CursorValue {
type Target = Value;
fn deref(&self) -> &Self::Target {
self.as_ref()
}
}
impl CursorDoc {
pub(crate) fn new<T: Into<CursorValue>>(
rid: Option<Arc<Thing>>,
ir: Option<Arc<IteratorRecord>>,
doc: T,
) -> Self {
Self {
rid,
ir,
doc: doc.into(),
}
}
}
impl From<Value> for CursorValue {
fn from(value: Value) -> Self {
Self {
mutable: value,
read_only: None,
}
}
}
impl From<Arc<Value>> for CursorValue {
fn from(value: Arc<Value>) -> Self {
Self {
mutable: Value::None,
read_only: Some(value),
}
}
}
impl From<Value> for CursorDoc {
fn from(val: Value) -> Self {
Self {
rid: None,
ir: None,
doc: CursorValue {
mutable: val,
read_only: None,
},
}
}
}
impl From<Arc<Value>> for CursorDoc {
fn from(doc: Arc<Value>) -> Self {
Self {
rid: None,
ir: None,
doc: CursorValue {
mutable: Value::None,
read_only: Some(doc),
},
}
}
}
impl Debug for Document {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "Document - id: <{:?}>", self.id)
}
}
impl From<&Document> for Vec<u8> {
fn from(val: &Document) -> Vec<u8> {
val.current.doc.as_ref().into()
}
}
pub(crate) enum Permitted {
Initial,
Current,
Both,
}
impl Document {
pub fn new(
id: Option<Arc<Thing>>,
ir: Option<Arc<IteratorRecord>>,
gen: Option<Table>,
val: Arc<Value>,
extras: Workable,
retry: bool,
rs: RecordStrategy,
) -> Self {
Document {
id: id.clone(),
gen,
retry,
extras,
current: CursorDoc::new(id.clone(), ir.clone(), val.clone()),
initial: CursorDoc::new(id.clone(), ir.clone(), val.clone()),
current_reduced: CursorDoc::new(id.clone(), ir.clone(), val.clone()),
initial_reduced: CursorDoc::new(id.clone(), ir.clone(), val.clone()),
record_strategy: rs,
}
}
pub fn changed(&self) -> bool {
self.initial.doc.as_ref() != self.current.doc.as_ref()
}
pub fn is_new(&self) -> bool {
self.initial.doc.as_ref().is_none()
}
pub(crate) fn is_iteration_initial(&self) -> bool {
!self.retry && self.initial.doc.as_ref().is_none()
}
pub(crate) fn is_specific_record_id(&self) -> bool {
match self.extras {
Workable::Insert(ref v) if v.rid().is_some() => true,
Workable::Normal if self.gen.is_none() => true,
_ => false,
}
}
pub(crate) fn is_condition_checked(&self) -> bool {
matches!(self.record_strategy, RecordStrategy::Count | RecordStrategy::KeysOnly)
}
pub(crate) async fn reduced(
&mut self,
stk: &mut Stk,
ctx: &Context,
opt: &Options,
permitted: Permitted,
) -> Result<bool, Error> {
if self.id.is_none() {
return Ok(false);
}
if !opt.check_perms(Action::View)? {
return Ok(false);
}
let fds = self.fd(ctx, opt).await?;
let targets = match permitted {
Permitted::Initial => vec![(&self.initial, &mut self.initial_reduced)],
Permitted::Current => vec![(&self.current, &mut self.current_reduced)],
Permitted::Both => vec![
(&self.initial, &mut self.initial_reduced),
(&self.current, &mut self.current_reduced),
],
};
for target in targets {
let full = target.0;
let mut out = full.doc.as_ref().compute(stk, ctx, opt, Some(full)).await?;
for fd in fds.iter() {
for k in out.each(&fd.name).iter() {
match &fd.permissions.select {
Permission::Full => (),
Permission::None => out.cut(k),
Permission::Specific(e) => {
let opt = &opt.new_with_perms(false);
let val = Arc::new(full.doc.as_ref().pick(k));
let mut ctx = MutableContext::new(ctx);
ctx.add_value("value", val);
let ctx = ctx.freeze();
if !e.compute(stk, &ctx, opt, Some(full)).await?.is_truthy() {
out.cut(k);
}
}
}
}
}
target.1.doc = out.into();
}
Ok(true)
}
pub fn id(&self) -> Result<Arc<Thing>, Error> {
match self.id.clone() {
Some(id) => Ok(id),
_ => Err(fail!("Expected a document id to be present")),
}
}
pub fn inner_id(&self) -> Result<Thing, Error> {
match self.id.clone() {
Some(id) => Ok(Arc::unwrap_or_clone(id)),
_ => Err(fail!("Expected a document id to be present")),
}
}
pub async fn tb(
&self,
ctx: &Context,
opt: &Options,
) -> Result<Arc<DefineTableStatement>, Error> {
let txn = ctx.tx();
let id = self.id()?;
let tb = txn.get_tb(opt.ns()?, opt.db()?, &id.tb).await;
match tb {
Err(Error::TbNotFound {
value: _,
}) => {
opt.is_allowed(Action::Edit, ResourceKind::Table, &Base::Db)?;
txn.ensure_ns_db_tb(opt.ns()?, opt.db()?, &id.tb, opt.strict).await
}
Err(err) => Err(err),
Ok(tb) => Ok(tb),
}
}
pub async fn ft(
&self,
ctx: &Context,
opt: &Options,
) -> Result<Arc<[DefineTableStatement]>, Error> {
let ns = opt.ns()?;
let db = opt.db()?;
let tb = self.tb(ctx, opt).await?;
let key = cache::ds::Lookup::Fts(ns, db, &tb.name, tb.cache_tables_ts);
match ctx.get_cache() {
Some(cache) => match cache.get(&key) {
Some(val) => val,
None => {
let val = ctx.tx().all_tb_views(ns, db, &tb.name).await?;
let val = cache::ds::Entry::Fts(val.clone());
cache.insert(key.into(), val.clone());
val
}
}
.try_into_fts(),
None => ctx.tx().all_tb_views(ns, db, &tb.name).await,
}
}
pub async fn ev(
&self,
ctx: &Context,
opt: &Options,
) -> Result<Arc<[DefineEventStatement]>, Error> {
let ns = opt.ns()?;
let db = opt.db()?;
let tb = self.tb(ctx, opt).await?;
let key = cache::ds::Lookup::Evs(ns, db, &tb.name, tb.cache_events_ts);
match ctx.get_cache() {
Some(cache) => match cache.get(&key) {
Some(val) => val,
None => {
let val = ctx.tx().all_tb_events(ns, db, &tb.name).await?;
let val = cache::ds::Entry::Evs(val.clone());
cache.insert(key.into(), val.clone());
val
}
}
.try_into_evs(),
None => ctx.tx().all_tb_events(ns, db, &tb.name).await,
}
}
pub async fn fd(
&self,
ctx: &Context,
opt: &Options,
) -> Result<Arc<[DefineFieldStatement]>, Error> {
let ns = opt.ns()?;
let db = opt.db()?;
let tb = self.tb(ctx, opt).await?;
let key = cache::ds::Lookup::Fds(ns, db, &tb.name, tb.cache_fields_ts);
match ctx.get_cache() {
Some(cache) => match cache.get(&key) {
Some(val) => val,
None => {
let val = ctx.tx().all_tb_fields(ns, db, &tb.name, opt.version).await?;
let val = cache::ds::Entry::Fds(val.clone());
cache.insert(key.into(), val.clone());
val
}
}
.try_into_fds(),
None => ctx.tx().all_tb_fields(ns, db, &tb.name, opt.version).await,
}
}
pub async fn ix(
&self,
ctx: &Context,
opt: &Options,
) -> Result<Arc<[DefineIndexStatement]>, Error> {
let ns = opt.ns()?;
let db = opt.db()?;
let tb = self.tb(ctx, opt).await?;
let key = cache::ds::Lookup::Ixs(ns, db, &tb.name, tb.cache_indexes_ts);
match ctx.get_cache() {
Some(cache) => match cache.get(&key) {
Some(val) => val,
None => {
let val = ctx.tx().all_tb_indexes(ns, db, &tb.name).await?;
let val = cache::ds::Entry::Ixs(val.clone());
cache.insert(key.into(), val.clone());
val
}
}
.try_into_ixs(),
None => ctx.tx().all_tb_indexes(ns, db, &tb.name).await,
}
}
pub async fn lv(&self, ctx: &Context, opt: &Options) -> Result<Arc<[LiveStatement]>, Error> {
let ns = opt.ns()?;
let db = opt.db()?;
let tb = self.tb(ctx, opt).await?;
let key = cache::ds::Lookup::Lvs(ns, db, &tb.name, tb.cache_lives_ts);
match ctx.get_cache() {
Some(cache) => match cache.get(&key) {
Some(val) => val,
None => {
let val = ctx.tx().all_tb_lives(ns, db, &tb.name).await?;
let val = cache::ds::Entry::Lvs(val.clone());
cache.insert(key.into(), val.clone());
val
}
}
.try_into_lvs(),
None => ctx.tx().all_tb_lives(ns, db, &tb.name).await,
}
}
}