Skip to main content

surrealdb/api/method/
live.rs

1use std::future::IntoFuture;
2use std::marker::PhantomData;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5
6use async_channel::Receiver;
7use futures::StreamExt;
8use serde::de::DeserializeOwned;
9#[cfg(not(target_family = "wasm"))]
10use tokio::spawn;
11use uuid::Uuid;
12#[cfg(target_family = "wasm")]
13use wasm_bindgen_futures::spawn_local as spawn;
14
15use crate::api::conn::{Command, Router};
16use crate::api::err::Error;
17use crate::api::method::BoxFuture;
18use crate::api::{self, Connection, ExtraFeatures, Result};
19use crate::core::dbs::{Action as CoreAction, Notification as CoreNotification};
20use crate::core::expr::{
21	BinaryOperator, Cond, Expr, Fields, Ident, Idiom, Literal, LiveStatement, TopLevelExpr,
22};
23use crate::core::val;
24use crate::engine::any::Any;
25use crate::method::{Live, OnceLockExt, Query, Select};
26use crate::opt::Resource;
27use crate::value::Notification;
28use crate::{Action, Surreal, Value};
29
30fn into_future<C, O>(this: Select<C, O, Live>) -> BoxFuture<Result<Stream<O>>>
31where
32	C: Connection,
33{
34	let Select {
35		client,
36		resource,
37		..
38	} = this;
39	Box::pin(async move {
40		let router = client.inner.router.extract()?;
41		if !router.features.contains(&ExtraFeatures::LiveQueries) {
42			return Err(Error::LiveQueriesNotSupported.into());
43		}
44		let mut stmt = LiveStatement::new(Fields::all());
45		match resource? {
46			Resource::Table(table) => {
47				stmt.what = Expr::Table(unsafe { Ident::new_unchecked(table) });
48			}
49			Resource::RecordId(record) => {
50				let record = record.into_inner();
51				stmt.what = Expr::Table(unsafe { Ident::new_unchecked(record.table.clone()) });
52				let ident = Ident::new("id".to_string()).unwrap();
53				let cond = Expr::Binary {
54					left: Box::new(Expr::Idiom(Idiom::field(ident))),
55					op: BinaryOperator::Equal,
56					right: Box::new(Expr::Literal(Literal::RecordId(record.into_literal()))),
57				};
58				stmt.cond = Some(Cond(cond));
59			}
60			Resource::Object(_) => return Err(Error::LiveOnObject.into()),
61			Resource::Array(_) => return Err(Error::LiveOnArray.into()),
62			Resource::Range(range) => {
63				let record = range.into_inner();
64
65				let val::RecordIdKey::Range(range) = record.key else {
66					panic!("invalid resource?");
67				};
68
69				stmt.what = Expr::Table(unsafe { Ident::new_unchecked(record.table.clone()) });
70
71				let id = Expr::Idiom(Idiom::field(Ident::new("id".to_string()).unwrap()));
72
73				let left = match range.start {
74					std::ops::Bound::Included(x) => Some(Expr::Binary {
75						left: Box::new(id.clone()),
76						op: BinaryOperator::MoreThanEqual,
77						right: Box::new(Expr::Literal(Literal::RecordId(
78							crate::core::expr::RecordIdLit {
79								table: record.table.clone(),
80								key: x.into_literal(),
81							},
82						))),
83					}),
84					std::ops::Bound::Excluded(x) => Some(Expr::Binary {
85						left: Box::new(id.clone()),
86						op: BinaryOperator::MoreThan,
87						right: Box::new(Expr::Literal(Literal::RecordId(
88							crate::core::expr::RecordIdLit {
89								table: record.table.clone(),
90								key: x.into_literal(),
91							},
92						))),
93					}),
94					std::ops::Bound::Unbounded => None,
95				};
96				let right = match range.end {
97					std::ops::Bound::Included(x) => Some(Expr::Binary {
98						left: Box::new(id),
99						op: BinaryOperator::LessThanEqual,
100						right: Box::new(Expr::Literal(Literal::RecordId(
101							crate::core::expr::RecordIdLit {
102								table: record.table,
103								key: x.into_literal(),
104							},
105						))),
106					}),
107					std::ops::Bound::Excluded(x) => Some(Expr::Binary {
108						left: Box::new(id),
109						op: BinaryOperator::LessThan,
110						right: Box::new(Expr::Literal(Literal::RecordId(
111							crate::core::expr::RecordIdLit {
112								table: record.table,
113								key: x.into_literal(),
114							},
115						))),
116					}),
117					std::ops::Bound::Unbounded => None,
118				};
119
120				let cond = match (left, right) {
121					(Some(l), Some(r)) => Some(Cond(Expr::Binary {
122						left: Box::new(l),
123						op: BinaryOperator::And,
124						right: Box::new(r),
125					})),
126					(Some(x), None) | (None, Some(x)) => Some(Cond(x)),
127					_ => None,
128				};
129
130				stmt.cond = cond
131			}
132			Resource::Unspecified => return Err(Error::LiveOnUnspecified.into()),
133		}
134		let query = Query::normal(
135			client.clone(),
136			vec![TopLevelExpr::Live(Box::new(stmt))],
137			Default::default(),
138			false,
139		);
140		let val::Value::Uuid(id) = query.await?.take::<Value>(0)?.into_inner() else {
141			return Err(Error::InternalError(
142				"successufull live query didn't return a uuid".to_string(),
143			)
144			.into());
145		};
146		let rx = register(router, *id).await?;
147		Ok(Stream::new(client.inner.clone().into(), *id, Some(rx)))
148	})
149}
150
151pub(crate) async fn register(router: &Router, id: Uuid) -> Result<Receiver<CoreNotification>> {
152	let (tx, rx) = async_channel::unbounded();
153	router
154		.execute_unit(Command::SubscribeLive {
155			uuid: id,
156			notification_sender: tx,
157		})
158		.await?;
159	Ok(rx)
160}
161
162impl<'r, Client> IntoFuture for Select<'r, Client, Value, Live>
163where
164	Client: Connection,
165{
166	type Output = Result<Stream<Value>>;
167	type IntoFuture = BoxFuture<'r, Self::Output>;
168
169	fn into_future(self) -> Self::IntoFuture {
170		into_future(self)
171	}
172}
173
174impl<'r, Client, R> IntoFuture for Select<'r, Client, Option<R>, Live>
175where
176	Client: Connection,
177	R: DeserializeOwned,
178{
179	type Output = Result<Stream<Option<R>>>;
180	type IntoFuture = BoxFuture<'r, Self::Output>;
181
182	fn into_future(self) -> Self::IntoFuture {
183		into_future(self)
184	}
185}
186
187impl<'r, Client, R> IntoFuture for Select<'r, Client, Vec<R>, Live>
188where
189	Client: Connection,
190	R: DeserializeOwned,
191{
192	type Output = Result<Stream<Vec<R>>>;
193	type IntoFuture = BoxFuture<'r, Self::Output>;
194
195	fn into_future(self) -> Self::IntoFuture {
196		into_future(self)
197	}
198}
199
200/// A stream of live query notifications
201#[derive(Debug)]
202#[must_use = "streams do nothing unless you poll them"]
203pub struct Stream<R> {
204	pub(crate) client: Surreal<Any>,
205	// We no longer need the lifetime and the type parameter
206	// Leaving them in for backwards compatibility
207	pub(crate) id: Uuid,
208	pub(crate) rx: Option<Pin<Box<Receiver<CoreNotification>>>>,
209	pub(crate) response_type: PhantomData<R>,
210}
211
212impl<R> Stream<R> {
213	pub(crate) fn new(
214		client: Surreal<Any>,
215		id: Uuid,
216		rx: Option<Receiver<CoreNotification>>,
217	) -> Self {
218		Self {
219			id,
220			rx: rx.map(Box::pin),
221			client,
222			response_type: PhantomData,
223		}
224	}
225}
226
227macro_rules! poll_next {
228	($notification:ident => $body:expr_2021) => {
229		fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
230			let Some(ref mut rx) = self.as_mut().rx else {
231				return Poll::Ready(None);
232			};
233			match rx.poll_next_unpin(cx) {
234				Poll::Ready(Some($notification)) => $body,
235				Poll::Ready(None) => Poll::Ready(None),
236				Poll::Pending => Poll::Pending,
237			}
238		}
239	};
240}
241
242impl futures::Stream for Stream<Value> {
243	type Item = Notification<Value>;
244
245	poll_next! {
246		notification => {
247			match notification.action {
248				CoreAction::Killed => Poll::Ready(None),
249				action => Poll::Ready(Some(Notification {
250					query_id: *notification.id,
251					action: Action::from_core(action),
252					data: Value::from_inner(notification.result),
253				})),
254			}
255		}
256	}
257
258	fn size_hint(&self) -> (usize, Option<usize>) {
259		(0, None)
260	}
261}
262
263macro_rules! poll_next_and_convert {
264	() => {
265		poll_next! {
266			notification => Poll::Ready(deserialize(notification))
267		}
268	};
269}
270
271impl<R> futures::Stream for Stream<Option<R>>
272where
273	R: DeserializeOwned + Unpin,
274{
275	type Item = Result<Notification<R>>;
276
277	poll_next_and_convert! {}
278}
279
280impl<R> futures::Stream for Stream<Vec<R>>
281where
282	R: DeserializeOwned + Unpin,
283{
284	type Item = Result<Notification<R>>;
285
286	poll_next_and_convert! {}
287}
288
289impl<R> futures::Stream for Stream<Notification<R>>
290where
291	R: DeserializeOwned + Unpin,
292{
293	type Item = Result<Notification<R>>;
294
295	poll_next_and_convert! {}
296}
297
298pub(crate) fn kill<Client>(client: &Surreal<Client>, uuid: Uuid)
299where
300	Client: Connection,
301{
302	let client = client.clone();
303	spawn(async move {
304		if let Ok(router) = client.inner.router.extract() {
305			router
306				.execute_unit(Command::Kill {
307					uuid,
308				})
309				.await
310				.ok();
311		}
312	});
313}
314
315impl<R> Drop for Stream<R> {
316	/// Close the live query stream
317	///
318	/// This kills the live query process responsible for this stream.
319	fn drop(&mut self) {
320		if self.rx.is_some() {
321			kill(&self.client, self.id);
322		}
323	}
324}
325
326fn deserialize<R>(notification: CoreNotification) -> Option<Result<crate::Notification<R>>>
327where
328	R: DeserializeOwned,
329{
330	let query_id = *notification.id;
331	let action = notification.action;
332	match action {
333		CoreAction::Killed => None,
334		action => match api::value::from_core_value(notification.result) {
335			Ok(data) => Some(Ok(Notification {
336				query_id,
337				data,
338				action: Action::from_core(action),
339			})),
340			Err(error) => Some(Err(error)),
341		},
342	}
343}