simploxide_core/
lib.rs

1//! A fully asynchronous raw SimpleX client that provides:
2//!
3//! 1. Requests batching under a heavy load.
4//!
5//! 2. Complete asynchonisity: futures created by the same instance of a client are fully
6//!    independent from each other. The event queue receives events independently from client
7//!    actions.
8//!
9//! 3. Graceful shutdown with strong guarantees:
10//!     - All futures scheduled before the `.disconnect` call are guaranteed to receive their
11//!       responses. All futures scheduled after the `.disconnect` call are guaranteed to receive the
12//!       [`tungstenite::Error::AlreadyClosed`] error.
13//!
14//!     - If the web socket connection drops due to an error all already received(buffered)
15//!       responses are guaranteed to be delivered to corresponding futures. All other pending
16//!       futures are guaranteed to be resolved with the web socket error.
17//!
18//!     - You will receive events for as long as there are futures awaiting responses. After all
19//!       futures are resolved you will receive all buffered events and then the event queue will be
20//!       closed.
21//!
22//! See [README on GitHub](https://github.com/a1akris/simploxide/tree/main/simploxide-core) for diagrams
23//! demonstrating how all this works under the hood.
24//!
25//! -----
26//!
27//! _Current implementation heavily depends on `tokio` runtime and won't work with other
28//! executors._
29mod dispatcher;
30mod router;
31mod transmission;
32
33use std::sync::{
34    Arc,
35    atomic::{AtomicUsize, Ordering},
36};
37
38use futures::StreamExt;
39use tokio::sync::oneshot;
40use tokio_tungstenite::{
41    MaybeTlsStream, WebSocketStream, connect_async,
42    tungstenite::{Message, client::IntoClientRequest as _},
43};
44use tokio_util::sync::CancellationToken;
45
46use {router::ClientRouter, transmission::Transmitter};
47
48pub use dispatcher::EventQueue;
49pub use tokio_tungstenite::{self, tungstenite};
50
51pub type Event = serde_json::Value;
52pub type Response = Event;
53pub type Error = Arc<tungstenite::Error>;
54pub type Result<T = ()> = ::std::result::Result<T, Error>;
55pub type RawEventQueue = EventQueue;
56
57type WsOut =
58    futures::stream::SplitSink<WebSocketStream<MaybeTlsStream<tokio::net::TcpStream>>, Message>;
59type WsIn = futures::stream::SplitStream<WebSocketStream<MaybeTlsStream<tokio::net::TcpStream>>>;
60
61static REQUEST_ID: AtomicUsize = AtomicUsize::new(0);
62
63type RequestId = usize;
64fn next_request_id() -> RequestId {
65    REQUEST_ID.fetch_add(1, Ordering::Relaxed)
66}
67
68/// Connect to the running SimpleX daemon by websocket URI str. Note that SimpleX doesn't support
69/// TLS so using "wss://" will produce an error.
70///
71/// Returns a client that should be used to send requests and receive responses and the event queue
72/// that buffers SimpleX chat events.
73///
74/// If you're writing a script-like app that doesn't need to process events drop the returned event
75/// queue immediately to prevent the events being buffered effectively causing a memory leak.
76///
77/// Example:
78/// ```ignore
79/// let (client, events) = simploxide_core::connect("ws://127.0.0.1:5225").await?;
80///
81/// // (Optional) Drop the event queue if you're not planning to handle events
82/// drop(events)
83///
84/// let current_user  = client.send("/user".to_owned()).await?;
85/// println!("{}", serde_json::to_string_pretty(&current_user).unwrap());
86/// ```
87pub async fn connect(simplex_daemon_url: &str) -> tungstenite::Result<(RawClient, RawEventQueue)> {
88    let connection_request = simplex_daemon_url.into_client_request()?;
89    let (sockstream, _) = connect_async(connection_request).await?;
90    let (ws_out, ws_in) = sockstream.split();
91
92    let dispatching_cancellator = CancellationToken::new();
93    let (transmission_interrupter, transmission_interrupted) = oneshot::channel();
94
95    let (client_router, response_router) =
96        router::init(dispatching_cancellator.clone(), transmission_interrupter);
97    let tx = transmission::init(ws_out, transmission_interrupted);
98    let event_queue = dispatcher::init(ws_in, response_router, dispatching_cancellator);
99
100    Ok((
101        RawClient {
102            tx,
103            router: client_router,
104        },
105        event_queue,
106    ))
107}
108
109/// A lightweight cheaply clonable client capable of sending raw requests(SimpleX commands) and
110/// getting raw responses(JSON objects).
111///
112/// You can use the client behind a shared reference, or you can clone it, in both cases the
113/// created futures will be indpenendent from each other.
114#[derive(Clone)]
115pub struct RawClient {
116    tx: Transmitter,
117    router: ClientRouter,
118}
119
120impl RawClient {
121    /// Send a raw SimpleX request that is a SimpleX CLI command.
122    ///
123    /// The actual request sending part always resolves immediately so the `send(..).await` call
124    /// directly awaits the response.
125    pub async fn send(&self, command: String) -> Result<Response> {
126        let id = next_request_id();
127        let (responder, response) = oneshot::channel();
128
129        // IMPORTANT: It's crucial to book a request before sending it to the server to avoid the
130        // case when the response comes before the responder registration.
131        self.router.book(id, responder)?;
132        self.tx.make_request(id, command)?;
133
134        response
135            .await
136            .expect("Registered responders always deliver")
137    }
138
139    /// Drops the current client and initiates a graceful shutdown for the underlying web socket
140    /// connection.
141    ///
142    /// If there are multiple instances of the client all futures scheduled before this call will
143    /// still receive their responses but all new [`Self::send`] futures will immediately resolve
144    /// with [`tungstenite::Error::AlreadyClosed`].
145    pub fn disconnect(self) {
146        self.router.shutdown();
147    }
148}