surrealdb_sql/statements/
live.rs

1use crate::ctx::Context;
2use crate::dbs::{Options, Transaction};
3use crate::doc::CursorDoc;
4use crate::err::Error;
5use crate::iam::Auth;
6use crate::{Cond, Fetchs, Fields, Uuid, Value};
7use derive::Store;
8use revision::revisioned;
9use serde::{Deserialize, Serialize};
10use std::fmt;
11
12#[derive(Clone, Debug, Default, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Store, Hash)]
13#[revisioned(revision = 2)]
14pub struct LiveStatement {
15	pub id: Uuid,
16	pub node: Uuid,
17	pub expr: Fields,
18	pub what: Value,
19	pub cond: Option<Cond>,
20	pub fetch: Option<Fetchs>,
21	// When a live query is marked for archiving, this will
22	// be set to the node ID that archived the query. This
23	// is an internal property, set by the database runtime.
24	// This is optional, and os only set when archived.
25	#[doc(hidden)]
26	pub archived: Option<Uuid>,
27	// When a live query is created, we must also store the
28	// authenticated session of the user who made the query,
29	// so we can chack it later when sending notifications.
30	// This is optional as it is only set by the database
31	// runtime when storing the live query to storage.
32	#[revision(start = 2)]
33	#[doc(hidden)]
34	pub session: Option<Value>,
35	// When a live query is created, we must also store the
36	// authenticated session of the user who made the query,
37	// so we can chack it later when sending notifications.
38	// This is optional as it is only set by the database
39	// runtime when storing the live query to storage.
40	#[doc(hidden)]
41	pub auth: Option<Auth>,
42}
43
44impl LiveStatement {
45	/// Creates a live statement from parts that can be set during a query.
46	pub(crate) fn from_source_parts(
47		expr: Fields,
48		what: Value,
49		cond: Option<Cond>,
50		fetch: Option<Fetchs>,
51	) -> Self {
52		LiveStatement {
53			id: Uuid::new_v4(),
54			node: Uuid::new_v4(),
55			expr,
56			what,
57			cond,
58			fetch,
59			..Default::default()
60		}
61	}
62
63	/// Process this type returning a computed simple Value
64	pub(crate) async fn compute(
65		&self,
66		ctx: &Context<'_>,
67		opt: &Options,
68		txn: &Transaction,
69		doc: Option<&CursorDoc<'_>>,
70	) -> Result<Value, Error> {
71		// Is realtime enabled?
72		opt.realtime()?;
73		// Valid options?
74		opt.valid_for_db()?;
75		// Get the Node ID
76		let nid = opt.id()?;
77		// Check that auth has been set
78		let mut stm = LiveStatement {
79			// Use the current session authentication
80			// for when we store the LIVE Statement
81			session: ctx.value("session").cloned(),
82			// Use the current session authentication
83			// for when we store the LIVE Statement
84			auth: Some(opt.auth.as_ref().clone()),
85			// Clone the rest of the original fields
86			// from the LIVE statement to the new one
87			..self.clone()
88		};
89		let id = stm.id.0;
90		// Claim transaction
91		let mut run = txn.lock().await;
92		// Process the live query table
93		match stm.what.compute(ctx, opt, txn, doc).await? {
94			Value::Table(tb) => {
95				// Store the current Node ID
96				stm.node = nid.into();
97				// Insert the node live query
98				run.putc_ndlq(nid, id, opt.ns(), opt.db(), tb.as_str(), None).await?;
99				// Insert the table live query
100				run.putc_tblq(opt.ns(), opt.db(), &tb, stm, None).await?;
101			}
102			v => {
103				return Err(Error::LiveStatement {
104					value: v.to_string(),
105				})
106			}
107		};
108		// Return the query id
109		Ok(id.into())
110	}
111
112	pub(crate) fn archive(mut self, node_id: Uuid) -> LiveStatement {
113		self.archived = Some(node_id);
114		self
115	}
116}
117
118impl fmt::Display for LiveStatement {
119	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
120		write!(f, "LIVE SELECT {} FROM {}", self.expr, self.what)?;
121		if let Some(ref v) = self.cond {
122			write!(f, " {v}")?
123		}
124		if let Some(ref v) = self.fetch {
125			write!(f, " {v}")?
126		}
127		Ok(())
128	}
129}