simploxide_ffi_core/lib.rs
1//! A fully asynchrouns raw SimpleX client backed by the SimpleX FFI bindings(see
2//! [simploxide_sxcrt_sys] for setup instructions) that provides:
3//!
4//! 1. Multi-instance support: run many SimpleX-Chat instances from a single process. Each instance
5//! is fully isolated and all are served by a single shared worker thread with fair round-robin
6//! scheduling and per-instance execution caps to prevent starvation.
7//!
8//! 2. Complete asynchonisity: futures created by the same instance of a client are fully
9//! independent from each other. The event queue receives events independently from client
10//! actions.
11//!
12//! 3. Graceful shutdown with strong guarantees:
13//! - All commands enqueued before [`RawClient::disconnect`] are guaranteed to execute and
14//! return their responses.
15//!
16//! - All commands enqueued after [`RawClient::disconnect`] are guaranteed to return
17//! [`CallError::Failure`] without being executed.
18//!
19//! - You will receive events for as long as the chat instance is active. After disconnect the
20//! remaining buffered events are delivered and then the event queue closes.
21//!
22//! -----
23//!
24//! _Current implementation heavily depends on `tokio` runtime and won't work with other
25//! executors._
26
27pub mod default;
28
29mod worker;
30
31pub use simploxide_core::SimplexVersion;
32pub use simploxide_sxcrt_sys::{CallError, InitError, MigrationConfirmation};
33
34use serde::Deserialize;
35use simploxide_core::VersionInfo;
36
37use std::{path::Path, sync::Arc, time::Duration};
38
39pub type Command = String;
40pub type Event = String;
41pub type Response = String;
42
43pub type Result<T = (), E = Arc<CallError>> = ::std::result::Result<T, E>;
44
45type FfiResponder = tokio::sync::oneshot::Sender<Result<Response>>;
46
47type CmdTransmitter = std::sync::mpsc::Sender<ChatCommand>;
48type CmdReceiver = std::sync::mpsc::Receiver<ChatCommand>;
49
50type EventTransmitter = tokio::sync::mpsc::UnboundedSender<Result<Event>>;
51pub type EventReceiver = tokio::sync::mpsc::UnboundedReceiver<Result<Event>>;
52
53type ShutdownEmitter = tokio::sync::watch::Sender<bool>;
54type ShutdownSignal = tokio::sync::watch::Receiver<bool>;
55
56/// Configuration for the shared FFI worker thread.
57///
58/// Applies only on the first [`init`] or [`init_with_config`] call. All subsequent calls reuse
59/// the already running worker thread and ignore this parameter entirely.
60#[derive(Default, Debug, Clone)]
61pub struct WorkerConfig {
62 /// Maximum permissible event latency. Controls how long the worker thread may sleep between
63 /// polling cycles when all chats are idle. The sleep interval grows linearly from zero up to
64 /// this value as idle time accumulates. Sending any command resets the interval immediately by
65 /// waking the thread. Default: [`default::MAX_EVENT_LATENCY`].
66 pub max_event_latency: Option<std::time::Duration>,
67
68 /// Maximum number of chat instances the worker thread will serve simultaneously. [`init`]
69 /// returns [`CallError::Failure`] when this limit is reached. Passing `0` is valid but
70 /// prevents any chat from ever being created. Default: [`default::MAX_CHAT_INSTANCES`].
71 pub max_instances: Option<usize>,
72}
73
74impl WorkerConfig {
75 pub fn new() -> Self {
76 Self::default()
77 }
78
79 pub fn with_event_latency(mut self, duration: Duration) -> Self {
80 self.max_event_latency = Some(duration);
81 self
82 }
83
84 pub fn max_instances(mut self, max_instances: usize) -> Self {
85 self.max_instances = Some(max_instances);
86 self
87 }
88}
89
90/// Open a SimpleX database with default [`WorkerConfig`] and start receiving events.
91///
92/// See [`init_with_config`] for full documentation.
93pub async fn init(
94 default_user: DefaultUser,
95 db_opts: DbOpts,
96) -> Result<(RawClient, RawEventQueue), InitError> {
97 init_with_config(default_user, db_opts, WorkerConfig::default()).await
98}
99
100/// Open a SimpleX database and start receiving events.
101///
102/// Returns a [`RawClient`] for sending commands and a [`RawEventQueue`] that buffers incoming chat
103/// events independently of client activity. Each init call creates a fully isolated instance with
104/// its own client and event queue, shutting down one instance does not affect any other.
105///
106/// All FFI calls like event polling and command execution are running on a single shared OS
107/// thread. Creating a new instance blocks this thread for the full duration of the database
108/// initialisation(including migrations). All other currently active chat instances are frozen
109/// during the execution of this method.
110///
111/// # Memory
112///
113/// The [`RawEventQueue`] is backed by an unbounded channel. If events are not consumed they
114/// accumulate indefinitely. Either process events promptly or drop the queue immediately if your
115/// application does not need them.
116///
117///
118/// # Example
119///
120/// ```ignore
121/// let (client, mut events) = simploxide_ffi_core::init_with_config(
122/// DefaultUser::bot("MyBot"),
123/// DbOpts::unencrypted("./data/mybot"),
124/// WorkerConfig::new().max_instances(4),
125/// ).await?;
126///
127/// // (Optional) Drop the event queue if you're not planning to handle events
128/// drop(events)
129///
130/// // Get SimpleX runtime version
131/// let version = client.version().await?;
132/// ```
133pub async fn init_with_config(
134 default_user: DefaultUser,
135 db_opts: DbOpts,
136 config: WorkerConfig,
137) -> Result<(RawClient, RawEventQueue), InitError> {
138 worker::init(config).spawn_chat(default_user, db_opts).await
139}
140
141/// A lightweight cheaply clonable client for sending raw requests(SimpleX commands) and receiving
142/// raw responses(JSON objects).
143///
144/// You can use the client behind a shared reference, or you can clone it, in both cases the
145/// created futures will be indpenendent from each other.
146#[derive(Clone)]
147pub struct RawClient {
148 tx: CmdTransmitter,
149 worker: worker::Worker,
150 shutdown: ShutdownSignal,
151}
152
153impl RawClient {
154 /// Send a raw SimpleX command and await its response.
155 ///
156 /// The command is sent immediately and the returned future directly awaits the response from
157 /// the worker thread.
158 pub async fn send(&self, command: Command) -> Result<Response> {
159 let (responder, response) = tokio::sync::oneshot::channel();
160
161 self.tx
162 .send(ChatCommand::Execute(command, responder))
163 .map_err(|_| CallError::Failure)?;
164
165 self.worker.wake();
166
167 response.await.map_err(|_| CallError::Failure)?
168 }
169
170 /// Returns the version of the underlying SimpleX runtime.
171 pub async fn version(&self) -> Result<SimplexVersion, VersionError> {
172 #[derive(Deserialize)]
173 struct VersionResult<'a> {
174 #[serde(borrow)]
175 result: VersionInfo<'a>,
176 }
177
178 let output = self.send("/v".to_owned()).await?;
179
180 let response = serde_json::from_str::<VersionResult>(&output)
181 .map_err(|e| Arc::new(CallError::InvalidJson(e)))?
182 .result
183 .version_info
184 .version;
185
186 let version = response
187 .parse()
188 .map_err(|_| VersionError::ParseError(response.to_owned()))?;
189
190 Ok(version)
191 }
192
193 /// Initiates a graceful shutdown and waits until the database is fully closed.
194 ///
195 /// All futures that got scheduled before this call will still receive their responses. All
196 /// futures scheduled after this call(from cloned clients) will resolve immediately with
197 /// [`CallError::Failure`].
198 ///
199 /// If you don't care about waiting for the graceful shutdown to complete you can just drop the
200 /// future, the shutdown will still be triggered
201 ///
202 /// ```ignore
203 /// let _ = client.disconnect();
204 /// ```
205 ///
206 /// or use [`tokio::time::timeout`] to limit the wait time
207 ///
208 /// ```ignore
209 /// tokio::time::timeout(Duration::from_secs(5), client.disconnect())
210 /// .await
211 /// .unwrap_or_default();
212 /// ```
213 ///
214 /// # Racing with [`Self::send`]
215 ///
216 /// Commands and the disconnect signal share the same FIFO channel. Whichever call enqueues
217 /// first is processed first: If `disconnect` enqueues before a concurrent `send` the `send`
218 /// future returns [`CallError::Failure`] and the command is guaranteed not to have been
219 /// executed, if `send` enqueues before `disconnect` - the command executes normally.
220 ///
221 /// To guarantee ordering, await all `send` futures to completion before calling `disconnect`.
222 pub fn disconnect(mut self) -> impl Future<Output = ()> {
223 let _ = self.tx.send(ChatCommand::Disconnect);
224 self.worker.wake();
225
226 async move {
227 let _ = self.shutdown.wait_for(|b| *b).await;
228 }
229 }
230}
231
232/// An event queue that buffers incoming SimpleX events independently of client activity.
233///
234/// Backed by an unbounded channel. If events are not consumed they accumulate indefinitely. Drop
235/// the queue as soon as it is no longer needed. When dropped while a chat instance is active and
236/// producing events, the Haskell-side queue is still drained continuously - events are discarded
237/// in Rust and do not accumulate in the FFI layer.
238pub struct RawEventQueue {
239 receiver: EventReceiver,
240}
241
242impl RawEventQueue {
243 /// Returns the next event from the queue, or `None` if the chat has shut down.
244 pub async fn next_event(&mut self) -> Option<Result<Event>> {
245 self.receiver.recv().await
246 }
247
248 /// Unwraps the queue into the underlying tokio unbounded receiver for more advanced use cases.
249 pub fn into_receiver(self) -> EventReceiver {
250 self.receiver
251 }
252}
253
254/// The SimpleX user profile used to initialise the chat instance.
255///
256/// # Security
257///
258/// The `display_name` field is injected into a SimpleX CLI command of the form `/create {kind}
259/// '{display_name}'`. It is intended to be a short, fixed, ASCII identifier chosen by the
260/// application author, do not supply a user-input here to avoid command injections like:
261/// "User'Name"(creates a user named "User" with bio="Name")
262#[derive(Debug, Clone)]
263pub struct DefaultUser {
264 pub display_name: String,
265 pub is_bot: bool,
266}
267
268impl DefaultUser {
269 /// Creates a regular SimpleX user profile with the given display name.
270 ///
271 /// `name` is injected literally into `/create user '{name}'`. Use a fixed ASCII identifier;
272 /// do not pass user-supplied input here.
273 pub fn regular<S: Into<String>>(name: S) -> Self {
274 Self {
275 display_name: name.into(),
276 is_bot: false,
277 }
278 }
279
280 /// Creates a bot SimpleX user profile with the given display name.
281 ///
282 /// `name` is injected literally into `/create bot '{name}'`. Use a fixed ASCII identifier;
283 /// do not pass user-supplied input here.
284 pub fn bot<S: Into<String>>(name: S) -> Self {
285 Self {
286 display_name: name.into(),
287 is_bot: true,
288 }
289 }
290}
291
292/// Database options for a SimpleX chat instance.
293///
294/// # The `prefix` field
295///
296/// SimpleX stores each chat instance as a set of files sharing a common path prefix. The prefix
297/// is a directory path plus a filename stem: the directory part is created if absent, and the
298/// stem is prepended to every database filename.
299///
300/// ```text
301/// prefix: "data/bot" - creates ./data/bot_agent.db, ./data/bot_chat.db
302/// prefix: "bot" - creates ./bot_agent.db, ./bot_chat.db
303/// ```
304///
305/// # Warning: overlapping prefixes
306///
307/// Two instances whose prefixes share the same directory and stem will silently read and write the
308/// same files. This will produce DB errors and may cause DB corruptions
309#[derive(Debug, Clone)]
310pub struct DbOpts {
311 pub prefix: String,
312 pub key: Option<String>,
313 pub migration: MigrationConfirmation,
314}
315
316impl DbOpts {
317 /// Open an unencrypted SimpleX database at the given path prefix.
318 ///
319 /// See [`DbOpts`] for an explanation of what `prefix` means and the overlap warning.
320 pub fn unencrypted<P: AsRef<Path>>(db_path: P) -> Self {
321 Self {
322 prefix: db_path.as_ref().display().to_string(),
323 key: None,
324 migration: MigrationConfirmation::YesUp,
325 }
326 }
327
328 /// Open an encrypted SimpleX database at the given path prefix with the given passphrase.
329 ///
330 /// See [`DbOpts`] for an explanation of what `prefix` means and the overlap warning.
331 pub fn encrypted<P: AsRef<Path>, K: Into<String>>(prefix: P, key: K) -> Self {
332 Self {
333 prefix: prefix.as_ref().display().to_string(),
334 key: Some(key.into()),
335 migration: MigrationConfirmation::YesUp,
336 }
337 }
338}
339
340/// Error returned by [`RawClient::version`].
341#[derive(Debug)]
342pub enum VersionError {
343 Ffi(Arc<CallError>),
344 ParseError(String),
345}
346
347impl From<Arc<CallError>> for VersionError {
348 fn from(value: Arc<CallError>) -> Self {
349 Self::Ffi(value)
350 }
351}
352
353impl std::fmt::Display for VersionError {
354 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
355 match self {
356 Self::Ffi(e) => e.fmt(f),
357 Self::ParseError(s) => {
358 write!(
359 f,
360 "Cannot parse version, expected format: '<major>.<minor>.<patch>.<hotfix>', got {s:?}"
361 )
362 }
363 }
364 }
365}
366
367impl std::error::Error for VersionError {
368 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
369 match self {
370 Self::Ffi(e) => Some(e),
371 Self::ParseError(_) => None,
372 }
373 }
374}
375
376enum ChatCommand {
377 Execute(Command, FfiResponder),
378 Disconnect,
379}