1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
use crate::ctx::Context;
use crate::dbs::response::Response;
use crate::dbs::Notification;
use crate::dbs::Options;
use crate::dbs::QueryType;
use crate::dbs::Transaction;
use crate::err::Error;
use crate::iam::Action;
use crate::iam::ResourceKind;
use crate::kvs::TransactionType;
use crate::kvs::{Datastore, LockType::*, TransactionType::*};
use crate::sql::paths::DB;
use crate::sql::paths::NS;
use crate::sql::query::Query;
use crate::sql::statement::Statement;
use crate::sql::value::Value;
use crate::sql::Base;
use channel::Receiver;
use futures::lock::Mutex;
use futures::StreamExt;
use std::sync::Arc;
#[cfg(not(target_arch = "wasm32"))]
use tokio::spawn;
use tracing::instrument;
use trice::Instant;
#[cfg(target_arch = "wasm32")]
use wasm_bindgen_futures::spawn_local as spawn;

pub(crate) struct Executor<'a> {
	err: bool,
	kvs: &'a Datastore,
	txn: Option<Transaction>,
}

impl<'a> Executor<'a> {
	pub fn new(kvs: &'a Datastore) -> Executor<'a> {
		Executor {
			kvs,
			txn: None,
			err: false,
		}
	}

	fn txn(&self) -> Transaction {
		self.txn.clone().expect("unreachable: txn was None after successful begin")
	}

	/// # Return
	/// - true if a new transaction has begun
	/// - false if
	///   - couldn't create transaction (sets err flag)
	///   - a transaction has already begun
	async fn begin(&mut self, write: TransactionType) -> bool {
		match self.txn.as_ref() {
			Some(_) => false,
			None => match self.kvs.transaction(write, Optimistic).await {
				Ok(v) => {
					self.txn = Some(Arc::new(Mutex::new(v)));
					true
				}
				Err(_) => {
					self.err = true;
					false
				}
			},
		}
	}

	/// Commits the transaction if it is local.
	///
	/// # Return
	///
	/// An `Err` if the transaction could not be committed;
	/// otherwise returns `Ok`.
	async fn commit(&mut self, local: bool) -> Result<(), Error> {
		if local {
			// Extract the transaction
			if let Some(txn) = self.txn.take() {
				let mut txn = txn.lock().await;
				if self.err {
					// Cancel and ignore any error because the error flag was
					// already set
					let _ = txn.cancel().await;
				} else {
					let r = match txn.complete_changes(false).await {
						Ok(_) => txn.commit().await,
						r => r,
					};
					if let Err(e) = r {
						// Transaction failed to commit
						//
						// TODO: Not all commit errors definitively mean
						// the transaction didn't commit. Detect that and tell
						// the user.
						self.err = true;
						return Err(e);
					}
				}
			}
		}
		Ok(())
	}

	async fn cancel(&mut self, local: bool) {
		if local {
			// Extract the transaction
			if let Some(txn) = self.txn.take() {
				let mut txn = txn.lock().await;
				if txn.cancel().await.is_err() {
					self.err = true;
				}
			}
		}
	}

	fn buf_cancel(&self, v: Response) -> Response {
		Response {
			time: v.time,
			result: Err(Error::QueryCancelled),
			query_type: QueryType::Other,
		}
	}

	fn buf_commit(&self, v: Response, commit_error: &Option<Error>) -> Response {
		match &self.err {
			true => Response {
				time: v.time,
				result: match v.result {
					Ok(_) => Err(commit_error
						.as_ref()
						.map(|e| Error::QueryNotExecutedDetail {
							message: e.to_string(),
						})
						.unwrap_or(Error::QueryNotExecuted)),
					Err(e) => Err(e),
				},
				query_type: QueryType::Other,
			},
			_ => v,
		}
	}

	/// Consume the live query notifications
	async fn clear(&self, _: &Context<'_>, mut rcv: Receiver<Notification>) {
		spawn(async move {
			while rcv.next().await.is_some() {
				// Ignore notification
			}
		});
	}

	/// Flush notifications from a buffer channel (live queries) to the committed notification channel.
	/// This is because we don't want to broadcast notifications to the user for failed transactions.
	async fn flush(&self, ctx: &Context<'_>, mut rcv: Receiver<Notification>) {
		let sender = ctx.notifications();
		spawn(async move {
			while let Some(notification) = rcv.next().await {
				if let Some(chn) = &sender {
					if chn.send(notification).await.is_err() {
						break;
					}
				}
			}
		});
	}

	async fn set_ns(&self, ctx: &mut Context<'_>, opt: &mut Options, ns: &str) {
		let mut session = ctx.value("session").unwrap_or(&Value::None).clone();
		session.put(NS.as_ref(), ns.to_owned().into());
		ctx.add_value("session", session);
		opt.set_ns(Some(ns.into()));
	}

	async fn set_db(&self, ctx: &mut Context<'_>, opt: &mut Options, db: &str) {
		let mut session = ctx.value("session").unwrap_or(&Value::None).clone();
		session.put(DB.as_ref(), db.to_owned().into());
		ctx.add_value("session", session);
		opt.set_db(Some(db.into()));
	}

	#[instrument(level = "debug", name = "executor", skip_all)]
	pub async fn execute(
		&mut self,
		mut ctx: Context<'_>,
		opt: Options,
		qry: Query,
	) -> Result<Vec<Response>, Error> {
		// Create a notification channel
		let (send, recv) = channel::unbounded();
		// Set the notification channel
		let mut opt = opt.new_with_sender(send);
		// Initialise buffer of responses
		let mut buf: Vec<Response> = vec![];
		// Initialise array of responses
		let mut out: Vec<Response> = vec![];
		// Process all statements in query
		for stm in qry.into_iter() {
			// Log the statement
			debug!("Executing: {}", stm);
			// Reset errors
			if self.txn.is_none() {
				self.err = false;
			}
			// Get the statement start time
			let now = Instant::now();
			// Check if this is a LIVE statement
			let is_stm_live = matches!(stm, Statement::Live(_));
			// Check if this is a KILL statement
			let is_stm_kill = matches!(stm, Statement::Kill(_));
			// Check if this is a RETURN statement
			let is_stm_output = matches!(stm, Statement::Output(_));
			// Process a single statement
			let res = match stm {
				// Specify runtime options
				Statement::Option(mut stm) => {
					// Allowed to run?
					opt.is_allowed(Action::Edit, ResourceKind::Option, &Base::Db)?;
					// Convert to uppercase
					stm.name.0.make_ascii_uppercase();
					// Process the option
					opt = match stm.name.0.as_str() {
						"FIELDS" => opt.with_fields(stm.what),
						"EVENTS" => opt.with_events(stm.what),
						"TABLES" => opt.with_tables(stm.what),
						"IMPORT" => opt.with_import(stm.what),
						"FORCE" => opt.with_force(stm.what),
						_ => break,
					};
					// Continue
					continue;
				}
				// Begin a new transaction
				Statement::Begin(_) => {
					self.begin(Write).await;
					continue;
				}
				// Cancel a running transaction
				Statement::Cancel(_) => {
					self.cancel(true).await;
					self.clear(&ctx, recv.clone()).await;
					buf = buf.into_iter().map(|v| self.buf_cancel(v)).collect();
					out.append(&mut buf);
					debug_assert!(self.txn.is_none(), "cancel(true) should have unset txn");
					self.txn = None;
					continue;
				}
				// Commit a running transaction
				Statement::Commit(_) => {
					let commit_error = self.commit(true).await.err();
					buf = buf.into_iter().map(|v| self.buf_commit(v, &commit_error)).collect();
					self.flush(&ctx, recv.clone()).await;
					out.append(&mut buf);
					debug_assert!(self.txn.is_none(), "commit(true) should have unset txn");
					self.txn = None;
					continue;
				}
				// Switch to a different NS or DB
				Statement::Use(stm) => {
					if let Some(ref ns) = stm.ns {
						self.set_ns(&mut ctx, &mut opt, ns).await;
					}
					if let Some(ref db) = stm.db {
						self.set_db(&mut ctx, &mut opt, db).await;
					}
					Ok(Value::None)
				}
				// Process param definition statements
				Statement::Set(stm) => {
					// Create a transaction
					let loc = self.begin(stm.writeable().into()).await;
					// Check the transaction
					match self.err {
						// We failed to create a transaction
						true => Err(Error::TxFailure),
						// The transaction began successfully
						false => {
							// Check the statement
							match stm.compute(&ctx, &opt, &self.txn(), None).await {
								Ok(val) => {
									// Check if writeable
									let writeable = stm.writeable();
									// Set the parameter
									ctx.add_value(stm.name, val);
									// Finalise transaction, returning nothing unless it couldn't commit
									if writeable {
										match self.commit(loc).await {
											Err(e) => {
												// Clear live query notifications
												self.clear(&ctx, recv.clone()).await;
												Err(Error::QueryNotExecutedDetail {
													message: e.to_string(),
												})
											}
											Ok(_) => {
												// Flush live query notifications
												self.flush(&ctx, recv.clone()).await;
												Ok(Value::None)
											}
										}
									} else {
										self.cancel(loc).await;
										self.clear(&ctx, recv.clone()).await;
										Ok(Value::None)
									}
								}
								Err(err) => {
									// Cancel transaction
									self.cancel(loc).await;
									// Return error
									Err(err)
								}
							}
						}
					}
				}
				// Process all other normal statements
				_ => match self.err {
					// This transaction has failed
					true => Err(Error::QueryNotExecuted),
					// Compute the statement normally
					false => {
						// Create a transaction
						let loc = self.begin(stm.writeable().into()).await;
						// Check the transaction
						match self.err {
							// We failed to create a transaction
							true => Err(Error::TxFailure),
							// The transaction began successfully
							false => {
								let mut ctx = Context::new(&ctx);
								// Process the statement
								let res = match stm.timeout() {
									// There is a timeout clause
									Some(timeout) => {
										// Set statement timeout
										ctx.add_timeout(timeout);
										// Process the statement
										let res = stm.compute(&ctx, &opt, &self.txn(), None).await;
										// Catch statement timeout
										match ctx.is_timedout() {
											true => Err(Error::QueryTimedout),
											false => res,
										}
									}
									// There is no timeout clause
									None => stm.compute(&ctx, &opt, &self.txn(), None).await,
								};
								// Catch global timeout
								let res = match ctx.is_timedout() {
									true => Err(Error::QueryTimedout),
									false => res,
								};
								// Finalise transaction and return the result.
								if res.is_ok() && stm.writeable() {
									if let Err(e) = self.commit(loc).await {
										// Clear live query notification details
										self.clear(&ctx, recv.clone()).await;
										// The commit failed
										Err(Error::QueryNotExecutedDetail {
											message: e.to_string(),
										})
									} else {
										// Flush the live query change notifications
										self.flush(&ctx, recv.clone()).await;
										// Successful, committed result
										res
									}
								} else {
									self.cancel(loc).await;
									// Clear live query notification details
									self.clear(&ctx, recv.clone()).await;
									// Return an error
									res
								}
							}
						}
					}
				},
			};
			// Produce the response
			let res = Response {
				// Get the statement end time
				time: now.elapsed(),
				// TODO: Replace with `inspect_err` once stable.
				result: res.map_err(|e| {
					// Mark the error.
					self.err = true;
					e
				}),
				query_type: match (is_stm_live, is_stm_kill) {
					(true, _) => QueryType::Live,
					(_, true) => QueryType::Kill,
					_ => QueryType::Other,
				},
			};
			// Output the response
			if self.txn.is_some() {
				if is_stm_output {
					buf.clear();
				}
				buf.push(res);
			} else {
				out.push(res)
			}
		}
		// Return responses
		Ok(out)
	}
}

#[cfg(test)]
mod tests {
	use crate::{dbs::Session, iam::Role, kvs::Datastore};

	#[tokio::test]
	async fn check_execute_option_permissions() {
		let tests = vec![
			// Root level
			(Session::for_level(().into(), Role::Owner).with_ns("NS").with_db("DB"), true, "owner at root level should be able to set options"),
			(Session::for_level(().into(), Role::Editor).with_ns("NS").with_db("DB"), true, "editor at root level should be able to set options"),
			(Session::for_level(().into(), Role::Viewer).with_ns("NS").with_db("DB"), false, "viewer at root level should not be able to set options"),

			// Namespace level
			(Session::for_level(("NS",).into(), Role::Owner).with_ns("NS").with_db("DB"), true, "owner at namespace level should be able to set options on its namespace"),
			(Session::for_level(("NS",).into(), Role::Owner).with_ns("OTHER_NS").with_db("DB"), false, "owner at namespace level should not be able to set options on another namespace"),
			(Session::for_level(("NS",).into(), Role::Editor).with_ns("NS").with_db("DB"), true, "editor at namespace level should be able to set options on its namespace"),
			(Session::for_level(("NS",).into(), Role::Editor).with_ns("OTHER_NS").with_db("DB"), false, "editor at namespace level should not be able to set options on another namespace"),
			(Session::for_level(("NS",).into(), Role::Viewer).with_ns("NS").with_db("DB"), false, "viewer at namespace level should not be able to set options on its namespace"),

			// Database level
			(Session::for_level(("NS", "DB").into(), Role::Owner).with_ns("NS").with_db("DB"), true, "owner at database level should be able to set options on its database"),
			(Session::for_level(("NS", "DB").into(), Role::Owner).with_ns("NS").with_db("OTHER_DB"), false, "owner at database level should not be able to set options on another database"),
			(Session::for_level(("NS", "DB").into(), Role::Owner).with_ns("OTHER_NS").with_db("DB"), false, "owner at database level should not be able to set options on another namespace even if the database name matches"),
			(Session::for_level(("NS", "DB").into(), Role::Editor).with_ns("NS").with_db("DB"), true, "editor at database level should be able to set options on its database"),
			(Session::for_level(("NS", "DB").into(), Role::Editor).with_ns("NS").with_db("OTHER_DB"), false, "editor at database level should not be able to set options on another database"),
			(Session::for_level(("NS", "DB").into(), Role::Editor).with_ns("OTHER_NS").with_db("DB"), false, "editor at database level should not be able to set options on another namespace even if the database name matches"),
			(Session::for_level(("NS", "DB").into(), Role::Viewer).with_ns("NS").with_db("DB"), false, "viewer at database level should not be able to set options on its database"),
		];
		let statement = "OPTION FIELDS = false";

		for test in tests.iter() {
			let (session, should_succeed, msg) = test;

			{
				let ds = Datastore::new("memory").await.unwrap().with_auth_enabled(true);

				let res = ds.execute(statement, session, None).await;

				if *should_succeed {
					assert!(res.is_ok(), "{}: {:?}", msg, res);
				} else {
					let err = res.unwrap_err().to_string();
					assert!(
						err.contains("Not enough permissions to perform this action"),
						"{}: {}",
						msg,
						err
					)
				}
			}
		}

		// Anonymous with auth enabled
		{
			let ds = Datastore::new("memory").await.unwrap().with_auth_enabled(true);

			let res =
				ds.execute(statement, &Session::default().with_ns("NS").with_db("DB"), None).await;

			let err = res.unwrap_err().to_string();
			assert!(
				err.contains("Not enough permissions to perform this action"),
				"anonymous user should not be able to set options: {}",
				err
			)
		}

		// Anonymous with auth disabled
		{
			let ds = Datastore::new("memory").await.unwrap().with_auth_enabled(false);

			let res =
				ds.execute(statement, &Session::default().with_ns("NS").with_db("DB"), None).await;

			assert!(
				res.is_ok(),
				"anonymous user should be able to set options when auth is disabled: {:?}",
				res
			)
		}
	}
}