1use crate::ctx::canceller::Canceller;
2use crate::ctx::reason::Reason;
3#[cfg(feature = "http")]
4use crate::dbs::capabilities::NetTarget;
5use crate::dbs::{Capabilities, Notification};
6use crate::err::Error;
7use crate::idx::planner::executor::QueryExecutor;
8use crate::idx::planner::{IterationStage, QueryPlanner};
9use crate::idx::trees::store::IndexStores;
10use crate::kvs::Transaction;
11use crate::sql::value::Value;
12use channel::Sender;
13use std::borrow::Cow;
14use std::collections::HashMap;
15use std::fmt::{self, Debug};
16#[cfg(any(
17 feature = "kv-mem",
18 feature = "kv-surrealkv",
19 feature = "kv-rocksdb",
20 feature = "kv-fdb",
21 feature = "kv-tikv",
22))]
23use std::path::PathBuf;
24use std::sync::atomic::{AtomicBool, Ordering};
25use std::sync::Arc;
26use std::time::Duration;
27use trice::Instant;
28#[cfg(feature = "http")]
29use url::Url;
30
31impl<'a> From<Value> for Cow<'a, Value> {
32 fn from(v: Value) -> Cow<'a, Value> {
33 Cow::Owned(v)
34 }
35}
36
37impl<'a> From<&'a Value> for Cow<'a, Value> {
38 fn from(v: &'a Value) -> Cow<'a, Value> {
39 Cow::Borrowed(v)
40 }
41}
42#[non_exhaustive]
43pub struct Context<'a> {
44 parent: Option<&'a Context<'a>>,
46 deadline: Option<Instant>,
48 cancelled: Arc<AtomicBool>,
50 values: HashMap<Cow<'static, str>, Cow<'a, Value>>,
52 notifications: Option<Sender<Notification>>,
54 query_planner: Option<&'a QueryPlanner<'a>>,
56 query_executor: Option<QueryExecutor>,
58 iteration_stage: Option<IterationStage>,
60 index_stores: IndexStores,
62 capabilities: Arc<Capabilities>,
64 #[cfg(any(
65 feature = "kv-mem",
66 feature = "kv-surrealkv",
67 feature = "kv-rocksdb",
68 feature = "kv-fdb",
69 feature = "kv-tikv",
70 ))]
71 temporary_directory: Option<Arc<PathBuf>>,
73 transaction: Option<Arc<Transaction>>,
75 isolated: bool,
77}
78
79impl<'a> Default for Context<'a> {
80 fn default() -> Self {
81 Context::background()
82 }
83}
84
85impl<'a> From<Transaction> for Context<'a> {
86 fn from(txn: Transaction) -> Self {
87 Context::background().with_transaction(Arc::new(txn))
88 }
89}
90
91impl<'a> Debug for Context<'a> {
92 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
93 f.debug_struct("Context")
94 .field("parent", &self.parent)
95 .field("deadline", &self.deadline)
96 .field("cancelled", &self.cancelled)
97 .field("values", &self.values)
98 .finish()
99 }
100}
101
102impl<'a> Context<'a> {
103 pub(crate) fn from_ds(
104 time_out: Option<Duration>,
105 capabilities: Capabilities,
106 index_stores: IndexStores,
107 #[cfg(any(
108 feature = "kv-mem",
109 feature = "kv-surrealkv",
110 feature = "kv-rocksdb",
111 feature = "kv-fdb",
112 feature = "kv-tikv",
113 ))]
114 temporary_directory: Option<Arc<PathBuf>>,
115 ) -> Result<Context<'a>, Error> {
116 let mut ctx = Self {
117 values: HashMap::default(),
118 parent: None,
119 deadline: None,
120 cancelled: Arc::new(AtomicBool::new(false)),
121 notifications: None,
122 query_planner: None,
123 query_executor: None,
124 iteration_stage: None,
125 capabilities: Arc::new(capabilities),
126 index_stores,
127 #[cfg(any(
128 feature = "kv-mem",
129 feature = "kv-surrealkv",
130 feature = "kv-rocksdb",
131 feature = "kv-fdb",
132 feature = "kv-tikv",
133 ))]
134 temporary_directory,
135 transaction: None,
136 isolated: false,
137 };
138 if let Some(timeout) = time_out {
139 ctx.add_timeout(timeout)?;
140 }
141 Ok(ctx)
142 }
143 pub fn background() -> Self {
145 Self {
146 values: HashMap::default(),
147 parent: None,
148 deadline: None,
149 cancelled: Arc::new(AtomicBool::new(false)),
150 notifications: None,
151 query_planner: None,
152 query_executor: None,
153 iteration_stage: None,
154 capabilities: Arc::new(Capabilities::default()),
155 index_stores: IndexStores::default(),
156 #[cfg(any(
157 feature = "kv-mem",
158 feature = "kv-surrealkv",
159 feature = "kv-rocksdb",
160 feature = "kv-fdb",
161 feature = "kv-tikv",
162 ))]
163 temporary_directory: None,
164 transaction: None,
165 isolated: false,
166 }
167 }
168
169 pub fn new(parent: &'a Context) -> Self {
171 Context {
172 values: HashMap::default(),
173 parent: Some(parent),
174 deadline: parent.deadline,
175 cancelled: Arc::new(AtomicBool::new(false)),
176 notifications: parent.notifications.clone(),
177 query_planner: parent.query_planner,
178 query_executor: parent.query_executor.clone(),
179 iteration_stage: parent.iteration_stage.clone(),
180 capabilities: parent.capabilities.clone(),
181 index_stores: parent.index_stores.clone(),
182 #[cfg(any(
183 feature = "kv-mem",
184 feature = "kv-surrealkv",
185 feature = "kv-rocksdb",
186 feature = "kv-fdb",
187 feature = "kv-tikv",
188 ))]
189 temporary_directory: parent.temporary_directory.clone(),
190 transaction: parent.transaction.clone(),
191 isolated: false,
192 }
193 }
194
195 pub fn new_isolated(parent: &'a Context) -> Self {
197 Context {
198 values: HashMap::default(),
199 parent: Some(parent),
200 deadline: parent.deadline,
201 cancelled: Arc::new(AtomicBool::new(false)),
202 notifications: parent.notifications.clone(),
203 query_planner: parent.query_planner,
204 query_executor: parent.query_executor.clone(),
205 iteration_stage: parent.iteration_stage.clone(),
206 capabilities: parent.capabilities.clone(),
207 index_stores: parent.index_stores.clone(),
208 #[cfg(any(
209 feature = "kv-mem",
210 feature = "kv-surrealkv",
211 feature = "kv-rocksdb",
212 feature = "kv-fdb",
213 feature = "kv-tikv",
214 ))]
215 temporary_directory: parent.temporary_directory.clone(),
216 transaction: parent.transaction.clone(),
217 isolated: true,
218 }
219 }
220
221 pub fn add_value<K, V>(&mut self, key: K, value: V)
224 where
225 K: Into<Cow<'static, str>>,
226 V: Into<Cow<'a, Value>>,
227 {
228 self.values.insert(key.into(), value.into());
229 }
230
231 pub fn add_cancel(&mut self) -> Canceller {
234 let cancelled = self.cancelled.clone();
235 Canceller::new(cancelled)
236 }
237
238 pub fn add_deadline(&mut self, deadline: Instant) {
241 match self.deadline {
242 Some(current) if current < deadline => (),
243 _ => self.deadline = Some(deadline),
244 }
245 }
246
247 pub fn add_timeout(&mut self, timeout: Duration) -> Result<(), Error> {
251 match Instant::now().checked_add(timeout) {
252 Some(deadline) => {
253 self.add_deadline(deadline);
254 Ok(())
255 }
256 None => Err(Error::InvalidTimeout(timeout.as_secs())),
257 }
258 }
259
260 pub fn add_notifications(&mut self, chn: Option<&Sender<Notification>>) {
263 self.notifications = chn.cloned()
264 }
265
266 pub(crate) fn set_query_planner(&mut self, qp: &'a QueryPlanner) {
267 self.query_planner = Some(qp);
268 }
269
270 pub(crate) fn set_query_executor(&mut self, qe: QueryExecutor) {
271 self.query_executor = Some(qe);
272 }
273
274 pub(crate) fn set_iteration_stage(&mut self, is: IterationStage) {
275 self.iteration_stage = Some(is);
276 }
277
278 pub(crate) fn set_transaction(&mut self, txn: Arc<Transaction>) {
279 self.transaction = Some(txn);
280 }
281
282 pub(crate) fn with_transaction(mut self, txn: Arc<Transaction>) -> Self {
283 self.transaction = Some(txn);
284 self
285 }
286
287 pub(crate) fn tx(&self) -> Arc<Transaction> {
288 self.transaction
289 .clone()
290 .unwrap_or_else(|| unreachable!("The context was not associated with a transaction"))
291 }
292
293 pub fn timeout(&self) -> Option<Duration> {
296 self.deadline.map(|v| v.saturating_duration_since(Instant::now()))
297 }
298
299 pub fn notifications(&self) -> Option<Sender<Notification>> {
300 self.notifications.clone()
301 }
302
303 pub(crate) fn get_query_planner(&self) -> Option<&QueryPlanner> {
304 self.query_planner
305 }
306
307 pub(crate) fn get_query_executor(&self) -> Option<&QueryExecutor> {
308 self.query_executor.as_ref()
309 }
310
311 pub(crate) fn get_iteration_stage(&self) -> Option<&IterationStage> {
312 self.iteration_stage.as_ref()
313 }
314
315 pub(crate) fn get_index_stores(&self) -> &IndexStores {
317 &self.index_stores
318 }
319
320 pub fn done(&self) -> Option<Reason> {
323 match self.deadline {
324 Some(deadline) if deadline <= Instant::now() => Some(Reason::Timedout),
325 _ if self.cancelled.load(Ordering::Relaxed) => Some(Reason::Canceled),
326 _ => match self.parent {
327 Some(ctx) => ctx.done(),
328 _ => None,
329 },
330 }
331 }
332
333 pub fn is_ok(&self) -> bool {
335 self.done().is_none()
336 }
337
338 pub fn is_done(&self) -> bool {
340 self.done().is_some()
341 }
342
343 pub fn is_timedout(&self) -> bool {
345 matches!(self.done(), Some(Reason::Timedout))
346 }
347
348 #[cfg(any(
349 feature = "kv-mem",
350 feature = "kv-surrealkv",
351 feature = "kv-rocksdb",
352 feature = "kv-fdb",
353 feature = "kv-tikv",
354 ))]
355 pub fn temporary_directory(&self) -> Option<&Arc<PathBuf>> {
357 self.temporary_directory.as_ref()
358 }
359
360 pub fn value(&self, key: &str) -> Option<&Value> {
363 match self.values.get(key) {
364 Some(v) => match v {
365 Cow::Borrowed(v) => Some(*v),
366 Cow::Owned(v) => Some(v),
367 },
368 None if !self.isolated => match self.parent {
369 Some(p) => p.value(key),
370 _ => None,
371 },
372 None => None,
373 }
374 }
375
376 #[cfg(feature = "scripting")]
378 pub fn cancellation(&self) -> crate::ctx::cancellation::Cancellation {
379 crate::ctx::cancellation::Cancellation::new(
380 self.deadline,
381 std::iter::successors(Some(self), |ctx| ctx.parent)
382 .map(|ctx| ctx.cancelled.clone())
383 .collect(),
384 )
385 }
386
387 pub fn add_capabilities(&mut self, caps: Capabilities) {
393 self.capabilities = Arc::new(caps);
394 }
395
396 #[allow(dead_code)]
398 pub fn get_capabilities(&self) -> Arc<Capabilities> {
399 self.capabilities.clone()
400 }
401
402 #[allow(dead_code)]
404 pub fn check_allowed_scripting(&self) -> Result<(), Error> {
405 if !self.capabilities.allows_scripting() {
406 return Err(Error::ScriptingNotAllowed);
407 }
408 Ok(())
409 }
410
411 pub fn check_allowed_function(&self, target: &str) -> Result<(), Error> {
413 if !self.capabilities.allows_function_name(target) {
414 return Err(Error::FunctionNotAllowed(target.to_string()));
415 }
416 Ok(())
417 }
418
419 #[cfg(feature = "http")]
421 pub fn check_allowed_net(&self, target: &Url) -> Result<(), Error> {
422 match target.host() {
423 Some(host)
424 if self.capabilities.allows_network_target(&NetTarget::Host(
425 host.to_owned(),
426 target.port_or_known_default(),
427 )) =>
428 {
429 Ok(())
430 }
431 _ => Err(Error::NetTargetNotAllowed(target.to_string())),
432 }
433 }
434}