simploxide_ws_core/lib.rs
1//! A fully asynchronous raw SimpleX websocket client that provides:
2//!
3//! 1. Requests batching under a heavy load.
4//!
5//! 2. Complete asynchonisity: futures created by the same instance of a client are fully
6//! independent from each other. The event queue receives events independently from client
7//! actions.
8//!
9//! 3. Graceful shutdown with strong guarantees:
10//! - All futures scheduled before the `.disconnect` call are guaranteed to receive their
11//! responses. All futures scheduled after the `.disconnect` call are guaranteed to receive the
12//! [`tungstenite::Error::AlreadyClosed`] error.
13//!
14//! - If the web socket connection drops due to an error all buffered responses are guaranteed
15//! to be delivered to corresponding futures. All other pending futures are guaranteed to be
16//! resolved with the web socket error.
17//!
18//! - You will receive events for as long as there are futures awaiting responses. After all
19//! futures are resolved you will receive all buffered events and then the event queue will be
20//! closed.
21//!
22//! See [README on GitHub](https://github.com/a1akris/simploxide/tree/main/simploxide-core) for diagrams
23//! demonstrating how all this works under the hood.
24//!
25//! -----
26//!
27//! _Current implementation heavily depends on `tokio` runtime and won't work with other
28//! executors._
29
30mod dispatcher;
31mod router;
32mod transmission;
33
34pub use dispatcher::{EventQueue, EventReceiver};
35pub use simploxide_core::SimplexVersion;
36pub use tokio_tungstenite::{self, tungstenite};
37
38use futures::StreamExt;
39use serde::Deserialize;
40use simploxide_core::VersionInfo;
41use tokio::sync::{oneshot, watch};
42use tokio_tungstenite::{
43 MaybeTlsStream, WebSocketStream, connect_async,
44 tungstenite::{Message, client::IntoClientRequest as _},
45};
46use tokio_util::sync::CancellationToken;
47
48use {router::ClientRouter, transmission::Transmitter};
49
50use std::sync::{
51 Arc,
52 atomic::{AtomicUsize, Ordering},
53};
54
55pub type Event = String;
56pub type Response = Event;
57pub type Error = Arc<tungstenite::Error>;
58pub type Result<T = ()> = ::std::result::Result<T, Error>;
59pub type RawEventQueue = EventQueue;
60
61type WsOut =
62 futures::stream::SplitSink<WebSocketStream<MaybeTlsStream<tokio::net::TcpStream>>, Message>;
63type WsIn = futures::stream::SplitStream<WebSocketStream<MaybeTlsStream<tokio::net::TcpStream>>>;
64
65type ShutdownEmitter = watch::Sender<bool>;
66type ShutdownSignal = watch::Receiver<bool>;
67
68static REQUEST_ID: AtomicUsize = AtomicUsize::new(0);
69
70#[cfg(feature = "cli")]
71pub mod cli;
72
73type RequestId = usize;
74fn next_request_id() -> RequestId {
75 REQUEST_ID.fetch_add(1, Ordering::Relaxed)
76}
77
78/// Connect to the running SimpleX daemon by websocket URI.
79///
80/// Returns a [RawClient] for sending commands and a [RawEventQueue] that buffers incoming chat
81/// events independently of client activity.
82///
83/// # Security
84///
85/// - SimpleX CLI does not support TLS URIs("wss://") and will fail at the handshake. The web
86/// socket carries unencrypted unauthenticated traffic. Bind the daemon to
87/// localhost(`ws://127.0.0.1:{port}`) only. Any process or host that can reach the port has full,
88/// unauthenticated control over the daemon, can intercept events and execute arbitrary commands.
89///
90/// # Memory
91///
92/// The [`RawEventQueue`] is backed by an unbounded channel. If events are not consumed they
93/// accumulate indefinitely. Either process events promptly or drop the queue immediately if your
94/// application does not need them
95///
96/// # Example
97///
98/// ```ignore
99/// let (client, events) = simploxide_core::connect("ws://127.0.0.1:5225").await?;
100///
101/// // (Optional) Drop the event queue if you're not planning to handle events
102/// drop(events)
103///
104/// let current_user = client.send("/user".to_owned()).await?;
105/// println!("{}", serde_json::to_string_pretty(¤t_user).unwrap());
106/// ```
107pub async fn connect(simplex_daemon_url: &str) -> tungstenite::Result<(RawClient, RawEventQueue)> {
108 let connection_request = simplex_daemon_url.into_client_request()?;
109 let (sockstream, _) = connect_async(connection_request).await?;
110 let (ws_out, ws_in) = sockstream.split();
111
112 let dispatching_cancellator = CancellationToken::new();
113 let (transmission_interrupter, transmission_interrupted) = oneshot::channel();
114 let (shutdown_tx, shutdown) = watch::channel(false);
115
116 let (client_router, response_router) = router::init(
117 dispatching_cancellator.clone(),
118 transmission_interrupter,
119 shutdown_tx,
120 );
121 let tx = transmission::init(ws_out, transmission_interrupted);
122 let event_queue = dispatcher::init(ws_in, response_router, dispatching_cancellator);
123
124 Ok((
125 RawClient {
126 tx,
127 router: client_router,
128 shutdown,
129 },
130 event_queue,
131 ))
132}
133
134/// A lightweight cheaply clonable client capable of sending raw requests(SimpleX commands) and
135/// receiving raw responses(JSON objects).
136///
137/// You can use the client behind a shared reference, or you can clone it, in both cases the
138/// created futures will be indpenendent from each other.
139#[derive(Clone)]
140pub struct RawClient {
141 tx: Transmitter,
142 router: ClientRouter,
143 shutdown: ShutdownSignal,
144}
145
146impl RawClient {
147 /// Send a raw SimpleX request that is a SimpleX CLI command.
148 ///
149 /// The actual request sending part always resolves immediately so the `send(..).await` call
150 /// directly awaits the response.
151 pub async fn send(&self, command: String) -> Result<Response> {
152 let id = next_request_id();
153 let (responder, response) = oneshot::channel();
154
155 // IMPORTANT: It's crucial to book a request before sending it to the server to avoid the
156 // case when the response comes before the responder registration.
157 self.router.book(id, responder)?;
158 self.tx.make_request(id, command)?;
159
160 response
161 .await
162 .expect("Registered responders always deliver")
163 }
164
165 /// Returns the version of the underlying SimpleX runtime.
166 pub async fn version(&self) -> std::result::Result<SimplexVersion, VersionError> {
167 #[derive(Deserialize)]
168 struct VersionResponse<'a> {
169 #[serde(borrow)]
170 resp: VersionInfo<'a>,
171 }
172
173 let output = self.send("/v".to_owned()).await?;
174
175 let response = serde_json::from_str::<VersionResponse>(&output)
176 .map_err(VersionError::InvalidJson)?
177 .resp
178 .version_info
179 .version;
180
181 let version = response
182 .parse()
183 .map_err(|_| VersionError::ParseError(response.to_owned()))?;
184
185 Ok(version)
186 }
187
188 /// Initiates a graceful shutdown and waits until it is complete. Returns only after the
189 /// connection is fully closed.
190 ///
191 /// All futures that got scheduled before this call will still receive their responses. All
192 /// futures scheduled after this call(from cloned clients) will resolve immediately with
193 /// [`tungstenite::Error::AlreadyClosed`].
194 ///
195 /// If you don't care about waiting for the graceful shutdown to complete you can just drop the
196 /// future, the shutdown will still be triggered
197 ///
198 /// ```ignore
199 /// let _ = client.disconnect();
200 /// ```
201 ///
202 /// or use [`tokio::time::timeout`] to limit the wait time
203 ///
204 /// ```ignore
205 /// tokio::time::timeout(Duration::from_secs(5), client.disconnect())
206 /// .await
207 /// .unwrap_or_default();
208 /// ```
209 ///
210 /// # Racing with [`Self::send`]
211 ///
212 /// If [`Self::send`] and [`Self::disconnect`] are called concurrently from different threads
213 /// the outcome depends on scheduling. If `send` wins the channel lock first, it will receive a
214 /// response as normal. If `disconnect` wins first, the `send` future will receive
215 /// [`tungstenite::Error::AlreadyClosed`].
216 ///
217 /// However, in the second case the request could have already been buffered and delivered to the
218 /// server by another thread while `disconnect` was executing on the current thread, meaning the
219 /// send command ran even though the client received an error. Do not use `AlreadyClosed` as a
220 /// proof that the command was not executed. To guarantee ordering, await all `send` futures to
221 /// completion before calling `disconnect`.
222 pub fn disconnect(mut self) -> impl Future<Output = ()> {
223 self.router.shutdown();
224 async move {
225 let _ = self.shutdown.wait_for(|done| *done).await;
226 }
227 }
228}
229
230#[derive(Debug)]
231pub enum VersionError {
232 Ws(Error),
233 InvalidJson(serde_json::Error),
234 ParseError(String),
235}
236
237impl From<Error> for VersionError {
238 fn from(value: Error) -> Self {
239 Self::Ws(value)
240 }
241}
242
243impl std::fmt::Display for VersionError {
244 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
245 match self {
246 Self::Ws(e) => e.fmt(f),
247 Self::InvalidJson(e) => write!(f, "Cannot parse the version json: {e}"),
248 Self::ParseError(s) => {
249 write!(
250 f,
251 "Cannot parse version, expected format: '<major>.<minor>.<patch>.<hotfix>', got {s:?}"
252 )
253 }
254 }
255 }
256}
257
258impl std::error::Error for VersionError {
259 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
260 match self {
261 Self::Ws(e) => Some(e),
262 Self::InvalidJson(e) => Some(e),
263 Self::ParseError(_) => None,
264 }
265 }
266}