Skip to main content

surrealdb_core/ctx/
context.rs

1use std::borrow::Cow;
2use std::collections::HashMap;
3use std::fmt::{self, Debug};
4#[cfg(storage)]
5use std::path::PathBuf;
6use std::sync::Arc;
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::time::Duration;
9
10use anyhow::{Result, bail};
11use async_channel::Sender;
12#[cfg(feature = "surrealism")]
13use surrealism_runtime::controller::Runtime;
14#[cfg(feature = "surrealism")]
15use surrealism_runtime::package::SurrealismPackage;
16#[cfg(feature = "http")]
17use url::Url;
18use web_time::Instant;
19
20use crate::buc::manager::BucketsManager;
21#[cfg(feature = "surrealism")]
22use crate::buc::store::ObjectKey;
23use crate::buc::store::ObjectStore;
24use crate::catalog::providers::{CatalogProvider, DatabaseProvider, NamespaceProvider};
25use crate::catalog::{DatabaseDefinition, DatabaseId, NamespaceId};
26use crate::cnf::PROTECTED_PARAM_NAMES;
27use crate::ctx::canceller::Canceller;
28use crate::ctx::reason::Reason;
29#[cfg(feature = "surrealism")]
30use crate::dbs::capabilities::ExperimentalTarget;
31#[cfg(feature = "http")]
32use crate::dbs::capabilities::NetTarget;
33use crate::dbs::{Capabilities, NewPlannerStrategy, Options, Session, Variables};
34use crate::err::Error;
35use crate::exec::function::FunctionRegistry;
36use crate::idx::planner::executor::QueryExecutor;
37use crate::idx::planner::{IterationStage, QueryPlanner};
38use crate::idx::trees::store::IndexStores;
39use crate::kvs::Transaction;
40use crate::kvs::cache::ds::DatastoreCache;
41use crate::kvs::index::IndexBuilder;
42use crate::kvs::sequences::Sequences;
43use crate::kvs::slowlog::SlowLog;
44use crate::mem::ALLOC;
45use crate::sql::expression::convert_public_value_to_internal;
46#[cfg(feature = "surrealism")]
47use crate::surrealism::cache::{SurrealismCache, SurrealismCacheLookup};
48use crate::types::{PublicNotification, PublicVariables};
49use crate::val::Value;
50
51pub type FrozenContext = Arc<Context>;
52
53pub struct Context {
54	// An optional parent context.
55	parent: Option<FrozenContext>,
56	// An optional deadline.
57	deadline: Option<(Instant, Duration)>,
58	// An optional slow log configuration used by the executor to log statements
59	// that exceed a given duration threshold. This configuration is propagated
60	// from the datastore into the context for the lifetime of a request.
61	slow_log: Option<SlowLog>,
62	// Whether or not this context is cancelled.
63	cancelled: Arc<AtomicBool>,
64	// A collection of read only values stored in this context.
65	values: HashMap<Cow<'static, str>, Arc<Value>>,
66	// Stores the notification channel if available
67	notifications: Option<Sender<PublicNotification>>,
68	// An optional query planner
69	query_planner: Option<Arc<QueryPlanner>>,
70	// An optional query executor
71	query_executor: Option<QueryExecutor>,
72	// An optional iteration stage
73	iteration_stage: Option<IterationStage>,
74	// An optional datastore cache
75	cache: Option<Arc<DatastoreCache>>,
76	// The index store
77	index_stores: IndexStores,
78	// The index concurrent builders
79	index_builder: Option<IndexBuilder>,
80	// The sequences
81	sequences: Option<Sequences>,
82	// Capabilities
83	capabilities: Arc<Capabilities>,
84	#[cfg(storage)]
85	// The temporary directory
86	temporary_directory: Option<Arc<PathBuf>>,
87	// An optional transaction
88	transaction: Option<Arc<Transaction>>,
89	// Does not read from parent `values`.
90	isolated: bool,
91	// A map of bucket connections
92	buckets: Option<BucketsManager>,
93	// The surrealism cache
94	#[cfg(feature = "surrealism")]
95	surrealism_cache: Option<Arc<SurrealismCache>>,
96	// Function registry for built-in and custom functions
97	function_registry: Arc<FunctionRegistry>,
98	// Strategy for the new streaming planner/executor
99	new_planner_strategy: NewPlannerStrategy,
100	// When true, EXPLAIN ANALYZE omits elapsed durations for deterministic test output
101	redact_volatile_explain_attrs: bool,
102	// Matches context for index functions (search::highlight, search::score, etc.)
103	matches_context: Option<Arc<crate::exec::function::MatchesContext>>,
104	// KNN context for index functions (vector::distance::knn)
105	knn_context: Option<Arc<crate::exec::function::KnnContext>>,
106}
107
108impl Default for Context {
109	fn default() -> Self {
110		Context::background()
111	}
112}
113
114impl From<Transaction> for Context {
115	fn from(txn: Transaction) -> Self {
116		let mut ctx = Context::background();
117		ctx.set_transaction(Arc::new(txn));
118		ctx
119	}
120}
121
122impl Debug for Context {
123	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
124		f.debug_struct("Context")
125			.field("parent", &self.parent)
126			.field("deadline", &self.deadline)
127			.field("cancelled", &self.cancelled)
128			.field("values", &self.values)
129			.finish()
130	}
131}
132
133impl Context {
134	/// Creates a new empty background context.
135	pub(crate) fn background() -> Self {
136		Self {
137			values: HashMap::default(),
138			parent: None,
139			deadline: None,
140			slow_log: None,
141			cancelled: Arc::new(AtomicBool::new(false)),
142			notifications: None,
143			query_planner: None,
144			query_executor: None,
145			iteration_stage: None,
146			capabilities: Arc::new(Capabilities::default()),
147			index_stores: IndexStores::default(),
148			cache: None,
149			index_builder: None,
150			sequences: None,
151			#[cfg(storage)]
152			temporary_directory: None,
153			transaction: None,
154			isolated: false,
155			buckets: None,
156			#[cfg(feature = "surrealism")]
157			surrealism_cache: None,
158			function_registry: Arc::new(FunctionRegistry::with_builtins()),
159			new_planner_strategy: NewPlannerStrategy::default(),
160			redact_volatile_explain_attrs: false,
161			matches_context: None,
162			knn_context: None,
163		}
164	}
165
166	/// Creates a new context from a frozen parent context.
167	pub(crate) fn new(parent: &FrozenContext) -> Self {
168		Context {
169			values: HashMap::default(),
170			deadline: parent.deadline,
171			slow_log: parent.slow_log.clone(),
172			cancelled: Arc::new(AtomicBool::new(false)),
173			notifications: parent.notifications.clone(),
174			query_planner: parent.query_planner.clone(),
175			query_executor: parent.query_executor.clone(),
176			iteration_stage: parent.iteration_stage.clone(),
177			capabilities: parent.capabilities.clone(),
178			index_stores: parent.index_stores.clone(),
179			cache: parent.cache.clone(),
180			index_builder: parent.index_builder.clone(),
181			sequences: parent.sequences.clone(),
182			#[cfg(storage)]
183			temporary_directory: parent.temporary_directory.clone(),
184			transaction: parent.transaction.clone(),
185			isolated: false,
186			parent: Some(parent.clone()),
187			buckets: parent.buckets.clone(),
188			#[cfg(feature = "surrealism")]
189			surrealism_cache: parent.surrealism_cache.clone(),
190			function_registry: parent.function_registry.clone(),
191			new_planner_strategy: parent.new_planner_strategy.clone(),
192			redact_volatile_explain_attrs: parent.redact_volatile_explain_attrs,
193			matches_context: parent.matches_context.clone(),
194			knn_context: parent.knn_context.clone(),
195		}
196	}
197
198	/// Create a new context from a frozen parent context.
199	/// This context is isolated, and values specified on
200	/// any parent contexts will not be accessible.
201	pub(crate) fn new_isolated(parent: &FrozenContext) -> Self {
202		Self {
203			values: HashMap::default(),
204			deadline: parent.deadline,
205			slow_log: parent.slow_log.clone(),
206			cancelled: Arc::new(AtomicBool::new(false)),
207			notifications: parent.notifications.clone(),
208			query_planner: parent.query_planner.clone(),
209			query_executor: parent.query_executor.clone(),
210			iteration_stage: parent.iteration_stage.clone(),
211			capabilities: parent.capabilities.clone(),
212			index_stores: parent.index_stores.clone(),
213			cache: parent.cache.clone(),
214			index_builder: parent.index_builder.clone(),
215			sequences: parent.sequences.clone(),
216			#[cfg(storage)]
217			temporary_directory: parent.temporary_directory.clone(),
218			transaction: parent.transaction.clone(),
219			isolated: true,
220			parent: Some(parent.clone()),
221			buckets: parent.buckets.clone(),
222			#[cfg(feature = "surrealism")]
223			surrealism_cache: parent.surrealism_cache.clone(),
224			function_registry: parent.function_registry.clone(),
225			new_planner_strategy: parent.new_planner_strategy.clone(),
226			redact_volatile_explain_attrs: parent.redact_volatile_explain_attrs,
227			matches_context: parent.matches_context.clone(),
228			knn_context: parent.knn_context.clone(),
229		}
230	}
231
232	/// Create an independent snapshot of a frozen context.
233	///
234	/// Flattens all values from the parent chain into the snapshot's own
235	/// `values` map and sets `parent: None`, so the returned context does
236	/// **not** hold an `Arc` reference to the original parent.
237	///
238	/// This is used by the streaming executor to give the operator pipeline
239	/// its own `Arc<Context>` that won't interfere with the executor's
240	/// `Arc::get_mut` requirements between statements.
241	pub(crate) fn snapshot(from: &FrozenContext) -> Self {
242		Self {
243			// Flatten all values from the parent chain into this context
244			values: from.collect_values(HashMap::default()),
245			deadline: from.deadline,
246			slow_log: from.slow_log.clone(),
247			cancelled: Arc::new(AtomicBool::new(false)),
248			notifications: from.notifications.clone(),
249			query_planner: from.query_planner.clone(),
250			query_executor: from.query_executor.clone(),
251			iteration_stage: from.iteration_stage.clone(),
252			capabilities: from.capabilities.clone(),
253			index_stores: from.index_stores.clone(),
254			cache: from.cache.clone(),
255			index_builder: from.index_builder.clone(),
256			sequences: from.sequences.clone(),
257			#[cfg(storage)]
258			temporary_directory: from.temporary_directory.clone(),
259			transaction: from.transaction.clone(),
260			isolated: false,
261			parent: None, // No parent reference — fully independent
262			buckets: from.buckets.clone(),
263			#[cfg(feature = "surrealism")]
264			surrealism_cache: from.surrealism_cache.clone(),
265			function_registry: from.function_registry.clone(),
266			new_planner_strategy: from.new_planner_strategy.clone(),
267			redact_volatile_explain_attrs: from.redact_volatile_explain_attrs,
268			matches_context: from.matches_context.clone(),
269			knn_context: from.knn_context.clone(),
270		}
271	}
272
273	/// Create a new context from a frozen parent context.
274	/// This context is not linked to the parent context,
275	/// and won't be cancelled if the parent is cancelled.
276	pub(crate) fn new_concurrent(from: &FrozenContext) -> Self {
277		Self {
278			values: HashMap::default(),
279			deadline: None,
280			slow_log: from.slow_log.clone(),
281			cancelled: Arc::new(AtomicBool::new(false)),
282			notifications: from.notifications.clone(),
283			query_planner: from.query_planner.clone(),
284			query_executor: from.query_executor.clone(),
285			iteration_stage: from.iteration_stage.clone(),
286			capabilities: from.capabilities.clone(),
287			index_stores: from.index_stores.clone(),
288			cache: from.cache.clone(),
289			index_builder: from.index_builder.clone(),
290			sequences: from.sequences.clone(),
291			#[cfg(storage)]
292			temporary_directory: from.temporary_directory.clone(),
293			transaction: None,
294			isolated: false,
295			parent: None,
296			buckets: from.buckets.clone(),
297			#[cfg(feature = "surrealism")]
298			surrealism_cache: from.surrealism_cache.clone(),
299			function_registry: from.function_registry.clone(),
300			new_planner_strategy: from.new_planner_strategy.clone(),
301			redact_volatile_explain_attrs: from.redact_volatile_explain_attrs,
302			matches_context: from.matches_context.clone(),
303			knn_context: from.knn_context.clone(),
304		}
305	}
306
307	/// Creates a new context from a configured datastore.
308	#[expect(clippy::too_many_arguments)]
309	pub(crate) fn from_ds(
310		time_out: Option<Duration>,
311		slow_log: Option<SlowLog>,
312		capabilities: Arc<Capabilities>,
313		index_stores: IndexStores,
314		index_builder: IndexBuilder,
315		sequences: Sequences,
316		cache: Arc<DatastoreCache>,
317		#[cfg(storage)] temporary_directory: Option<Arc<PathBuf>>,
318		buckets: BucketsManager,
319		#[cfg(feature = "surrealism")] surrealism_cache: Arc<SurrealismCache>,
320	) -> Result<Context> {
321		let planner_strategy = capabilities.planner_strategy().clone();
322		let mut ctx = Self {
323			values: HashMap::default(),
324			parent: None,
325			deadline: None,
326			slow_log,
327			cancelled: Arc::new(AtomicBool::new(false)),
328			notifications: None,
329			query_planner: None,
330			query_executor: None,
331			iteration_stage: None,
332			capabilities,
333			index_stores,
334			cache: Some(cache),
335			index_builder: Some(index_builder),
336			sequences: Some(sequences),
337			#[cfg(storage)]
338			temporary_directory,
339			transaction: None,
340			isolated: false,
341			buckets: Some(buckets),
342			#[cfg(feature = "surrealism")]
343			surrealism_cache: Some(surrealism_cache),
344			function_registry: Arc::new(FunctionRegistry::with_builtins()),
345			new_planner_strategy: planner_strategy,
346			redact_volatile_explain_attrs: false,
347			matches_context: None,
348			knn_context: None,
349		};
350		if let Some(timeout) = time_out {
351			ctx.add_timeout(timeout)?;
352		}
353		Ok(ctx)
354	}
355
356	/// Freezes this context, allowing it to be used as a parent context.
357	pub(crate) fn freeze(self) -> FrozenContext {
358		Arc::new(self)
359	}
360
361	/// Unfreezes this context, allowing it to be edited and configured.
362	pub(crate) fn unfreeze(ctx: FrozenContext) -> Result<Context> {
363		let Some(x) = Arc::into_inner(ctx) else {
364			fail!("Tried to unfreeze a Context with multiple references")
365		};
366		Ok(x)
367	}
368
369	/// Get the namespace id for the current context.
370	/// If the namespace does not exist, it will be try to be created based on
371	/// the `strict` option.
372	pub(crate) async fn get_ns_id(&self, opt: &Options) -> Result<NamespaceId> {
373		let ns = opt.ns()?;
374		let tx = self.tx();
375		let ns_def = tx.get_or_add_ns(Some(self), ns).await?;
376		Ok(ns_def.namespace_id)
377	}
378
379	/// Get the namespace id for the current context.
380	/// If the namespace does not exist, it will return an error.
381	pub(crate) async fn expect_ns_id(&self, opt: &Options) -> Result<NamespaceId> {
382		let ns = opt.ns()?;
383		let Some(ns_def) = self.tx().get_ns_by_name(ns).await? else {
384			return Err(Error::NsNotFound {
385				name: ns.to_string(),
386			}
387			.into());
388		};
389		Ok(ns_def.namespace_id)
390	}
391
392	/// Get the namespace and database ids for the current context.
393	/// If the namespace or database does not exist, it will be try to be
394	/// created based on the `strict` option.
395	pub(crate) async fn get_ns_db_ids(&self, opt: &Options) -> Result<(NamespaceId, DatabaseId)> {
396		let (ns, db) = opt.ns_db()?;
397		let db_def = self.tx().ensure_ns_db(Some(self), ns, db).await?;
398		Ok((db_def.namespace_id, db_def.database_id))
399	}
400
401	/// Get the namespace and database ids for the current context.
402	/// If the namespace or database does not exist, it will be try to be
403	/// created based on the `strict` option.
404	pub(crate) async fn try_ns_db_ids(
405		&self,
406		opt: &Options,
407	) -> Result<Option<(NamespaceId, DatabaseId)>> {
408		let (ns, db) = opt.ns_db()?;
409		let Some(db_def) = self.tx().get_db_by_name(ns, db).await? else {
410			return Ok(None);
411		};
412		Ok(Some((db_def.namespace_id, db_def.database_id)))
413	}
414
415	/// Get the namespace and database ids for the current context.
416	/// If the namespace or database does not exist, it will return an error.
417	pub(crate) async fn expect_ns_db_ids(
418		&self,
419		opt: &Options,
420	) -> Result<(NamespaceId, DatabaseId)> {
421		let (ns, db) = opt.ns_db()?;
422		let Some(db_def) = self.tx().get_db_by_name(ns, db).await? else {
423			return Err(Error::DbNotFound {
424				name: db.to_string(),
425			}
426			.into());
427		};
428		Ok((db_def.namespace_id, db_def.database_id))
429	}
430
431	pub(crate) async fn get_db(&self, opt: &Options) -> Result<Arc<DatabaseDefinition>> {
432		let (ns, db) = opt.ns_db()?;
433		let db_def = self.tx().ensure_ns_db(Some(self), ns, db).await?;
434		Ok(db_def)
435	}
436
437	/// Add a value to the context. It overwrites any previously set values
438	/// with the same key.
439	pub(crate) fn add_value<K>(&mut self, key: K, value: Arc<Value>)
440	where
441		K: Into<Cow<'static, str>>,
442	{
443		self.values.insert(key.into(), value);
444	}
445
446	/// Add a value to the context. It overwrites any previously set values
447	/// with the same key.
448	pub(crate) fn add_values<T, K, V>(&mut self, iter: T)
449	where
450		T: IntoIterator<Item = (K, V)>,
451		K: Into<Cow<'static, str>>,
452		V: Into<Arc<Value>>,
453	{
454		self.values.extend(iter.into_iter().map(|(k, v)| (k.into(), v.into())))
455	}
456
457	/// Add cancellation to the context. The value that is returned will cancel
458	/// the context and it's children once called.
459	pub(crate) fn add_cancel(&mut self) -> Canceller {
460		let cancelled = self.cancelled.clone();
461		Canceller::new(cancelled)
462	}
463
464	/// Add a deadline to the context. If the current deadline is sooner than
465	/// the provided deadline, this method does nothing.
466	pub(crate) fn add_deadline(&mut self, deadline: Instant, duration: Duration) {
467		match self.deadline {
468			Some((current, _)) if current < deadline => (),
469			_ => self.deadline = Some((deadline, duration)),
470		}
471	}
472
473	/// Add a timeout to the context. If the current timeout is sooner than
474	/// the provided timeout, this method does nothing. If the result of the
475	/// addition causes an overflow, this method returns an error.
476	pub(crate) fn add_timeout(&mut self, timeout: Duration) -> Result<(), Error> {
477		match Instant::now().checked_add(timeout) {
478			Some(deadline) => {
479				self.add_deadline(deadline, timeout);
480				Ok(())
481			}
482			None => Err(Error::InvalidTimeout(timeout.as_secs())),
483		}
484	}
485
486	/// Add the LIVE query notification channel to the context, so that we
487	/// can send notifications to any subscribers.
488	pub(crate) fn add_notifications(&mut self, chn: Option<&Sender<PublicNotification>>) {
489		self.notifications = chn.cloned()
490	}
491
492	pub(crate) fn set_query_planner(&mut self, qp: QueryPlanner) {
493		self.query_planner = Some(Arc::new(qp));
494	}
495
496	/// Cache a table-specific QueryExecutor in the Context.
497	///
498	/// This is set by the collector/processor when iterating over a specific
499	/// table or index so that downstream per-record operations can access the
500	/// executor without repeatedly looking it up from the QueryPlanner.
501	pub(crate) fn set_query_executor(&mut self, qe: QueryExecutor) {
502		self.query_executor = Some(qe);
503	}
504
505	pub(crate) fn set_iteration_stage(&mut self, is: IterationStage) {
506		self.iteration_stage = Some(is);
507	}
508
509	pub(crate) fn set_transaction(&mut self, txn: Arc<Transaction>) {
510		self.transaction = Some(txn);
511	}
512
513	pub(crate) fn tx(&self) -> Arc<Transaction> {
514		self.transaction
515			.clone()
516			.unwrap_or_else(|| unreachable!("The context was not associated with a transaction"))
517	}
518
519	/// Returns the transaction if one is associated with this context.
520	pub(crate) fn try_tx(&self) -> Option<&Arc<Transaction>> {
521		self.transaction.as_ref()
522	}
523
524	/// Get the timeout for this operation, if any. This is useful for
525	/// checking if a long job should be started or not.
526	pub(crate) fn timeout(&self) -> Option<Duration> {
527		self.deadline.map(|(v, _)| v.saturating_duration_since(Instant::now()))
528	}
529
530	/// Returns the slow log configuration, if any, attached to this context.
531	/// The executor consults this to decide whether to emit slow-query log lines.
532	pub(crate) fn slow_log(&self) -> Option<&SlowLog> {
533		self.slow_log.as_ref()
534	}
535
536	pub(crate) fn notifications(&self) -> Option<Sender<PublicNotification>> {
537		self.notifications.clone()
538	}
539
540	pub(crate) fn has_notifications(&self) -> bool {
541		self.notifications.is_some()
542	}
543
544	pub(crate) fn get_query_planner(&self) -> Option<&QueryPlanner> {
545		self.query_planner.as_ref().map(|qp| qp.as_ref())
546	}
547
548	/// Get the cached QueryExecutor (if any) attached by the current iteration
549	/// context.
550	pub(crate) fn get_query_executor(&self) -> Option<&QueryExecutor> {
551		self.query_executor.as_ref()
552	}
553
554	pub(crate) fn get_iteration_stage(&self) -> Option<&IterationStage> {
555		self.iteration_stage.as_ref()
556	}
557
558	/// Get the index_store for this context/ds
559	pub(crate) fn get_index_stores(&self) -> &IndexStores {
560		&self.index_stores
561	}
562
563	/// Get the index_builder for this context/ds
564	pub(crate) fn get_index_builder(&self) -> Option<&IndexBuilder> {
565		self.index_builder.as_ref()
566	}
567
568	/// Return the sequences manager
569	pub(crate) fn get_sequences(&self) -> Option<&Sequences> {
570		self.sequences.as_ref()
571	}
572
573	pub(crate) fn try_get_sequences(&self) -> Result<&Sequences> {
574		if let Some(sqs) = self.get_sequences() {
575			Ok(sqs)
576		} else {
577			bail!(Error::Internal("Sequences are not supported in this context.".to_string(),))
578		}
579	}
580
581	// Get the current datastore cache
582	pub(crate) fn get_cache(&self) -> Option<Arc<DatastoreCache>> {
583		self.cache.clone()
584	}
585
586	/// Check if the context is done. If it returns `None` the operation may
587	/// proceed, otherwise the operation should be stopped.
588	///
589	/// # Check Priority Order
590	/// The checks are performed in the following order, with earlier checks taking priority:
591	/// 1. **Cancellation** (always checked): Fast atomic flag check via `self.cancelled`
592	/// 2. **Memory threshold** (only if `deep_check = true`): Expensive check via
593	///    `ALLOC.is_beyond_threshold()`
594	/// 3. **Deadline** (only if `deep_check = true`): Moderately expensive check via
595	///    `Instant::now()`
596	///
597	/// # Parameters
598	/// - `deep_check`: When `true`, performs all checks (cancellation, memory, deadline). When
599	///   `false`, only checks the cancellation flag (fast atomic operation).
600	///
601	/// # Performance Note
602	/// - Checking an `AtomicBool` (cancellation): single-digit nanoseconds
603	/// - Checking `Instant::now()` (deadline): tens to hundreds of nanoseconds
604	/// - Checking `ALLOC.is_beyond_threshold()` (memory): hundreds of nanoseconds (requires lock +
605	///   traversal)
606	///
607	/// Use `deep_check = false` in hot loops to minimize overhead while still allowing
608	/// cancellation.
609	pub(crate) fn done(&self, deep_check: bool) -> Result<Option<Reason>> {
610		// Check cancellation FIRST (fast atomic operation)
611		if self.cancelled.load(Ordering::Relaxed) {
612			return Ok(Some(Reason::Canceled));
613		}
614		if deep_check {
615			if ALLOC.is_beyond_threshold() {
616				bail!(Error::QueryBeyondMemoryThreshold);
617			}
618			let now = Instant::now();
619			if let Some((deadline, timeout)) = self.deadline
620				&& deadline <= now
621			{
622				return Ok(Some(Reason::Timedout(timeout.into())));
623			}
624		}
625		if let Some(ctx) = &self.parent {
626			return ctx.done(deep_check);
627		}
628		Ok(None)
629	}
630
631	/// Check if there is some reason to stop processing the current query.
632	///
633	/// Returns `true` when the query should be stopped (cancelled, timed out, or exceeded memory
634	/// threshold).
635	///
636	/// # Parameters
637	/// - `count`: Optional iteration count for optimization. Pass:
638	///   - `Some(count)` when called in a loop - enables adaptive checking to balance
639	///     responsiveness with performance. The method will:
640	///     - Yield every 32 iterations to allow other tasks to run
641	///     - Perform deep checks (memory/deadline) at iterations 1, 2, 4, 8, 16, 32, then every 64
642	///   - `None` when called outside a loop (e.g., single operations) - always performs a deep
643	///     check for immediate cancellation/timeout detection
644	///
645	/// # Performance
646	/// The adaptive checking strategy ("jitter-based back-off") minimizes overhead in hot loops
647	/// while maintaining reasonable responsiveness to cancellation and timeout events.
648	pub(crate) async fn is_done(&self, count: Option<usize>) -> Result<bool> {
649		let deep_check = if let Some(count) = count {
650			// We yield every 32 iterations
651			if count % 32 == 0 {
652				yield_now!();
653			}
654			// Adaptive back-off strategy for deep checks based on iteration number:
655			// Check frequently early (powers of 2), then settle into every 64 iterations
656			match count {
657				1 | 2 | 4 | 8 | 16 | 32 => true,
658				_ => count % 64 == 0,
659			}
660		} else {
661			// No count provided - perform a deep check immediately (single operation context)
662			true
663		};
664		Ok(self.done(deep_check)?.is_some())
665	}
666
667	/// Check if the context is not ok to continue, because it timed out.
668	pub(crate) async fn is_timedout(&self) -> Result<Option<Duration>> {
669		yield_now!();
670		if let Some(Reason::Timedout(d)) = self.done(true)? {
671			Ok(Some(d.0))
672		} else {
673			Ok(None)
674		}
675	}
676
677	pub(crate) async fn expect_not_timedout(&self) -> Result<()> {
678		if let Some(d) = self.is_timedout().await? {
679			bail!(Error::QueryTimedout(d.into()))
680		} else {
681			Ok(())
682		}
683	}
684
685	#[cfg(storage)]
686	/// Return the location of the temporary directory if any
687	pub(crate) fn temporary_directory(&self) -> Option<&Arc<PathBuf>> {
688		self.temporary_directory.as_ref()
689	}
690
691	/// Get a value from the context. If no value is stored under the
692	/// provided key, then this will return None.
693	pub(crate) fn value(&self, key: &str) -> Option<&Value> {
694		match self.values.get(key) {
695			Some(v) => Some(v.as_ref()),
696			None if PROTECTED_PARAM_NAMES.contains(&key) || !self.isolated => match &self.parent {
697				Some(p) => p.value(key),
698				_ => None,
699			},
700			None => None,
701		}
702	}
703
704	/// Collect context values into the provided map, walking up parent contexts
705	/// unless this context is isolated.
706	pub(crate) fn collect_values(
707		&self,
708		map: HashMap<Cow<'static, str>, Arc<Value>>,
709	) -> HashMap<Cow<'static, str>, Arc<Value>> {
710		let mut map = if !self.isolated
711			&& let Some(p) = &self.parent
712		{
713			p.collect_values(map)
714		} else {
715			map
716		};
717		self.values.iter().for_each(|(k, v)| {
718			map.insert(k.clone(), v.clone());
719		});
720		map
721	}
722
723	/// Get a 'static view into the cancellation status.
724	#[cfg(feature = "scripting")]
725	pub(crate) fn cancellation(&self) -> crate::ctx::cancellation::Cancellation {
726		crate::ctx::cancellation::Cancellation::new(
727			self.deadline.map(|(deadline, _)| deadline),
728			std::iter::successors(Some(self), |ctx| ctx.parent.as_ref().map(|c| c.as_ref()))
729				.map(|ctx| ctx.cancelled.clone())
730				.collect(),
731		)
732	}
733
734	/// Attach a session to the context and add any session variables to the
735	/// context.
736	pub(crate) fn attach_session(&mut self, session: &Session) -> Result<(), Error> {
737		self.add_values(session.values());
738		// Only override the planner strategy if the session explicitly sets a
739		// non-default value (e.g. language tests). Otherwise the capability-level
740		// strategy (set via from_ds) is preserved.
741		if session.new_planner_strategy != NewPlannerStrategy::default() {
742			self.new_planner_strategy = session.new_planner_strategy.clone();
743		}
744		// Propagate duration redaction flag from session.
745		if session.redact_volatile_explain_attrs {
746			self.redact_volatile_explain_attrs = true;
747		}
748		if !session.variables.is_empty() {
749			self.attach_variables(session.variables.clone().into())?;
750		}
751		Ok(())
752	}
753
754	/// Attach variables to the context.
755	pub(crate) fn attach_variables(&mut self, vars: Variables) -> Result<(), Error> {
756		for (name, val) in vars {
757			if PROTECTED_PARAM_NAMES.contains(&name.as_str()) {
758				return Err(Error::InvalidParam {
759					name,
760				});
761			}
762			self.add_value(name, Arc::new(val));
763		}
764		Ok(())
765	}
766
767	pub(crate) fn attach_public_variables(&mut self, vars: PublicVariables) -> Result<(), Error> {
768		for (name, val) in vars {
769			if PROTECTED_PARAM_NAMES.contains(&name.as_str()) {
770				return Err(Error::InvalidParam {
771					name,
772				});
773			}
774			self.add_value(name, Arc::new(convert_public_value_to_internal(val)));
775		}
776		Ok(())
777	}
778
779	//
780	// Capabilities
781	//
782
783	/// Set the capabilities for this context
784	pub(crate) fn add_capabilities(&mut self, caps: Arc<Capabilities>) {
785		self.capabilities = caps;
786	}
787
788	/// Get the capabilities for this context
789	pub(crate) fn get_capabilities(&self) -> Arc<Capabilities> {
790		self.capabilities.clone()
791	}
792
793	/// Get the function registry for this context
794	pub(crate) fn function_registry(&self) -> &Arc<FunctionRegistry> {
795		&self.function_registry
796	}
797
798	/// Set the matches context for index functions (search::highlight, etc.)
799	pub(crate) fn set_matches_context(&mut self, ctx: crate::exec::function::MatchesContext) {
800		self.matches_context = Some(Arc::new(ctx));
801	}
802
803	/// Get the matches context for index functions
804	pub(crate) fn get_matches_context(
805		&self,
806	) -> Option<&Arc<crate::exec::function::MatchesContext>> {
807		self.matches_context.as_ref()
808	}
809
810	/// Set the KNN context for index functions (vector::distance::knn)
811	pub(crate) fn set_knn_context(&mut self, ctx: Arc<crate::exec::function::KnnContext>) {
812		self.knn_context = Some(ctx);
813	}
814
815	/// Get the KNN context for index functions
816	pub(crate) fn get_knn_context(&self) -> Option<&Arc<crate::exec::function::KnnContext>> {
817		self.knn_context.as_ref()
818	}
819
820	/// Get the new planner strategy for this context
821	pub(crate) fn new_planner_strategy(&self) -> &NewPlannerStrategy {
822		&self.new_planner_strategy
823	}
824
825	/// Whether EXPLAIN ANALYZE should redact elapsed durations.
826	pub(crate) fn redact_volatile_explain_attrs(&self) -> bool {
827		self.redact_volatile_explain_attrs
828	}
829
830	/// Check if scripting is allowed
831	#[cfg_attr(not(feature = "scripting"), expect(dead_code))]
832	pub(crate) fn check_allowed_scripting(&self) -> Result<()> {
833		if !self.capabilities.allows_scripting() {
834			warn!("Capabilities denied scripting attempt");
835			bail!(Error::ScriptingNotAllowed);
836		}
837		trace!("Capabilities allowed scripting");
838		Ok(())
839	}
840
841	/// Check if a function is allowed
842	pub(crate) fn check_allowed_function(&self, target: &str) -> Result<()> {
843		if !self.capabilities.allows_function_name(target) {
844			warn!("Capabilities denied function execution attempt, target: '{target}'");
845			bail!(Error::FunctionNotAllowed(target.to_string()));
846		}
847		trace!("Capabilities allowed function execution, target: '{target}'");
848		Ok(())
849	}
850
851	/// Checks if the provided URL's network target is allowed based on current
852	/// capabilities.
853	///
854	/// This function performs a validation to ensure that the outgoing network
855	/// connection specified by the provided `url` is permitted. It checks the
856	/// resolved network targets associated with the URL and ensures that all
857	/// targets adhere to the configured capabilities.
858	///
859	/// # Features
860	/// The function is only available if the `http` feature is enabled.
861	///
862	/// # Parameters
863	/// - `url`: A reference to a [`Url`] object representing the target endpoint to check.
864	///
865	/// # Returns
866	/// This function returns a [`Result<()>`]:
867	/// - On success, it returns `Ok(())` indicating the network target is allowed.
868	/// - On failure, it returns an error wrapped in the [`Error`] type:
869	///   - `NetTargetNotAllowed` if the target is not permitted.
870	///   - `InvalidUrl` if the provided URL is invalid.
871	///
872	/// # Behavior
873	/// 1. Extracts the host and port information from the URL.
874	/// 2. Constructs a [`NetTarget`] object and checks if it is allowed by the current network
875	///    capabilities.
876	/// 3. If the network target resolves to multiple targets (e.g., DNS resolution), each target is
877	///    validated individually.
878	/// 4. Logs a warning and prevents the connection if the target is denied by the capabilities.
879	///
880	/// # Logging
881	/// - Logs a warning message if the network target is denied.
882	/// - Logs a trace message if the network target is permitted.
883	///
884	/// # Errors
885	/// - `NetTargetNotAllowed`: Returned if any of the resolved targets are not allowed.
886	/// - `InvalidUrl`: Returned if the URL does not have a valid host.
887	#[cfg(feature = "http")]
888	pub(crate) async fn check_allowed_net(&self, url: &Url) -> Result<()> {
889		let match_any_deny_net = |t| {
890			if self.capabilities.matches_any_deny_net(t) {
891				warn!("Capabilities denied outgoing network connection attempt, target: '{t}'");
892				bail!(Error::NetTargetNotAllowed(t.to_string()));
893			}
894			Ok(())
895		};
896		match url.host() {
897			Some(host) => {
898				let target = NetTarget::Host(host.to_owned(), url.port_or_known_default());
899				// Check the domain name (if any) matches the allow list
900				let host_allowed = self.capabilities.matches_any_allow_net(&target);
901				if !host_allowed {
902					warn!(
903						"Capabilities denied outgoing network connection attempt, target: '{target}'"
904					);
905					bail!(Error::NetTargetNotAllowed(target.to_string()));
906				}
907				// Check against the deny list
908				match_any_deny_net(&target)?;
909				// Resolve the domain name to a vector of IP addresses
910				#[cfg(not(target_family = "wasm"))]
911				let targets = target.resolve().await?;
912				#[cfg(target_family = "wasm")]
913				let targets = target.resolve()?;
914				for t in &targets {
915					// For each IP address resolved, check it is allowed
916					match_any_deny_net(t)?;
917				}
918				trace!("Capabilities allowed outgoing network connection, target: '{target}'");
919				Ok(())
920			}
921			_ => bail!(Error::InvalidUrl(url.to_string())),
922		}
923	}
924
925	pub(crate) fn get_buckets(&self) -> Option<&BucketsManager> {
926		self.buckets.as_ref()
927	}
928
929	/// Obtain the connection for a bucket
930	pub(crate) async fn get_bucket_store(
931		&self,
932		ns: NamespaceId,
933		db: DatabaseId,
934		bu: &str,
935	) -> Result<Arc<dyn ObjectStore>> {
936		// Do we have a buckets context?
937		if let Some(buckets) = &self.buckets {
938			buckets.get_bucket_store(&self.tx(), ns, db, bu).await
939		} else {
940			bail!(Error::BucketUnavailable(bu.into()))
941		}
942	}
943
944	#[cfg(feature = "surrealism")]
945	pub(crate) fn get_surrealism_cache(&self) -> Option<Arc<SurrealismCache>> {
946		self.surrealism_cache.as_ref().map(|sc| sc.clone())
947	}
948
949	#[cfg(feature = "surrealism")]
950	pub(crate) async fn get_surrealism_runtime(
951		&self,
952		lookup: SurrealismCacheLookup<'_>,
953	) -> Result<Arc<Runtime>> {
954		if !self.get_capabilities().allows_experimental(&ExperimentalTarget::Surrealism) {
955			bail!(
956				"Failed to get surrealism runtime: Experimental capability `surrealism` is not enabled"
957			);
958		}
959
960		let Some(cache) = self.get_surrealism_cache() else {
961			bail!("Surrealism cache is not available");
962		};
963
964		cache
965			.get_or_insert_with(&lookup, async || {
966				let SurrealismCacheLookup::File(ns, db, bucket, key) = lookup else {
967					bail!("silo lookups are not supported yet");
968				};
969
970				let bucket = self.get_bucket_store(*ns, *db, bucket).await?;
971				let key = ObjectKey::new(key);
972				let surli = bucket
973					.get(&key)
974					.await
975					.map_err(|e| anyhow::anyhow!("failed to get file: {}", e))?;
976
977				let Some(surli) = surli else {
978					bail!("file not found");
979				};
980
981				let package = SurrealismPackage::from_reader(std::io::Cursor::new(surli))?;
982				let runtime = Arc::new(Runtime::new(package)?);
983
984				Ok(runtime)
985			})
986			.await
987	}
988}
989
990#[cfg(test)]
991mod tests {
992	#[cfg(feature = "http")]
993	use std::str::FromStr;
994	use std::time::Duration;
995
996	#[cfg(feature = "http")]
997	use url::Url;
998
999	use crate::cnf::MEMORY_THRESHOLD;
1000	use crate::ctx::Context;
1001	use crate::ctx::reason::Reason;
1002	#[cfg(feature = "http")]
1003	use crate::dbs::Capabilities;
1004	#[cfg(feature = "http")]
1005	use crate::dbs::capabilities::{NetTarget, Targets};
1006
1007	#[cfg(feature = "http")]
1008	#[tokio::test]
1009	async fn test_context_check_allowed_net() {
1010		let cap = Capabilities::all().without_network_targets(Targets::Some(
1011			[NetTarget::from_str("127.0.0.1").unwrap()].into(),
1012		));
1013		let mut ctx = Context::background();
1014		ctx.capabilities = cap.into();
1015		let ctx = ctx.freeze();
1016		let r = ctx.check_allowed_net(&Url::parse("http://localhost").unwrap()).await;
1017		assert_eq!(
1018			r.err().unwrap().to_string(),
1019			"Access to network target '127.0.0.1/32' is not allowed"
1020		);
1021	}
1022
1023	#[tokio::test]
1024	async fn test_context_cancellation_priority() {
1025		// Test that cancellation is detected even when a deadline is set and exceeded
1026		let mut ctx = Context::background();
1027
1028		// Set a deadline in the past (already exceeded)
1029		ctx.add_timeout(Duration::from_nanos(1)).unwrap();
1030		// Give time for the deadline to pass
1031		tokio::time::sleep(Duration::from_millis(10)).await;
1032
1033		// Cancel the context
1034		let canceller = ctx.add_cancel();
1035		canceller.cancel();
1036
1037		let ctx = ctx.freeze();
1038
1039		// Cancellation should be detected first, not timeout
1040		let result = ctx.done(true);
1041		assert!(result.is_ok());
1042		assert_eq!(result.unwrap(), Some(Reason::Canceled));
1043	}
1044
1045	#[tokio::test]
1046	async fn test_context_deadline_detection() {
1047		// Test that deadline timeout is detected when context is not cancelled
1048		let mut ctx = Context::background();
1049
1050		// Set a very short timeout
1051		ctx.add_timeout(Duration::from_nanos(1)).unwrap();
1052		// Give time for the deadline to pass
1053		tokio::time::sleep(Duration::from_millis(10)).await;
1054
1055		let ctx = ctx.freeze();
1056
1057		// Should detect timeout
1058		let result = ctx.done(true);
1059		assert!(result.is_ok());
1060		assert!(matches!(result.unwrap(), Some(Reason::Timedout(_))));
1061	}
1062
1063	#[tokio::test]
1064	async fn test_context_no_deadline() {
1065		// Test that a context without deadline or cancellation returns None
1066		let ctx = Context::background();
1067		let ctx = ctx.freeze();
1068
1069		// Should return None (ok to continue)
1070		let result = ctx.done(true);
1071		assert!(result.is_ok());
1072		assert_eq!(result.unwrap(), None);
1073	}
1074
1075	#[tokio::test]
1076	async fn test_context_is_done_adaptive_backoff() {
1077		// Test the adaptive back-off strategy in is_done()
1078		let ctx = Context::background();
1079		let ctx = ctx.freeze();
1080
1081		// Test that early iterations trigger deep checks (1, 2, 4, 8, 16, 32)
1082		for count in [1, 2, 4, 8, 16, 32] {
1083			let result = ctx.is_done(Some(count)).await;
1084			assert!(result.is_ok());
1085			assert_eq!(result.unwrap(), false, "Count {} should not be done", count);
1086		}
1087
1088		// Test that later iterations only check every 64
1089		// Count 33-63 should not trigger deep checks (except at 64)
1090		for count in 33..64 {
1091			let result = ctx.is_done(Some(count)).await;
1092			assert!(result.is_ok());
1093			assert_eq!(result.unwrap(), false, "Count {} should not be done", count);
1094		}
1095
1096		// Count 64 should trigger a deep check (64 % 64 == 0)
1097		let result = ctx.is_done(Some(64)).await;
1098		assert!(result.is_ok());
1099		assert_eq!(result.unwrap(), false);
1100	}
1101
1102	#[tokio::test]
1103	async fn test_context_is_done_with_none() {
1104		// Test that is_done(None) always performs a deep check
1105		let ctx = Context::background();
1106		let ctx = ctx.freeze();
1107
1108		// Should perform deep check and return Ok(false) since no cancellation/timeout
1109		let result = ctx.is_done(None).await;
1110		assert!(result.is_ok());
1111		assert_eq!(result.unwrap(), false);
1112	}
1113
1114	#[tokio::test]
1115	async fn test_context_is_done_detects_cancellation() {
1116		// Test that is_done detects cancellation
1117		let mut ctx = Context::background();
1118		let canceller = ctx.add_cancel();
1119		canceller.cancel();
1120		let ctx = ctx.freeze();
1121
1122		// Should detect cancellation
1123		let result = ctx.is_done(None).await;
1124		assert!(result.is_ok());
1125		assert_eq!(result.unwrap(), true);
1126
1127		// Should also detect with count
1128		let result = ctx.is_done(Some(1)).await;
1129		assert!(result.is_ok());
1130		assert_eq!(result.unwrap(), true);
1131	}
1132
1133	/// Test documenting the expected behavior when memory threshold is exceeded.
1134	///
1135	/// Note: This test documents the expected behavior but cannot easily test actual
1136	/// memory threshold violations without:
1137	/// 1. Setting MEMORY_THRESHOLD configuration (via cnf::MEMORY_THRESHOLD)
1138	/// 2. Actually allocating enough memory to exceed it
1139	/// 3. Having the "allocation-tracking" feature enabled
1140	///
1141	/// The key behavior tested elsewhere is that when is_beyond_threshold() returns true,
1142	/// it takes priority over deadline timeout, which prevents OOM crashes from being
1143	/// masked by timeout errors.
1144	#[tokio::test]
1145	async fn test_context_memory_threshold_priority_documentation() {
1146		// This test documents that the priority order in done() is:
1147		// 1. Cancellation (always checked, fast atomic operation)
1148		// 2. Memory threshold (checked when deep_check=true, if beyond threshold returns Error)
1149		// 3. Deadline (checked when deep_check=true, returns Reason::Timedout)
1150
1151		// When ALLOC.is_beyond_threshold() returns true, done() will bail with
1152		// Error::QueryBeyondMemoryThreshold before checking the deadline.
1153		// This ensures memory violations are always detected before timeout errors.
1154
1155		let ctx = Context::background();
1156		let ctx = ctx.freeze();
1157
1158		// With no memory pressure, deadline not set, and no cancellation:
1159		let result = ctx.done(true);
1160		assert!(result.is_ok());
1161		assert_eq!(result.unwrap(), None);
1162	}
1163
1164	/// Integration test that actually tests memory threshold detection.
1165	///
1166	/// This test requires:
1167	/// 1. The "allocation-tracking" feature to be enabled
1168	/// 2. The "allocator" feature to be enabled (for tracking to work)
1169	/// 3. Running with #[serial] to avoid interference from other tests
1170	///
1171	/// The test sets SURREAL_MEMORY_THRESHOLD environment variable, allocates memory
1172	/// to exceed the threshold, and verifies that context.done(true) detects the violation.
1173	#[tokio::test]
1174	#[cfg(all(feature = "allocation-tracking", feature = "allocator"))]
1175	#[serial_test::serial]
1176	async fn test_context_memory_threshold_integration() {
1177		use crate::err::Error;
1178		use crate::str::ParseBytes;
1179
1180		// Set a low memory threshold (1MB) before MEMORY_THRESHOLD is accessed
1181		// This must happen before any code accesses cnf::MEMORY_THRESHOLD
1182		// Safety: This test runs with #[serial] ensuring no other tests run concurrently,
1183		// so there's no risk of data races when modifying the environment variable.
1184		unsafe {
1185			std::env::set_var(
1186				"SURREAL_MEMORY_THRESHOLD",
1187				"1MB".parse_bytes::<u64>().unwrap().to_string(),
1188			);
1189		}
1190		// Assert that SURREAL_MEMORY_THRESHOLD shows up as MEMORY_THRESHOLD with the expected value
1191		assert_eq!(*MEMORY_THRESHOLD, 1048576);
1192
1193		// Force reinitialization by dropping and recreating (this won't work with LazyLock)
1194		// Instead, we rely on this test running in isolation with #[serial]
1195		// and being run in a fresh process where MEMORY_THRESHOLD hasn't been accessed yet
1196
1197		// Note: This test may not work reliably if MEMORY_THRESHOLD was already accessed
1198		// elsewhere in the test suite. The #[serial] attribute ensures tests run one at a time,
1199		// but doesn't guarantee a fresh process. For reliable testing, this should be run
1200		// as a separate integration test binary.
1201
1202		// Allocate a large vector (10MB) to exceed the threshold
1203		// Using Vec::with_capacity to ensure the memory is actually allocated
1204		let _large_allocation: Vec<u8> = Vec::with_capacity(20 * 1024 * 1024);
1205
1206		// Give the allocator tracking time to register the allocation
1207		tokio::time::sleep(Duration::from_millis(10)).await;
1208
1209		let ctx = Context::background();
1210		let ctx = ctx.freeze();
1211
1212		// The memory threshold check should detect that we've exceeded the limit
1213		let result = ctx.done(true);
1214
1215		// We expect either:
1216		// 1. An error if memory tracking properly detected the threshold violation
1217		// 2. Ok(None) if MEMORY_THRESHOLD was already initialized with default (0) before we set
1218		//    the environment variable
1219		match result {
1220			Err(e) => {
1221				// Verify it's the correct error type
1222				match e.downcast_ref::<Error>() {
1223					Some(Error::QueryBeyondMemoryThreshold) => {
1224						// Success! Memory threshold was properly detected
1225						println!("✓ Memory threshold violation detected as expected");
1226					}
1227					other => {
1228						panic!("Expected QueryBeyondMemoryThreshold error, got: {:?}", other);
1229					}
1230				}
1231			}
1232			Ok(None) => {
1233				// This means MEMORY_THRESHOLD was already initialized before we set the env var
1234				// This is expected behavior in the test suite - document it
1235				println!(
1236					"⚠ Memory threshold not enforced - MEMORY_THRESHOLD was already initialized"
1237				);
1238				println!("  This is expected when running as part of the full test suite.");
1239				println!(
1240					"  To properly test memory threshold enforcement, run this test in isolation:"
1241				);
1242				println!(
1243					"  cargo test --package surrealdb-core --features allocation-tracking,allocator test_context_memory_threshold_integration"
1244				);
1245				panic!("MEMORY_THRESHOLD was already initialized")
1246			}
1247			Ok(Some(reason)) => {
1248				panic!("Unexpected reason returned: {:?}", reason);
1249			}
1250		}
1251
1252		// Clean up the environment variable
1253		// Safety: Same as above - #[serial] ensures no concurrent access
1254		unsafe {
1255			std::env::remove_var("SURREAL_MEMORY_THRESHOLD");
1256		}
1257	}
1258}