Skip to main content

simploxide_client/
xftp.rs

1//! XFTP file download manager.
2//!
3//! [`XftpClient`] wraps any [`ClientApi`] client and observes the file rcv events emitted by the
4//! SimpleX-Chat. [`DownloadFileBuilder`] (obtained via [`XftpExt::download_file`]) initiates the transfer
5//! and awaits those events, returning the outcome directly to the caller.
6//!
7//! # When to use
8//!
9//! - **Out-of-handler downloads.** When the decision to download a file is made outside an event
10//!   handler (for example, after a user command or a timer), `download_file` provides the result
11//!   without requiring custom event routing.
12//!
13//! - **Keeping download logic in one handler.** Sometimes it may be useful to keep all logic in a
14//!   single handler to simplify state management.
15//!
16
17use std::sync::Arc;
18
19use dashmap::DashMap;
20use serde::Deserialize;
21use simploxide_api_types::{
22    client_api::ClientApi,
23    commands::ReceiveFile,
24    events::{Event, EventKind, RcvFileComplete, RcvFileError, RcvFileSndCancelled},
25    responses::{CancelFileResponse, RcvFileAcceptedSndCancelledResponse, ReceiveFileResponse},
26};
27
28use crate::{Hook, id::FileId};
29
30type XftpDownloadResponder = tokio::sync::oneshot::Sender<XftpManagerDownloadResponse>;
31
32/// Adds [`download_file`](Self::download_file) to any [`ClientApi`].
33/// Automatically implemented for [`XftpClient`].
34pub trait XftpExt: ClientApi {
35    /// Begin downloading `file_id` and return a builder to configure and await the result.
36    ///
37    /// # Deadlock warning
38    ///
39    /// `download_file` awaits a completion event that only arrives when the event loop processes
40    /// **Awaiting a download inside a sequential handler blocks the event loop**: that event
41    /// never arrives, causing a deadlock. Only use `download_file` from a **concurrent** handler
42    /// (registered with [`Dispatcher::on`](crate::dispatcher::Dispatcher::on)) or outside the
43    /// dispatcher entirely.
44    fn download_file<FID: Into<FileId>>(&self, file_id: FID) -> DownloadFileBuilder<'_, Self>;
45}
46
47/// A [`ClientApi`] wrapper that intercepts file-result events and routes them to the
48/// corresponding [`DownloadFileBuilder`] futures. Should be constructed by
49/// [`EventStream::hook_xftp`](crate::EventStream::hook_xftp) to work correctly.
50#[derive(Clone)]
51pub struct XftpClient<C> {
52    client: C,
53    xftp: Arc<XftpManager>,
54}
55
56#[cfg(feature = "websocket")]
57impl XftpClient<crate::ws::Client> {
58    pub fn disconnect(self) -> impl Future<Output = ()> {
59        self.client.disconnect()
60    }
61}
62
63#[cfg(feature = "ffi")]
64impl XftpClient<crate::ffi::Client> {
65    pub fn disconnect(self) -> impl Future<Output = ()> {
66        self.client.disconnect()
67    }
68}
69
70impl<C: ClientApi> ClientApi for XftpClient<C> {
71    type ResponseShape<'de, T: 'de + Deserialize<'de>> = C::ResponseShape<'de, T>;
72    type Error = C::Error;
73
74    fn send_raw(
75        &self,
76        command: String,
77    ) -> impl Future<Output = Result<String, Self::Error>> + Send {
78        self.client.send_raw(command)
79    }
80
81    fn cancel_file(
82        &self,
83        file_id: i64,
84    ) -> impl Future<Output = Result<CancelFileResponse, Self::Error>> + Send {
85        self.xftp.downloads.remove(&file_id);
86        self.client.cancel_file(file_id)
87    }
88}
89
90impl<C: ClientApi> From<C> for XftpClient<C> {
91    fn from(client: C) -> Self {
92        Self {
93            client,
94            xftp: Arc::new(XftpManager::default()),
95        }
96    }
97}
98
99impl<C: ClientApi> XftpExt for XftpClient<C> {
100    fn download_file<FID: Into<FileId>>(&self, file_id: FID) -> DownloadFileBuilder<'_, Self> {
101        DownloadFileBuilder {
102            client: self,
103            cmd: ReceiveFile::new(file_id.into().0),
104        }
105    }
106}
107
108impl<C: ClientApi> Hook for XftpClient<C> {
109    fn should_intercept(&self, kind: EventKind) -> bool {
110        const EVENT_KINDS: [EventKind; 3] = [
111            EventKind::RcvFileSndCancelled,
112            EventKind::RcvFileComplete,
113            EventKind::RcvFileError,
114        ];
115
116        EVENT_KINDS.contains(&kind)
117    }
118
119    fn intercept_event(&mut self, event: Event) {
120        match event {
121            Event::RcvFileComplete(ev) => {
122                if let Some((_, responder)) = self
123                    .xftp
124                    .downloads
125                    .remove(&ev.chat_item.chat_item.file.as_ref().unwrap().file_id)
126                {
127                    let _ = responder.send(XftpManagerDownloadResponse::Complete(ev));
128                }
129            }
130            Event::RcvFileSndCancelled(ev) => {
131                if let Some((_, responder)) =
132                    self.xftp.downloads.remove(&ev.rcv_file_transfer.file_id)
133                {
134                    let _ = responder.send(XftpManagerDownloadResponse::Cancelled(ev));
135                }
136            }
137            Event::RcvFileError(ev) => {
138                if let Some((_, responder)) =
139                    self.xftp.downloads.remove(&ev.rcv_file_transfer.file_id)
140                {
141                    let _ = responder.send(XftpManagerDownloadResponse::Error(ev));
142                }
143            }
144            _ => (),
145        }
146    }
147}
148
149pub struct DownloadFileBuilder<'a, C: 'a + ?Sized> {
150    client: &'a C,
151    cmd: ReceiveFile,
152}
153
154impl<'a, C: 'a + ?Sized> DownloadFileBuilder<'a, C> {
155    /// Route the download through user-approved relays rather than the default ones.
156    pub fn via_user_approved_relays(mut self) -> Self {
157        self.cmd.user_approved_relays = true;
158        self
159    }
160
161    /// Store the downloaded file in encrypted form.
162    pub fn store_encrypted(mut self) -> Self {
163        self.cmd.store_encrypted = Some(true);
164        self
165    }
166
167    /// Request inline delivery (small files only).
168    pub fn inline(mut self) -> Self {
169        self.cmd.file_inline = Some(true);
170        self
171    }
172
173    /// Override the path where the downloaded file will be saved.
174    pub fn file_path<P: AsRef<std::path::Path>>(mut self, path: P) -> Self {
175        self.cmd.file_path = Some(path.as_ref().display().to_string());
176        self
177    }
178}
179
180impl<'a, C: 'a + ClientApi> IntoFuture for DownloadFileBuilder<'a, XftpClient<C>>
181where
182    <XftpClient<C> as ClientApi>::Error: 'static + Send,
183{
184    type Output = Result<Arc<RcvFileComplete>, DownloadError<<XftpClient<C> as ClientApi>::Error>>;
185    type IntoFuture = std::pin::Pin<Box<dyn 'a + Send + Future<Output = Self::Output>>>;
186
187    fn into_future(self) -> Self::IntoFuture {
188        Box::pin(async move {
189            let file_id = self.cmd.file_id;
190
191            let (responder, response) = tokio::sync::oneshot::channel();
192            self.client.xftp.downloads.insert(file_id, responder);
193
194            match self.client.receive_file(self.cmd).await {
195                Ok(ReceiveFileResponse::RcvFileAccepted(_)) => {
196                    match response.await.expect("XFTP responses are always delivered") {
197                        XftpManagerDownloadResponse::Complete(success) => Ok(success),
198                        XftpManagerDownloadResponse::Cancelled(err) => {
199                            Err(DownloadError::SendCancelled(err))
200                        }
201                        XftpManagerDownloadResponse::Error(err) => Err(DownloadError::Receive(err)),
202                    }
203                }
204                Ok(ReceiveFileResponse::RcvFileAcceptedSndCancelled(err)) => {
205                    self.client.xftp.downloads.remove(&file_id);
206                    Err(DownloadError::AcceptCancelled(err))
207                }
208                Err(e) => {
209                    self.client.xftp.downloads.remove(&file_id);
210                    Err(DownloadError::Api(e))
211                }
212            }
213        })
214    }
215}
216
217/// Error returned when a [`DownloadFileBuilder`] future resolves unsuccessfully.
218pub enum DownloadError<E> {
219    /// The sender cancelled the transfer after the download was accepted.
220    SendCancelled(Arc<RcvFileSndCancelled>),
221    /// The file was no longer available when the download request arrived.
222    AcceptCancelled(Arc<RcvFileAcceptedSndCancelledResponse>),
223    /// The SimpleX agent reported an error while receiving the file.
224    Receive(Arc<RcvFileError>),
225    /// The API call to initiate the download failed.
226    Api(E),
227}
228
229impl<E> std::fmt::Debug for DownloadError<E>
230where
231    E: std::fmt::Debug,
232{
233    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
234        match self {
235            Self::SendCancelled(arg) => f
236                .debug_tuple("SendCancelled")
237                .field(&arg.rcv_file_transfer.file_id)
238                .finish(),
239            Self::AcceptCancelled(arg) => f
240                .debug_tuple("AcceptCancelled")
241                .field(&arg.rcv_file_transfer.file_id)
242                .finish(),
243            Self::Receive(arg) => f.debug_tuple("Receive").field(&arg.agent_error).finish(),
244            Self::Api(e) => f.debug_tuple("Api").field(e).finish(),
245        }
246    }
247}
248
249impl<E> std::fmt::Display for DownloadError<E>
250where
251    E: std::fmt::Display,
252{
253    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
254        match self {
255            Self::SendCancelled(err) => write!(
256                f,
257                "File(ID={}) was cancelled by the user",
258                err.rcv_file_transfer.file_id
259            ),
260            Self::AcceptCancelled(err) => write!(
261                f,
262                "File(ID={}) is no longer available",
263                err.rcv_file_transfer.file_id
264            ),
265            Self::Receive(err) => write!(
266                f,
267                "File(ID={}) receive error: {:?}",
268                err.rcv_file_transfer.file_id, err.agent_error
269            ),
270            Self::Api(err) => write!(f, "{err}"),
271        }
272    }
273}
274
275impl<E> std::error::Error for DownloadError<E>
276where
277    E: 'static + std::error::Error,
278{
279    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
280        match self {
281            Self::SendCancelled(_) => None,
282            Self::AcceptCancelled(_) => None,
283            Self::Receive(_) => None,
284            Self::Api(error) => Some(error),
285        }
286    }
287}
288
289#[derive(Default)]
290struct XftpManager {
291    downloads: DashMap<i64, XftpDownloadResponder>,
292}
293
294enum XftpManagerDownloadResponse {
295    Complete(Arc<RcvFileComplete>),
296    Error(Arc<RcvFileError>),
297    Cancelled(Arc<RcvFileSndCancelled>),
298}