Skip to main content

simploxide_client/
ws.rs

1//! WebSocket backend that connects to a `simplex-chat` WebSocket server.
2//!
3//! Use [`BotBuilder`] to launch or connect to `simplex-chat` and get a ready-to-use [`Bot`].
4//! For lower-level access, [`connect`] and [`retry_connect`] return a [`Client`] and an
5//! [`EventStream`](crate::EventStream) directly.
6
7use std::sync::Arc;
8
9pub use simploxide_ws_core::{
10    self as core, Error as CoreError, Event as CoreEvent, Result as CoreResult, SimplexVersion,
11    VersionError, tungstenite::Error as WsError,
12};
13
14#[cfg(feature = "cli")]
15pub use simploxide_ws_core::cli;
16
17use serde::Deserialize;
18use simploxide_api_types::{
19    Preferences, Profile,
20    client_api::{ExtractResponse, WebSocketResponseShape, WebSocketResponseShapeInner},
21    events::{Event, EventKind},
22};
23use simploxide_core::{MAX_SUPPORTED_VERSION, MIN_SUPPORTED_VERSION};
24use simploxide_ws_core::RawClient;
25
26use crate::{
27    BadResponseError, ClientApi, ClientApiError, EventParser,
28    bot::{BotProfileSettings, BotSettings},
29    preview::ImagePreview,
30};
31
32#[cfg(not(feature = "xftp"))]
33pub type Bot = crate::bot::Bot<Client>;
34
35#[cfg(feature = "xftp")]
36pub type Bot = crate::bot::Bot<crate::xftp::XftpClient<Client>>;
37
38pub type EventStream = crate::EventStream<CoreResult<CoreEvent>>;
39pub type ClientResult<T = ()> = ::std::result::Result<T, ClientError>;
40
41/// Connects to a `simplex-chat` WebSocket server, returning a [`Client`] and an [`EventStream`]
42/// that handle serialization/deserialization of commands and events.
43///
44/// ```ignore
45/// let (client, mut events) = simploxide_client::ws::connect("ws://127.0.0.1:5225").await?;
46///
47/// let current_user = client.api_show_active_user().await?;
48/// println!("{current_user:#?}");
49///
50/// while let Some(ev) = events.try_next().await? {
51///     // Process events...
52/// }
53/// ```
54pub async fn connect<S: AsRef<str>>(uri: S) -> Result<(Client, EventStream), ConnectError> {
55    let (raw_client, raw_event_queue) = simploxide_ws_core::connect(uri.as_ref()).await?;
56
57    let version = raw_client
58        .version()
59        .await
60        .map_err(ConnectError::VersionError)?;
61
62    if !version.is_supported() {
63        return Err(ConnectError::VersionMismatch(version));
64    }
65
66    Ok((
67        Client::from(raw_client),
68        EventStream::from(raw_event_queue.into_receiver()),
69    ))
70}
71
72/// Like [`connect`] but retries to connect `retries_count` times before returning an error. This
73/// method is needed when you run simplex-cli programmatically and don't know when WebSocket port
74/// becomes available.
75///
76/// ```ignore
77/// let port = 5225;
78/// let cli = SimplexCli::spawn(port);
79/// let uri = format!("ws://127.0.0.1:{port}");
80///
81/// let (client, mut events) = simploxide_client::retry_connect(&uri, Duration::from_secs(1), 10).await?;
82///
83/// //...
84///
85/// ```
86pub async fn retry_connect<S: AsRef<str>>(
87    uri: S,
88    retry_delay: std::time::Duration,
89    mut retries_count: usize,
90) -> Result<(Client, EventStream), ConnectError> {
91    loop {
92        match connect(uri.as_ref()).await {
93            Ok(connection) => break Ok(connection),
94            Err(e) if !e.is_server() || retries_count == 0 => break Err(e),
95            Err(_) => {
96                retries_count -= 1;
97                tokio::time::sleep(retry_delay).await
98            }
99        }
100    }
101}
102
103impl EventParser for CoreResult<String> {
104    type Error = ClientError;
105
106    fn parse_kind(&self) -> Result<EventKind, Self::Error> {
107        #[derive(Deserialize)]
108        struct TypeField<'a> {
109            #[serde(rename = "type", borrow)]
110            typ: &'a str,
111        }
112
113        match parse_data::<TypeField<'_>>(self) {
114            Ok(f) => Ok(EventKind::from_type_str(f.typ)),
115            Err(ClientError::BadResponse(BadResponseError::Undocumented(_))) => {
116                Ok(EventKind::Undocumented)
117            }
118            Err(e) => Err(e),
119        }
120    }
121
122    fn parse_event(&self) -> Result<Event, Self::Error> {
123        parse_data(self)
124    }
125}
126
127fn parse_data<'de, 'r: 'de, D: 'de + Deserialize<'de>>(
128    res: &'r CoreResult<String>,
129) -> ClientResult<D> {
130    res.as_ref()
131        .map_err(|e| ClientError::WebSocketFailure(e.clone()))
132        .and_then(|ev| {
133            serde_json::from_str::<EventShape<D>>(ev)
134                .map_err(BadResponseError::InvalidJson)
135                .and_then(|shape| shape.extract_response())
136                .map_err(ClientError::BadResponse)
137        })
138}
139
140#[derive(Deserialize)]
141#[serde(untagged)]
142pub enum EventShape<T> {
143    ResponseShape(WebSocketResponseShape<T>),
144    InlineShape(WebSocketResponseShapeInner<T>),
145}
146
147impl<'de, T: 'de + Deserialize<'de>> ExtractResponse<'de, T> for EventShape<T> {
148    fn extract_response(self) -> Result<T, BadResponseError> {
149        match self {
150            Self::ResponseShape(resp) => resp.extract_response(),
151            Self::InlineShape(inline) => inline.extract_response(),
152        }
153    }
154}
155
156/// A high level SimpleX-Chat client which provides typed API methods with automatic command
157/// serialization and response deserialization.
158#[derive(Clone)]
159pub struct Client {
160    inner: RawClient,
161}
162
163impl From<RawClient> for Client {
164    fn from(inner: RawClient) -> Self {
165        Self { inner }
166    }
167}
168
169impl Client {
170    pub fn version(&self) -> impl Future<Output = Result<SimplexVersion, VersionError>> {
171        self.inner.version()
172    }
173
174    /// Initiates a graceful shutdown for the underlying web socket connection. See
175    /// [`simploxide_ws_core::RawClient::disconnect`] for details.
176    pub fn disconnect(self) -> impl Future<Output = ()> {
177        self.inner.disconnect()
178    }
179}
180
181impl ClientApi for Client {
182    type ResponseShape<'de, T>
183        = WebSocketResponseShape<T>
184    where
185        T: 'de + Deserialize<'de>;
186
187    type Error = ClientError;
188
189    async fn send_raw(&self, command: String) -> Result<String, Self::Error> {
190        self.inner
191            .send(command)
192            .await
193            .map_err(ClientError::WebSocketFailure)
194    }
195}
196
197/// See [`crate::client_api::AllowUndocumentedResponses`] if you don't want to trigger an error when
198/// you receive undocumeted responses(you usually receive undocumented responses when your
199/// simplex-chat server version is not compatible with the simploxide-client version. Keep an eye
200/// on the
201/// [Version compatability table](https://github.com/a1akris/simploxide?tab=readme-ov-file#version-compatability-table)
202/// )
203#[derive(Debug)]
204pub enum ClientError {
205    /// Critical error signalling that the web socket connection is dropped for some reason. You
206    /// will have to reconnect to the SimpleX server to recover from this one.
207    WebSocketFailure(CoreError),
208    /// SimpleX command error or unexpected(undocumented) response.
209    BadResponse(BadResponseError),
210}
211
212impl std::error::Error for ClientError {
213    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
214        match self {
215            Self::WebSocketFailure(error) => Some(error),
216            Self::BadResponse(error) => Some(error),
217        }
218    }
219}
220
221impl std::fmt::Display for ClientError {
222    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
223        match self {
224            ClientError::WebSocketFailure(err) => writeln!(f, "Web socket failure: {err}"),
225            ClientError::BadResponse(err) => err.fmt(f),
226        }
227    }
228}
229
230impl From<BadResponseError> for ClientError {
231    fn from(err: BadResponseError) -> Self {
232        Self::BadResponse(err)
233    }
234}
235
236impl ClientApiError for ClientError {
237    fn bad_response(&self) -> Option<&BadResponseError> {
238        if let Self::BadResponse(resp) = self {
239            Some(resp)
240        } else {
241            None
242        }
243    }
244
245    fn bad_response_mut(&mut self) -> Option<&mut BadResponseError> {
246        if let Self::BadResponse(resp) = self {
247            Some(resp)
248        } else {
249            None
250        }
251    }
252}
253
254#[derive(Debug)]
255pub enum ConnectError {
256    /// Failure to establish the connection to the server
257    Server(CoreError),
258    /// Failure to get the server version
259    VersionError(VersionError),
260    /// Unsupported server version
261    VersionMismatch(SimplexVersion),
262}
263
264impl ConnectError {
265    pub fn is_server(&self) -> bool {
266        matches!(self, Self::Server(_))
267    }
268
269    pub fn is_version_mismatch(&self) -> bool {
270        matches!(self, Self::VersionMismatch(_))
271    }
272}
273
274impl From<WsError> for ConnectError {
275    fn from(value: WsError) -> Self {
276        Self::Server(Arc::new(value))
277    }
278}
279
280impl std::fmt::Display for ConnectError {
281    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
282        match self {
283            Self::Server(error) => write!(f, "Cannot connect to the server: {error}"),
284            Self::VersionError(error) => write!(f, "Cannot get the server version: {error}"),
285            Self::VersionMismatch(v) => write!(
286                f,
287                "Version {v} is unsupported by the current client. Supported versions are {MIN_SUPPORTED_VERSION}..{MAX_SUPPORTED_VERSION}"
288            ),
289        }
290    }
291}
292
293impl std::error::Error for ConnectError {
294    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
295        match self {
296            Self::Server(error) => Some(error),
297            Self::VersionError(error) => Some(error),
298            Self::VersionMismatch(_) => None,
299        }
300    }
301}
302
303pub struct BotBuilder {
304    name: String,
305    port: u16,
306    retry_delay: std::time::Duration,
307    retries: usize,
308    auto_accept: Option<String>,
309    profile: Option<Profile>,
310    preferences: Option<Preferences>,
311    avatar: Option<ImagePreview>,
312    #[cfg(feature = "cli")]
313    db_prefix: String,
314    #[cfg(feature = "cli")]
315    db_key: Option<String>,
316    #[cfg(feature = "cli")]
317    extra_args: Vec<std::ffi::OsString>,
318}
319
320impl BotBuilder {
321    pub fn new(name: impl Into<String>, port: u16) -> Self {
322        Self {
323            name: name.into(),
324            port,
325            db_prefix: "bot".into(),
326            db_key: None,
327            retry_delay: std::time::Duration::from_secs(1),
328            retries: 5,
329            auto_accept: None,
330            profile: None,
331            preferences: None,
332            avatar: None,
333            #[cfg(feature = "cli")]
334            extra_args: Vec::new(),
335        }
336    }
337
338    #[cfg(feature = "cli")]
339    /// Path prefix for the SimpleX database
340    ///
341    /// "{dir}/{prefix}" creates a {dir} with `{prefix}_agent.db` and `{prefix}_chat.db`;
342    /// "{prefix}" creates `{prefix}_agent.db` and `{prefix}_chat.db` at the current dir
343    pub fn db_prefix(mut self, prefix: impl Into<String>) -> Self {
344        self.db_prefix = prefix.into();
345        self
346    }
347
348    #[cfg(feature = "cli")]
349    /// Database encryption key.
350    pub fn db_key(mut self, key: impl Into<String>) -> Self {
351        self.db_key = Some(key.into());
352        self
353    }
354
355    /// Delay between connection retry attempt. Default: 1s
356    pub fn connect_retry_delay(mut self, delay: std::time::Duration) -> Self {
357        self.retry_delay = delay;
358        self
359    }
360
361    /// Number of connection retry attempts. Default: 5
362    pub fn retries(mut self, n: usize) -> Self {
363        self.retries = n;
364        self
365    }
366
367    /// Create public address and auto accept users
368    pub fn auto_accept(mut self) -> Self {
369        self.auto_accept = Some(String::default());
370        self
371    }
372
373    /// Set a welcome message. This automatically creates a public address with enabled auto_accept
374    pub fn auto_accept_with(mut self, welcome_message: impl Into<String>) -> Self {
375        self.auto_accept = Some(welcome_message.into());
376        self
377    }
378
379    /// Set the bot avatar during initialisation
380    pub fn with_avatar(mut self, avatar: ImagePreview) -> Self {
381        self.avatar = Some(avatar);
382        self
383    }
384
385    /// Update/create the whole bot profile on launch
386    pub fn with_profile(mut self, profile: Profile) -> Self {
387        self.profile = Some(profile);
388        self
389    }
390
391    /// Apply these preferences to the bot's profile during initialisation.
392    pub fn with_preferences(mut self, prefs: Preferences) -> Self {
393        self.preferences = Some(prefs);
394        self
395    }
396
397    /// Pass extra arguments to the `simplex-chat` process.
398    #[cfg(feature = "cli")]
399    pub fn cli_args<I, S>(mut self, args: I) -> Self
400    where
401        I: IntoIterator<Item = S>,
402        S: Into<std::ffi::OsString>,
403    {
404        self.extra_args.extend(args.into_iter().map(|s| s.into()));
405        self
406    }
407
408    /// Connect to an already-running `simplex-chat` instance.
409    pub async fn connect(self) -> Result<(Bot, EventStream), BotInitError> {
410        let url = format!("ws://127.0.0.1:{}", self.port);
411
412        let (client, events) = retry_connect(url, self.retry_delay, self.retries)
413            .await
414            .map_err(BotInitError::Connect)?;
415
416        #[cfg(feature = "xftp")]
417        let (client, events) = {
418            let mut events = events;
419            let client = events.hook_xftp(client);
420            (client, events)
421        };
422
423        let settings = BotSettings {
424            display_name: self.name,
425            auto_accept: self.auto_accept,
426            profile_settings: match (self.profile, self.preferences) {
427                (Some(mut profile), Some(preferences)) => {
428                    profile.preferences = Some(preferences);
429                    Some(BotProfileSettings::FullProfile(profile))
430                }
431                (Some(profile), None) => Some(BotProfileSettings::FullProfile(profile)),
432                (None, Some(preferences)) => Some(BotProfileSettings::Preferences(preferences)),
433                (None, None) => None,
434            },
435            avatar: self.avatar,
436        };
437
438        let bot = Bot::init(client, settings).await?;
439        Ok((bot, events))
440    }
441
442    /// Spawn `simplex-chat`, then connect and initialise.
443    ///
444    /// Returns `(bot, events, cli)`. The caller is responsible for calling
445    /// [`cli::SimplexCli::kill`] after the bot finishes.
446    #[cfg(feature = "cli")]
447    pub async fn launch(mut self) -> Result<(Bot, EventStream, cli::SimplexCli), BotInitError> {
448        let mut builder = cli::SimplexCli::builder(&self.name, self.port)
449            .db_prefix(std::mem::take(&mut self.db_prefix));
450
451        if let Some(ref mut key) = self.db_key {
452            builder = builder.db_key(std::mem::take(key));
453        }
454
455        let cli = builder
456            .args(std::mem::take(&mut self.extra_args))
457            .spawn()
458            .await
459            .map_err(BotInitError::CliSpawn)?;
460
461        let (bot, events) = self.connect().await?;
462        Ok((bot, events, cli))
463    }
464}
465
466/// Error returned by [`BotBuilder::connect`] and [`BotBuilder::launch`].
467#[derive(Debug)]
468pub enum BotInitError {
469    Connect(ConnectError),
470    Api(ClientError),
471    #[cfg(feature = "cli")]
472    CliSpawn(std::io::Error),
473}
474
475impl std::fmt::Display for BotInitError {
476    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
477        match self {
478            #[cfg(feature = "cli")]
479            Self::CliSpawn(e) => write!(f, "failed to spawn simplex-chat: {e}"),
480            Self::Connect(e) => write!(f, "websocket connection failed: {e}"),
481            Self::Api(e) => write!(f, "SimpleX API error during init: {e}"),
482        }
483    }
484}
485
486impl std::error::Error for BotInitError {
487    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
488        match self {
489            #[cfg(feature = "cli")]
490            Self::CliSpawn(e) => Some(e),
491            Self::Connect(e) => Some(e),
492            Self::Api(e) => Some(e),
493        }
494    }
495}
496
497impl From<ClientError> for BotInitError {
498    fn from(e: ClientError) -> Self {
499        Self::Api(e)
500    }
501}