surrealdb/api/engine/remote/ws/
mod.rs

1//! WebSocket engine
2
3#[cfg(not(target_arch = "wasm32"))]
4pub(crate) mod native;
5#[cfg(target_arch = "wasm32")]
6pub(crate) mod wasm;
7
8use crate::api;
9use crate::api::conn::DbResponse;
10use crate::api::conn::Method;
11use crate::api::engine::remote::duration_from_str;
12use crate::api::err::Error;
13use crate::api::method::query::QueryResult;
14use crate::api::Connect;
15use crate::api::Result;
16use crate::api::Surreal;
17use crate::dbs::Notification;
18use crate::dbs::QueryMethodResponse;
19use crate::dbs::Status;
20use crate::method::Stats;
21use crate::opt::IntoEndpoint;
22use crate::sql::Value;
23use indexmap::IndexMap;
24use revision::revisioned;
25use revision::Revisioned;
26use serde::de::DeserializeOwned;
27use serde::Deserialize;
28use std::io::Read;
29use std::marker::PhantomData;
30use std::time::Duration;
31
32pub(crate) const PATH: &str = "rpc";
33const PING_INTERVAL: Duration = Duration::from_secs(5);
34const PING_METHOD: &str = "ping";
35const REVISION_HEADER: &str = "revision";
36
37/// The WS scheme used to connect to `ws://` endpoints
38#[derive(Debug)]
39pub struct Ws;
40
41/// The WSS scheme used to connect to `wss://` endpoints
42#[derive(Debug)]
43pub struct Wss;
44
45/// A WebSocket client for communicating with the server via WebSockets
46#[derive(Debug, Clone)]
47pub struct Client {
48	pub(crate) id: i64,
49	method: Method,
50}
51
52impl Surreal<Client> {
53	/// Connects to a specific database endpoint, saving the connection on the static client
54	///
55	/// # Examples
56	///
57	/// ```no_run
58	/// use once_cell::sync::Lazy;
59	/// use surrealdb::Surreal;
60	/// use surrealdb::engine::remote::ws::Client;
61	/// use surrealdb::engine::remote::ws::Ws;
62	///
63	/// static DB: Lazy<Surreal<Client>> = Lazy::new(Surreal::init);
64	///
65	/// # #[tokio::main]
66	/// # async fn main() -> surrealdb::Result<()> {
67	/// DB.connect::<Ws>("localhost:8000").await?;
68	/// # Ok(())
69	/// # }
70	/// ```
71	pub fn connect<P>(
72		&self,
73		address: impl IntoEndpoint<P, Client = Client>,
74	) -> Connect<Client, ()> {
75		Connect {
76			router: self.router.clone(),
77			engine: PhantomData,
78			address: address.into_endpoint(),
79			capacity: 0,
80			waiter: self.waiter.clone(),
81			response_type: PhantomData,
82		}
83	}
84}
85
86#[revisioned(revision = 1)]
87#[derive(Clone, Debug, Deserialize)]
88pub(crate) struct Failure {
89	pub(crate) code: i64,
90	pub(crate) message: String,
91}
92
93#[revisioned(revision = 1)]
94#[derive(Debug, Deserialize)]
95pub(crate) enum Data {
96	Other(Value),
97	Query(Vec<QueryMethodResponse>),
98	Live(Notification),
99}
100
101type ServerResult = std::result::Result<Data, Failure>;
102
103impl From<Failure> for Error {
104	fn from(failure: Failure) -> Self {
105		match failure.code {
106			-32600 => Self::InvalidRequest(failure.message),
107			-32602 => Self::InvalidParams(failure.message),
108			-32603 => Self::InternalError(failure.message),
109			-32700 => Self::ParseError(failure.message),
110			_ => Self::Query(failure.message),
111		}
112	}
113}
114
115impl DbResponse {
116	fn from(result: ServerResult) -> Result<Self> {
117		match result.map_err(Error::from)? {
118			Data::Other(value) => Ok(DbResponse::Other(value)),
119			Data::Query(responses) => {
120				let mut map =
121					IndexMap::<usize, (Stats, QueryResult)>::with_capacity(responses.len());
122
123				for (index, response) in responses.into_iter().enumerate() {
124					let stats = Stats {
125						execution_time: duration_from_str(&response.time),
126					};
127					match response.status {
128						Status::Ok => {
129							map.insert(index, (stats, Ok(response.result)));
130						}
131						Status::Err => {
132							map.insert(
133								index,
134								(stats, Err(Error::Query(response.result.as_raw_string()).into())),
135							);
136						}
137						_ => unreachable!(),
138					}
139				}
140
141				Ok(DbResponse::Query(api::Response {
142					results: map,
143					..api::Response::new()
144				}))
145			}
146			// Live notifications don't call this method
147			Data::Live(..) => unreachable!(),
148		}
149	}
150}
151
152#[revisioned(revision = 1)]
153#[derive(Debug, Deserialize)]
154pub(crate) struct Response {
155	id: Option<Value>,
156	pub(crate) result: ServerResult,
157}
158
159fn serialize(value: &Value, revisioned: bool) -> Result<Vec<u8>> {
160	if revisioned {
161		let mut buf = Vec::new();
162		value.serialize_revisioned(&mut buf).map_err(|error| crate::Error::Db(error.into()))?;
163		return Ok(buf);
164	}
165	crate::sql::serde::serialize(value).map_err(|error| crate::Error::Db(error.into()))
166}
167
168fn deserialize<A, T>(bytes: &mut A, revisioned: bool) -> Result<T>
169where
170	A: Read,
171	T: Revisioned + DeserializeOwned,
172{
173	if revisioned {
174		return T::deserialize_revisioned(bytes).map_err(|x| crate::Error::Db(x.into()));
175	}
176	let mut buf = Vec::new();
177	bytes.read_to_end(&mut buf).map_err(crate::err::Error::Io)?;
178	crate::sql::serde::deserialize(&buf).map_err(|error| crate::Error::Db(error.into()))
179}