simploxide_ws_core/lib.rs
1#![cfg_attr(docsrs, feature(doc_cfg))]
2//! A fully asynchronous raw SimpleX websocket client that provides:
3//!
4//! 1. Requests batching under a heavy load.
5//!
6//! 2. Complete asynchonisity: futures created by the same instance of a client are fully
7//! independent from each other. The event queue receives events independently from client
8//! actions.
9//!
10//! 3. Graceful shutdown with strong guarantees:
11//! - All futures scheduled before the `.disconnect` call are guaranteed to receive their
12//! responses. All futures scheduled after the `.disconnect` call are guaranteed to receive the
13//! [`tungstenite::Error::AlreadyClosed`] error.
14//!
15//! - If the web socket connection drops due to an error all buffered responses are guaranteed
16//! to be delivered to corresponding futures. All other pending futures are guaranteed to be
17//! resolved with the web socket error.
18//!
19//! - You will receive events for as long as there are futures awaiting responses. After all
20//! futures are resolved you will receive all buffered events and then the event queue will be
21//! closed.
22//!
23//! See [README on GitHub](https://github.com/a1akris/simploxide/tree/main/simploxide-core) for diagrams
24//! demonstrating how all this works under the hood.
25//!
26//! -----
27//!
28//! _Current implementation heavily depends on `tokio` runtime and won't work with other
29//! executors._
30
31mod dispatcher;
32mod router;
33mod transmission;
34
35pub use dispatcher::{EventQueue, EventReceiver};
36pub use simploxide_core::SimplexVersion;
37pub use tokio_tungstenite::{self, tungstenite};
38
39use futures::StreamExt;
40use serde::Deserialize;
41use simploxide_core::VersionInfo;
42use tokio::sync::{oneshot, watch};
43use tokio_tungstenite::{
44 MaybeTlsStream, WebSocketStream, connect_async,
45 tungstenite::{Message, client::IntoClientRequest as _},
46};
47use tokio_util::sync::CancellationToken;
48
49use {router::ClientRouter, transmission::Transmitter};
50
51use std::sync::{
52 Arc,
53 atomic::{AtomicUsize, Ordering},
54};
55
56pub type Event = String;
57pub type Response = Event;
58pub type Error = Arc<tungstenite::Error>;
59pub type Result<T = ()> = ::std::result::Result<T, Error>;
60pub type RawEventQueue = EventQueue;
61
62type WsOut =
63 futures::stream::SplitSink<WebSocketStream<MaybeTlsStream<tokio::net::TcpStream>>, Message>;
64type WsIn = futures::stream::SplitStream<WebSocketStream<MaybeTlsStream<tokio::net::TcpStream>>>;
65
66type ShutdownEmitter = watch::Sender<bool>;
67type ShutdownSignal = watch::Receiver<bool>;
68
69static REQUEST_ID: AtomicUsize = AtomicUsize::new(0);
70
71#[cfg(feature = "cli")]
72pub mod cli;
73
74type RequestId = usize;
75fn next_request_id() -> RequestId {
76 REQUEST_ID.fetch_add(1, Ordering::Relaxed)
77}
78
79/// Connect to the running SimpleX daemon by websocket URI.
80///
81/// Returns a [RawClient] for sending commands and a [RawEventQueue] that buffers incoming chat
82/// events independently of client activity.
83///
84/// # Security
85///
86/// - SimpleX CLI does not support TLS URIs("wss://") and will fail at the handshake. The web
87/// socket carries unencrypted unauthenticated traffic. Bind the daemon to
88/// localhost(`ws://127.0.0.1:{port}`) only. Any process or host that can reach the port has full,
89/// unauthenticated control over the daemon, can intercept events and execute arbitrary commands.
90///
91/// # Memory
92///
93/// The [`RawEventQueue`] is backed by an unbounded channel. If events are not consumed they
94/// accumulate indefinitely. Either process events promptly or drop the queue immediately if your
95/// application does not need them
96///
97/// # Example
98///
99/// ```ignore
100/// let (client, events) = simploxide_core::connect("ws://127.0.0.1:5225").await?;
101///
102/// // (Optional) Drop the event queue if you're not planning to handle events
103/// drop(events)
104///
105/// let current_user = client.send("/user".to_owned()).await?;
106/// println!("{}", serde_json::to_string_pretty(¤t_user).unwrap());
107/// ```
108pub async fn connect(simplex_daemon_url: &str) -> tungstenite::Result<(RawClient, RawEventQueue)> {
109 let connection_request = simplex_daemon_url.into_client_request()?;
110 let (sockstream, _) = connect_async(connection_request).await?;
111 let (ws_out, ws_in) = sockstream.split();
112
113 let dispatching_cancellator = CancellationToken::new();
114 let (transmission_interrupter, transmission_interrupted) = oneshot::channel();
115 let (shutdown_tx, shutdown) = watch::channel(false);
116
117 let (client_router, response_router) = router::init(
118 dispatching_cancellator.clone(),
119 transmission_interrupter,
120 shutdown_tx,
121 );
122 let tx = transmission::init(ws_out, transmission_interrupted);
123 let event_queue = dispatcher::init(ws_in, response_router, dispatching_cancellator);
124
125 Ok((
126 RawClient {
127 tx,
128 router: client_router,
129 shutdown,
130 },
131 event_queue,
132 ))
133}
134
135/// A lightweight cheaply clonable client capable of sending raw requests(SimpleX commands) and
136/// receiving raw responses(JSON objects).
137///
138/// You can use the client behind a shared reference, or you can clone it, in both cases the
139/// created futures will be indpenendent from each other.
140#[derive(Clone)]
141pub struct RawClient {
142 tx: Transmitter,
143 router: ClientRouter,
144 shutdown: ShutdownSignal,
145}
146
147impl RawClient {
148 /// Send a raw SimpleX request that is a SimpleX CLI command.
149 ///
150 /// The actual request sending part always resolves immediately so the `send(..).await` call
151 /// directly awaits the response.
152 pub async fn send(&self, command: String) -> Result<Response> {
153 let id = next_request_id();
154 let (responder, response) = oneshot::channel();
155
156 // IMPORTANT: It's crucial to book a request before sending it to the server to avoid the
157 // case when the response comes before the responder registration.
158 self.router.book(id, responder)?;
159 self.tx.make_request(id, command)?;
160
161 response
162 .await
163 .expect("Registered responders always deliver")
164 }
165
166 /// Returns the version of the underlying SimpleX runtime.
167 pub async fn version(&self) -> std::result::Result<SimplexVersion, VersionError> {
168 #[derive(Deserialize)]
169 struct VersionResponse<'a> {
170 #[serde(borrow)]
171 resp: VersionInfo<'a>,
172 }
173
174 let output = self.send("/v".to_owned()).await?;
175
176 let response = serde_json::from_str::<VersionResponse>(&output)
177 .map_err(VersionError::InvalidJson)?
178 .resp
179 .version_info
180 .version;
181
182 let version = response
183 .parse()
184 .map_err(|_| VersionError::ParseError(response.to_owned()))?;
185
186 Ok(version)
187 }
188
189 /// Initiates a graceful shutdown and waits until it is complete. Returns only after the
190 /// connection is fully closed.
191 ///
192 /// All futures that got scheduled before this call will still receive their responses. All
193 /// futures scheduled after this call(from cloned clients) will resolve immediately with
194 /// [`tungstenite::Error::AlreadyClosed`].
195 ///
196 /// If you don't care about waiting for the graceful shutdown to complete you can just drop the
197 /// future, the shutdown will still be triggered
198 ///
199 /// ```ignore
200 /// let _ = client.disconnect();
201 /// ```
202 ///
203 /// or use [`tokio::time::timeout`] to limit the wait time
204 ///
205 /// ```ignore
206 /// tokio::time::timeout(Duration::from_secs(5), client.disconnect())
207 /// .await
208 /// .unwrap_or_default();
209 /// ```
210 ///
211 /// # Racing with [`Self::send`]
212 ///
213 /// If [`Self::send`] and [`Self::disconnect`] are called concurrently from different threads
214 /// the outcome depends on scheduling. If `send` wins the channel lock first, it will receive a
215 /// response as normal. If `disconnect` wins first, the `send` future will receive
216 /// [`tungstenite::Error::AlreadyClosed`].
217 ///
218 /// However, in the second case the request could have already been buffered and delivered to the
219 /// server by another thread while `disconnect` was executing on the current thread, meaning the
220 /// send command ran even though the client received an error. Do not use `AlreadyClosed` as a
221 /// proof that the command was not executed. To guarantee ordering, await all `send` futures to
222 /// completion before calling `disconnect`.
223 pub fn disconnect(mut self) -> impl Future<Output = ()> {
224 self.router.shutdown();
225 async move {
226 let _ = self.shutdown.wait_for(|done| *done).await;
227 }
228 }
229}
230
231#[derive(Debug)]
232pub enum VersionError {
233 Ws(Error),
234 InvalidJson(serde_json::Error),
235 ParseError(String),
236}
237
238impl From<Error> for VersionError {
239 fn from(value: Error) -> Self {
240 Self::Ws(value)
241 }
242}
243
244impl std::fmt::Display for VersionError {
245 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
246 match self {
247 Self::Ws(e) => e.fmt(f),
248 Self::InvalidJson(e) => write!(f, "Cannot parse the version json: {e}"),
249 Self::ParseError(s) => {
250 write!(
251 f,
252 "Cannot parse version, expected format: '<major>.<minor>.<patch>.<hotfix>', got {s:?}"
253 )
254 }
255 }
256 }
257}
258
259impl std::error::Error for VersionError {
260 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
261 match self {
262 Self::Ws(e) => Some(e),
263 Self::InvalidJson(e) => Some(e),
264 Self::ParseError(_) => None,
265 }
266 }
267}