use crate::ctx::Context;
use crate::dbs::response::Response;
use crate::dbs::Notification;
use crate::dbs::Options;
use crate::dbs::QueryType;
use crate::dbs::Transaction;
use crate::err::Error;
use crate::iam::Action;
use crate::iam::ResourceKind;
use crate::kvs::TransactionType;
use crate::kvs::{Datastore, LockType::*, TransactionType::*};
use crate::sql::paths::DB;
use crate::sql::paths::NS;
use crate::sql::query::Query;
use crate::sql::statement::Statement;
use crate::sql::value::Value;
use crate::sql::Base;
use channel::Receiver;
use futures::lock::Mutex;
use futures::StreamExt;
use std::sync::Arc;
#[cfg(not(target_arch = "wasm32"))]
use tokio::spawn;
use tracing::instrument;
use trice::Instant;
#[cfg(target_arch = "wasm32")]
use wasm_bindgen_futures::spawn_local as spawn;
pub(crate) struct Executor<'a> {
err: bool,
kvs: &'a Datastore,
txn: Option<Transaction>,
}
impl<'a> Executor<'a> {
pub fn new(kvs: &'a Datastore) -> Executor<'a> {
Executor {
kvs,
txn: None,
err: false,
}
}
fn txn(&self) -> Transaction {
self.txn.clone().expect("unreachable: txn was None after successful begin")
}
async fn begin(&mut self, write: TransactionType) -> bool {
match self.txn.as_ref() {
Some(_) => false,
None => match self.kvs.transaction(write, Optimistic).await {
Ok(v) => {
self.txn = Some(Arc::new(Mutex::new(v)));
true
}
Err(_) => {
self.err = true;
false
}
},
}
}
async fn commit(&mut self, local: bool) -> Result<(), Error> {
if local {
if let Some(txn) = self.txn.take() {
let mut txn = txn.lock().await;
if self.err {
let _ = txn.cancel().await;
} else {
let r = match txn.complete_changes(false).await {
Ok(_) => txn.commit().await,
r => r,
};
if let Err(e) = r {
self.err = true;
return Err(e);
}
}
}
}
Ok(())
}
async fn cancel(&mut self, local: bool) {
if local {
if let Some(txn) = self.txn.take() {
let mut txn = txn.lock().await;
if txn.cancel().await.is_err() {
self.err = true;
}
}
}
}
fn buf_cancel(&self, v: Response) -> Response {
Response {
time: v.time,
result: Err(Error::QueryCancelled),
query_type: QueryType::Other,
}
}
fn buf_commit(&self, v: Response, commit_error: &Option<Error>) -> Response {
match &self.err {
true => Response {
time: v.time,
result: match v.result {
Ok(_) => Err(commit_error
.as_ref()
.map(|e| Error::QueryNotExecutedDetail {
message: e.to_string(),
})
.unwrap_or(Error::QueryNotExecuted)),
Err(e) => Err(e),
},
query_type: QueryType::Other,
},
_ => v,
}
}
async fn clear(&self, _: &Context<'_>, mut rcv: Receiver<Notification>) {
spawn(async move {
while rcv.next().await.is_some() {
}
});
}
async fn flush(&self, ctx: &Context<'_>, mut rcv: Receiver<Notification>) {
let sender = ctx.notifications();
spawn(async move {
while let Some(notification) = rcv.next().await {
if let Some(chn) = &sender {
if chn.send(notification).await.is_err() {
break;
}
}
}
});
}
async fn set_ns(&self, ctx: &mut Context<'_>, opt: &mut Options, ns: &str) {
let mut session = ctx.value("session").unwrap_or(&Value::None).clone();
session.put(NS.as_ref(), ns.to_owned().into());
ctx.add_value("session", session);
opt.set_ns(Some(ns.into()));
}
async fn set_db(&self, ctx: &mut Context<'_>, opt: &mut Options, db: &str) {
let mut session = ctx.value("session").unwrap_or(&Value::None).clone();
session.put(DB.as_ref(), db.to_owned().into());
ctx.add_value("session", session);
opt.set_db(Some(db.into()));
}
#[instrument(level = "debug", name = "executor", skip_all)]
pub async fn execute(
&mut self,
mut ctx: Context<'_>,
opt: Options,
qry: Query,
) -> Result<Vec<Response>, Error> {
let (send, recv) = channel::unbounded();
let mut opt = opt.new_with_sender(send);
let mut buf: Vec<Response> = vec![];
let mut out: Vec<Response> = vec![];
for stm in qry.into_iter() {
debug!("Executing: {}", stm);
if self.txn.is_none() {
self.err = false;
}
let now = Instant::now();
let is_stm_live = matches!(stm, Statement::Live(_));
let is_stm_kill = matches!(stm, Statement::Kill(_));
let is_stm_output = matches!(stm, Statement::Output(_));
let res = match stm {
Statement::Option(mut stm) => {
opt.is_allowed(Action::Edit, ResourceKind::Option, &Base::Db)?;
stm.name.0.make_ascii_uppercase();
opt = match stm.name.0.as_str() {
"FIELDS" => opt.with_fields(stm.what),
"EVENTS" => opt.with_events(stm.what),
"TABLES" => opt.with_tables(stm.what),
"IMPORT" => opt.with_import(stm.what),
"FORCE" => opt.with_force(stm.what),
_ => break,
};
continue;
}
Statement::Begin(_) => {
self.begin(Write).await;
continue;
}
Statement::Cancel(_) => {
self.cancel(true).await;
self.clear(&ctx, recv.clone()).await;
buf = buf.into_iter().map(|v| self.buf_cancel(v)).collect();
out.append(&mut buf);
debug_assert!(self.txn.is_none(), "cancel(true) should have unset txn");
self.txn = None;
continue;
}
Statement::Commit(_) => {
let commit_error = self.commit(true).await.err();
buf = buf.into_iter().map(|v| self.buf_commit(v, &commit_error)).collect();
self.flush(&ctx, recv.clone()).await;
out.append(&mut buf);
debug_assert!(self.txn.is_none(), "commit(true) should have unset txn");
self.txn = None;
continue;
}
Statement::Use(stm) => {
if let Some(ref ns) = stm.ns {
self.set_ns(&mut ctx, &mut opt, ns).await;
}
if let Some(ref db) = stm.db {
self.set_db(&mut ctx, &mut opt, db).await;
}
Ok(Value::None)
}
Statement::Set(stm) => {
let loc = self.begin(stm.writeable().into()).await;
match self.err {
true => Err(Error::TxFailure),
false => {
match stm.compute(&ctx, &opt, &self.txn(), None).await {
Ok(val) => {
let writeable = stm.writeable();
ctx.add_value(stm.name, val);
if writeable {
match self.commit(loc).await {
Err(e) => {
self.clear(&ctx, recv.clone()).await;
Err(Error::QueryNotExecutedDetail {
message: e.to_string(),
})
}
Ok(_) => {
self.flush(&ctx, recv.clone()).await;
Ok(Value::None)
}
}
} else {
self.cancel(loc).await;
self.clear(&ctx, recv.clone()).await;
Ok(Value::None)
}
}
Err(err) => {
self.cancel(loc).await;
Err(err)
}
}
}
}
}
_ => match self.err {
true => Err(Error::QueryNotExecuted),
false => {
let loc = self.begin(stm.writeable().into()).await;
match self.err {
true => Err(Error::TxFailure),
false => {
let mut ctx = Context::new(&ctx);
let res = match stm.timeout() {
Some(timeout) => {
ctx.add_timeout(timeout);
let res = stm.compute(&ctx, &opt, &self.txn(), None).await;
match ctx.is_timedout() {
true => Err(Error::QueryTimedout),
false => res,
}
}
None => stm.compute(&ctx, &opt, &self.txn(), None).await,
};
let res = match ctx.is_timedout() {
true => Err(Error::QueryTimedout),
false => res,
};
if res.is_ok() && stm.writeable() {
if let Err(e) = self.commit(loc).await {
self.clear(&ctx, recv.clone()).await;
Err(Error::QueryNotExecutedDetail {
message: e.to_string(),
})
} else {
self.flush(&ctx, recv.clone()).await;
res
}
} else {
self.cancel(loc).await;
self.clear(&ctx, recv.clone()).await;
res
}
}
}
}
},
};
let res = Response {
time: now.elapsed(),
result: res.map_err(|e| {
self.err = true;
e
}),
query_type: match (is_stm_live, is_stm_kill) {
(true, _) => QueryType::Live,
(_, true) => QueryType::Kill,
_ => QueryType::Other,
},
};
if self.txn.is_some() {
if is_stm_output {
buf.clear();
}
buf.push(res);
} else {
out.push(res)
}
}
Ok(out)
}
}
#[cfg(test)]
mod tests {
use crate::{dbs::Session, iam::Role, kvs::Datastore};
#[tokio::test]
async fn check_execute_option_permissions() {
let tests = vec![
(Session::for_level(().into(), Role::Owner).with_ns("NS").with_db("DB"), true, "owner at root level should be able to set options"),
(Session::for_level(().into(), Role::Editor).with_ns("NS").with_db("DB"), true, "editor at root level should be able to set options"),
(Session::for_level(().into(), Role::Viewer).with_ns("NS").with_db("DB"), false, "viewer at root level should not be able to set options"),
(Session::for_level(("NS",).into(), Role::Owner).with_ns("NS").with_db("DB"), true, "owner at namespace level should be able to set options on its namespace"),
(Session::for_level(("NS",).into(), Role::Owner).with_ns("OTHER_NS").with_db("DB"), false, "owner at namespace level should not be able to set options on another namespace"),
(Session::for_level(("NS",).into(), Role::Editor).with_ns("NS").with_db("DB"), true, "editor at namespace level should be able to set options on its namespace"),
(Session::for_level(("NS",).into(), Role::Editor).with_ns("OTHER_NS").with_db("DB"), false, "editor at namespace level should not be able to set options on another namespace"),
(Session::for_level(("NS",).into(), Role::Viewer).with_ns("NS").with_db("DB"), false, "viewer at namespace level should not be able to set options on its namespace"),
(Session::for_level(("NS", "DB").into(), Role::Owner).with_ns("NS").with_db("DB"), true, "owner at database level should be able to set options on its database"),
(Session::for_level(("NS", "DB").into(), Role::Owner).with_ns("NS").with_db("OTHER_DB"), false, "owner at database level should not be able to set options on another database"),
(Session::for_level(("NS", "DB").into(), Role::Owner).with_ns("OTHER_NS").with_db("DB"), false, "owner at database level should not be able to set options on another namespace even if the database name matches"),
(Session::for_level(("NS", "DB").into(), Role::Editor).with_ns("NS").with_db("DB"), true, "editor at database level should be able to set options on its database"),
(Session::for_level(("NS", "DB").into(), Role::Editor).with_ns("NS").with_db("OTHER_DB"), false, "editor at database level should not be able to set options on another database"),
(Session::for_level(("NS", "DB").into(), Role::Editor).with_ns("OTHER_NS").with_db("DB"), false, "editor at database level should not be able to set options on another namespace even if the database name matches"),
(Session::for_level(("NS", "DB").into(), Role::Viewer).with_ns("NS").with_db("DB"), false, "viewer at database level should not be able to set options on its database"),
];
let statement = "OPTION FIELDS = false";
for test in tests.iter() {
let (session, should_succeed, msg) = test;
{
let ds = Datastore::new("memory").await.unwrap().with_auth_enabled(true);
let res = ds.execute(statement, session, None).await;
if *should_succeed {
assert!(res.is_ok(), "{}: {:?}", msg, res);
} else {
let err = res.unwrap_err().to_string();
assert!(
err.contains("Not enough permissions to perform this action"),
"{}: {}",
msg,
err
)
}
}
}
{
let ds = Datastore::new("memory").await.unwrap().with_auth_enabled(true);
let res =
ds.execute(statement, &Session::default().with_ns("NS").with_db("DB"), None).await;
let err = res.unwrap_err().to_string();
assert!(
err.contains("Not enough permissions to perform this action"),
"anonymous user should not be able to set options: {}",
err
)
}
{
let ds = Datastore::new("memory").await.unwrap().with_auth_enabled(false);
let res =
ds.execute(statement, &Session::default().with_ns("NS").with_db("DB"), None).await;
assert!(
res.is_ok(),
"anonymous user should be able to set options when auth is disabled: {:?}",
res
)
}
}
}