Skip to main content

surrealdb/api/method/
select.rs

1use std::borrow::Cow;
2use std::future::IntoFuture;
3use std::marker::PhantomData;
4
5use serde::de::DeserializeOwned;
6use uuid::Uuid;
7
8use super::transaction::WithTransaction;
9use crate::api::conn::Command;
10use crate::api::method::{BoxFuture, OnceLockExt};
11use crate::api::opt::Resource;
12use crate::api::{Connection, Result};
13use crate::method::Live;
14use crate::opt::KeyRange;
15use crate::{Surreal, Value};
16
17/// A select future
18#[derive(Debug)]
19#[must_use = "futures do nothing unless you `.await` or poll them"]
20pub struct Select<'r, C: Connection, R, T = ()> {
21	pub(super) txn: Option<Uuid>,
22	pub(super) client: Cow<'r, Surreal<C>>,
23	pub(super) resource: Result<Resource>,
24	pub(super) response_type: PhantomData<R>,
25	pub(super) query_type: PhantomData<T>,
26}
27
28impl<C, R, T> WithTransaction for Select<'_, C, R, T>
29where
30	C: Connection,
31{
32	fn with_transaction(mut self, id: Uuid) -> Self {
33		self.txn = Some(id);
34		self
35	}
36}
37
38impl<C, R, T> Select<'_, C, R, T>
39where
40	C: Connection,
41{
42	/// Converts to an owned type which can easily be moved to a different
43	/// thread
44	pub fn into_owned(self) -> Select<'static, C, R, T> {
45		Select {
46			client: Cow::Owned(self.client.into_owned()),
47			..self
48		}
49	}
50}
51
52macro_rules! into_future {
53	($method:ident) => {
54		fn into_future(self) -> Self::IntoFuture {
55			let Select {
56				txn,
57				client,
58				resource,
59				..
60			} = self;
61			Box::pin(async move {
62				let router = client.inner.router.extract()?;
63				router
64					.$method(Command::Select {
65						txn,
66						what: resource?,
67					})
68					.await
69			})
70		}
71	};
72}
73
74impl<'r, Client> IntoFuture for Select<'r, Client, Value>
75where
76	Client: Connection,
77{
78	type Output = Result<Value>;
79	type IntoFuture = BoxFuture<'r, Self::Output>;
80
81	into_future! {execute_value}
82}
83
84impl<'r, Client, R> IntoFuture for Select<'r, Client, Option<R>>
85where
86	Client: Connection,
87	R: DeserializeOwned,
88{
89	type Output = Result<Option<R>>;
90	type IntoFuture = BoxFuture<'r, Self::Output>;
91
92	into_future! {execute_opt}
93}
94
95impl<'r, Client, R> IntoFuture for Select<'r, Client, Vec<R>>
96where
97	Client: Connection,
98	R: DeserializeOwned,
99{
100	type Output = Result<Vec<R>>;
101	type IntoFuture = BoxFuture<'r, Self::Output>;
102
103	into_future! {execute_vec}
104}
105
106impl<C> Select<'_, C, Value>
107where
108	C: Connection,
109{
110	/// Restricts the records selected to those in the specified range
111	pub fn range(mut self, range: impl Into<KeyRange>) -> Self {
112		self.resource = self.resource.and_then(|x| x.with_range(range.into()));
113		self
114	}
115}
116
117impl<C, R> Select<'_, C, Vec<R>>
118where
119	C: Connection,
120{
121	/// Restricts the records selected to those in the specified range
122	pub fn range(mut self, range: impl Into<KeyRange>) -> Self {
123		self.resource = self.resource.and_then(|x| x.with_range(range.into()));
124		self
125	}
126}
127
128impl<'r, C, R> Select<'r, C, R>
129where
130	C: Connection,
131	R: DeserializeOwned,
132{
133	/// Turns a normal select query into a live query
134	///
135	/// # Examples
136	///
137	/// ```no_run
138	/// # use futures::StreamExt;
139	/// # use surrealdb::opt::Resource;
140	/// # use surrealdb::Result;
141	/// # use surrealdb::Notification;
142	/// # #[derive(Debug, serde::Deserialize)]
143	/// # struct Person;
144	/// #
145	/// # #[tokio::main]
146	/// # async fn main() -> surrealdb::Result<()> {
147	/// # let db = surrealdb::engine::any::connect("mem://").await?;
148	/// #
149	/// // Select the namespace/database to use
150	/// db.use_ns("namespace").use_db("database").await?;
151	///
152	/// // Listen to all updates on a table
153	/// let mut stream = db.select("person").live().await?;
154	/// # let _: Option<Result<Notification<Person>>> = stream.next().await;
155	///
156	/// // Listen to updates on a range of records
157	/// let mut stream = db.select("person").range("jane".."john").live().await?;
158	/// # let _: Option<Result<Notification<Person>>> = stream.next().await;
159	///
160	/// // Listen to updates on a specific record
161	/// let mut stream = db.select(("person", "h5wxrf2ewk8xjxosxtyc")).live().await?;
162	///
163	/// // The returned stream implements `futures::Stream` so we can
164	/// // use it with `futures::StreamExt`, for example.
165	/// while let Some(result) = stream.next().await {
166	///     handle(result);
167	/// }
168	///
169	/// // Handle the result of the live query notification
170	/// fn handle(result: Result<Notification<Person>>) {
171	///     match result {
172	///         Ok(notification) => println!("{notification:?}"),
173	///         Err(error) => eprintln!("{error}"),
174	///     }
175	/// }
176	/// #
177	/// # Ok(())
178	/// # }
179	/// ```
180	pub fn live(self) -> Select<'r, C, R, Live> {
181		Select {
182			txn: self.txn,
183			client: self.client,
184			resource: self.resource,
185			response_type: self.response_type,
186			query_type: PhantomData,
187		}
188	}
189}