Skip to main content

simploxide_ws_core/
lib.rs

1#![cfg_attr(docsrs, feature(doc_cfg))]
2//! A fully asynchronous raw SimpleX websocket client that provides:
3//!
4//! 1. Requests batching under a heavy load.
5//!
6//! 2. Complete asynchonisity: futures created by the same instance of a client are fully
7//!    independent from each other. The event queue receives events independently from client
8//!    actions.
9//!
10//! 3. Graceful shutdown with strong guarantees:
11//!     - All futures scheduled before the `.disconnect` call are guaranteed to receive their
12//!       responses. All futures scheduled after the `.disconnect` call are guaranteed to receive the
13//!       [`tungstenite::Error::AlreadyClosed`] error.
14//!
15//!     - If the web socket connection drops due to an error all buffered responses are guaranteed
16//!       to be delivered to corresponding futures. All other pending futures are guaranteed to be
17//!       resolved with the web socket error.
18//!
19//!     - You will receive events for as long as there are futures awaiting responses. After all
20//!       futures are resolved you will receive all buffered events and then the event queue will be
21//!       closed.
22//!
23//! See [README on GitHub](https://github.com/a1akris/simploxide/tree/main/simploxide-core) for diagrams
24//! demonstrating how all this works under the hood.
25//!
26//! -----
27//!
28//! _Current implementation heavily depends on `tokio` runtime and won't work with other
29//! executors._
30
31mod dispatcher;
32mod router;
33mod transmission;
34
35pub use dispatcher::{EventQueue, EventReceiver};
36pub use simploxide_core::SimplexVersion;
37pub use tokio_tungstenite::{self, tungstenite};
38
39use futures::StreamExt;
40use serde::Deserialize;
41use simploxide_core::VersionInfo;
42use tokio::sync::{oneshot, watch};
43use tokio_tungstenite::{
44    MaybeTlsStream, WebSocketStream, connect_async,
45    tungstenite::{Message, client::IntoClientRequest as _},
46};
47use tokio_util::sync::CancellationToken;
48
49use {router::ClientRouter, transmission::Transmitter};
50
51use std::sync::{
52    Arc,
53    atomic::{AtomicUsize, Ordering},
54};
55
56pub type Event = String;
57pub type Response = Event;
58pub type Error = Arc<tungstenite::Error>;
59pub type Result<T = ()> = ::std::result::Result<T, Error>;
60pub type RawEventQueue = EventQueue;
61
62type WsOut =
63    futures::stream::SplitSink<WebSocketStream<MaybeTlsStream<tokio::net::TcpStream>>, Message>;
64type WsIn = futures::stream::SplitStream<WebSocketStream<MaybeTlsStream<tokio::net::TcpStream>>>;
65
66type ShutdownEmitter = watch::Sender<bool>;
67type ShutdownSignal = watch::Receiver<bool>;
68
69static REQUEST_ID: AtomicUsize = AtomicUsize::new(0);
70
71#[cfg(feature = "cli")]
72pub mod cli;
73
74type RequestId = usize;
75fn next_request_id() -> RequestId {
76    REQUEST_ID.fetch_add(1, Ordering::Relaxed)
77}
78
79/// Connect to the running SimpleX daemon by websocket URI.
80///
81/// Returns a [RawClient] for sending commands and a [RawEventQueue] that buffers incoming chat
82/// events independently of client activity.
83///
84/// # Security
85///
86/// - SimpleX CLI does not support TLS URIs("wss://") and will fail at the handshake. The web
87///   socket carries unencrypted unauthenticated traffic. Bind the daemon to
88///   localhost(`ws://127.0.0.1:{port}`) only. Any process or host that can reach the port has full,
89///   unauthenticated control over the daemon, can intercept events and execute arbitrary commands.
90///
91/// # Memory
92///
93/// The [`RawEventQueue`] is backed by an unbounded channel. If events are not consumed they
94/// accumulate indefinitely. Either process events promptly or drop the queue immediately if your
95/// application does not need them
96///
97/// # Example
98///
99/// ```ignore
100/// let (client, events) = simploxide_core::connect("ws://127.0.0.1:5225").await?;
101///
102/// // (Optional) Drop the event queue if you're not planning to handle events
103/// drop(events)
104///
105/// let current_user  = client.send("/user".to_owned()).await?;
106/// println!("{}", serde_json::to_string_pretty(&current_user).unwrap());
107/// ```
108pub async fn connect(simplex_daemon_url: &str) -> tungstenite::Result<(RawClient, RawEventQueue)> {
109    let connection_request = simplex_daemon_url.into_client_request()?;
110    let (sockstream, _) = connect_async(connection_request).await?;
111    let (ws_out, ws_in) = sockstream.split();
112
113    let dispatching_cancellator = CancellationToken::new();
114    let (transmission_interrupter, transmission_interrupted) = oneshot::channel();
115    let (shutdown_tx, shutdown) = watch::channel(false);
116
117    let (client_router, response_router) = router::init(
118        dispatching_cancellator.clone(),
119        transmission_interrupter,
120        shutdown_tx,
121    );
122    let tx = transmission::init(ws_out, transmission_interrupted);
123    let event_queue = dispatcher::init(ws_in, response_router, dispatching_cancellator);
124
125    Ok((
126        RawClient {
127            tx,
128            router: client_router,
129            shutdown,
130        },
131        event_queue,
132    ))
133}
134
135/// A lightweight cheaply clonable client capable of sending raw requests(SimpleX commands) and
136/// receiving raw responses(JSON objects).
137///
138/// You can use the client behind a shared reference, or you can clone it, in both cases the
139/// created futures will be indpenendent from each other.
140#[derive(Clone)]
141pub struct RawClient {
142    tx: Transmitter,
143    router: ClientRouter,
144    shutdown: ShutdownSignal,
145}
146
147impl RawClient {
148    /// Send a raw SimpleX request that is a SimpleX CLI command.
149    ///
150    /// The actual request sending part always resolves immediately so the `send(..).await` call
151    /// directly awaits the response.
152    pub async fn send(&self, command: String) -> Result<Response> {
153        let id = next_request_id();
154        let (responder, response) = oneshot::channel();
155
156        // IMPORTANT: It's crucial to book a request before sending it to the server to avoid the
157        // case when the response comes before the responder registration.
158        self.router.book(id, responder)?;
159        self.tx.make_request(id, command)?;
160
161        response
162            .await
163            .expect("Registered responders always deliver")
164    }
165
166    /// Returns the version of the underlying SimpleX runtime.
167    pub async fn version(&self) -> std::result::Result<SimplexVersion, VersionError> {
168        #[derive(Deserialize)]
169        struct VersionResponse<'a> {
170            #[serde(borrow)]
171            resp: VersionInfo<'a>,
172        }
173
174        let output = self.send("/v".to_owned()).await?;
175
176        let response = serde_json::from_str::<VersionResponse>(&output)
177            .map_err(VersionError::InvalidJson)?
178            .resp
179            .version_info
180            .version;
181
182        let version = response
183            .parse()
184            .map_err(|_| VersionError::ParseError(response.to_owned()))?;
185
186        Ok(version)
187    }
188
189    /// Initiates a graceful shutdown and waits until it is complete. Returns only after the
190    /// connection is fully closed.
191    ///
192    /// All futures that got scheduled before this call will still receive their responses. All
193    /// futures scheduled after this call(from cloned clients) will resolve immediately with
194    /// [`tungstenite::Error::AlreadyClosed`].
195    ///
196    /// If you don't care about waiting for the graceful shutdown to complete you can just drop the
197    /// future, the shutdown will still be triggered
198    ///
199    /// ```ignore
200    /// let _ = client.disconnect();
201    /// ```
202    ///
203    /// or use [`tokio::time::timeout`] to limit the wait time
204    ///
205    /// ```ignore
206    /// tokio::time::timeout(Duration::from_secs(5), client.disconnect())
207    ///     .await
208    ///     .unwrap_or_default();
209    /// ```
210    ///
211    /// # Racing with [`Self::send`]
212    ///
213    /// If [`Self::send`] and [`Self::disconnect`] are called concurrently from different threads
214    /// the outcome depends on scheduling. If `send` wins the channel lock first, it will receive a
215    /// response as normal. If `disconnect` wins first, the `send` future will receive
216    /// [`tungstenite::Error::AlreadyClosed`].
217    ///
218    /// However, in the second case the request could have already been buffered and delivered to the
219    /// server by another thread while `disconnect` was executing on the current thread, meaning the
220    /// send command ran even though the client received an error. Do not use `AlreadyClosed` as a
221    /// proof that the command was not executed. To guarantee ordering, await all `send` futures to
222    /// completion before calling `disconnect`.
223    pub fn disconnect(mut self) -> impl Future<Output = ()> {
224        self.router.shutdown();
225        async move {
226            let _ = self.shutdown.wait_for(|done| *done).await;
227        }
228    }
229}
230
231#[derive(Debug)]
232pub enum VersionError {
233    Ws(Error),
234    InvalidJson(serde_json::Error),
235    ParseError(String),
236}
237
238impl From<Error> for VersionError {
239    fn from(value: Error) -> Self {
240        Self::Ws(value)
241    }
242}
243
244impl std::fmt::Display for VersionError {
245    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
246        match self {
247            Self::Ws(e) => e.fmt(f),
248            Self::InvalidJson(e) => write!(f, "Cannot parse the version json: {e}"),
249            Self::ParseError(s) => {
250                write!(
251                    f,
252                    "Cannot parse version, expected format: '<major>.<minor>.<patch>.<hotfix>', got {s:?}"
253                )
254            }
255        }
256    }
257}
258
259impl std::error::Error for VersionError {
260    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
261        match self {
262            Self::Ws(e) => Some(e),
263            Self::InvalidJson(e) => Some(e),
264            Self::ParseError(_) => None,
265        }
266    }
267}