simploxide_core/lib.rs
1//! A fully asynchronous raw SimpleX client that provides:
2//!
3//! 1. Requests batching under a heavy load.
4//!
5//! 2. Complete asynchonisity: futures created by the same instance of a client are fully
6//! independent from each other. The event queue receives events independently from client
7//! actions.
8//!
9//! 3. Graceful shutdown with strong guarantees:
10//! - All futures scheduled before the `.disconnect` call are guaranteed to receive their
11//! responses. All futures scheduled after the `.disconnect` call are guaranteed to receive the
12//! [`tungstenite::Error::AlreadyClosed`] error.
13//!
14//! - If the web socket connection drops due to an error all already received(buffered)
15//! responses are guaranteed to be delivered to corresponding futures. All other pending
16//! futures are guaranteed to be resolved with the web socket error.
17//!
18//! - You will receive events for as long as there are futures awaiting responses. After all
19//! futures are resolved you will receive all buffered events and then the event queue will be
20//! closed.
21//!
22//! See [README on GitHub](https://github.com/a1akris/simploxide/tree/main/simploxide-core) for diagrams
23//! demonstrating how all this works under the hood.
24//!
25//! -----
26//!
27//! _Current implementation heavily depends on `tokio` runtime and won't work with other
28//! executors._
29mod dispatcher;
30mod router;
31mod transmission;
32
33use std::sync::{
34 Arc,
35 atomic::{AtomicUsize, Ordering},
36};
37
38use futures::StreamExt;
39use tokio::sync::oneshot;
40use tokio_tungstenite::{
41 MaybeTlsStream, WebSocketStream, connect_async,
42 tungstenite::{Message, client::IntoClientRequest as _},
43};
44use tokio_util::sync::CancellationToken;
45
46use {router::ClientRouter, transmission::Transmitter};
47
48pub use dispatcher::EventQueue;
49pub use tokio_tungstenite::{self, tungstenite};
50
51pub type Event = serde_json::Value;
52pub type Response = Event;
53pub type Error = Arc<tungstenite::Error>;
54pub type Result<T = ()> = ::std::result::Result<T, Error>;
55pub type RawEventQueue = EventQueue;
56
57type WsOut =
58 futures::stream::SplitSink<WebSocketStream<MaybeTlsStream<tokio::net::TcpStream>>, Message>;
59type WsIn = futures::stream::SplitStream<WebSocketStream<MaybeTlsStream<tokio::net::TcpStream>>>;
60
61static REQUEST_ID: AtomicUsize = AtomicUsize::new(0);
62
63type RequestId = usize;
64fn next_request_id() -> RequestId {
65 REQUEST_ID.fetch_add(1, Ordering::Relaxed)
66}
67
68/// Connect to the running SimpleX daemon by websocket URI str. Note that SimpleX doesn't support
69/// TLS so using "wss://" will produce an error.
70///
71/// Returns a client that should be used to send requests and receive responses and the event queue
72/// that buffers SimpleX chat events.
73///
74/// If you're writing a script-like app that doesn't need to process events drop the returned event
75/// queue immediately to prevent the events being buffered effectively causing a memory leak.
76///
77/// Example:
78/// ```ignore
79/// let (client, events) = simploxide_core::connect("ws://127.0.0.1:5225").await?;
80///
81/// // (Optional) Drop the event queue if you're not planning to handle events
82/// drop(events)
83///
84/// let current_user = client.send("/user".to_owned()).await?;
85/// println!("{}", serde_json::to_string_pretty(¤t_user).unwrap());
86/// ```
87pub async fn connect(simplex_daemon_url: &str) -> tungstenite::Result<(RawClient, RawEventQueue)> {
88 let connection_request = simplex_daemon_url.into_client_request()?;
89 let (sockstream, _) = connect_async(connection_request).await?;
90 let (ws_out, ws_in) = sockstream.split();
91
92 let dispatching_cancellator = CancellationToken::new();
93 let (transmission_interrupter, transmission_interrupted) = oneshot::channel();
94
95 let (client_router, response_router) =
96 router::init(dispatching_cancellator.clone(), transmission_interrupter);
97 let tx = transmission::init(ws_out, transmission_interrupted);
98 let event_queue = dispatcher::init(ws_in, response_router, dispatching_cancellator);
99
100 Ok((
101 RawClient {
102 tx,
103 router: client_router,
104 },
105 event_queue,
106 ))
107}
108
109/// A lightweight cheaply clonable client capable of sending raw requests(SimpleX commands) and
110/// getting raw responses(JSON objects).
111///
112/// You can use the client behind a shared reference, or you can clone it, in both cases the
113/// created futures will be indpenendent from each other.
114#[derive(Clone)]
115pub struct RawClient {
116 tx: Transmitter,
117 router: ClientRouter,
118}
119
120impl RawClient {
121 /// Send a raw SimpleX request that is a SimpleX CLI command.
122 ///
123 /// The actual request sending part always resolves immediately so the `send(..).await` call
124 /// directly awaits the response.
125 pub async fn send(&self, command: String) -> Result<Response> {
126 let id = next_request_id();
127 let (responder, response) = oneshot::channel();
128
129 // IMPORTANT: It's crucial to book a request before sending it to the server to avoid the
130 // case when the response comes before the responder registration.
131 self.router.book(id, responder)?;
132 self.tx.make_request(id, command)?;
133
134 response
135 .await
136 .expect("Registered responders always deliver")
137 }
138
139 /// Drops the current client and initiates a graceful shutdown for the underlying web socket
140 /// connection.
141 ///
142 /// If there are multiple instances of the client all futures scheduled before this call will
143 /// still receive their responses but all new [`Self::send`] futures will immediately resolve
144 /// with [`tungstenite::Error::AlreadyClosed`].
145 pub fn disconnect(self) {
146 self.router.shutdown();
147 }
148}