1use std::future::IntoFuture;
2use std::marker::PhantomData;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5
6use async_channel::Receiver;
7use futures::StreamExt;
8use serde::de::DeserializeOwned;
9#[cfg(not(target_family = "wasm"))]
10use tokio::spawn;
11use uuid::Uuid;
12#[cfg(target_family = "wasm")]
13use wasm_bindgen_futures::spawn_local as spawn;
14
15use crate::api::conn::{Command, Router};
16use crate::api::err::Error;
17use crate::api::method::BoxFuture;
18use crate::api::{self, Connection, ExtraFeatures, Result};
19use crate::core::dbs::{Action as CoreAction, Notification as CoreNotification};
20use crate::core::expr::{
21 BinaryOperator, Cond, Expr, Fields, Ident, Idiom, Literal, LiveStatement, TopLevelExpr,
22};
23use crate::core::val;
24use crate::engine::any::Any;
25use crate::method::{Live, OnceLockExt, Query, Select};
26use crate::opt::Resource;
27use crate::value::Notification;
28use crate::{Action, Surreal, Value};
29
30fn into_future<C, O>(this: Select<C, O, Live>) -> BoxFuture<Result<Stream<O>>>
31where
32 C: Connection,
33{
34 let Select {
35 client,
36 resource,
37 ..
38 } = this;
39 Box::pin(async move {
40 let router = client.inner.router.extract()?;
41 if !router.features.contains(&ExtraFeatures::LiveQueries) {
42 return Err(Error::LiveQueriesNotSupported.into());
43 }
44 let mut stmt = LiveStatement::new(Fields::all());
45 match resource? {
46 Resource::Table(table) => {
47 stmt.what = Expr::Table(unsafe { Ident::new_unchecked(table) });
48 }
49 Resource::RecordId(record) => {
50 let record = record.into_inner();
51 stmt.what = Expr::Table(unsafe { Ident::new_unchecked(record.table.clone()) });
52 let ident = Ident::new("id".to_string()).unwrap();
53 let cond = Expr::Binary {
54 left: Box::new(Expr::Idiom(Idiom::field(ident))),
55 op: BinaryOperator::Equal,
56 right: Box::new(Expr::Literal(Literal::RecordId(record.into_literal()))),
57 };
58 stmt.cond = Some(Cond(cond));
59 }
60 Resource::Object(_) => return Err(Error::LiveOnObject.into()),
61 Resource::Array(_) => return Err(Error::LiveOnArray.into()),
62 Resource::Range(range) => {
63 let record = range.into_inner();
64
65 let val::RecordIdKey::Range(range) = record.key else {
66 panic!("invalid resource?");
67 };
68
69 stmt.what = Expr::Table(unsafe { Ident::new_unchecked(record.table.clone()) });
70
71 let id = Expr::Idiom(Idiom::field(Ident::new("id".to_string()).unwrap()));
72
73 let left = match range.start {
74 std::ops::Bound::Included(x) => Some(Expr::Binary {
75 left: Box::new(id.clone()),
76 op: BinaryOperator::MoreThanEqual,
77 right: Box::new(Expr::Literal(Literal::RecordId(
78 crate::core::expr::RecordIdLit {
79 table: record.table.clone(),
80 key: x.into_literal(),
81 },
82 ))),
83 }),
84 std::ops::Bound::Excluded(x) => Some(Expr::Binary {
85 left: Box::new(id.clone()),
86 op: BinaryOperator::MoreThan,
87 right: Box::new(Expr::Literal(Literal::RecordId(
88 crate::core::expr::RecordIdLit {
89 table: record.table.clone(),
90 key: x.into_literal(),
91 },
92 ))),
93 }),
94 std::ops::Bound::Unbounded => None,
95 };
96 let right = match range.end {
97 std::ops::Bound::Included(x) => Some(Expr::Binary {
98 left: Box::new(id),
99 op: BinaryOperator::LessThanEqual,
100 right: Box::new(Expr::Literal(Literal::RecordId(
101 crate::core::expr::RecordIdLit {
102 table: record.table,
103 key: x.into_literal(),
104 },
105 ))),
106 }),
107 std::ops::Bound::Excluded(x) => Some(Expr::Binary {
108 left: Box::new(id),
109 op: BinaryOperator::LessThan,
110 right: Box::new(Expr::Literal(Literal::RecordId(
111 crate::core::expr::RecordIdLit {
112 table: record.table,
113 key: x.into_literal(),
114 },
115 ))),
116 }),
117 std::ops::Bound::Unbounded => None,
118 };
119
120 let cond = match (left, right) {
121 (Some(l), Some(r)) => Some(Cond(Expr::Binary {
122 left: Box::new(l),
123 op: BinaryOperator::And,
124 right: Box::new(r),
125 })),
126 (Some(x), None) | (None, Some(x)) => Some(Cond(x)),
127 _ => None,
128 };
129
130 stmt.cond = cond
131 }
132 Resource::Unspecified => return Err(Error::LiveOnUnspecified.into()),
133 }
134 let query = Query::normal(
135 client.clone(),
136 vec![TopLevelExpr::Live(Box::new(stmt))],
137 Default::default(),
138 false,
139 );
140 let val::Value::Uuid(id) = query.await?.take::<Value>(0)?.into_inner() else {
141 return Err(Error::InternalError(
142 "successufull live query didn't return a uuid".to_string(),
143 )
144 .into());
145 };
146 let rx = register(router, *id).await?;
147 Ok(Stream::new(client.inner.clone().into(), *id, Some(rx)))
148 })
149}
150
151pub(crate) async fn register(router: &Router, id: Uuid) -> Result<Receiver<CoreNotification>> {
152 let (tx, rx) = async_channel::unbounded();
153 router
154 .execute_unit(Command::SubscribeLive {
155 uuid: id,
156 notification_sender: tx,
157 })
158 .await?;
159 Ok(rx)
160}
161
162impl<'r, Client> IntoFuture for Select<'r, Client, Value, Live>
163where
164 Client: Connection,
165{
166 type Output = Result<Stream<Value>>;
167 type IntoFuture = BoxFuture<'r, Self::Output>;
168
169 fn into_future(self) -> Self::IntoFuture {
170 into_future(self)
171 }
172}
173
174impl<'r, Client, R> IntoFuture for Select<'r, Client, Option<R>, Live>
175where
176 Client: Connection,
177 R: DeserializeOwned,
178{
179 type Output = Result<Stream<Option<R>>>;
180 type IntoFuture = BoxFuture<'r, Self::Output>;
181
182 fn into_future(self) -> Self::IntoFuture {
183 into_future(self)
184 }
185}
186
187impl<'r, Client, R> IntoFuture for Select<'r, Client, Vec<R>, Live>
188where
189 Client: Connection,
190 R: DeserializeOwned,
191{
192 type Output = Result<Stream<Vec<R>>>;
193 type IntoFuture = BoxFuture<'r, Self::Output>;
194
195 fn into_future(self) -> Self::IntoFuture {
196 into_future(self)
197 }
198}
199
200#[derive(Debug)]
202#[must_use = "streams do nothing unless you poll them"]
203pub struct Stream<R> {
204 pub(crate) client: Surreal<Any>,
205 pub(crate) id: Uuid,
208 pub(crate) rx: Option<Pin<Box<Receiver<CoreNotification>>>>,
209 pub(crate) response_type: PhantomData<R>,
210}
211
212impl<R> Stream<R> {
213 pub(crate) fn new(
214 client: Surreal<Any>,
215 id: Uuid,
216 rx: Option<Receiver<CoreNotification>>,
217 ) -> Self {
218 Self {
219 id,
220 rx: rx.map(Box::pin),
221 client,
222 response_type: PhantomData,
223 }
224 }
225}
226
227macro_rules! poll_next {
228 ($notification:ident => $body:expr_2021) => {
229 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
230 let Some(ref mut rx) = self.as_mut().rx else {
231 return Poll::Ready(None);
232 };
233 match rx.poll_next_unpin(cx) {
234 Poll::Ready(Some($notification)) => $body,
235 Poll::Ready(None) => Poll::Ready(None),
236 Poll::Pending => Poll::Pending,
237 }
238 }
239 };
240}
241
242impl futures::Stream for Stream<Value> {
243 type Item = Notification<Value>;
244
245 poll_next! {
246 notification => {
247 match notification.action {
248 CoreAction::Killed => Poll::Ready(None),
249 action => Poll::Ready(Some(Notification {
250 query_id: *notification.id,
251 action: Action::from_core(action),
252 data: Value::from_inner(notification.result),
253 })),
254 }
255 }
256 }
257
258 fn size_hint(&self) -> (usize, Option<usize>) {
259 (0, None)
260 }
261}
262
263macro_rules! poll_next_and_convert {
264 () => {
265 poll_next! {
266 notification => Poll::Ready(deserialize(notification))
267 }
268 };
269}
270
271impl<R> futures::Stream for Stream<Option<R>>
272where
273 R: DeserializeOwned + Unpin,
274{
275 type Item = Result<Notification<R>>;
276
277 poll_next_and_convert! {}
278}
279
280impl<R> futures::Stream for Stream<Vec<R>>
281where
282 R: DeserializeOwned + Unpin,
283{
284 type Item = Result<Notification<R>>;
285
286 poll_next_and_convert! {}
287}
288
289impl<R> futures::Stream for Stream<Notification<R>>
290where
291 R: DeserializeOwned + Unpin,
292{
293 type Item = Result<Notification<R>>;
294
295 poll_next_and_convert! {}
296}
297
298pub(crate) fn kill<Client>(client: &Surreal<Client>, uuid: Uuid)
299where
300 Client: Connection,
301{
302 let client = client.clone();
303 spawn(async move {
304 if let Ok(router) = client.inner.router.extract() {
305 router
306 .execute_unit(Command::Kill {
307 uuid,
308 })
309 .await
310 .ok();
311 }
312 });
313}
314
315impl<R> Drop for Stream<R> {
316 fn drop(&mut self) {
320 if self.rx.is_some() {
321 kill(&self.client, self.id);
322 }
323 }
324}
325
326fn deserialize<R>(notification: CoreNotification) -> Option<Result<crate::Notification<R>>>
327where
328 R: DeserializeOwned,
329{
330 let query_id = *notification.id;
331 let action = notification.action;
332 match action {
333 CoreAction::Killed => None,
334 action => match api::value::from_core_value(notification.result) {
335 Ok(data) => Some(Ok(Notification {
336 query_id,
337 data,
338 action: Action::from_core(action),
339 })),
340 Err(error) => Some(Err(error)),
341 },
342 }
343}