use crate::cnf::{INDEXING_BATCH_SIZE, NORMAL_FETCH_SIZE};
use crate::ctx::{Context, MutableContext};
use crate::dbs::Options;
use crate::doc::{CursorDoc, Document};
use crate::err::Error;
use crate::idx::index::IndexOperation;
use crate::key::index::ia::Ia;
use crate::key::index::ip::Ip;
use crate::key::thing;
use crate::kvs::ds::TransactionFactory;
use crate::kvs::LockType::Optimistic;
use crate::kvs::{Key, Transaction, TransactionType, Val};
use crate::sql::statements::DefineIndexStatement;
use crate::sql::{Id, Object, Thing, Value};
use dashmap::mapref::entry::Entry;
use dashmap::DashMap;
use derive::Store;
use reblessive::TreeStack;
use revision::revisioned;
use serde::{Deserialize, Serialize};
use std::ops::Range;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::task;
use tokio::task::JoinHandle;
use super::KeyDecode;
#[derive(Clone)]
pub(crate) enum BuildingStatus {
Started,
InitialIndexing(usize),
UpdatesIndexing(usize),
Error(Arc<Error>),
Built,
}
pub(crate) enum ConsumeResult {
Enqueued,
Ignored(Option<Vec<Value>>, Option<Vec<Value>>),
}
impl BuildingStatus {
fn is_error(&self) -> bool {
matches!(self, Self::Error(_))
}
fn is_built(&self) -> bool {
matches!(self, Self::Built)
}
}
impl From<BuildingStatus> for Value {
fn from(st: BuildingStatus) -> Self {
let mut o = Object::default();
let s = match st {
BuildingStatus::Started => "started",
BuildingStatus::InitialIndexing(count) => {
o.insert("count".to_string(), count.into());
"initial"
}
BuildingStatus::UpdatesIndexing(count) => {
o.insert("count".to_string(), count.into());
"updates"
}
BuildingStatus::Error(error) => {
o.insert("error".to_string(), error.to_string().into());
"error"
}
BuildingStatus::Built => "built",
};
o.insert("status".to_string(), s.into());
o.into()
}
}
type IndexBuilding = (Arc<Building>, JoinHandle<()>);
#[derive(Clone)]
pub(crate) struct IndexBuilder {
tf: TransactionFactory,
indexes: Arc<DashMap<Arc<DefineIndexStatement>, IndexBuilding>>,
}
impl IndexBuilder {
pub(super) fn new(tf: TransactionFactory) -> Self {
Self {
tf,
indexes: Default::default(),
}
}
pub(crate) fn build(
&self,
ctx: &Context,
opt: Options,
ix: Arc<DefineIndexStatement>,
) -> Result<(), Error> {
match self.indexes.entry(ix) {
Entry::Occupied(e) => {
if !e.get().1.is_finished() {
return Err(Error::IndexAlreadyBuilding {
index: e.key().name.to_string(),
});
}
}
Entry::Vacant(e) => {
let building = Arc::new(Building::new(ctx, self.tf.clone(), opt, e.key().clone())?);
let b = building.clone();
let jh = task::spawn(async move {
if let Err(err) = b.compute().await {
b.set_status(BuildingStatus::Error(err.into())).await;
}
});
e.insert((building, jh));
}
}
Ok(())
}
pub(crate) async fn consume(
&self,
ctx: &Context,
ix: &DefineIndexStatement,
old_values: Option<Vec<Value>>,
new_values: Option<Vec<Value>>,
rid: &Thing,
) -> Result<ConsumeResult, Error> {
if let Some(r) = self.indexes.get(ix) {
let (b, _) = r.value();
return b.maybe_consume(ctx, old_values, new_values, rid).await;
}
Ok(ConsumeResult::Ignored(old_values, new_values))
}
pub(crate) async fn get_status(&self, ix: &DefineIndexStatement) -> Option<BuildingStatus> {
if let Some(a) = self.indexes.get(ix) {
Some(a.value().0.status.lock().await.clone())
} else {
None
}
}
}
#[revisioned(revision = 1)]
#[derive(Serialize, Deserialize, Store, Debug)]
#[non_exhaustive]
struct Appending {
old_values: Option<Vec<Value>>,
new_values: Option<Vec<Value>>,
id: Id,
}
#[revisioned(revision = 1)]
#[derive(Serialize, Deserialize, Store, Debug)]
#[non_exhaustive]
struct PrimaryAppending(u32);
#[derive(Default)]
struct QueueSequences {
to_index: u32,
next: u32,
}
impl QueueSequences {
fn is_empty(&self) -> bool {
self.to_index == self.next
}
fn add_update(&mut self) -> u32 {
let i = self.next;
self.next += 1;
i
}
fn clear(&mut self) {
self.to_index = 0;
self.next = 0;
}
fn set_to_index(&mut self, i: u32) {
self.to_index = i;
}
fn next_indexing_batch(&self, page: u32) -> Range<u32> {
let s = self.to_index;
let e = (s + page).min(self.next);
s..e
}
}
struct Building {
ctx: Context,
opt: Options,
tf: TransactionFactory,
ix: Arc<DefineIndexStatement>,
tb: String,
status: Arc<Mutex<BuildingStatus>>,
queue: Arc<Mutex<QueueSequences>>,
}
impl Building {
fn new(
ctx: &Context,
tf: TransactionFactory,
opt: Options,
ix: Arc<DefineIndexStatement>,
) -> Result<Self, Error> {
Ok(Self {
ctx: MutableContext::new_concurrent(ctx).freeze(),
opt,
tf,
tb: ix.what.to_string(),
ix,
status: Arc::new(Mutex::new(BuildingStatus::Started)),
queue: Default::default(),
})
}
async fn set_status(&self, status: BuildingStatus) {
let mut s = self.status.lock().await;
if !s.is_error() {
*s = status;
}
}
async fn maybe_consume(
&self,
ctx: &Context,
old_values: Option<Vec<Value>>,
new_values: Option<Vec<Value>>,
rid: &Thing,
) -> Result<ConsumeResult, Error> {
let mut queue = self.queue.lock().await;
if queue.is_empty() {
if self.status.lock().await.is_built() {
return Ok(ConsumeResult::Ignored(old_values, new_values));
}
}
let tx = ctx.tx();
let a = Appending {
old_values,
new_values,
id: rid.id.clone(),
};
let idx = queue.add_update();
let ia = self.new_ia_key(idx)?;
tx.set(ia, a, None).await?;
let ip = self.new_ip_key(rid.id.clone())?;
if tx.get(ip.clone(), None).await?.is_none() {
tx.set(ip, PrimaryAppending(idx), None).await?;
}
Ok(ConsumeResult::Enqueued)
}
fn new_ia_key(&self, i: u32) -> Result<Ia, Error> {
let ns = self.opt.ns()?;
let db = self.opt.db()?;
Ok(Ia::new(ns, db, &self.ix.what, &self.ix.name, i))
}
fn new_ip_key(&self, id: Id) -> Result<Ip, Error> {
let ns = self.opt.ns()?;
let db = self.opt.db()?;
Ok(Ip::new(ns, db, &self.ix.what, &self.ix.name, id))
}
async fn new_read_tx(&self) -> Result<Transaction, Error> {
self.tf.transaction(TransactionType::Read, Optimistic).await
}
async fn new_write_tx_ctx(&self) -> Result<Context, Error> {
let tx = self.tf.transaction(TransactionType::Write, Optimistic).await?.into();
let mut ctx = MutableContext::new(&self.ctx);
ctx.set_transaction(tx);
Ok(ctx.freeze())
}
async fn compute(&self) -> Result<(), Error> {
self.set_status(BuildingStatus::InitialIndexing(0)).await;
let ns = self.opt.ns()?;
let db = self.opt.db()?;
let beg = thing::prefix(ns, db, &self.tb)?;
let end = thing::suffix(ns, db, &self.tb)?;
let mut next = Some(beg..end);
let mut count = 0;
while let Some(rng) = next {
let tx = self.new_read_tx().await?;
let batch = catch!(tx, tx.batch_keys_vals(rng, *INDEXING_BATCH_SIZE, None).await);
drop(tx);
next = batch.next;
if batch.result.is_empty() {
break;
}
let ctx = self.new_write_tx_ctx().await?;
let tx = ctx.tx();
catch!(tx, self.index_initial_batch(&ctx, &tx, batch.result, &mut count).await);
tx.commit().await?;
}
self.set_status(BuildingStatus::UpdatesIndexing(0)).await;
loop {
let mut queue = self.queue.lock().await;
if queue.is_empty() {
self.set_status(BuildingStatus::Built).await;
queue.clear();
break;
}
let range = queue.next_indexing_batch(*NORMAL_FETCH_SIZE);
if range.is_empty() {
continue;
}
let next_to_index = range.end;
let ctx = self.new_write_tx_ctx().await?;
let tx = ctx.tx();
catch!(tx, self.index_appending_range(&ctx, &tx, range, &mut count).await);
tx.commit().await?;
queue.set_to_index(next_to_index);
}
Ok(())
}
async fn index_initial_batch(
&self,
ctx: &Context,
tx: &Transaction,
values: Vec<(Key, Val)>,
count: &mut usize,
) -> Result<(), Error> {
let mut stack = TreeStack::new();
for (k, v) in values.into_iter() {
let key = thing::Thing::decode(&k)?;
let val: Value = (&v).into();
let rid: Arc<Thing> = Thing::from((key.tb, key.id)).into();
let opt_values;
let ip = self.new_ip_key(rid.id.clone())?;
if let Some(v) = tx.get(ip, None).await? {
let pa: PrimaryAppending = v.into();
let ia = self.new_ia_key(pa.0)?;
let v = tx
.get(ia, None)
.await?
.ok_or_else(|| Error::CorruptedIndex("Appending record is missing"))?;
let a: Appending = v.into();
opt_values = a.old_values;
} else {
let doc = CursorDoc::new(Some(rid.clone()), None, val);
opt_values = stack
.enter(|stk| Document::build_opt_values(stk, ctx, &self.opt, &self.ix, &doc))
.finish()
.await?;
}
let mut io =
IndexOperation::new(ctx, &self.opt, &self.ix, None, opt_values.clone(), &rid);
stack.enter(|stk| io.compute(stk)).finish().await?;
*count += 1;
self.set_status(BuildingStatus::InitialIndexing(*count)).await;
}
Ok(())
}
async fn index_appending_range(
&self,
ctx: &Context,
tx: &Transaction,
range: Range<u32>,
count: &mut usize,
) -> Result<(), Error> {
let mut stack = TreeStack::new();
for i in range {
let ia = self.new_ia_key(i)?;
if let Some(v) = tx.get(ia.clone(), None).await? {
tx.del(ia).await?;
let a: Appending = v.into();
let rid = Thing::from((self.tb.clone(), a.id));
let mut io =
IndexOperation::new(ctx, &self.opt, &self.ix, a.old_values, a.new_values, &rid);
stack.enter(|stk| io.compute(stk)).finish().await?;
let ip = self.new_ip_key(rid.id)?;
tx.del(ip).await?;
*count += 1;
self.set_status(BuildingStatus::UpdatesIndexing(*count)).await;
}
}
Ok(())
}
}