surrealdb_core/sql/statements/
live.rs1use crate::ctx::Context;
2use crate::dbs::Options;
3use crate::doc::CursorDoc;
4use crate::err::{Error, LiveQueryCause};
5use crate::fflags::FFLAGS;
6use crate::iam::Auth;
7use crate::kvs::lq_structs::{LqEntry, TrackedResult};
8use crate::sql::statements::info::InfoStructure;
9use crate::sql::{Cond, Fetchs, Fields, Object, Table, Uuid, Value};
10use derive::Store;
11use futures::lock::MutexGuard;
12use reblessive::tree::Stk;
13use revision::revisioned;
14use serde::{Deserialize, Serialize};
15use std::fmt;
16
17#[revisioned(revision = 2)]
18#[derive(Clone, Debug, Default, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Store, Hash)]
19#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
20#[non_exhaustive]
21pub struct LiveStatement {
22 pub id: Uuid,
23 pub node: Uuid,
24 pub expr: Fields,
25 pub what: Value,
26 pub cond: Option<Cond>,
27 pub fetch: Option<Fetchs>,
28 pub(crate) archived: Option<Uuid>,
35 #[revision(start = 2)]
41 pub(crate) session: Option<Value>,
42 pub(crate) auth: Option<Auth>,
48}
49
50impl LiveStatement {
51 #[doc(hidden)]
52 pub fn new(expr: Fields) -> Self {
53 LiveStatement {
54 id: Uuid::new_v4(),
55 node: Uuid::new_v4(),
56 expr,
57 ..Default::default()
58 }
59 }
60
61 pub(crate) fn from_source_parts(
63 expr: Fields,
64 what: Value,
65 cond: Option<Cond>,
66 fetch: Option<Fetchs>,
67 ) -> Self {
68 LiveStatement {
69 id: Uuid::new_v4(),
70 node: Uuid::new_v4(),
71 expr,
72 what,
73 cond,
74 fetch,
75 ..Default::default()
76 }
77 }
78
79 pub(crate) async fn compute(
81 &self,
82 stk: &mut Stk,
83 ctx: &Context<'_>,
84 opt: &Options,
85 doc: Option<&CursorDoc<'_>>,
86 ) -> Result<Value, Error> {
87 opt.realtime()?;
89 opt.valid_for_db()?;
91 let nid = opt.id()?;
93 let mut stm = LiveStatement {
95 session: ctx.value("session").cloned(),
98 auth: Some(opt.auth.as_ref().clone()),
101 ..self.clone()
104 };
105 let id = stm.id.0;
106 match FFLAGS.change_feed_live_queries.enabled() {
107 true => {
108 let mut run = ctx.tx_lock().await;
109 match stm.what.compute(stk, ctx, opt, doc).await? {
110 Value::Table(tb) => {
111 let mut stm = stm;
113 stm.what = Value::Table(tb.clone());
114
115 let ns = opt.ns()?.to_string();
116 let db = opt.db()?.to_string();
117 self.validate_change_feed_valid(&mut run, &ns, &db, &tb).await?;
118 run.pre_commit_register_async_event(TrackedResult::LiveQuery(LqEntry {
120 live_id: stm.id,
121 ns,
122 db,
123 stm,
124 }))?;
125 }
126 v => {
127 return Err(Error::LiveStatement {
128 value: v.to_string(),
129 });
130 }
131 }
132 Ok(id.into())
133 }
134 false => {
135 let mut run = ctx.tx_lock().await;
137 match stm.what.compute(stk, ctx, opt, doc).await? {
139 Value::Table(tb) => {
140 stm.node = nid.into();
142 run.putc_ndlq(nid, id, opt.ns()?, opt.db()?, tb.as_str(), None).await?;
144 run.putc_tblq(opt.ns()?, opt.db()?, &tb, stm, None).await?;
146 }
147 v => {
148 return Err(Error::LiveStatement {
149 value: v.to_string(),
150 });
151 }
152 };
153 Ok(id.into())
155 }
156 }
157 }
158
159 async fn validate_change_feed_valid(
160 &self,
161 tx: &mut MutexGuard<'_, crate::kvs::Transaction>,
162 ns: &str,
163 db: &str,
164 tb: &Table,
165 ) -> Result<(), Error> {
166 let tb_definition = tx.get_and_cache_tb(ns, db, tb).await.map_err(|e| match e {
168 Error::TbNotFound {
169 value: _tb,
170 } => Error::LiveQueryError(LiveQueryCause::MissingChangeFeed),
171 _ => e,
172 })?;
173 let cf = tb_definition
175 .changefeed
176 .ok_or(Error::LiveQueryError(LiveQueryCause::MissingChangeFeed))?;
177 if !cf.store_diff {
179 return Err(Error::LiveQueryError(LiveQueryCause::ChangeFeedNoOriginal));
180 }
181 Ok(())
182 }
183
184 pub(crate) fn archive(mut self, node_id: Uuid) -> LiveStatement {
185 self.archived = Some(node_id);
186 self
187 }
188}
189
190impl fmt::Display for LiveStatement {
191 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
192 write!(f, "LIVE SELECT {} FROM {}", self.expr, self.what)?;
193 if let Some(ref v) = self.cond {
194 write!(f, " {v}")?
195 }
196 if let Some(ref v) = self.fetch {
197 write!(f, " {v}")?
198 }
199 Ok(())
200 }
201}
202
203impl InfoStructure for LiveStatement {
204 fn structure(self) -> Value {
205 let Self {
206 expr,
207 what,
208 cond,
209 fetch,
210 ..
211 } = self;
212
213 let mut acc = Object::default();
214
215 acc.insert("expr".to_string(), expr.structure());
216
217 acc.insert("what".to_string(), what.structure());
218
219 if let Some(cond) = cond {
220 acc.insert("cond".to_string(), cond.structure());
221 }
222
223 if let Some(fetch) = fetch {
224 acc.insert("fetch".to_string(), fetch.structure());
225 }
226 Value::Object(acc)
227 }
228}