Skip to main content

simploxide_ws_core/
lib.rs

1//! A fully asynchronous raw SimpleX websocket client that provides:
2//!
3//! 1. Requests batching under a heavy load.
4//!
5//! 2. Complete asynchonisity: futures created by the same instance of a client are fully
6//!    independent from each other. The event queue receives events independently from client
7//!    actions.
8//!
9//! 3. Graceful shutdown with strong guarantees:
10//!     - All futures scheduled before the `.disconnect` call are guaranteed to receive their
11//!       responses. All futures scheduled after the `.disconnect` call are guaranteed to receive the
12//!       [`tungstenite::Error::AlreadyClosed`] error.
13//!
14//!     - If the web socket connection drops due to an error all buffered responses are guaranteed
15//!       to be delivered to corresponding futures. All other pending futures are guaranteed to be
16//!       resolved with the web socket error.
17//!
18//!     - You will receive events for as long as there are futures awaiting responses. After all
19//!       futures are resolved you will receive all buffered events and then the event queue will be
20//!       closed.
21//!
22//! See [README on GitHub](https://github.com/a1akris/simploxide/tree/main/simploxide-core) for diagrams
23//! demonstrating how all this works under the hood.
24//!
25//! -----
26//!
27//! _Current implementation heavily depends on `tokio` runtime and won't work with other
28//! executors._
29
30mod dispatcher;
31mod router;
32mod transmission;
33
34pub use dispatcher::{EventQueue, EventReceiver};
35pub use simploxide_core::SimplexVersion;
36pub use tokio_tungstenite::{self, tungstenite};
37
38use futures::StreamExt;
39use serde::Deserialize;
40use simploxide_core::VersionInfo;
41use tokio::sync::{oneshot, watch};
42use tokio_tungstenite::{
43    MaybeTlsStream, WebSocketStream, connect_async,
44    tungstenite::{Message, client::IntoClientRequest as _},
45};
46use tokio_util::sync::CancellationToken;
47
48use {router::ClientRouter, transmission::Transmitter};
49
50use std::sync::{
51    Arc,
52    atomic::{AtomicUsize, Ordering},
53};
54
55pub type Event = String;
56pub type Response = Event;
57pub type Error = Arc<tungstenite::Error>;
58pub type Result<T = ()> = ::std::result::Result<T, Error>;
59pub type RawEventQueue = EventQueue;
60
61type WsOut =
62    futures::stream::SplitSink<WebSocketStream<MaybeTlsStream<tokio::net::TcpStream>>, Message>;
63type WsIn = futures::stream::SplitStream<WebSocketStream<MaybeTlsStream<tokio::net::TcpStream>>>;
64
65type ShutdownEmitter = watch::Sender<bool>;
66type ShutdownSignal = watch::Receiver<bool>;
67
68static REQUEST_ID: AtomicUsize = AtomicUsize::new(0);
69
70#[cfg(feature = "cli")]
71pub mod cli;
72
73type RequestId = usize;
74fn next_request_id() -> RequestId {
75    REQUEST_ID.fetch_add(1, Ordering::Relaxed)
76}
77
78/// Connect to the running SimpleX daemon by websocket URI.
79///
80/// Returns a [RawClient] for sending commands and a [RawEventQueue] that buffers incoming chat
81/// events independently of client activity.
82///
83/// # Security
84///
85/// - SimpleX CLI does not support TLS URIs("wss://") and will fail at the handshake. The web
86///   socket carries unencrypted unauthenticated traffic. Bind the daemon to
87///   localhost(`ws://127.0.0.1:{port}`) only. Any process or host that can reach the port has full,
88///   unauthenticated control over the daemon, can intercept events and execute arbitrary commands.
89///
90/// # Memory
91///
92/// The [`RawEventQueue`] is backed by an unbounded channel. If events are not consumed they
93/// accumulate indefinitely. Either process events promptly or drop the queue immediately if your
94/// application does not need them
95///
96/// # Example
97///
98/// ```ignore
99/// let (client, events) = simploxide_core::connect("ws://127.0.0.1:5225").await?;
100///
101/// // (Optional) Drop the event queue if you're not planning to handle events
102/// drop(events)
103///
104/// let current_user  = client.send("/user".to_owned()).await?;
105/// println!("{}", serde_json::to_string_pretty(&current_user).unwrap());
106/// ```
107pub async fn connect(simplex_daemon_url: &str) -> tungstenite::Result<(RawClient, RawEventQueue)> {
108    let connection_request = simplex_daemon_url.into_client_request()?;
109    let (sockstream, _) = connect_async(connection_request).await?;
110    let (ws_out, ws_in) = sockstream.split();
111
112    let dispatching_cancellator = CancellationToken::new();
113    let (transmission_interrupter, transmission_interrupted) = oneshot::channel();
114    let (shutdown_tx, shutdown) = watch::channel(false);
115
116    let (client_router, response_router) = router::init(
117        dispatching_cancellator.clone(),
118        transmission_interrupter,
119        shutdown_tx,
120    );
121    let tx = transmission::init(ws_out, transmission_interrupted);
122    let event_queue = dispatcher::init(ws_in, response_router, dispatching_cancellator);
123
124    Ok((
125        RawClient {
126            tx,
127            router: client_router,
128            shutdown,
129        },
130        event_queue,
131    ))
132}
133
134/// A lightweight cheaply clonable client capable of sending raw requests(SimpleX commands) and
135/// receiving raw responses(JSON objects).
136///
137/// You can use the client behind a shared reference, or you can clone it, in both cases the
138/// created futures will be indpenendent from each other.
139#[derive(Clone)]
140pub struct RawClient {
141    tx: Transmitter,
142    router: ClientRouter,
143    shutdown: ShutdownSignal,
144}
145
146impl RawClient {
147    /// Send a raw SimpleX request that is a SimpleX CLI command.
148    ///
149    /// The actual request sending part always resolves immediately so the `send(..).await` call
150    /// directly awaits the response.
151    pub async fn send(&self, command: String) -> Result<Response> {
152        let id = next_request_id();
153        let (responder, response) = oneshot::channel();
154
155        // IMPORTANT: It's crucial to book a request before sending it to the server to avoid the
156        // case when the response comes before the responder registration.
157        self.router.book(id, responder)?;
158        self.tx.make_request(id, command)?;
159
160        response
161            .await
162            .expect("Registered responders always deliver")
163    }
164
165    /// Returns the version of the underlying SimpleX runtime.
166    pub async fn version(&self) -> std::result::Result<SimplexVersion, VersionError> {
167        #[derive(Deserialize)]
168        struct VersionResponse<'a> {
169            #[serde(borrow)]
170            resp: VersionInfo<'a>,
171        }
172
173        let output = self.send("/v".to_owned()).await?;
174
175        let response = serde_json::from_str::<VersionResponse>(&output)
176            .map_err(VersionError::InvalidJson)?
177            .resp
178            .version_info
179            .version;
180
181        let version = response
182            .parse()
183            .map_err(|_| VersionError::ParseError(response.to_owned()))?;
184
185        Ok(version)
186    }
187
188    /// Initiates a graceful shutdown and waits until it is complete. Returns only after the
189    /// connection is fully closed.
190    ///
191    /// All futures that got scheduled before this call will still receive their responses. All
192    /// futures scheduled after this call(from cloned clients) will resolve immediately with
193    /// [`tungstenite::Error::AlreadyClosed`].
194    ///
195    /// If you don't care about waiting for the graceful shutdown to complete you can just drop the
196    /// future, the shutdown will still be triggered
197    ///
198    /// ```ignore
199    /// let _ = client.disconnect();
200    /// ```
201    ///
202    /// or use [`tokio::time::timeout`] to limit the wait time
203    ///
204    /// ```ignore
205    /// tokio::time::timeout(Duration::from_secs(5), client.disconnect())
206    ///     .await
207    ///     .unwrap_or_default();
208    /// ```
209    ///
210    /// # Racing with [`Self::send`]
211    ///
212    /// If [`Self::send`] and [`Self::disconnect`] are called concurrently from different threads
213    /// the outcome depends on scheduling. If `send` wins the channel lock first, it will receive a
214    /// response as normal. If `disconnect` wins first, the `send` future will receive
215    /// [`tungstenite::Error::AlreadyClosed`].
216    ///
217    /// However, in the second case the request could have already been buffered and delivered to the
218    /// server by another thread while `disconnect` was executing on the current thread, meaning the
219    /// send command ran even though the client received an error. Do not use `AlreadyClosed` as a
220    /// proof that the command was not executed. To guarantee ordering, await all `send` futures to
221    /// completion before calling `disconnect`.
222    pub fn disconnect(mut self) -> impl Future<Output = ()> {
223        self.router.shutdown();
224        async move {
225            let _ = self.shutdown.wait_for(|done| *done).await;
226        }
227    }
228}
229
230#[derive(Debug)]
231pub enum VersionError {
232    Ws(Error),
233    InvalidJson(serde_json::Error),
234    ParseError(String),
235}
236
237impl From<Error> for VersionError {
238    fn from(value: Error) -> Self {
239        Self::Ws(value)
240    }
241}
242
243impl std::fmt::Display for VersionError {
244    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
245        match self {
246            Self::Ws(e) => e.fmt(f),
247            Self::InvalidJson(e) => write!(f, "Cannot parse the version json: {e}"),
248            Self::ParseError(s) => {
249                write!(
250                    f,
251                    "Cannot parse version, expected format: '<major>.<minor>.<patch>.<hotfix>', got {s:?}"
252                )
253            }
254        }
255    }
256}
257
258impl std::error::Error for VersionError {
259    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
260        match self {
261            Self::Ws(e) => Some(e),
262            Self::InvalidJson(e) => Some(e),
263            Self::ParseError(_) => None,
264        }
265    }
266}