surrealdb_core/ctx/
context.rs

1use crate::ctx::canceller::Canceller;
2use crate::ctx::reason::Reason;
3#[cfg(feature = "http")]
4use crate::dbs::capabilities::NetTarget;
5use crate::dbs::{Capabilities, Notification};
6use crate::err::Error;
7use crate::idx::planner::executor::QueryExecutor;
8use crate::idx::planner::{IterationStage, QueryPlanner};
9use crate::idx::trees::store::IndexStores;
10use crate::kvs::Transaction;
11use crate::sql::value::Value;
12use channel::Sender;
13use std::borrow::Cow;
14use std::collections::HashMap;
15use std::fmt::{self, Debug};
16#[cfg(any(
17	feature = "kv-mem",
18	feature = "kv-surrealkv",
19	feature = "kv-rocksdb",
20	feature = "kv-fdb",
21	feature = "kv-tikv",
22))]
23use std::path::PathBuf;
24use std::sync::atomic::{AtomicBool, Ordering};
25use std::sync::Arc;
26use std::time::Duration;
27use trice::Instant;
28#[cfg(feature = "http")]
29use url::Url;
30
31impl<'a> From<Value> for Cow<'a, Value> {
32	fn from(v: Value) -> Cow<'a, Value> {
33		Cow::Owned(v)
34	}
35}
36
37impl<'a> From<&'a Value> for Cow<'a, Value> {
38	fn from(v: &'a Value) -> Cow<'a, Value> {
39		Cow::Borrowed(v)
40	}
41}
42#[non_exhaustive]
43pub struct Context<'a> {
44	// An optional parent context.
45	parent: Option<&'a Context<'a>>,
46	// An optional deadline.
47	deadline: Option<Instant>,
48	// Whether or not this context is cancelled.
49	cancelled: Arc<AtomicBool>,
50	// A collection of read only values stored in this context.
51	values: HashMap<Cow<'static, str>, Cow<'a, Value>>,
52	// Stores the notification channel if available
53	notifications: Option<Sender<Notification>>,
54	// An optional query planner
55	query_planner: Option<&'a QueryPlanner<'a>>,
56	// An optional query executor
57	query_executor: Option<QueryExecutor>,
58	// An optional iteration stage
59	iteration_stage: Option<IterationStage>,
60	// The index store
61	index_stores: IndexStores,
62	// Capabilities
63	capabilities: Arc<Capabilities>,
64	#[cfg(any(
65		feature = "kv-mem",
66		feature = "kv-surrealkv",
67		feature = "kv-rocksdb",
68		feature = "kv-fdb",
69		feature = "kv-tikv",
70	))]
71	// The temporary directory
72	temporary_directory: Option<Arc<PathBuf>>,
73	// An optional transaction
74	transaction: Option<Arc<Transaction>>,
75	// Does not read from parent `values`.
76	isolated: bool,
77}
78
79impl<'a> Default for Context<'a> {
80	fn default() -> Self {
81		Context::background()
82	}
83}
84
85impl<'a> From<Transaction> for Context<'a> {
86	fn from(txn: Transaction) -> Self {
87		Context::background().with_transaction(Arc::new(txn))
88	}
89}
90
91impl<'a> Debug for Context<'a> {
92	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
93		f.debug_struct("Context")
94			.field("parent", &self.parent)
95			.field("deadline", &self.deadline)
96			.field("cancelled", &self.cancelled)
97			.field("values", &self.values)
98			.finish()
99	}
100}
101
102impl<'a> Context<'a> {
103	pub(crate) fn from_ds(
104		time_out: Option<Duration>,
105		capabilities: Capabilities,
106		index_stores: IndexStores,
107		#[cfg(any(
108			feature = "kv-mem",
109			feature = "kv-surrealkv",
110			feature = "kv-rocksdb",
111			feature = "kv-fdb",
112			feature = "kv-tikv",
113		))]
114		temporary_directory: Option<Arc<PathBuf>>,
115	) -> Result<Context<'a>, Error> {
116		let mut ctx = Self {
117			values: HashMap::default(),
118			parent: None,
119			deadline: None,
120			cancelled: Arc::new(AtomicBool::new(false)),
121			notifications: None,
122			query_planner: None,
123			query_executor: None,
124			iteration_stage: None,
125			capabilities: Arc::new(capabilities),
126			index_stores,
127			#[cfg(any(
128				feature = "kv-mem",
129				feature = "kv-surrealkv",
130				feature = "kv-rocksdb",
131				feature = "kv-fdb",
132				feature = "kv-tikv",
133			))]
134			temporary_directory,
135			transaction: None,
136			isolated: false,
137		};
138		if let Some(timeout) = time_out {
139			ctx.add_timeout(timeout)?;
140		}
141		Ok(ctx)
142	}
143	/// Create an empty background context.
144	pub fn background() -> Self {
145		Self {
146			values: HashMap::default(),
147			parent: None,
148			deadline: None,
149			cancelled: Arc::new(AtomicBool::new(false)),
150			notifications: None,
151			query_planner: None,
152			query_executor: None,
153			iteration_stage: None,
154			capabilities: Arc::new(Capabilities::default()),
155			index_stores: IndexStores::default(),
156			#[cfg(any(
157				feature = "kv-mem",
158				feature = "kv-surrealkv",
159				feature = "kv-rocksdb",
160				feature = "kv-fdb",
161				feature = "kv-tikv",
162			))]
163			temporary_directory: None,
164			transaction: None,
165			isolated: false,
166		}
167	}
168
169	/// Create a new child from a frozen context.
170	pub fn new(parent: &'a Context) -> Self {
171		Context {
172			values: HashMap::default(),
173			parent: Some(parent),
174			deadline: parent.deadline,
175			cancelled: Arc::new(AtomicBool::new(false)),
176			notifications: parent.notifications.clone(),
177			query_planner: parent.query_planner,
178			query_executor: parent.query_executor.clone(),
179			iteration_stage: parent.iteration_stage.clone(),
180			capabilities: parent.capabilities.clone(),
181			index_stores: parent.index_stores.clone(),
182			#[cfg(any(
183				feature = "kv-mem",
184				feature = "kv-surrealkv",
185				feature = "kv-rocksdb",
186				feature = "kv-fdb",
187				feature = "kv-tikv",
188			))]
189			temporary_directory: parent.temporary_directory.clone(),
190			transaction: parent.transaction.clone(),
191			isolated: false,
192		}
193	}
194
195	/// Create a new child from a frozen context.
196	pub fn new_isolated(parent: &'a Context) -> Self {
197		Context {
198			values: HashMap::default(),
199			parent: Some(parent),
200			deadline: parent.deadline,
201			cancelled: Arc::new(AtomicBool::new(false)),
202			notifications: parent.notifications.clone(),
203			query_planner: parent.query_planner,
204			query_executor: parent.query_executor.clone(),
205			iteration_stage: parent.iteration_stage.clone(),
206			capabilities: parent.capabilities.clone(),
207			index_stores: parent.index_stores.clone(),
208			#[cfg(any(
209				feature = "kv-mem",
210				feature = "kv-surrealkv",
211				feature = "kv-rocksdb",
212				feature = "kv-fdb",
213				feature = "kv-tikv",
214			))]
215			temporary_directory: parent.temporary_directory.clone(),
216			transaction: parent.transaction.clone(),
217			isolated: true,
218		}
219	}
220
221	/// Add a value to the context. It overwrites any previously set values
222	/// with the same key.
223	pub fn add_value<K, V>(&mut self, key: K, value: V)
224	where
225		K: Into<Cow<'static, str>>,
226		V: Into<Cow<'a, Value>>,
227	{
228		self.values.insert(key.into(), value.into());
229	}
230
231	/// Add cancellation to the context. The value that is returned will cancel
232	/// the context and it's children once called.
233	pub fn add_cancel(&mut self) -> Canceller {
234		let cancelled = self.cancelled.clone();
235		Canceller::new(cancelled)
236	}
237
238	/// Add a deadline to the context. If the current deadline is sooner than
239	/// the provided deadline, this method does nothing.
240	pub fn add_deadline(&mut self, deadline: Instant) {
241		match self.deadline {
242			Some(current) if current < deadline => (),
243			_ => self.deadline = Some(deadline),
244		}
245	}
246
247	/// Add a timeout to the context. If the current timeout is sooner than
248	/// the provided timeout, this method does nothing. If the result of the
249	/// addition causes an overflow, this method returns an error.
250	pub fn add_timeout(&mut self, timeout: Duration) -> Result<(), Error> {
251		match Instant::now().checked_add(timeout) {
252			Some(deadline) => {
253				self.add_deadline(deadline);
254				Ok(())
255			}
256			None => Err(Error::InvalidTimeout(timeout.as_secs())),
257		}
258	}
259
260	/// Add the LIVE query notification channel to the context, so that we
261	/// can send notifications to any subscribers.
262	pub fn add_notifications(&mut self, chn: Option<&Sender<Notification>>) {
263		self.notifications = chn.cloned()
264	}
265
266	pub(crate) fn set_query_planner(&mut self, qp: &'a QueryPlanner) {
267		self.query_planner = Some(qp);
268	}
269
270	pub(crate) fn set_query_executor(&mut self, qe: QueryExecutor) {
271		self.query_executor = Some(qe);
272	}
273
274	pub(crate) fn set_iteration_stage(&mut self, is: IterationStage) {
275		self.iteration_stage = Some(is);
276	}
277
278	pub(crate) fn set_transaction(&mut self, txn: Arc<Transaction>) {
279		self.transaction = Some(txn);
280	}
281
282	pub(crate) fn with_transaction(mut self, txn: Arc<Transaction>) -> Self {
283		self.transaction = Some(txn);
284		self
285	}
286
287	pub(crate) fn tx(&self) -> Arc<Transaction> {
288		self.transaction
289			.clone()
290			.unwrap_or_else(|| unreachable!("The context was not associated with a transaction"))
291	}
292
293	/// Get the timeout for this operation, if any. This is useful for
294	/// checking if a long job should be started or not.
295	pub fn timeout(&self) -> Option<Duration> {
296		self.deadline.map(|v| v.saturating_duration_since(Instant::now()))
297	}
298
299	pub fn notifications(&self) -> Option<Sender<Notification>> {
300		self.notifications.clone()
301	}
302
303	pub(crate) fn get_query_planner(&self) -> Option<&QueryPlanner> {
304		self.query_planner
305	}
306
307	pub(crate) fn get_query_executor(&self) -> Option<&QueryExecutor> {
308		self.query_executor.as_ref()
309	}
310
311	pub(crate) fn get_iteration_stage(&self) -> Option<&IterationStage> {
312		self.iteration_stage.as_ref()
313	}
314
315	/// Get the index_store for this context/ds
316	pub(crate) fn get_index_stores(&self) -> &IndexStores {
317		&self.index_stores
318	}
319
320	/// Check if the context is done. If it returns `None` the operation may
321	/// proceed, otherwise the operation should be stopped.
322	pub fn done(&self) -> Option<Reason> {
323		match self.deadline {
324			Some(deadline) if deadline <= Instant::now() => Some(Reason::Timedout),
325			_ if self.cancelled.load(Ordering::Relaxed) => Some(Reason::Canceled),
326			_ => match self.parent {
327				Some(ctx) => ctx.done(),
328				_ => None,
329			},
330		}
331	}
332
333	/// Check if the context is ok to continue.
334	pub fn is_ok(&self) -> bool {
335		self.done().is_none()
336	}
337
338	/// Check if the context is not ok to continue.
339	pub fn is_done(&self) -> bool {
340		self.done().is_some()
341	}
342
343	/// Check if the context is not ok to continue, because it timed out.
344	pub fn is_timedout(&self) -> bool {
345		matches!(self.done(), Some(Reason::Timedout))
346	}
347
348	#[cfg(any(
349		feature = "kv-mem",
350		feature = "kv-surrealkv",
351		feature = "kv-rocksdb",
352		feature = "kv-fdb",
353		feature = "kv-tikv",
354	))]
355	/// Return the location of the temporary directory if any
356	pub fn temporary_directory(&self) -> Option<&Arc<PathBuf>> {
357		self.temporary_directory.as_ref()
358	}
359
360	/// Get a value from the context. If no value is stored under the
361	/// provided key, then this will return None.
362	pub fn value(&self, key: &str) -> Option<&Value> {
363		match self.values.get(key) {
364			Some(v) => match v {
365				Cow::Borrowed(v) => Some(*v),
366				Cow::Owned(v) => Some(v),
367			},
368			None if !self.isolated => match self.parent {
369				Some(p) => p.value(key),
370				_ => None,
371			},
372			None => None,
373		}
374	}
375
376	/// Get a 'static view into the cancellation status.
377	#[cfg(feature = "scripting")]
378	pub fn cancellation(&self) -> crate::ctx::cancellation::Cancellation {
379		crate::ctx::cancellation::Cancellation::new(
380			self.deadline,
381			std::iter::successors(Some(self), |ctx| ctx.parent)
382				.map(|ctx| ctx.cancelled.clone())
383				.collect(),
384		)
385	}
386
387	//
388	// Capabilities
389	//
390
391	/// Set the capabilities for this context
392	pub fn add_capabilities(&mut self, caps: Capabilities) {
393		self.capabilities = Arc::new(caps);
394	}
395
396	/// Get the capabilities for this context
397	#[allow(dead_code)]
398	pub fn get_capabilities(&self) -> Arc<Capabilities> {
399		self.capabilities.clone()
400	}
401
402	/// Check if scripting is allowed
403	#[allow(dead_code)]
404	pub fn check_allowed_scripting(&self) -> Result<(), Error> {
405		if !self.capabilities.allows_scripting() {
406			return Err(Error::ScriptingNotAllowed);
407		}
408		Ok(())
409	}
410
411	/// Check if a function is allowed
412	pub fn check_allowed_function(&self, target: &str) -> Result<(), Error> {
413		if !self.capabilities.allows_function_name(target) {
414			return Err(Error::FunctionNotAllowed(target.to_string()));
415		}
416		Ok(())
417	}
418
419	/// Check if a network target is allowed
420	#[cfg(feature = "http")]
421	pub fn check_allowed_net(&self, target: &Url) -> Result<(), Error> {
422		match target.host() {
423			Some(host)
424				if self.capabilities.allows_network_target(&NetTarget::Host(
425					host.to_owned(),
426					target.port_or_known_default(),
427				)) =>
428			{
429				Ok(())
430			}
431			_ => Err(Error::NetTargetNotAllowed(target.to_string())),
432		}
433	}
434}