Skip to main content

simploxide_ffi_core/
lib.rs

1//! A fully asynchrouns raw SimpleX client backed by the SimpleX FFI bindings(see
2//! [simploxide_sxcrt_sys] for setup instructions) that provides:
3//!
4//! 1. Multi-instance support: run many SimpleX-Chat instances from a single process. Each instance
5//!    is fully isolated and all are served by a single shared worker thread with fair round-robin
6//!    scheduling and per-instance execution caps to prevent starvation.
7//!
8//! 2. Complete asynchonisity: futures created by the same instance of a client are fully
9//!    independent from each other. The event queue receives events independently from client
10//!    actions.
11//!
12//! 3. Graceful shutdown with strong guarantees:
13//!     - All commands enqueued before [`RawClient::disconnect`] are guaranteed to execute and
14//!       return their responses.
15//!
16//!     - All commands enqueued after [`RawClient::disconnect`] are guaranteed to return
17//!       [`CallError::Failure`] without being executed.
18//!
19//!     - You will receive events for as long as the chat instance is active. After disconnect the
20//!       remaining buffered events are delivered and then the event queue closes.
21//!
22//! -----
23//!
24//! _Current implementation heavily depends on `tokio` runtime and won't work with other
25//! executors._
26
27pub mod default;
28
29mod worker;
30
31pub use simploxide_core::SimplexVersion;
32pub use simploxide_sxcrt_sys::{CallError, InitError, MigrationConfirmation};
33
34use serde::Deserialize;
35use simploxide_core::VersionInfo;
36
37use std::{path::Path, sync::Arc, time::Duration};
38
39pub type Command = String;
40pub type Event = String;
41pub type Response = String;
42
43pub type Result<T = (), E = Arc<CallError>> = ::std::result::Result<T, E>;
44
45type FfiResponder = tokio::sync::oneshot::Sender<Result<Response>>;
46
47type CmdTransmitter = std::sync::mpsc::Sender<ChatCommand>;
48type CmdReceiver = std::sync::mpsc::Receiver<ChatCommand>;
49
50type EventTransmitter = tokio::sync::mpsc::UnboundedSender<Result<Event>>;
51pub type EventReceiver = tokio::sync::mpsc::UnboundedReceiver<Result<Event>>;
52
53type ShutdownEmitter = tokio::sync::watch::Sender<bool>;
54type ShutdownSignal = tokio::sync::watch::Receiver<bool>;
55
56/// Configuration for the shared FFI worker thread.
57///
58/// Applies only on the first [`init`] or [`init_with_config`] call. All subsequent calls reuse
59/// the already running worker thread and ignore this parameter entirely.
60#[derive(Default, Debug, Clone)]
61pub struct WorkerConfig {
62    /// Maximum permissible event latency. Controls how long the worker thread may sleep between
63    /// polling cycles when all chats are idle. The sleep interval grows linearly from zero up to
64    /// this value as idle time accumulates. Sending any command resets the interval immediately by
65    /// waking the thread. Default: [`default::MAX_EVENT_LATENCY`].
66    pub max_event_latency: Option<std::time::Duration>,
67
68    /// Maximum number of chat instances the worker thread will serve simultaneously. [`init`]
69    /// returns [`CallError::Failure`] when this limit is reached. Passing `0` is valid but
70    /// prevents any chat from ever being created. Default: [`default::MAX_CHAT_INSTANCES`].
71    pub max_instances: Option<usize>,
72}
73
74impl WorkerConfig {
75    pub fn new() -> Self {
76        Self::default()
77    }
78
79    pub fn with_event_latency(mut self, duration: Duration) -> Self {
80        self.max_event_latency = Some(duration);
81        self
82    }
83
84    pub fn max_instances(mut self, max_instances: usize) -> Self {
85        self.max_instances = Some(max_instances);
86        self
87    }
88}
89
90/// Open a SimpleX database with default [`WorkerConfig`] and start receiving events.
91///
92/// See [`init_with_config`] for full documentation.
93pub async fn init(
94    default_user: DefaultUser,
95    db_opts: DbOpts,
96) -> Result<(RawClient, RawEventQueue), InitError> {
97    init_with_config(default_user, db_opts, WorkerConfig::default()).await
98}
99
100/// Open a SimpleX database and start receiving events.
101///
102/// Returns a [`RawClient`] for sending commands and a [`RawEventQueue`] that buffers incoming chat
103/// events independently of client activity. Each init call creates a fully isolated instance with
104/// its own client and event queue, shutting down one instance does not affect any other.
105///
106/// All FFI calls like event polling and command execution are running on a single shared OS
107/// thread. Creating a new instance blocks this thread for the full duration of the database
108/// initialisation(including migrations). All other currently active chat instances are frozen
109/// during the execution of this method.
110///
111/// # Memory
112///
113/// The [`RawEventQueue`] is backed by an unbounded channel. If events are not consumed they
114/// accumulate indefinitely. Either process events promptly or drop the queue immediately if your
115/// application does not need them.
116///
117///
118/// # Example
119///
120/// ```ignore
121/// let (client, mut events) = simploxide_ffi_core::init_with_config(
122///     DefaultUser::bot("MyBot"),
123///     DbOpts::unencrypted("./data/mybot"),
124///     WorkerConfig::new().max_instances(4),
125/// ).await?;
126///
127/// // (Optional) Drop the event queue if you're not planning to handle events
128/// drop(events)
129///
130/// // Get SimpleX runtime version
131/// let version = client.version().await?;
132/// ```
133pub async fn init_with_config(
134    default_user: DefaultUser,
135    db_opts: DbOpts,
136    config: WorkerConfig,
137) -> Result<(RawClient, RawEventQueue), InitError> {
138    worker::init(config).spawn_chat(default_user, db_opts).await
139}
140
141/// A lightweight cheaply clonable client for sending raw requests(SimpleX commands) and receiving
142/// raw responses(JSON objects).
143///
144/// You can use the client behind a shared reference, or you can clone it, in both cases the
145/// created futures will be indpenendent from each other.
146#[derive(Clone)]
147pub struct RawClient {
148    tx: CmdTransmitter,
149    worker: worker::Worker,
150    shutdown: ShutdownSignal,
151}
152
153impl RawClient {
154    /// Send a raw SimpleX command and await its response.
155    ///
156    /// The command is sent immediately and the returned future directly awaits the response from
157    /// the worker thread.
158    pub async fn send(&self, command: Command) -> Result<Response> {
159        let (responder, response) = tokio::sync::oneshot::channel();
160
161        self.tx
162            .send(ChatCommand::Execute(command, responder))
163            .map_err(|_| CallError::Failure)?;
164
165        self.worker.wake();
166
167        response.await.map_err(|_| CallError::Failure)?
168    }
169
170    /// Returns the version of the underlying SimpleX runtime.
171    pub async fn version(&self) -> Result<SimplexVersion, VersionError> {
172        #[derive(Deserialize)]
173        struct VersionResult<'a> {
174            #[serde(borrow)]
175            result: VersionInfo<'a>,
176        }
177
178        let output = self.send("/v".to_owned()).await?;
179
180        let response = serde_json::from_str::<VersionResult>(&output)
181            .map_err(|e| Arc::new(CallError::InvalidJson(e)))?
182            .result
183            .version_info
184            .version;
185
186        let version = response
187            .parse()
188            .map_err(|_| VersionError::ParseError(response.to_owned()))?;
189
190        Ok(version)
191    }
192
193    /// Initiates a graceful shutdown and waits until the database is fully closed.
194    ///
195    /// All futures that got scheduled before this call will still receive their responses. All
196    /// futures scheduled after this call(from cloned clients) will resolve immediately with
197    /// [`CallError::Failure`].
198    ///
199    /// If you don't care about waiting for the graceful shutdown to complete you can just drop the
200    /// future, the shutdown will still be triggered
201    ///
202    /// ```ignore
203    /// let _ = client.disconnect();
204    /// ```
205    ///
206    /// or use [`tokio::time::timeout`] to limit the wait time
207    ///
208    /// ```ignore
209    /// tokio::time::timeout(Duration::from_secs(5), client.disconnect())
210    ///     .await
211    ///     .unwrap_or_default();
212    /// ```
213    ///
214    /// # Racing with [`Self::send`]
215    ///
216    /// Commands and the disconnect signal share the same FIFO channel. Whichever call enqueues
217    /// first is processed first: If `disconnect` enqueues before a concurrent `send` the `send`
218    /// future returns [`CallError::Failure`] and the command is guaranteed not to have been
219    /// executed, if `send` enqueues before `disconnect` - the command executes normally.
220    ///
221    /// To guarantee ordering, await all `send` futures to completion before calling `disconnect`.
222    pub fn disconnect(mut self) -> impl Future<Output = ()> {
223        let _ = self.tx.send(ChatCommand::Disconnect);
224        self.worker.wake();
225
226        async move {
227            let _ = self.shutdown.wait_for(|b| *b).await;
228        }
229    }
230}
231
232/// An event queue that buffers incoming SimpleX events independently of client activity.
233///
234/// Backed by an unbounded channel. If events are not consumed they accumulate indefinitely. Drop
235/// the queue as soon as it is no longer needed. When dropped while a chat instance is active and
236/// producing events, the Haskell-side queue is still drained continuously - events are discarded
237/// in Rust and do not accumulate in the FFI layer.
238pub struct RawEventQueue {
239    receiver: EventReceiver,
240}
241
242impl RawEventQueue {
243    /// Returns the next event from the queue, or `None` if the chat has shut down.
244    pub async fn next_event(&mut self) -> Option<Result<Event>> {
245        self.receiver.recv().await
246    }
247
248    /// Unwraps the queue into the underlying tokio unbounded receiver for more advanced use cases.
249    pub fn into_receiver(self) -> EventReceiver {
250        self.receiver
251    }
252}
253
254/// The SimpleX user profile used to initialise the chat instance.
255///
256/// # Security
257///
258/// The `display_name` field is injected into a SimpleX CLI command of the form `/create {kind}
259/// '{display_name}'`. It is intended to be a short, fixed, ASCII identifier chosen by the
260/// application author, do not supply a user-input here to avoid command injections like:
261/// "User'Name"(creates a user named "User" with bio="Name")
262#[derive(Debug, Clone)]
263pub struct DefaultUser {
264    pub display_name: String,
265    pub is_bot: bool,
266}
267
268impl DefaultUser {
269    /// Creates a regular SimpleX user profile with the given display name.
270    ///
271    /// `name` is injected literally into `/create user '{name}'`. Use a fixed ASCII identifier;
272    /// do not pass user-supplied input here.
273    pub fn regular<S: Into<String>>(name: S) -> Self {
274        Self {
275            display_name: name.into(),
276            is_bot: false,
277        }
278    }
279
280    /// Creates a bot SimpleX user profile with the given display name.
281    ///
282    /// `name` is injected literally into `/create bot '{name}'`. Use a fixed ASCII identifier;
283    /// do not pass user-supplied input here.
284    pub fn bot<S: Into<String>>(name: S) -> Self {
285        Self {
286            display_name: name.into(),
287            is_bot: true,
288        }
289    }
290}
291
292/// Database options for a SimpleX chat instance.
293///
294/// # The `prefix` field
295///
296/// SimpleX stores each chat instance as a set of files sharing a common path prefix. The prefix
297/// is a directory path plus a filename stem: the directory part is created if absent, and the
298/// stem is prepended to every database filename.
299///
300/// ```text
301/// prefix: "data/bot" - creates ./data/bot_agent.db, ./data/bot_chat.db
302/// prefix: "bot" - creates ./bot_agent.db, ./bot_chat.db
303/// ```
304///
305/// # Warning: overlapping prefixes
306///
307/// Two instances whose prefixes share the same directory and stem will silently read and write the
308/// same files. This will produce DB errors and may cause DB corruptions
309#[derive(Debug, Clone)]
310pub struct DbOpts {
311    pub prefix: String,
312    pub key: Option<String>,
313    pub migration: MigrationConfirmation,
314}
315
316impl DbOpts {
317    /// Open an unencrypted SimpleX database at the given path prefix.
318    ///
319    /// See [`DbOpts`] for an explanation of what `prefix` means and the overlap warning.
320    pub fn unencrypted<P: AsRef<Path>>(db_path: P) -> Self {
321        Self {
322            prefix: db_path.as_ref().display().to_string(),
323            key: None,
324            migration: MigrationConfirmation::YesUp,
325        }
326    }
327
328    /// Open an encrypted SimpleX database at the given path prefix with the given passphrase.
329    ///
330    /// See [`DbOpts`] for an explanation of what `prefix` means and the overlap warning.
331    pub fn encrypted<P: AsRef<Path>, K: Into<String>>(prefix: P, key: K) -> Self {
332        Self {
333            prefix: prefix.as_ref().display().to_string(),
334            key: Some(key.into()),
335            migration: MigrationConfirmation::YesUp,
336        }
337    }
338}
339
340/// Error returned by [`RawClient::version`].
341#[derive(Debug)]
342pub enum VersionError {
343    Ffi(Arc<CallError>),
344    ParseError(String),
345}
346
347impl From<Arc<CallError>> for VersionError {
348    fn from(value: Arc<CallError>) -> Self {
349        Self::Ffi(value)
350    }
351}
352
353impl std::fmt::Display for VersionError {
354    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
355        match self {
356            Self::Ffi(e) => e.fmt(f),
357            Self::ParseError(s) => {
358                write!(
359                    f,
360                    "Cannot parse version, expected format: '<major>.<minor>.<patch>.<hotfix>', got {s:?}"
361                )
362            }
363        }
364    }
365}
366
367impl std::error::Error for VersionError {
368    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
369        match self {
370            Self::Ffi(e) => Some(e),
371            Self::ParseError(_) => None,
372        }
373    }
374}
375
376enum ChatCommand {
377    Execute(Command, FfiResponder),
378    Disconnect,
379}