surrealdb_core/sql/statements/
live.rs

1use crate::ctx::Context;
2use crate::dbs::Options;
3use crate::doc::CursorDoc;
4use crate::err::{Error, LiveQueryCause};
5use crate::fflags::FFLAGS;
6use crate::iam::Auth;
7use crate::kvs::lq_structs::{LqEntry, TrackedResult};
8use crate::sql::statements::info::InfoStructure;
9use crate::sql::{Cond, Fetchs, Fields, Object, Table, Uuid, Value};
10use derive::Store;
11use futures::lock::MutexGuard;
12use reblessive::tree::Stk;
13use revision::revisioned;
14use serde::{Deserialize, Serialize};
15use std::fmt;
16
17#[revisioned(revision = 2)]
18#[derive(Clone, Debug, Default, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Store, Hash)]
19#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
20#[non_exhaustive]
21pub struct LiveStatement {
22	pub id: Uuid,
23	pub node: Uuid,
24	pub expr: Fields,
25	pub what: Value,
26	pub cond: Option<Cond>,
27	pub fetch: Option<Fetchs>,
28	// When a live query is marked for archiving, this will
29	// be set to the node ID that archived the query. This
30	// is an internal property, set by the database runtime.
31	// This is optional, and is only set when archived.
32	//
33	// This is deprecated from 2.0
34	pub(crate) archived: Option<Uuid>,
35	// When a live query is created, we must also store the
36	// authenticated session of the user who made the query,
37	// so we can check it later when sending notifications.
38	// This is optional as it is only set by the database
39	// runtime when storing the live query to storage.
40	#[revision(start = 2)]
41	pub(crate) session: Option<Value>,
42	// When a live query is created, we must also store the
43	// authenticated session of the user who made the query,
44	// so we can check it later when sending notifications.
45	// This is optional as it is only set by the database
46	// runtime when storing the live query to storage.
47	pub(crate) auth: Option<Auth>,
48}
49
50impl LiveStatement {
51	#[doc(hidden)]
52	pub fn new(expr: Fields) -> Self {
53		LiveStatement {
54			id: Uuid::new_v4(),
55			node: Uuid::new_v4(),
56			expr,
57			..Default::default()
58		}
59	}
60
61	/// Creates a live statement from parts that can be set during a query.
62	pub(crate) fn from_source_parts(
63		expr: Fields,
64		what: Value,
65		cond: Option<Cond>,
66		fetch: Option<Fetchs>,
67	) -> Self {
68		LiveStatement {
69			id: Uuid::new_v4(),
70			node: Uuid::new_v4(),
71			expr,
72			what,
73			cond,
74			fetch,
75			..Default::default()
76		}
77	}
78
79	/// Process this type returning a computed simple Value
80	pub(crate) async fn compute(
81		&self,
82		stk: &mut Stk,
83		ctx: &Context<'_>,
84		opt: &Options,
85		doc: Option<&CursorDoc<'_>>,
86	) -> Result<Value, Error> {
87		// Is realtime enabled?
88		opt.realtime()?;
89		// Valid options?
90		opt.valid_for_db()?;
91		// Get the Node ID
92		let nid = opt.id()?;
93		// Check that auth has been set
94		let mut stm = LiveStatement {
95			// Use the current session authentication
96			// for when we store the LIVE Statement
97			session: ctx.value("session").cloned(),
98			// Use the current session authentication
99			// for when we store the LIVE Statement
100			auth: Some(opt.auth.as_ref().clone()),
101			// Clone the rest of the original fields
102			// from the LIVE statement to the new one
103			..self.clone()
104		};
105		let id = stm.id.0;
106		match FFLAGS.change_feed_live_queries.enabled() {
107			true => {
108				let mut run = ctx.tx_lock().await;
109				match stm.what.compute(stk, ctx, opt, doc).await? {
110					Value::Table(tb) => {
111						// We modify the table as it can be a $PARAM and the compute evaluates that
112						let mut stm = stm;
113						stm.what = Value::Table(tb.clone());
114
115						let ns = opt.ns()?.to_string();
116						let db = opt.db()?.to_string();
117						self.validate_change_feed_valid(&mut run, &ns, &db, &tb).await?;
118						// Send the live query registration hook to the transaction pre-commit channel
119						run.pre_commit_register_async_event(TrackedResult::LiveQuery(LqEntry {
120							live_id: stm.id,
121							ns,
122							db,
123							stm,
124						}))?;
125					}
126					v => {
127						return Err(Error::LiveStatement {
128							value: v.to_string(),
129						});
130					}
131				}
132				Ok(id.into())
133			}
134			false => {
135				// Claim transaction
136				let mut run = ctx.tx_lock().await;
137				// Process the live query table
138				match stm.what.compute(stk, ctx, opt, doc).await? {
139					Value::Table(tb) => {
140						// Store the current Node ID
141						stm.node = nid.into();
142						// Insert the node live query
143						run.putc_ndlq(nid, id, opt.ns()?, opt.db()?, tb.as_str(), None).await?;
144						// Insert the table live query
145						run.putc_tblq(opt.ns()?, opt.db()?, &tb, stm, None).await?;
146					}
147					v => {
148						return Err(Error::LiveStatement {
149							value: v.to_string(),
150						});
151					}
152				};
153				// Return the query id
154				Ok(id.into())
155			}
156		}
157	}
158
159	async fn validate_change_feed_valid(
160		&self,
161		tx: &mut MutexGuard<'_, crate::kvs::Transaction>,
162		ns: &str,
163		db: &str,
164		tb: &Table,
165	) -> Result<(), Error> {
166		// Find the table definition
167		let tb_definition = tx.get_and_cache_tb(ns, db, tb).await.map_err(|e| match e {
168			Error::TbNotFound {
169				value: _tb,
170			} => Error::LiveQueryError(LiveQueryCause::MissingChangeFeed),
171			_ => e,
172		})?;
173		// check it has a change feed
174		let cf = tb_definition
175			.changefeed
176			.ok_or(Error::LiveQueryError(LiveQueryCause::MissingChangeFeed))?;
177		// check the change feed includes the original - required for differentiating between CREATE and UPDATE
178		if !cf.store_diff {
179			return Err(Error::LiveQueryError(LiveQueryCause::ChangeFeedNoOriginal));
180		}
181		Ok(())
182	}
183
184	pub(crate) fn archive(mut self, node_id: Uuid) -> LiveStatement {
185		self.archived = Some(node_id);
186		self
187	}
188}
189
190impl fmt::Display for LiveStatement {
191	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
192		write!(f, "LIVE SELECT {} FROM {}", self.expr, self.what)?;
193		if let Some(ref v) = self.cond {
194			write!(f, " {v}")?
195		}
196		if let Some(ref v) = self.fetch {
197			write!(f, " {v}")?
198		}
199		Ok(())
200	}
201}
202
203impl InfoStructure for LiveStatement {
204	fn structure(self) -> Value {
205		let Self {
206			expr,
207			what,
208			cond,
209			fetch,
210			..
211		} = self;
212
213		let mut acc = Object::default();
214
215		acc.insert("expr".to_string(), expr.structure());
216
217		acc.insert("what".to_string(), what.structure());
218
219		if let Some(cond) = cond {
220			acc.insert("cond".to_string(), cond.structure());
221		}
222
223		if let Some(fetch) = fetch {
224			acc.insert("fetch".to_string(), fetch.structure());
225		}
226		Value::Object(acc)
227	}
228}