1use std::borrow::Cow;
2use std::collections::HashMap;
3use std::fmt::{self, Debug};
4#[cfg(storage)]
5use std::path::PathBuf;
6use std::sync::Arc;
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::time::Duration;
9
10use anyhow::{Result, bail};
11use async_channel::Sender;
12#[cfg(feature = "surrealism")]
13use surrealism_runtime::controller::Runtime;
14#[cfg(feature = "surrealism")]
15use surrealism_runtime::package::SurrealismPackage;
16#[cfg(feature = "http")]
17use url::Url;
18use web_time::Instant;
19
20use crate::buc::manager::BucketsManager;
21#[cfg(feature = "surrealism")]
22use crate::buc::store::ObjectKey;
23use crate::buc::store::ObjectStore;
24use crate::catalog::providers::{CatalogProvider, DatabaseProvider, NamespaceProvider};
25use crate::catalog::{DatabaseDefinition, DatabaseId, NamespaceId};
26use crate::cnf::PROTECTED_PARAM_NAMES;
27use crate::ctx::canceller::Canceller;
28use crate::ctx::reason::Reason;
29#[cfg(feature = "surrealism")]
30use crate::dbs::capabilities::ExperimentalTarget;
31#[cfg(feature = "http")]
32use crate::dbs::capabilities::NetTarget;
33use crate::dbs::{Capabilities, NewPlannerStrategy, Options, Session, Variables};
34use crate::err::Error;
35use crate::exec::function::FunctionRegistry;
36use crate::idx::planner::executor::QueryExecutor;
37use crate::idx::planner::{IterationStage, QueryPlanner};
38use crate::idx::trees::store::IndexStores;
39use crate::kvs::Transaction;
40use crate::kvs::cache::ds::DatastoreCache;
41use crate::kvs::index::IndexBuilder;
42use crate::kvs::sequences::Sequences;
43use crate::kvs::slowlog::SlowLog;
44use crate::mem::ALLOC;
45use crate::sql::expression::convert_public_value_to_internal;
46#[cfg(feature = "surrealism")]
47use crate::surrealism::cache::{SurrealismCache, SurrealismCacheLookup};
48use crate::types::{PublicNotification, PublicVariables};
49use crate::val::Value;
50
51pub type FrozenContext = Arc<Context>;
52
53pub struct Context {
54 parent: Option<FrozenContext>,
56 deadline: Option<(Instant, Duration)>,
58 slow_log: Option<SlowLog>,
62 cancelled: Arc<AtomicBool>,
64 values: HashMap<Cow<'static, str>, Arc<Value>>,
66 notifications: Option<Sender<PublicNotification>>,
68 query_planner: Option<Arc<QueryPlanner>>,
70 query_executor: Option<QueryExecutor>,
72 iteration_stage: Option<IterationStage>,
74 cache: Option<Arc<DatastoreCache>>,
76 index_stores: IndexStores,
78 index_builder: Option<IndexBuilder>,
80 sequences: Option<Sequences>,
82 capabilities: Arc<Capabilities>,
84 #[cfg(storage)]
85 temporary_directory: Option<Arc<PathBuf>>,
87 transaction: Option<Arc<Transaction>>,
89 isolated: bool,
91 buckets: Option<BucketsManager>,
93 #[cfg(feature = "surrealism")]
95 surrealism_cache: Option<Arc<SurrealismCache>>,
96 function_registry: Arc<FunctionRegistry>,
98 new_planner_strategy: NewPlannerStrategy,
100 redact_volatile_explain_attrs: bool,
102 matches_context: Option<Arc<crate::exec::function::MatchesContext>>,
104 knn_context: Option<Arc<crate::exec::function::KnnContext>>,
106}
107
108impl Default for Context {
109 fn default() -> Self {
110 Context::background()
111 }
112}
113
114impl From<Transaction> for Context {
115 fn from(txn: Transaction) -> Self {
116 let mut ctx = Context::background();
117 ctx.set_transaction(Arc::new(txn));
118 ctx
119 }
120}
121
122impl Debug for Context {
123 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
124 f.debug_struct("Context")
125 .field("parent", &self.parent)
126 .field("deadline", &self.deadline)
127 .field("cancelled", &self.cancelled)
128 .field("values", &self.values)
129 .finish()
130 }
131}
132
133impl Context {
134 pub(crate) fn background() -> Self {
136 Self {
137 values: HashMap::default(),
138 parent: None,
139 deadline: None,
140 slow_log: None,
141 cancelled: Arc::new(AtomicBool::new(false)),
142 notifications: None,
143 query_planner: None,
144 query_executor: None,
145 iteration_stage: None,
146 capabilities: Arc::new(Capabilities::default()),
147 index_stores: IndexStores::default(),
148 cache: None,
149 index_builder: None,
150 sequences: None,
151 #[cfg(storage)]
152 temporary_directory: None,
153 transaction: None,
154 isolated: false,
155 buckets: None,
156 #[cfg(feature = "surrealism")]
157 surrealism_cache: None,
158 function_registry: Arc::new(FunctionRegistry::with_builtins()),
159 new_planner_strategy: NewPlannerStrategy::default(),
160 redact_volatile_explain_attrs: false,
161 matches_context: None,
162 knn_context: None,
163 }
164 }
165
166 pub(crate) fn new(parent: &FrozenContext) -> Self {
168 Context {
169 values: HashMap::default(),
170 deadline: parent.deadline,
171 slow_log: parent.slow_log.clone(),
172 cancelled: Arc::new(AtomicBool::new(false)),
173 notifications: parent.notifications.clone(),
174 query_planner: parent.query_planner.clone(),
175 query_executor: parent.query_executor.clone(),
176 iteration_stage: parent.iteration_stage.clone(),
177 capabilities: parent.capabilities.clone(),
178 index_stores: parent.index_stores.clone(),
179 cache: parent.cache.clone(),
180 index_builder: parent.index_builder.clone(),
181 sequences: parent.sequences.clone(),
182 #[cfg(storage)]
183 temporary_directory: parent.temporary_directory.clone(),
184 transaction: parent.transaction.clone(),
185 isolated: false,
186 parent: Some(parent.clone()),
187 buckets: parent.buckets.clone(),
188 #[cfg(feature = "surrealism")]
189 surrealism_cache: parent.surrealism_cache.clone(),
190 function_registry: parent.function_registry.clone(),
191 new_planner_strategy: parent.new_planner_strategy.clone(),
192 redact_volatile_explain_attrs: parent.redact_volatile_explain_attrs,
193 matches_context: parent.matches_context.clone(),
194 knn_context: parent.knn_context.clone(),
195 }
196 }
197
198 pub(crate) fn new_isolated(parent: &FrozenContext) -> Self {
202 Self {
203 values: HashMap::default(),
204 deadline: parent.deadline,
205 slow_log: parent.slow_log.clone(),
206 cancelled: Arc::new(AtomicBool::new(false)),
207 notifications: parent.notifications.clone(),
208 query_planner: parent.query_planner.clone(),
209 query_executor: parent.query_executor.clone(),
210 iteration_stage: parent.iteration_stage.clone(),
211 capabilities: parent.capabilities.clone(),
212 index_stores: parent.index_stores.clone(),
213 cache: parent.cache.clone(),
214 index_builder: parent.index_builder.clone(),
215 sequences: parent.sequences.clone(),
216 #[cfg(storage)]
217 temporary_directory: parent.temporary_directory.clone(),
218 transaction: parent.transaction.clone(),
219 isolated: true,
220 parent: Some(parent.clone()),
221 buckets: parent.buckets.clone(),
222 #[cfg(feature = "surrealism")]
223 surrealism_cache: parent.surrealism_cache.clone(),
224 function_registry: parent.function_registry.clone(),
225 new_planner_strategy: parent.new_planner_strategy.clone(),
226 redact_volatile_explain_attrs: parent.redact_volatile_explain_attrs,
227 matches_context: parent.matches_context.clone(),
228 knn_context: parent.knn_context.clone(),
229 }
230 }
231
232 pub(crate) fn snapshot(from: &FrozenContext) -> Self {
242 Self {
243 values: from.collect_values(HashMap::default()),
245 deadline: from.deadline,
246 slow_log: from.slow_log.clone(),
247 cancelled: Arc::new(AtomicBool::new(false)),
248 notifications: from.notifications.clone(),
249 query_planner: from.query_planner.clone(),
250 query_executor: from.query_executor.clone(),
251 iteration_stage: from.iteration_stage.clone(),
252 capabilities: from.capabilities.clone(),
253 index_stores: from.index_stores.clone(),
254 cache: from.cache.clone(),
255 index_builder: from.index_builder.clone(),
256 sequences: from.sequences.clone(),
257 #[cfg(storage)]
258 temporary_directory: from.temporary_directory.clone(),
259 transaction: from.transaction.clone(),
260 isolated: false,
261 parent: None, buckets: from.buckets.clone(),
263 #[cfg(feature = "surrealism")]
264 surrealism_cache: from.surrealism_cache.clone(),
265 function_registry: from.function_registry.clone(),
266 new_planner_strategy: from.new_planner_strategy.clone(),
267 redact_volatile_explain_attrs: from.redact_volatile_explain_attrs,
268 matches_context: from.matches_context.clone(),
269 knn_context: from.knn_context.clone(),
270 }
271 }
272
273 pub(crate) fn new_concurrent(from: &FrozenContext) -> Self {
277 Self {
278 values: HashMap::default(),
279 deadline: None,
280 slow_log: from.slow_log.clone(),
281 cancelled: Arc::new(AtomicBool::new(false)),
282 notifications: from.notifications.clone(),
283 query_planner: from.query_planner.clone(),
284 query_executor: from.query_executor.clone(),
285 iteration_stage: from.iteration_stage.clone(),
286 capabilities: from.capabilities.clone(),
287 index_stores: from.index_stores.clone(),
288 cache: from.cache.clone(),
289 index_builder: from.index_builder.clone(),
290 sequences: from.sequences.clone(),
291 #[cfg(storage)]
292 temporary_directory: from.temporary_directory.clone(),
293 transaction: None,
294 isolated: false,
295 parent: None,
296 buckets: from.buckets.clone(),
297 #[cfg(feature = "surrealism")]
298 surrealism_cache: from.surrealism_cache.clone(),
299 function_registry: from.function_registry.clone(),
300 new_planner_strategy: from.new_planner_strategy.clone(),
301 redact_volatile_explain_attrs: from.redact_volatile_explain_attrs,
302 matches_context: from.matches_context.clone(),
303 knn_context: from.knn_context.clone(),
304 }
305 }
306
307 #[expect(clippy::too_many_arguments)]
309 pub(crate) fn from_ds(
310 time_out: Option<Duration>,
311 slow_log: Option<SlowLog>,
312 capabilities: Arc<Capabilities>,
313 index_stores: IndexStores,
314 index_builder: IndexBuilder,
315 sequences: Sequences,
316 cache: Arc<DatastoreCache>,
317 #[cfg(storage)] temporary_directory: Option<Arc<PathBuf>>,
318 buckets: BucketsManager,
319 #[cfg(feature = "surrealism")] surrealism_cache: Arc<SurrealismCache>,
320 ) -> Result<Context> {
321 let planner_strategy = capabilities.planner_strategy().clone();
322 let mut ctx = Self {
323 values: HashMap::default(),
324 parent: None,
325 deadline: None,
326 slow_log,
327 cancelled: Arc::new(AtomicBool::new(false)),
328 notifications: None,
329 query_planner: None,
330 query_executor: None,
331 iteration_stage: None,
332 capabilities,
333 index_stores,
334 cache: Some(cache),
335 index_builder: Some(index_builder),
336 sequences: Some(sequences),
337 #[cfg(storage)]
338 temporary_directory,
339 transaction: None,
340 isolated: false,
341 buckets: Some(buckets),
342 #[cfg(feature = "surrealism")]
343 surrealism_cache: Some(surrealism_cache),
344 function_registry: Arc::new(FunctionRegistry::with_builtins()),
345 new_planner_strategy: planner_strategy,
346 redact_volatile_explain_attrs: false,
347 matches_context: None,
348 knn_context: None,
349 };
350 if let Some(timeout) = time_out {
351 ctx.add_timeout(timeout)?;
352 }
353 Ok(ctx)
354 }
355
356 pub(crate) fn freeze(self) -> FrozenContext {
358 Arc::new(self)
359 }
360
361 pub(crate) fn unfreeze(ctx: FrozenContext) -> Result<Context> {
363 let Some(x) = Arc::into_inner(ctx) else {
364 fail!("Tried to unfreeze a Context with multiple references")
365 };
366 Ok(x)
367 }
368
369 pub(crate) async fn get_ns_id(&self, opt: &Options) -> Result<NamespaceId> {
373 let ns = opt.ns()?;
374 let tx = self.tx();
375 let ns_def = tx.get_or_add_ns(Some(self), ns).await?;
376 Ok(ns_def.namespace_id)
377 }
378
379 pub(crate) async fn expect_ns_id(&self, opt: &Options) -> Result<NamespaceId> {
382 let ns = opt.ns()?;
383 let Some(ns_def) = self.tx().get_ns_by_name(ns).await? else {
384 return Err(Error::NsNotFound {
385 name: ns.to_string(),
386 }
387 .into());
388 };
389 Ok(ns_def.namespace_id)
390 }
391
392 pub(crate) async fn get_ns_db_ids(&self, opt: &Options) -> Result<(NamespaceId, DatabaseId)> {
396 let (ns, db) = opt.ns_db()?;
397 let db_def = self.tx().ensure_ns_db(Some(self), ns, db).await?;
398 Ok((db_def.namespace_id, db_def.database_id))
399 }
400
401 pub(crate) async fn try_ns_db_ids(
405 &self,
406 opt: &Options,
407 ) -> Result<Option<(NamespaceId, DatabaseId)>> {
408 let (ns, db) = opt.ns_db()?;
409 let Some(db_def) = self.tx().get_db_by_name(ns, db).await? else {
410 return Ok(None);
411 };
412 Ok(Some((db_def.namespace_id, db_def.database_id)))
413 }
414
415 pub(crate) async fn expect_ns_db_ids(
418 &self,
419 opt: &Options,
420 ) -> Result<(NamespaceId, DatabaseId)> {
421 let (ns, db) = opt.ns_db()?;
422 let Some(db_def) = self.tx().get_db_by_name(ns, db).await? else {
423 return Err(Error::DbNotFound {
424 name: db.to_string(),
425 }
426 .into());
427 };
428 Ok((db_def.namespace_id, db_def.database_id))
429 }
430
431 pub(crate) async fn get_db(&self, opt: &Options) -> Result<Arc<DatabaseDefinition>> {
432 let (ns, db) = opt.ns_db()?;
433 let db_def = self.tx().ensure_ns_db(Some(self), ns, db).await?;
434 Ok(db_def)
435 }
436
437 pub(crate) fn add_value<K>(&mut self, key: K, value: Arc<Value>)
440 where
441 K: Into<Cow<'static, str>>,
442 {
443 self.values.insert(key.into(), value);
444 }
445
446 pub(crate) fn add_values<T, K, V>(&mut self, iter: T)
449 where
450 T: IntoIterator<Item = (K, V)>,
451 K: Into<Cow<'static, str>>,
452 V: Into<Arc<Value>>,
453 {
454 self.values.extend(iter.into_iter().map(|(k, v)| (k.into(), v.into())))
455 }
456
457 pub(crate) fn add_cancel(&mut self) -> Canceller {
460 let cancelled = self.cancelled.clone();
461 Canceller::new(cancelled)
462 }
463
464 pub(crate) fn add_deadline(&mut self, deadline: Instant, duration: Duration) {
467 match self.deadline {
468 Some((current, _)) if current < deadline => (),
469 _ => self.deadline = Some((deadline, duration)),
470 }
471 }
472
473 pub(crate) fn add_timeout(&mut self, timeout: Duration) -> Result<(), Error> {
477 match Instant::now().checked_add(timeout) {
478 Some(deadline) => {
479 self.add_deadline(deadline, timeout);
480 Ok(())
481 }
482 None => Err(Error::InvalidTimeout(timeout.as_secs())),
483 }
484 }
485
486 pub(crate) fn add_notifications(&mut self, chn: Option<&Sender<PublicNotification>>) {
489 self.notifications = chn.cloned()
490 }
491
492 pub(crate) fn set_query_planner(&mut self, qp: QueryPlanner) {
493 self.query_planner = Some(Arc::new(qp));
494 }
495
496 pub(crate) fn set_query_executor(&mut self, qe: QueryExecutor) {
502 self.query_executor = Some(qe);
503 }
504
505 pub(crate) fn set_iteration_stage(&mut self, is: IterationStage) {
506 self.iteration_stage = Some(is);
507 }
508
509 pub(crate) fn set_transaction(&mut self, txn: Arc<Transaction>) {
510 self.transaction = Some(txn);
511 }
512
513 pub(crate) fn tx(&self) -> Arc<Transaction> {
514 self.transaction
515 .clone()
516 .unwrap_or_else(|| unreachable!("The context was not associated with a transaction"))
517 }
518
519 pub(crate) fn try_tx(&self) -> Option<&Arc<Transaction>> {
521 self.transaction.as_ref()
522 }
523
524 pub(crate) fn timeout(&self) -> Option<Duration> {
527 self.deadline.map(|(v, _)| v.saturating_duration_since(Instant::now()))
528 }
529
530 pub(crate) fn slow_log(&self) -> Option<&SlowLog> {
533 self.slow_log.as_ref()
534 }
535
536 pub(crate) fn notifications(&self) -> Option<Sender<PublicNotification>> {
537 self.notifications.clone()
538 }
539
540 pub(crate) fn has_notifications(&self) -> bool {
541 self.notifications.is_some()
542 }
543
544 pub(crate) fn get_query_planner(&self) -> Option<&QueryPlanner> {
545 self.query_planner.as_ref().map(|qp| qp.as_ref())
546 }
547
548 pub(crate) fn get_query_executor(&self) -> Option<&QueryExecutor> {
551 self.query_executor.as_ref()
552 }
553
554 pub(crate) fn get_iteration_stage(&self) -> Option<&IterationStage> {
555 self.iteration_stage.as_ref()
556 }
557
558 pub(crate) fn get_index_stores(&self) -> &IndexStores {
560 &self.index_stores
561 }
562
563 pub(crate) fn get_index_builder(&self) -> Option<&IndexBuilder> {
565 self.index_builder.as_ref()
566 }
567
568 pub(crate) fn get_sequences(&self) -> Option<&Sequences> {
570 self.sequences.as_ref()
571 }
572
573 pub(crate) fn try_get_sequences(&self) -> Result<&Sequences> {
574 if let Some(sqs) = self.get_sequences() {
575 Ok(sqs)
576 } else {
577 bail!(Error::Internal("Sequences are not supported in this context.".to_string(),))
578 }
579 }
580
581 pub(crate) fn get_cache(&self) -> Option<Arc<DatastoreCache>> {
583 self.cache.clone()
584 }
585
586 pub(crate) fn done(&self, deep_check: bool) -> Result<Option<Reason>> {
610 if self.cancelled.load(Ordering::Relaxed) {
612 return Ok(Some(Reason::Canceled));
613 }
614 if deep_check {
615 if ALLOC.is_beyond_threshold() {
616 bail!(Error::QueryBeyondMemoryThreshold);
617 }
618 let now = Instant::now();
619 if let Some((deadline, timeout)) = self.deadline
620 && deadline <= now
621 {
622 return Ok(Some(Reason::Timedout(timeout.into())));
623 }
624 }
625 if let Some(ctx) = &self.parent {
626 return ctx.done(deep_check);
627 }
628 Ok(None)
629 }
630
631 pub(crate) async fn is_done(&self, count: Option<usize>) -> Result<bool> {
649 let deep_check = if let Some(count) = count {
650 if count % 32 == 0 {
652 yield_now!();
653 }
654 match count {
657 1 | 2 | 4 | 8 | 16 | 32 => true,
658 _ => count % 64 == 0,
659 }
660 } else {
661 true
663 };
664 Ok(self.done(deep_check)?.is_some())
665 }
666
667 pub(crate) async fn is_timedout(&self) -> Result<Option<Duration>> {
669 yield_now!();
670 if let Some(Reason::Timedout(d)) = self.done(true)? {
671 Ok(Some(d.0))
672 } else {
673 Ok(None)
674 }
675 }
676
677 pub(crate) async fn expect_not_timedout(&self) -> Result<()> {
678 if let Some(d) = self.is_timedout().await? {
679 bail!(Error::QueryTimedout(d.into()))
680 } else {
681 Ok(())
682 }
683 }
684
685 #[cfg(storage)]
686 pub(crate) fn temporary_directory(&self) -> Option<&Arc<PathBuf>> {
688 self.temporary_directory.as_ref()
689 }
690
691 pub(crate) fn value(&self, key: &str) -> Option<&Value> {
694 match self.values.get(key) {
695 Some(v) => Some(v.as_ref()),
696 None if PROTECTED_PARAM_NAMES.contains(&key) || !self.isolated => match &self.parent {
697 Some(p) => p.value(key),
698 _ => None,
699 },
700 None => None,
701 }
702 }
703
704 pub(crate) fn collect_values(
707 &self,
708 map: HashMap<Cow<'static, str>, Arc<Value>>,
709 ) -> HashMap<Cow<'static, str>, Arc<Value>> {
710 let mut map = if !self.isolated
711 && let Some(p) = &self.parent
712 {
713 p.collect_values(map)
714 } else {
715 map
716 };
717 self.values.iter().for_each(|(k, v)| {
718 map.insert(k.clone(), v.clone());
719 });
720 map
721 }
722
723 #[cfg(feature = "scripting")]
725 pub(crate) fn cancellation(&self) -> crate::ctx::cancellation::Cancellation {
726 crate::ctx::cancellation::Cancellation::new(
727 self.deadline.map(|(deadline, _)| deadline),
728 std::iter::successors(Some(self), |ctx| ctx.parent.as_ref().map(|c| c.as_ref()))
729 .map(|ctx| ctx.cancelled.clone())
730 .collect(),
731 )
732 }
733
734 pub(crate) fn attach_session(&mut self, session: &Session) -> Result<(), Error> {
737 self.add_values(session.values());
738 if session.new_planner_strategy != NewPlannerStrategy::default() {
742 self.new_planner_strategy = session.new_planner_strategy.clone();
743 }
744 if session.redact_volatile_explain_attrs {
746 self.redact_volatile_explain_attrs = true;
747 }
748 if !session.variables.is_empty() {
749 self.attach_variables(session.variables.clone().into())?;
750 }
751 Ok(())
752 }
753
754 pub(crate) fn attach_variables(&mut self, vars: Variables) -> Result<(), Error> {
756 for (name, val) in vars {
757 if PROTECTED_PARAM_NAMES.contains(&name.as_str()) {
758 return Err(Error::InvalidParam {
759 name,
760 });
761 }
762 self.add_value(name, Arc::new(val));
763 }
764 Ok(())
765 }
766
767 pub(crate) fn attach_public_variables(&mut self, vars: PublicVariables) -> Result<(), Error> {
768 for (name, val) in vars {
769 if PROTECTED_PARAM_NAMES.contains(&name.as_str()) {
770 return Err(Error::InvalidParam {
771 name,
772 });
773 }
774 self.add_value(name, Arc::new(convert_public_value_to_internal(val)));
775 }
776 Ok(())
777 }
778
779 pub(crate) fn add_capabilities(&mut self, caps: Arc<Capabilities>) {
785 self.capabilities = caps;
786 }
787
788 pub(crate) fn get_capabilities(&self) -> Arc<Capabilities> {
790 self.capabilities.clone()
791 }
792
793 pub(crate) fn function_registry(&self) -> &Arc<FunctionRegistry> {
795 &self.function_registry
796 }
797
798 pub(crate) fn set_matches_context(&mut self, ctx: crate::exec::function::MatchesContext) {
800 self.matches_context = Some(Arc::new(ctx));
801 }
802
803 pub(crate) fn get_matches_context(
805 &self,
806 ) -> Option<&Arc<crate::exec::function::MatchesContext>> {
807 self.matches_context.as_ref()
808 }
809
810 pub(crate) fn set_knn_context(&mut self, ctx: Arc<crate::exec::function::KnnContext>) {
812 self.knn_context = Some(ctx);
813 }
814
815 pub(crate) fn get_knn_context(&self) -> Option<&Arc<crate::exec::function::KnnContext>> {
817 self.knn_context.as_ref()
818 }
819
820 pub(crate) fn new_planner_strategy(&self) -> &NewPlannerStrategy {
822 &self.new_planner_strategy
823 }
824
825 pub(crate) fn redact_volatile_explain_attrs(&self) -> bool {
827 self.redact_volatile_explain_attrs
828 }
829
830 #[cfg_attr(not(feature = "scripting"), expect(dead_code))]
832 pub(crate) fn check_allowed_scripting(&self) -> Result<()> {
833 if !self.capabilities.allows_scripting() {
834 warn!("Capabilities denied scripting attempt");
835 bail!(Error::ScriptingNotAllowed);
836 }
837 trace!("Capabilities allowed scripting");
838 Ok(())
839 }
840
841 pub(crate) fn check_allowed_function(&self, target: &str) -> Result<()> {
843 if !self.capabilities.allows_function_name(target) {
844 warn!("Capabilities denied function execution attempt, target: '{target}'");
845 bail!(Error::FunctionNotAllowed(target.to_string()));
846 }
847 trace!("Capabilities allowed function execution, target: '{target}'");
848 Ok(())
849 }
850
851 #[cfg(feature = "http")]
888 pub(crate) async fn check_allowed_net(&self, url: &Url) -> Result<()> {
889 let match_any_deny_net = |t| {
890 if self.capabilities.matches_any_deny_net(t) {
891 warn!("Capabilities denied outgoing network connection attempt, target: '{t}'");
892 bail!(Error::NetTargetNotAllowed(t.to_string()));
893 }
894 Ok(())
895 };
896 match url.host() {
897 Some(host) => {
898 let target = NetTarget::Host(host.to_owned(), url.port_or_known_default());
899 let host_allowed = self.capabilities.matches_any_allow_net(&target);
901 if !host_allowed {
902 warn!(
903 "Capabilities denied outgoing network connection attempt, target: '{target}'"
904 );
905 bail!(Error::NetTargetNotAllowed(target.to_string()));
906 }
907 match_any_deny_net(&target)?;
909 #[cfg(not(target_family = "wasm"))]
911 let targets = target.resolve().await?;
912 #[cfg(target_family = "wasm")]
913 let targets = target.resolve()?;
914 for t in &targets {
915 match_any_deny_net(t)?;
917 }
918 trace!("Capabilities allowed outgoing network connection, target: '{target}'");
919 Ok(())
920 }
921 _ => bail!(Error::InvalidUrl(url.to_string())),
922 }
923 }
924
925 pub(crate) fn get_buckets(&self) -> Option<&BucketsManager> {
926 self.buckets.as_ref()
927 }
928
929 pub(crate) async fn get_bucket_store(
931 &self,
932 ns: NamespaceId,
933 db: DatabaseId,
934 bu: &str,
935 ) -> Result<Arc<dyn ObjectStore>> {
936 if let Some(buckets) = &self.buckets {
938 buckets.get_bucket_store(&self.tx(), ns, db, bu).await
939 } else {
940 bail!(Error::BucketUnavailable(bu.into()))
941 }
942 }
943
944 #[cfg(feature = "surrealism")]
945 pub(crate) fn get_surrealism_cache(&self) -> Option<Arc<SurrealismCache>> {
946 self.surrealism_cache.as_ref().map(|sc| sc.clone())
947 }
948
949 #[cfg(feature = "surrealism")]
950 pub(crate) async fn get_surrealism_runtime(
951 &self,
952 lookup: SurrealismCacheLookup<'_>,
953 ) -> Result<Arc<Runtime>> {
954 if !self.get_capabilities().allows_experimental(&ExperimentalTarget::Surrealism) {
955 bail!(
956 "Failed to get surrealism runtime: Experimental capability `surrealism` is not enabled"
957 );
958 }
959
960 let Some(cache) = self.get_surrealism_cache() else {
961 bail!("Surrealism cache is not available");
962 };
963
964 cache
965 .get_or_insert_with(&lookup, async || {
966 let SurrealismCacheLookup::File(ns, db, bucket, key) = lookup else {
967 bail!("silo lookups are not supported yet");
968 };
969
970 let bucket = self.get_bucket_store(*ns, *db, bucket).await?;
971 let key = ObjectKey::new(key);
972 let surli = bucket
973 .get(&key)
974 .await
975 .map_err(|e| anyhow::anyhow!("failed to get file: {}", e))?;
976
977 let Some(surli) = surli else {
978 bail!("file not found");
979 };
980
981 let package = SurrealismPackage::from_reader(std::io::Cursor::new(surli))?;
982 let runtime = Arc::new(Runtime::new(package)?);
983
984 Ok(runtime)
985 })
986 .await
987 }
988}
989
990#[cfg(test)]
991mod tests {
992 #[cfg(feature = "http")]
993 use std::str::FromStr;
994 use std::time::Duration;
995
996 #[cfg(feature = "http")]
997 use url::Url;
998
999 use crate::cnf::MEMORY_THRESHOLD;
1000 use crate::ctx::Context;
1001 use crate::ctx::reason::Reason;
1002 #[cfg(feature = "http")]
1003 use crate::dbs::Capabilities;
1004 #[cfg(feature = "http")]
1005 use crate::dbs::capabilities::{NetTarget, Targets};
1006
1007 #[cfg(feature = "http")]
1008 #[tokio::test]
1009 async fn test_context_check_allowed_net() {
1010 let cap = Capabilities::all().without_network_targets(Targets::Some(
1011 [NetTarget::from_str("127.0.0.1").unwrap()].into(),
1012 ));
1013 let mut ctx = Context::background();
1014 ctx.capabilities = cap.into();
1015 let ctx = ctx.freeze();
1016 let r = ctx.check_allowed_net(&Url::parse("http://localhost").unwrap()).await;
1017 assert_eq!(
1018 r.err().unwrap().to_string(),
1019 "Access to network target '127.0.0.1/32' is not allowed"
1020 );
1021 }
1022
1023 #[tokio::test]
1024 async fn test_context_cancellation_priority() {
1025 let mut ctx = Context::background();
1027
1028 ctx.add_timeout(Duration::from_nanos(1)).unwrap();
1030 tokio::time::sleep(Duration::from_millis(10)).await;
1032
1033 let canceller = ctx.add_cancel();
1035 canceller.cancel();
1036
1037 let ctx = ctx.freeze();
1038
1039 let result = ctx.done(true);
1041 assert!(result.is_ok());
1042 assert_eq!(result.unwrap(), Some(Reason::Canceled));
1043 }
1044
1045 #[tokio::test]
1046 async fn test_context_deadline_detection() {
1047 let mut ctx = Context::background();
1049
1050 ctx.add_timeout(Duration::from_nanos(1)).unwrap();
1052 tokio::time::sleep(Duration::from_millis(10)).await;
1054
1055 let ctx = ctx.freeze();
1056
1057 let result = ctx.done(true);
1059 assert!(result.is_ok());
1060 assert!(matches!(result.unwrap(), Some(Reason::Timedout(_))));
1061 }
1062
1063 #[tokio::test]
1064 async fn test_context_no_deadline() {
1065 let ctx = Context::background();
1067 let ctx = ctx.freeze();
1068
1069 let result = ctx.done(true);
1071 assert!(result.is_ok());
1072 assert_eq!(result.unwrap(), None);
1073 }
1074
1075 #[tokio::test]
1076 async fn test_context_is_done_adaptive_backoff() {
1077 let ctx = Context::background();
1079 let ctx = ctx.freeze();
1080
1081 for count in [1, 2, 4, 8, 16, 32] {
1083 let result = ctx.is_done(Some(count)).await;
1084 assert!(result.is_ok());
1085 assert_eq!(result.unwrap(), false, "Count {} should not be done", count);
1086 }
1087
1088 for count in 33..64 {
1091 let result = ctx.is_done(Some(count)).await;
1092 assert!(result.is_ok());
1093 assert_eq!(result.unwrap(), false, "Count {} should not be done", count);
1094 }
1095
1096 let result = ctx.is_done(Some(64)).await;
1098 assert!(result.is_ok());
1099 assert_eq!(result.unwrap(), false);
1100 }
1101
1102 #[tokio::test]
1103 async fn test_context_is_done_with_none() {
1104 let ctx = Context::background();
1106 let ctx = ctx.freeze();
1107
1108 let result = ctx.is_done(None).await;
1110 assert!(result.is_ok());
1111 assert_eq!(result.unwrap(), false);
1112 }
1113
1114 #[tokio::test]
1115 async fn test_context_is_done_detects_cancellation() {
1116 let mut ctx = Context::background();
1118 let canceller = ctx.add_cancel();
1119 canceller.cancel();
1120 let ctx = ctx.freeze();
1121
1122 let result = ctx.is_done(None).await;
1124 assert!(result.is_ok());
1125 assert_eq!(result.unwrap(), true);
1126
1127 let result = ctx.is_done(Some(1)).await;
1129 assert!(result.is_ok());
1130 assert_eq!(result.unwrap(), true);
1131 }
1132
1133 #[tokio::test]
1145 async fn test_context_memory_threshold_priority_documentation() {
1146 let ctx = Context::background();
1156 let ctx = ctx.freeze();
1157
1158 let result = ctx.done(true);
1160 assert!(result.is_ok());
1161 assert_eq!(result.unwrap(), None);
1162 }
1163
1164 #[tokio::test]
1174 #[cfg(all(feature = "allocation-tracking", feature = "allocator"))]
1175 #[serial_test::serial]
1176 async fn test_context_memory_threshold_integration() {
1177 use crate::err::Error;
1178 use crate::str::ParseBytes;
1179
1180 unsafe {
1185 std::env::set_var(
1186 "SURREAL_MEMORY_THRESHOLD",
1187 "1MB".parse_bytes::<u64>().unwrap().to_string(),
1188 );
1189 }
1190 assert_eq!(*MEMORY_THRESHOLD, 1048576);
1192
1193 let _large_allocation: Vec<u8> = Vec::with_capacity(20 * 1024 * 1024);
1205
1206 tokio::time::sleep(Duration::from_millis(10)).await;
1208
1209 let ctx = Context::background();
1210 let ctx = ctx.freeze();
1211
1212 let result = ctx.done(true);
1214
1215 match result {
1220 Err(e) => {
1221 match e.downcast_ref::<Error>() {
1223 Some(Error::QueryBeyondMemoryThreshold) => {
1224 println!("✓ Memory threshold violation detected as expected");
1226 }
1227 other => {
1228 panic!("Expected QueryBeyondMemoryThreshold error, got: {:?}", other);
1229 }
1230 }
1231 }
1232 Ok(None) => {
1233 println!(
1236 "⚠ Memory threshold not enforced - MEMORY_THRESHOLD was already initialized"
1237 );
1238 println!(" This is expected when running as part of the full test suite.");
1239 println!(
1240 " To properly test memory threshold enforcement, run this test in isolation:"
1241 );
1242 println!(
1243 " cargo test --package surrealdb-core --features allocation-tracking,allocator test_context_memory_threshold_integration"
1244 );
1245 panic!("MEMORY_THRESHOLD was already initialized")
1246 }
1247 Ok(Some(reason)) => {
1248 panic!("Unexpected reason returned: {:?}", reason);
1249 }
1250 }
1251
1252 unsafe {
1255 std::env::remove_var("SURREAL_MEMORY_THRESHOLD");
1256 }
1257 }
1258}