use crate::ctx::Context;
use crate::dbs::Operable;
use crate::dbs::Statement;
use crate::dbs::Workable;
use crate::dbs::{Options, Processed};
use crate::doc::Document;
use crate::err::Error;
use crate::idx::planner::RecordStrategy;
use crate::sql::value::Value;
use reblessive::tree::Stk;
use std::sync::Arc;
impl Document {
#[allow(dead_code)]
pub(crate) async fn process(
stk: &mut Stk,
ctx: &Context,
opt: &Options,
stm: &Statement<'_>,
mut pro: Processed,
) -> Result<Value, Error> {
let mut retry = false;
for _ in 0..2 {
if ctx.is_done() {
return Err(Error::Ignore);
}
let ins = match pro.val {
Operable::Value(v) => (v, Workable::Normal),
Operable::Insert(v, o) => (v, Workable::Insert(o)),
Operable::Relate(f, v, w, o) => (v, Workable::Relate(f, w, o)),
Operable::Count(count) => (Arc::new(count.into()), Workable::Normal),
};
let mut doc = Document::new(pro.rid, pro.ir, pro.generate, ins.0, ins.1, retry, pro.rs);
doc.generate_record_id(stk, ctx, opt, stm).await?;
let is_save_point = if stm.is_retryable() {
ctx.tx().lock().await.new_save_point().await;
true
} else {
false
};
let res = match stm {
Statement::Select(_) => doc.select(stk, ctx, opt, stm).await,
Statement::Create(_) => doc.create(stk, ctx, opt, stm).await,
Statement::Upsert(_) => doc.upsert(stk, ctx, opt, stm).await,
Statement::Update(_) => doc.update(stk, ctx, opt, stm).await,
Statement::Relate(_) => doc.relate(stk, ctx, opt, stm).await,
Statement::Delete(_) => doc.delete(stk, ctx, opt, stm).await,
Statement::Insert(_) => doc.insert(stk, ctx, opt, stm).await,
stm => return Err(fail!("Unexpected statement type: {stm:?}")),
};
let res = match res {
Err(Error::RetryWithId(v)) => {
if is_save_point {
ctx.tx().lock().await.rollback_to_save_point().await?;
}
let val = ctx
.tx()
.get_record(opt.ns()?, opt.db()?, &v.tb, &v.id, opt.version)
.await?;
pro = Processed {
rs: RecordStrategy::KeysAndValues,
generate: None,
rid: Some(Arc::new(v)),
ir: None,
val: match doc.extras {
Workable::Normal => Operable::Value(val),
Workable::Insert(o) => Operable::Insert(val, o),
Workable::Relate(f, w, o) => Operable::Relate(f, val, w, o),
},
};
retry = true;
continue;
}
Err(Error::Ignore) => Err(Error::Ignore),
Err(e) => {
if is_save_point {
ctx.tx().lock().await.rollback_to_save_point().await?;
}
Err(e)
}
Ok(v) => {
if is_save_point {
ctx.tx().lock().await.release_last_save_point().await?;
}
Ok(v)
}
};
return res;
}
Err(fail!("Internal error"))
}
}