use crate::ctx::Canceller;
use crate::ctx::Context;
#[cfg(not(target_arch = "wasm32"))]
use crate::dbs::distinct::AsyncDistinct;
use crate::dbs::distinct::SyncDistinct;
use crate::dbs::explanation::Explanation;
use crate::dbs::Statement;
use crate::dbs::{Options, Transaction};
use crate::doc::Document;
use crate::err::Error;
use crate::idx::docids::DocId;
use crate::idx::planner::executor::IteratorRef;
use crate::sql::array::Array;
use crate::sql::edges::Edges;
use crate::sql::field::Field;
use crate::sql::range::Range;
use crate::sql::table::Table;
use crate::sql::thing::Thing;
use crate::sql::value::Value;
use async_recursion::async_recursion;
use std::borrow::Cow;
use std::cmp::Ordering;
use std::collections::BTreeMap;
use std::mem;
pub(crate) enum Iterable {
Value(Value),
Table(Table),
Thing(Thing),
Range(Range),
Edges(Edges),
Defer(Thing),
Mergeable(Thing, Value),
Relatable(Thing, Thing, Thing),
Index(Table, IteratorRef),
}
pub(crate) struct Processed {
pub(crate) ir: Option<IteratorRef>,
pub(crate) rid: Option<Thing>,
pub(crate) doc_id: Option<DocId>,
pub(crate) val: Operable,
}
pub(crate) enum Operable {
Value(Value),
Mergeable(Value, Value),
Relatable(Thing, Value, Thing),
}
pub(crate) enum Workable {
Normal,
Insert(Value),
Relate(Thing, Thing),
}
#[derive(Default)]
pub(crate) struct Iterator {
run: Canceller,
limit: Option<usize>,
start: Option<usize>,
error: Option<Error>,
results: Vec<Value>,
entries: Vec<Iterable>,
}
impl Iterator {
pub fn new() -> Self {
Self::default()
}
pub fn ingest(&mut self, val: Iterable) {
self.entries.push(val)
}
pub async fn prepare(
&mut self,
ctx: &Context<'_>,
opt: &Options,
txn: &Transaction,
stm: &Statement<'_>,
val: Value,
) -> Result<(), Error> {
match val {
Value::Table(v) => match stm.data() {
Some(data) => match stm {
Statement::Create(_) => {
let id = match data.rid(ctx, opt, txn).await? {
Some(id) => id.generate(&v, false)?,
None => v.generate(),
};
self.ingest(Iterable::Thing(id))
}
_ => {
self.ingest(Iterable::Table(v))
}
},
None => match stm {
Statement::Create(_) => {
self.ingest(Iterable::Thing(v.generate()))
}
_ => {
self.ingest(Iterable::Table(v))
}
},
},
Value::Thing(v) => {
if let Some(data) = stm.data() {
if let Some(id) = data.rid(ctx, opt, txn).await? {
match id {
Value::Thing(id) if id == v => (),
id => {
return Err(Error::IdMismatch {
value: id.to_string(),
});
}
}
}
}
match stm {
Statement::Create(_) => {
self.ingest(Iterable::Defer(v));
}
_ => {
self.ingest(Iterable::Thing(v));
}
};
}
Value::Mock(v) => {
if let Some(data) = stm.data() {
if let Some(id) = data.rid(ctx, opt, txn).await? {
return Err(Error::IdMismatch {
value: id.to_string(),
});
}
}
for v in v {
self.ingest(Iterable::Thing(v))
}
}
Value::Range(v) => {
if let Statement::Create(_) = stm {
return Err(Error::InvalidStatementTarget {
value: v.to_string(),
});
}
if let Some(data) = stm.data() {
if let Some(id) = data.rid(ctx, opt, txn).await? {
return Err(Error::IdMismatch {
value: id.to_string(),
});
}
}
self.ingest(Iterable::Range(*v));
}
Value::Edges(v) => {
if let Statement::Create(_) = stm {
return Err(Error::InvalidStatementTarget {
value: v.to_string(),
});
}
if let Some(data) = stm.data() {
if let Some(id) = data.rid(ctx, opt, txn).await? {
return Err(Error::IdMismatch {
value: id.to_string(),
});
}
}
self.ingest(Iterable::Edges(*v));
}
Value::Object(v) => {
if let Some(data) = stm.data() {
if let Some(id) = data.rid(ctx, opt, txn).await? {
return Err(Error::IdMismatch {
value: id.to_string(),
});
}
}
match v.rid() {
Some(id) => {
self.ingest(Iterable::Thing(id))
}
None => {
return Err(Error::InvalidStatementTarget {
value: v.to_string(),
});
}
}
}
Value::Array(v) => {
if let Some(data) = stm.data() {
if let Some(id) = data.rid(ctx, opt, txn).await? {
return Err(Error::IdMismatch {
value: id.to_string(),
});
}
}
for v in v {
match v {
Value::Thing(v) => self.ingest(Iterable::Thing(v)),
Value::Edges(v) => self.ingest(Iterable::Edges(*v)),
Value::Object(v) => match v.rid() {
Some(v) => self.ingest(Iterable::Thing(v)),
None => {
return Err(Error::InvalidStatementTarget {
value: v.to_string(),
})
}
},
_ => {
return Err(Error::InvalidStatementTarget {
value: v.to_string(),
})
}
}
}
}
v => {
return Err(Error::InvalidStatementTarget {
value: v.to_string(),
})
}
};
Ok(())
}
pub async fn output(
&mut self,
ctx: &Context<'_>,
opt: &Options,
txn: &Transaction,
stm: &Statement<'_>,
) -> Result<Value, Error> {
trace!("Iterating: {}", stm);
let mut cancel_ctx = Context::new(ctx);
self.run = cancel_ctx.add_cancel();
self.setup_limit(&cancel_ctx, opt, txn, stm).await?;
self.setup_start(&cancel_ctx, opt, txn, stm).await?;
let (do_iterate, mut explanation) = Explanation::new(ctx, stm.explain(), &self.entries);
if do_iterate {
self.iterate(&cancel_ctx, opt, txn, stm).await?;
if let Some(e) = self.error.take() {
return Err(e);
}
self.output_split(ctx, opt, txn, stm).await?;
self.output_group(ctx, opt, txn, stm).await?;
self.output_order(ctx, opt, txn, stm).await?;
self.output_start(ctx, opt, txn, stm).await?;
self.output_limit(ctx, opt, txn, stm).await?;
if let Some(e) = &mut explanation {
e.add_fetch(self.results.len());
self.results.clear();
} else {
self.output_fetch(ctx, opt, txn, stm).await?;
}
}
if let Some(e) = explanation {
e.output(&mut self.results);
}
Ok(mem::take(&mut self.results).into())
}
#[inline]
async fn setup_limit(
&mut self,
ctx: &Context<'_>,
opt: &Options,
txn: &Transaction,
stm: &Statement<'_>,
) -> Result<(), Error> {
if let Some(v) = stm.limit() {
self.limit = Some(v.process(ctx, opt, txn, None).await?);
}
Ok(())
}
#[inline]
async fn setup_start(
&mut self,
ctx: &Context<'_>,
opt: &Options,
txn: &Transaction,
stm: &Statement<'_>,
) -> Result<(), Error> {
if let Some(v) = stm.start() {
self.start = Some(v.process(ctx, opt, txn, None).await?);
}
Ok(())
}
#[inline]
async fn output_split(
&mut self,
ctx: &Context<'_>,
opt: &Options,
txn: &Transaction,
stm: &Statement<'_>,
) -> Result<(), Error> {
if let Some(splits) = stm.split() {
for split in splits.iter() {
let res = mem::take(&mut self.results);
for obj in &res {
let val = obj.pick(split);
match val {
Value::Array(v) => {
for val in v {
let mut obj = obj.clone();
obj.set(ctx, opt, txn, split, val).await?;
self.results.push(obj);
}
}
_ => {
let mut obj = obj.clone();
obj.set(ctx, opt, txn, split, val).await?;
self.results.push(obj);
}
}
}
}
}
Ok(())
}
#[inline]
async fn output_group(
&mut self,
ctx: &Context<'_>,
opt: &Options,
txn: &Transaction,
stm: &Statement<'_>,
) -> Result<(), Error> {
if let Some(fields) = stm.expr() {
if let Some(groups) = stm.group() {
let mut grp: BTreeMap<Array, Array> = BTreeMap::new();
let res = mem::take(&mut self.results);
for obj in res {
let mut arr = Array::with_capacity(groups.len());
for group in groups.iter() {
let val = obj.pick(group);
arr.push(val);
}
match grp.get_mut(&arr) {
Some(v) => v.push(obj),
None => {
grp.insert(arr, Array::from(obj));
}
}
}
for (_, vals) in grp {
let mut obj = Value::base();
let vals = Value::from(vals);
for field in fields.other() {
if let Field::Single {
expr,
alias,
} = field
{
let idiom = alias
.as_ref()
.map(Cow::Borrowed)
.unwrap_or_else(|| Cow::Owned(expr.to_idiom()));
match expr {
Value::Function(f) if f.is_aggregate() => {
let x =
vals.all().get(ctx, opt, txn, None, idiom.as_ref()).await?;
let x = f.aggregate(x).compute(ctx, opt, txn, None).await?;
obj.set(ctx, opt, txn, idiom.as_ref(), x).await?;
}
_ => {
let x = vals.first();
let x = if let Some(alias) = alias {
let cur = (&x).into();
alias.compute(ctx, opt, txn, Some(&cur)).await?
} else {
let cur = (&x).into();
expr.compute(ctx, opt, txn, Some(&cur)).await?
};
obj.set(ctx, opt, txn, idiom.as_ref(), x).await?;
}
}
}
}
self.results.push(obj);
}
}
}
Ok(())
}
#[inline]
async fn output_order(
&mut self,
_ctx: &Context<'_>,
_opt: &Options,
_txn: &Transaction,
stm: &Statement<'_>,
) -> Result<(), Error> {
if let Some(orders) = stm.order() {
self.results.sort_by(|a, b| {
for order in orders.iter() {
let o = match order.random {
true => {
let a = rand::random::<f64>();
let b = rand::random::<f64>();
a.partial_cmp(&b)
}
false => match order.direction {
true => a.compare(b, order, order.collate, order.numeric),
false => b.compare(a, order, order.collate, order.numeric),
},
};
match o {
Some(Ordering::Greater) => return Ordering::Greater,
Some(Ordering::Equal) => continue,
Some(Ordering::Less) => return Ordering::Less,
None => continue,
}
}
Ordering::Equal
})
}
Ok(())
}
#[inline]
async fn output_start(
&mut self,
_ctx: &Context<'_>,
_opt: &Options,
_txn: &Transaction,
_stm: &Statement<'_>,
) -> Result<(), Error> {
if let Some(v) = self.start {
self.results = mem::take(&mut self.results).into_iter().skip(v).collect();
}
Ok(())
}
#[inline]
async fn output_limit(
&mut self,
_ctx: &Context<'_>,
_opt: &Options,
_txn: &Transaction,
_stm: &Statement<'_>,
) -> Result<(), Error> {
if let Some(v) = self.limit {
self.results = mem::take(&mut self.results).into_iter().take(v).collect();
}
Ok(())
}
#[inline]
async fn output_fetch(
&mut self,
ctx: &Context<'_>,
opt: &Options,
txn: &Transaction,
stm: &Statement<'_>,
) -> Result<(), Error> {
if let Some(fetchs) = stm.fetch() {
for fetch in fetchs.iter() {
for obj in &mut self.results {
obj.fetch(ctx, opt, txn, fetch).await?;
}
}
}
Ok(())
}
#[cfg(target_arch = "wasm32")]
#[async_recursion(?Send)]
async fn iterate(
&mut self,
ctx: &Context<'_>,
opt: &Options,
txn: &Transaction,
stm: &Statement<'_>,
) -> Result<(), Error> {
let opt = &opt.dive(4)?;
let mut distinct = SyncDistinct::new(ctx);
for v in mem::take(&mut self.entries) {
let dis = SyncDistinct::requires_distinct(ctx, distinct.as_mut(), &v);
v.iterate(ctx, opt, txn, stm, self, dis).await?;
}
Ok(())
}
#[cfg(not(target_arch = "wasm32"))]
#[async_recursion]
async fn iterate(
&mut self,
ctx: &Context<'_>,
opt: &Options,
txn: &Transaction,
stm: &Statement<'_>,
) -> Result<(), Error> {
let opt = &opt.dive(4)?;
match stm.parallel() {
false => {
let mut distinct = SyncDistinct::new(ctx);
for v in mem::take(&mut self.entries) {
let dis = SyncDistinct::requires_distinct(ctx, distinct.as_mut(), &v);
v.iterate(ctx, opt, txn, stm, self, dis).await?;
}
Ok(())
}
true => {
let distinct = AsyncDistinct::new(ctx);
let e = executor::Executor::new();
let vals = mem::take(&mut self.entries);
let (end, exit) = channel::bounded::<()>(1);
let (chn, docs) = channel::bounded(crate::cnf::MAX_CONCURRENT_TASKS);
let adocs = async {
for v in vals {
let dis = AsyncDistinct::requires_distinct(ctx, distinct.as_ref(), &v);
e.spawn(v.channel(ctx, opt, txn, stm, chn.clone(), dis))
.detach();
}
drop(chn);
};
let (chn, vals) = channel::bounded(crate::cnf::MAX_CONCURRENT_TASKS);
let avals = async {
while let Ok(pro) = docs.recv().await {
e.spawn(Document::compute(ctx, opt, txn, stm, chn.clone(), pro))
.detach();
}
drop(chn);
};
let aproc = async {
while let Ok(r) = vals.recv().await {
self.result(r, stm);
}
let _ = end.send(()).await;
};
let fut = e.run(exit.recv());
let res = futures::join!(adocs, avals, aproc, fut);
let _ = res.3;
Ok(())
}
}
}
pub async fn process(
&mut self,
ctx: &Context<'_>,
opt: &Options,
txn: &Transaction,
stm: &Statement<'_>,
pro: Processed,
) {
let res = Document::process(ctx, opt, txn, stm, pro).await;
self.result(res, stm);
}
fn result(&mut self, res: Result<Value, Error>, stm: &Statement<'_>) {
match res {
Err(Error::Ignore) => {
return;
}
Err(e) => {
self.error = Some(e);
self.run.cancel();
return;
}
Ok(v) => self.results.push(v),
}
if stm.group().is_none() && stm.order().is_none() {
if let Some(l) = self.limit {
if let Some(s) = self.start {
if self.results.len() == l + s {
self.run.cancel()
}
} else if self.results.len() == l {
self.run.cancel()
}
}
}
}
}