use crate::ctx::Context;
use crate::ctx::{Canceller, MutableContext};
use crate::dbs::distinct::SyncDistinct;
use crate::dbs::plan::{Explanation, Plan};
use crate::dbs::result::Results;
use crate::dbs::Options;
use crate::dbs::Statement;
use crate::doc::Document;
use crate::err::Error;
use crate::idx::planner::iterators::{IteratorRecord, IteratorRef};
use crate::idx::planner::{IterationStage, RecordStrategy};
use crate::sql::array::Array;
use crate::sql::edges::Edges;
use crate::sql::mock::Mock;
use crate::sql::object::Object;
use crate::sql::table::Table;
use crate::sql::thing::Thing;
use crate::sql::value::Value;
use crate::sql::{Fields, Id, IdRange};
use reblessive::tree::Stk;
use std::mem;
use std::sync::Arc;
const TARGET: &str = "surrealdb::core::dbs";
#[derive(Clone)]
pub(crate) enum Iterable {
Value(Value),
Defer(Thing),
Yield(Table),
Thing(Thing),
Edges(Edges),
Table(Table, RecordStrategy),
Range(String, IdRange, RecordStrategy),
Mergeable(Thing, Value),
Relatable(Thing, Thing, Thing, Option<Value>),
Index(Table, IteratorRef, RecordStrategy),
}
#[derive(Debug)]
pub(crate) enum Operable {
Value(Arc<Value>),
Insert(Arc<Value>, Arc<Value>),
Relate(Thing, Arc<Value>, Thing, Option<Arc<Value>>),
Count(usize),
}
#[derive(Debug)]
pub(crate) enum Workable {
Normal,
Insert(Arc<Value>),
Relate(Thing, Thing, Option<Arc<Value>>),
}
#[derive(Debug)]
pub(crate) struct Processed {
pub(crate) rs: RecordStrategy,
pub(crate) generate: Option<Table>,
pub(crate) rid: Option<Arc<Thing>>,
pub(crate) val: Operable,
pub(crate) ir: Option<Arc<IteratorRecord>>,
}
#[derive(Default)]
pub(crate) struct Iterator {
run: Canceller,
count: u64,
limit: Option<u32>,
start: Option<u32>,
start_skip: Option<usize>,
error: Option<Error>,
results: Results,
entries: Vec<Iterable>,
guaranteed: Option<Iterable>,
cancel_on_limit: Option<u32>,
}
impl Clone for Iterator {
fn clone(&self) -> Self {
Self {
run: self.run.clone(),
count: 0,
limit: self.limit,
start: self.start,
start_skip: self.start_skip.map(|_| self.start.unwrap_or(0) as usize),
error: None,
results: Results::default(),
entries: self.entries.clone(),
guaranteed: None,
cancel_on_limit: None,
}
}
}
impl Iterator {
pub(crate) fn new() -> Self {
Self::default()
}
pub(crate) fn ingest(&mut self, val: Iterable) {
self.entries.push(val)
}
pub(crate) fn prepare(&mut self, stm: &Statement<'_>, val: Value) -> Result<(), Error> {
match val {
Value::Mock(v) => self.prepare_mock(stm, v)?,
Value::Table(v) => self.prepare_table(stm, v)?,
Value::Edges(v) => self.prepare_edges(stm, *v)?,
Value::Object(v) => self.prepare_object(stm, v)?,
Value::Array(v) => self.prepare_array(stm, v)?,
Value::Thing(v) => match v.is_range() {
true => self.prepare_range(stm, v, RecordStrategy::KeysAndValues)?,
false => self.prepare_thing(stm, v)?,
},
v => {
return Err(Error::InvalidStatementTarget {
value: v.to_string(),
})
}
};
Ok(())
}
pub(crate) fn prepare_table(&mut self, stm: &Statement<'_>, v: Table) -> Result<(), Error> {
match stm.is_deferable() {
true => self.ingest(Iterable::Yield(v)),
false => match stm.is_guaranteed() {
false => self.ingest(Iterable::Table(v, RecordStrategy::KeysAndValues)),
true => {
self.guaranteed = Some(Iterable::Yield(v.clone()));
self.ingest(Iterable::Table(v, RecordStrategy::KeysAndValues))
}
},
}
Ok(())
}
pub(crate) fn prepare_thing(&mut self, stm: &Statement<'_>, v: Thing) -> Result<(), Error> {
match stm.is_deferable() {
true => self.ingest(Iterable::Defer(v)),
false => self.ingest(Iterable::Thing(v)),
}
Ok(())
}
pub(crate) fn prepare_mock(&mut self, stm: &Statement<'_>, v: Mock) -> Result<(), Error> {
for v in v {
match stm.is_deferable() {
true => self.ingest(Iterable::Defer(v)),
false => self.ingest(Iterable::Thing(v)),
}
}
Ok(())
}
pub(crate) fn prepare_edges(&mut self, stm: &Statement<'_>, v: Edges) -> Result<(), Error> {
if stm.is_create() {
return Err(Error::InvalidStatementTarget {
value: v.to_string(),
});
}
self.ingest(Iterable::Edges(v));
Ok(())
}
pub(crate) fn prepare_range(
&mut self,
stm: &Statement<'_>,
v: Thing,
rs: RecordStrategy,
) -> Result<(), Error> {
if stm.is_create() {
return Err(Error::InvalidStatementTarget {
value: v.to_string(),
});
}
if let (tb, Id::Range(v)) = (v.tb, v.id) {
self.ingest(Iterable::Range(tb, *v, rs));
}
Ok(())
}
pub(crate) fn prepare_object(&mut self, stm: &Statement<'_>, v: Object) -> Result<(), Error> {
match v.rid() {
Some(v) => match stm.is_deferable() {
true => self.ingest(Iterable::Defer(v)),
false => self.ingest(Iterable::Thing(v)),
},
None => {
return Err(Error::InvalidStatementTarget {
value: v.to_string(),
});
}
}
Ok(())
}
pub(crate) fn prepare_array(&mut self, stm: &Statement<'_>, v: Array) -> Result<(), Error> {
for v in v {
match v {
Value::Mock(v) => self.prepare_mock(stm, v)?,
Value::Table(v) => self.prepare_table(stm, v)?,
Value::Edges(v) => self.prepare_edges(stm, *v)?,
Value::Object(v) => self.prepare_object(stm, v)?,
Value::Thing(v) => match v.is_range() {
true => self.prepare_range(stm, v, RecordStrategy::KeysAndValues)?,
false => self.prepare_thing(stm, v)?,
},
_ => {
return Err(Error::InvalidStatementTarget {
value: v.to_string(),
})
}
}
}
Ok(())
}
pub async fn output(
&mut self,
stk: &mut Stk,
ctx: &Context,
opt: &Options,
stm: &Statement<'_>,
rs: RecordStrategy,
) -> Result<Value, Error> {
trace!(target: TARGET, statement = %stm, "Iterating statement");
let mut cancel_ctx = MutableContext::new(ctx);
self.run = cancel_ctx.add_cancel();
let mut cancel_ctx = cancel_ctx.freeze();
self.setup_limit(stk, &cancel_ctx, opt, stm).await?;
self.setup_start(stk, &cancel_ctx, opt, stm).await?;
self.results = self.results.prepare(
#[cfg(storage)]
ctx,
stm,
self.start,
self.limit,
)?;
let mut plan = Plan::new(ctx, stm, &self.entries, &self.results);
if plan.do_iterate {
if let Some(e) = &mut plan.explanation {
e.add_record_strategy(rs);
}
let sp = if let Some(qp) = ctx.get_query_planner() {
let sp = Some(qp.is_any_specific_permission());
while let Some(s) = qp.next_iteration_stage().await {
let is_last = matches!(s, IterationStage::Iterate(_));
let mut c = MutableContext::unfreeze(cancel_ctx)?;
c.set_iteration_stage(s);
cancel_ctx = c.freeze();
if !is_last {
self.clone().iterate(stk, &cancel_ctx, opt, stm, sp, None).await?;
};
}
sp
} else {
None
};
self.iterate(stk, &cancel_ctx, opt, stm, sp, plan.explanation.as_mut()).await?;
if let Some(e) = self.error.take() {
return Err(e);
}
if self.results.is_empty() {
if let Some(guaranteed) = self.guaranteed.take() {
self.ingest(guaranteed);
self.iterate(stk, &cancel_ctx, opt, stm, sp, None).await?;
}
}
self.output_split(stk, ctx, opt, stm, rs).await?;
self.output_group(stk, ctx, opt, stm).await?;
if let Some(orders) = stm.order() {
#[cfg(not(target_family = "wasm"))]
self.results.sort(orders).await?;
#[cfg(target_family = "wasm")]
self.results.sort(orders);
}
self.results.start_limit(self.start_skip, self.start, self.limit).await?;
if let Some(e) = &mut plan.explanation {
e.add_fetch(self.results.len());
} else {
self.output_fetch(stk, ctx, opt, stm).await?;
}
}
let mut results = self.results.take().await?;
if let Some(e) = plan.explanation {
results.clear();
for v in e.output() {
results.push(v)
}
}
Ok(results.into())
}
#[inline]
pub(crate) async fn setup_limit(
&mut self,
stk: &mut Stk,
ctx: &Context,
opt: &Options,
stm: &Statement<'_>,
) -> Result<Option<u32>, Error> {
if self.limit.is_none() {
if let Some(v) = stm.limit() {
self.limit = Some(v.process(stk, ctx, opt, None).await?);
}
}
Ok(self.limit)
}
#[inline]
async fn setup_start(
&mut self,
stk: &mut Stk,
ctx: &Context,
opt: &Options,
stm: &Statement<'_>,
) -> Result<(), Error> {
if let Some(v) = stm.start() {
self.start = Some(v.process(stk, ctx, opt, None).await?);
}
Ok(())
}
fn check_set_start_limit(&self, ctx: &Context, stm: &Statement<'_>) -> bool {
if stm.group().is_some() {
return false;
}
if stm.order().is_none() {
return true;
}
if self.entries.len() != 1 {
return false;
}
if let Some(Iterable::Index(_, irf, _)) = self.entries.first() {
if let Some(qp) = ctx.get_query_planner() {
if qp.is_order(irf) {
return true;
}
}
}
false
}
fn compute_start_limit(
&mut self,
ctx: &Context,
stm: &Statement<'_>,
is_specific_permission: Option<bool>,
) {
if self.check_set_start_limit(ctx, stm) {
if let Some(l) = self.limit {
if let Some(s) = self.start {
self.cancel_on_limit = Some(l + s);
} else {
self.cancel_on_limit = Some(l);
}
}
if let Some(false) = is_specific_permission {
let s = self.start.unwrap_or(0) as usize;
if s > 0 {
self.start_skip = Some(s);
}
}
}
}
pub(super) fn skippable(&self) -> usize {
self.start_skip.unwrap_or(0)
}
pub(super) fn skipped(&mut self, skipped: usize) {
if let Some(s) = &mut self.start_skip {
*s -= skipped;
}
}
async fn output_split(
&mut self,
stk: &mut Stk,
ctx: &Context,
opt: &Options,
stm: &Statement<'_>,
rs: RecordStrategy,
) -> Result<(), Error> {
if let Some(splits) = stm.split() {
for split in splits.iter() {
let res = self.results.take().await?;
for obj in &res {
let val = obj.pick(split);
match val {
Value::Array(v) => {
for val in v {
let mut obj = obj.clone();
obj.set(stk, ctx, opt, split, val).await?;
self.results.push(stk, ctx, opt, stm, rs, obj).await?;
}
}
_ => {
let mut obj = obj.clone();
obj.set(stk, ctx, opt, split, val).await?;
self.results.push(stk, ctx, opt, stm, rs, obj).await?;
}
}
}
}
}
Ok(())
}
async fn output_group(
&mut self,
stk: &mut Stk,
ctx: &Context,
opt: &Options,
stm: &Statement<'_>,
) -> Result<(), Error> {
if let Results::Groups(g) = &mut self.results {
self.results = Results::Memory(g.output(stk, ctx, opt, stm).await?);
}
Ok(())
}
async fn output_fetch(
&mut self,
stk: &mut Stk,
ctx: &Context,
opt: &Options,
stm: &Statement<'_>,
) -> Result<(), Error> {
if let Some(fetchs) = stm.fetch() {
let mut idioms = Vec::with_capacity(fetchs.0.len());
for fetch in fetchs.iter() {
fetch.compute(stk, ctx, opt, &mut idioms).await?;
}
for i in &idioms {
let mut values = self.results.take().await?;
for obj in &mut values {
stk.run(|stk| obj.fetch(stk, ctx, opt, i)).await?;
}
self.results = values.into();
}
}
Ok(())
}
async fn iterate(
&mut self,
stk: &mut Stk,
ctx: &Context,
opt: &Options,
stm: &Statement<'_>,
is_specific_permission: Option<bool>,
exp: Option<&mut Explanation>,
) -> Result<(), Error> {
self.compute_start_limit(ctx, stm, is_specific_permission);
if let Some(e) = exp {
if self.start_skip.is_some() || self.cancel_on_limit.is_some() {
e.add_start_limit(self.start_skip, self.cancel_on_limit);
}
}
let opt = opt.dive(4)?;
let mut distinct = SyncDistinct::new(ctx);
for v in mem::take(&mut self.entries) {
v.iterate(stk, ctx, &opt, stm, self, distinct.as_mut()).await?;
}
Ok(())
}
pub async fn process(
&mut self,
stk: &mut Stk,
ctx: &Context,
opt: &Options,
stm: &Statement<'_>,
pro: Processed,
) -> Result<(), Error> {
let rs = pro.rs;
let res = Self::extract_value(stk, ctx, opt, stm, pro).await;
self.result(stk, ctx, opt, stm, rs, res).await;
Ok(())
}
async fn extract_value(
stk: &mut Stk,
ctx: &Context,
opt: &Options,
stm: &Statement<'_>,
pro: Processed,
) -> Result<Value, Error> {
let count_all = stm.expr().is_some_and(Fields::is_count_all_only);
if count_all {
if let Operable::Count(count) = pro.val {
return Ok(count.into());
}
if matches!(pro.rs, RecordStrategy::KeysOnly) {
return Ok(map! { "count".to_string() => Value::from(1) }.into());
}
}
stk.run(|stk| Document::process(stk, ctx, opt, stm, pro)).await
}
async fn result(
&mut self,
stk: &mut Stk,
ctx: &Context,
opt: &Options,
stm: &Statement<'_>,
rs: RecordStrategy,
res: Result<Value, Error>,
) {
self.count += 1;
if self.count % 100 == 0 {
yield_now!();
}
match res {
Err(Error::Ignore) => {
return;
}
Err(e) => {
self.error = Some(e);
self.run.cancel();
return;
}
Ok(v) => {
if let Err(e) = self.results.push(stk, ctx, opt, stm, rs, v).await {
self.error = Some(e);
self.run.cancel();
return;
}
}
}
if let Some(l) = self.cancel_on_limit {
if self.results.len() == l as usize {
self.run.cancel()
}
}
}
}