use crate::ctx::Context;
use crate::dbs::Options;
use crate::dbs::{Force, Statement};
use crate::doc::{CursorDoc, Document};
use crate::err::Error;
use crate::idx::ft::FtIndex;
use crate::idx::trees::mtree::MTreeIndex;
use crate::idx::IndexKeyBase;
use crate::key;
#[cfg(not(target_family = "wasm"))]
use crate::kvs::ConsumeResult;
use crate::kvs::TransactionType;
use crate::sql::array::Array;
use crate::sql::index::{HnswParams, Index, MTreeParams, SearchParams};
use crate::sql::statements::DefineIndexStatement;
use crate::sql::{Part, Thing, Value};
use reblessive::tree::Stk;
impl Document {
pub(super) async fn store_index_data(
&self,
stk: &mut Stk,
ctx: &Context,
opt: &Options,
_stm: &Statement<'_>,
) -> Result<(), Error> {
let targeted_force = matches!(opt.force, Force::Index(_));
let ixs = match &opt.force {
Force::Index(ix)
if ix
.first()
.is_some_and(|ix| self.id.as_ref().is_some_and(|id| ix.what.0 == id.tb)) =>
{
ix.clone()
}
Force::All => self.ix(ctx, opt).await?,
_ if self.changed() => self.ix(ctx, opt).await?,
_ => return Ok(()),
};
if self.tb(ctx, opt).await?.drop {
return Ok(());
}
let rid = self.id()?;
for ix in ixs.iter() {
let o = Self::build_opt_values(stk, ctx, opt, ix, &self.initial).await?;
let n = Self::build_opt_values(stk, ctx, opt, ix, &self.current).await?;
if targeted_force || o != n {
Self::one_index(stk, ctx, opt, ix, o, n, &rid).await?;
}
}
Ok(())
}
async fn one_index(
stk: &mut Stk,
ctx: &Context,
opt: &Options,
ix: &DefineIndexStatement,
o: Option<Vec<Value>>,
n: Option<Vec<Value>>,
rid: &Thing,
) -> Result<(), Error> {
#[cfg(not(target_family = "wasm"))]
let (o, n) = if let Some(ib) = ctx.get_index_builder() {
match ib.consume(ctx, ix, o, n, rid).await? {
ConsumeResult::Enqueued => return Ok(()),
ConsumeResult::Ignored(o, n) => (o, n),
}
} else {
(o, n)
};
let mut ic = IndexOperation::new(opt, ix, o, n, rid);
match &ix.index {
Index::Uniq => ic.index_unique(ctx).await?,
Index::Idx => ic.index_non_unique(ctx).await?,
Index::Search(p) => ic.index_full_text(stk, ctx, p).await?,
Index::MTree(p) => ic.index_mtree(stk, ctx, p).await?,
Index::Hnsw(p) => ic.index_hnsw(ctx, p).await?,
}
Ok(())
}
pub(crate) async fn build_opt_values(
stk: &mut Stk,
ctx: &Context,
opt: &Options,
ix: &DefineIndexStatement,
doc: &CursorDoc,
) -> Result<Option<Vec<Value>>, Error> {
if !doc.doc.as_ref().is_some() {
return Ok(None);
}
let mut o = Vec::with_capacity(ix.cols.len());
for i in ix.cols.iter() {
let v = i.compute(stk, ctx, opt, Some(doc)).await?;
o.push(v);
}
Ok(Some(o))
}
}
struct Indexable(Vec<(Value, bool)>);
impl Indexable {
fn new(vals: Vec<Value>, ix: &DefineIndexStatement) -> Self {
let mut source = Vec::with_capacity(vals.len());
for (v, i) in vals.into_iter().zip(ix.cols.0.iter()) {
let f = matches!(i.0.last(), Some(&Part::Flatten));
source.push((v, f));
}
Self(source)
}
}
impl IntoIterator for Indexable {
type Item = Array;
type IntoIter = Combinator;
fn into_iter(self) -> Self::IntoIter {
Combinator::new(self.0)
}
}
struct Combinator {
iterators: Vec<Box<dyn ValuesIterator>>,
has_next: bool,
}
impl Combinator {
fn new(source: Vec<(Value, bool)>) -> Self {
let mut iterators: Vec<Box<dyn ValuesIterator>> = Vec::new();
for (v, f) in source {
if !f {
if let Value::Array(v) = v {
iterators.push(Box::new(MultiValuesIterator::new(v.0)));
continue;
}
}
iterators.push(Box::new(SingleValueIterator(v)));
}
Self {
iterators,
has_next: true,
}
}
}
impl Iterator for Combinator {
type Item = Array;
fn next(&mut self) -> Option<Self::Item> {
if !self.has_next {
return None;
}
let mut o = Vec::with_capacity(self.iterators.len());
self.has_next = false;
for i in &mut self.iterators {
o.push(i.current().clone());
if !self.has_next {
if i.next() {
self.has_next = true;
}
}
}
let o = Array::from(o);
Some(o)
}
}
trait ValuesIterator: Send {
fn next(&mut self) -> bool;
fn current(&self) -> &Value;
}
struct MultiValuesIterator {
vals: Vec<Value>,
done: bool,
current: usize,
end: usize,
}
impl MultiValuesIterator {
fn new(vals: Vec<Value>) -> Self {
let len = vals.len();
if len == 0 {
Self {
vals,
done: true,
current: 0,
end: 0,
}
} else {
Self {
vals,
done: false,
current: 0,
end: len - 1,
}
}
}
}
impl ValuesIterator for MultiValuesIterator {
fn next(&mut self) -> bool {
if self.done {
return false;
}
if self.current == self.end {
self.done = true;
return false;
}
self.current += 1;
true
}
fn current(&self) -> &Value {
self.vals.get(self.current).unwrap_or(&Value::Null)
}
}
struct SingleValueIterator(Value);
impl ValuesIterator for SingleValueIterator {
fn next(&mut self) -> bool {
false
}
fn current(&self) -> &Value {
&self.0
}
}
struct IndexOperation<'a> {
opt: &'a Options,
ix: &'a DefineIndexStatement,
o: Option<Vec<Value>>,
n: Option<Vec<Value>>,
rid: &'a Thing,
}
impl<'a> IndexOperation<'a> {
fn new(
opt: &'a Options,
ix: &'a DefineIndexStatement,
o: Option<Vec<Value>>,
n: Option<Vec<Value>>,
rid: &'a Thing,
) -> Self {
Self {
opt,
ix,
o,
n,
rid,
}
}
fn get_unique_index_key(&self, v: &'a Array) -> Result<key::index::Index, Error> {
Ok(crate::key::index::Index::new(
self.opt.ns()?,
self.opt.db()?,
&self.ix.what,
&self.ix.name,
v,
None,
))
}
fn get_non_unique_index_key(&self, v: &'a Array) -> Result<key::index::Index, Error> {
Ok(crate::key::index::Index::new(
self.opt.ns()?,
self.opt.db()?,
&self.ix.what,
&self.ix.name,
v,
Some(&self.rid.id),
))
}
async fn index_unique(&mut self, ctx: &Context) -> Result<(), Error> {
let txn = ctx.tx();
let mut txn = txn.lock().await;
if let Some(o) = self.o.take() {
let i = Indexable::new(o, self.ix);
for o in i {
let key = self.get_unique_index_key(&o)?;
match txn.delc(key, Some(self.rid)).await {
Err(Error::TxConditionNotMet) => Ok(()),
Err(e) => Err(e),
Ok(v) => Ok(v),
}?
}
}
if let Some(n) = self.n.take() {
let i = Indexable::new(n, self.ix);
for n in i {
if !n.is_all_none_or_null() {
let key = self.get_unique_index_key(&n)?;
if txn.putc(key, self.rid, None).await.is_err() {
let key = self.get_unique_index_key(&n)?;
let val = txn.get(key, None).await?.unwrap();
let rid: Thing = val.into();
return self.err_index_exists(rid, n);
}
}
}
}
Ok(())
}
async fn index_non_unique(&mut self, ctx: &Context) -> Result<(), Error> {
let txn = ctx.tx();
let mut txn = txn.lock().await;
if let Some(o) = self.o.take() {
let i = Indexable::new(o, self.ix);
for o in i {
let key = self.get_non_unique_index_key(&o)?;
match txn.delc(key, Some(self.rid)).await {
Err(Error::TxConditionNotMet) => Ok(()),
Err(e) => Err(e),
Ok(v) => Ok(v),
}?
}
}
if let Some(n) = self.n.take() {
let i = Indexable::new(n, self.ix);
for n in i {
let key = self.get_non_unique_index_key(&n)?;
if txn.putc(key, self.rid, None).await.is_err() {
let key = self.get_non_unique_index_key(&n)?;
let val = txn.get(key, None).await?.unwrap();
let rid: Thing = val.into();
return self.err_index_exists(rid, n);
}
}
}
Ok(())
}
fn err_index_exists(&self, rid: Thing, n: Array) -> Result<(), Error> {
Err(Error::IndexExists {
thing: rid,
index: self.ix.name.to_string(),
value: match n.len() {
1 => n.first().unwrap().to_string(),
_ => n.to_string(),
},
})
}
async fn index_full_text(
&mut self,
stk: &mut Stk,
ctx: &Context,
p: &SearchParams,
) -> Result<(), Error> {
let ikb = IndexKeyBase::new(self.opt.ns()?, self.opt.db()?, self.ix)?;
let mut ft = FtIndex::new(ctx, self.opt, &p.az, ikb, p, TransactionType::Write).await?;
if let Some(n) = self.n.take() {
ft.index_document(stk, ctx, self.opt, self.rid, n).await?;
} else {
ft.remove_document(ctx, self.rid).await?;
}
ft.finish(ctx).await
}
async fn index_mtree(
&mut self,
stk: &mut Stk,
ctx: &Context,
p: &MTreeParams,
) -> Result<(), Error> {
let txn = ctx.tx();
let ikb = IndexKeyBase::new(self.opt.ns()?, self.opt.db()?, self.ix)?;
let mut mt = MTreeIndex::new(&txn, ikb, p, TransactionType::Write).await?;
if let Some(o) = self.o.take() {
mt.remove_document(stk, &txn, self.rid, &o).await?;
}
if let Some(n) = self.n.take() {
mt.index_document(stk, &txn, self.rid, &n).await?;
}
mt.finish(&txn).await
}
async fn index_hnsw(&mut self, ctx: &Context, p: &HnswParams) -> Result<(), Error> {
let hnsw = ctx.get_index_stores().get_index_hnsw(ctx, self.opt, self.ix, p).await?;
let mut hnsw = hnsw.write().await;
if let Some(o) = self.o.take() {
hnsw.remove_document(&ctx.tx(), self.rid.id.clone(), &o).await?;
}
if let Some(n) = self.n.take() {
hnsw.index_document(&ctx.tx(), self.rid.id.clone(), &n).await?;
}
Ok(())
}
}