Skip to main content

surrealdb/api/engine/remote/ws/
mod.rs

1//! WebSocket engine
2
3#[cfg(not(target_family = "wasm"))]
4pub(crate) mod native;
5#[cfg(target_family = "wasm")]
6pub(crate) mod wasm;
7
8use std::collections::HashMap;
9use std::marker::PhantomData;
10use std::time::Duration;
11
12use async_channel::Sender;
13use indexmap::IndexMap;
14use trice::Instant;
15use uuid::Uuid;
16
17use crate::api::conn::{Command, DbResponse};
18use crate::api::{Connect, Result, Surreal};
19use crate::core::dbs::Notification;
20use crate::core::val::Value as CoreValue;
21use crate::opt::IntoEndpoint;
22
23pub(crate) const PATH: &str = "rpc";
24const PING_INTERVAL: Duration = Duration::from_secs(5);
25const REVISION_HEADER: &str = "revision";
26
27enum RequestEffect {
28	/// Completing this request sets a variable to a give value.
29	Set {
30		key: String,
31		value: CoreValue,
32	},
33	/// Completing this request sets a variable to a give value.
34	Clear {
35		key: String,
36	},
37	/// Insert requests repsonses need to be flattened in an array.
38	Insert,
39	/// No effect
40	None,
41}
42
43#[derive(Clone, Copy, Eq, PartialEq, Hash)]
44enum ReplayMethod {
45	Use,
46	Signup,
47	Signin,
48	Invalidate,
49	Authenticate,
50}
51
52struct PendingRequest {
53	// Does resolving this request has some effects.
54	effect: RequestEffect,
55	// The channel to send the result of the request into.
56	response_channel: Sender<Result<DbResponse>>,
57}
58
59struct RouterState<Sink, Stream> {
60	/// Vars currently set by the set method,
61	vars: IndexMap<String, CoreValue>,
62	/// Messages which aught to be replayed on a reconnect.
63	replay: IndexMap<ReplayMethod, Command>,
64	/// Pending live queries
65	live_queries: HashMap<Uuid, async_channel::Sender<Notification>>,
66	/// Send requests which are still awaiting an awnser.
67	pending_requests: HashMap<i64, PendingRequest>,
68	/// The last time a message was recieved from the server.
69	last_activity: Instant,
70	/// The sink into which messages are send to surrealdb
71	sink: Sink,
72	/// The stream from which messages are recieved from surrealdb
73	stream: Stream,
74}
75
76impl<Sink, Stream> RouterState<Sink, Stream> {
77	pub fn new(sink: Sink, stream: Stream) -> Self {
78		RouterState {
79			vars: IndexMap::new(),
80			replay: IndexMap::new(),
81			live_queries: HashMap::new(),
82			pending_requests: HashMap::new(),
83			last_activity: Instant::now(),
84			sink,
85			stream,
86		}
87	}
88}
89
90enum HandleResult {
91	/// Socket disconnected, should continue to reconnect
92	Disconnected,
93	/// Nothing wrong continue as normal.
94	Ok,
95}
96
97/// The WS scheme used to connect to `ws://` endpoints
98#[derive(Debug)]
99pub struct Ws;
100
101/// The WSS scheme used to connect to `wss://` endpoints
102#[derive(Debug)]
103pub struct Wss;
104
105/// A WebSocket client for communicating with the server via WebSockets
106#[derive(Debug, Clone)]
107pub struct Client(());
108
109impl Surreal<Client> {
110	/// Connects to a specific database endpoint, saving the connection on the
111	/// static client
112	///
113	/// # Examples
114	///
115	/// ```no_run
116	/// use std::sync::LazyLock;
117	/// use surrealdb::Surreal;
118	/// use surrealdb::engine::remote::ws::Client;
119	/// use surrealdb::engine::remote::ws::Ws;
120	///
121	/// static DB: LazyLock<Surreal<Client>> = LazyLock::new(Surreal::init);
122	///
123	/// # #[tokio::main]
124	/// # async fn main() -> surrealdb::Result<()> {
125	/// DB.connect::<Ws>("localhost:8000").await?;
126	/// # Ok(())
127	/// # }
128	/// ```
129	pub fn connect<P>(
130		&self,
131		address: impl IntoEndpoint<P, Client = Client>,
132	) -> Connect<Client, ()> {
133		Connect {
134			surreal: self.inner.clone().into(),
135			address: address.into_endpoint(),
136			capacity: 0,
137			response_type: PhantomData,
138		}
139	}
140}