Skip to main content

magic_wormhole/
transfer.rs

1//! Client-to-Client protocol to organize file transfers
2//!
3//! This gives you the actual capability to transfer files, that feature that Magic Wormhole got known and loved for.
4//!
5//! It is bound to an [`AppID`]. Only applications using that APPID (and thus this protocol) can interoperate with
6//! the original Python implementation (and other compliant implementations).
7//!
8//! At its core, "peer messages" are exchanged over an established wormhole connection with the other side.
9//! They are used to set up a [transit] portal and to exchange a file offer/accept. Then, the file is transmitted over the transit relay.
10
11use futures::{AsyncRead, AsyncWrite};
12use serde_derive::{Deserialize, Serialize};
13#[cfg(test)]
14use serde_json::json;
15use std::sync::Arc;
16
17use super::{AppID, Wormhole, core::WormholeError, transit};
18use futures::Future;
19use std::{borrow::Cow, collections::BTreeMap};
20
21#[cfg(not(target_family = "wasm"))]
22use std::path::{Path, PathBuf};
23
24use transit::{
25    Abilities as TransitAbilities, Transit, TransitConnectError, TransitConnector, TransitError,
26};
27
28mod cancel;
29#[doc(hidden)]
30pub mod offer;
31mod v1;
32#[cfg(feature = "experimental-transfer-v2")]
33#[allow(missing_docs)]
34mod v2;
35
36#[doc(hidden)]
37pub use v1::ReceiveRequest as ReceiveRequestV1;
38
39#[cfg(not(feature = "experimental-transfer-v2"))]
40pub use v1::ReceiveRequest;
41
42#[cfg(feature = "experimental-transfer-v2")]
43pub use v2::ReceiveRequest as ReceiveRequestV2;
44
45const APPID_RAW: &str = "lothar.com/wormhole/text-or-file-xfer";
46
47/// The App ID associated with this protocol.
48pub const APPID: AppID = AppID(Cow::Borrowed(APPID_RAW));
49
50/// An [`crate::AppConfig`] with default parameters for the file transfer protocol.
51///
52/// You **must not** change `id` and `rendezvous_url` to be interoperable.
53/// The `app_version` can be adjusted if you want to disable some features.
54pub const APP_CONFIG: crate::AppConfig<AppVersion> = crate::AppConfig::<AppVersion> {
55    id: AppID(Cow::Borrowed(APPID_RAW)),
56    rendezvous_url: Cow::Borrowed(crate::rendezvous::DEFAULT_RENDEZVOUS_SERVER),
57    app_version: AppVersion::new(),
58};
59
60// TODO be more extensible on the JSON enum types (i.e. recognize unknown variants)
61
62#[derive(Debug, thiserror::Error)]
63#[non_exhaustive]
64/// An error occurred during file transfer
65pub enum TransferError {
66    /// Transfer was not acknowledged by peer
67    #[error("Transfer was not acknowledged by peer")]
68    AckError,
69
70    /// Receive checksum error
71    #[error("Receive checksum error")]
72    Checksum,
73
74    /// The file contained a different amount of bytes than advertized
75    #[error(
76        "The file contained a different amount of bytes than advertized! Sent {} bytes, but should have been {}",
77        sent_size,
78        file_size
79    )]
80    FileSize {
81        /// The amount of bytes that were sent
82        sent_size: u64,
83        /// The expected amount of bytes
84        file_size: u64,
85    },
86
87    /// The file(s) to send got modified during the transfer, and thus corrupted
88    #[error("The file(s) to send got modified during the transfer, and thus corrupted")]
89    FilesystemSkew,
90
91    // TODO be more specific
92    /// Unsupported offer type
93    #[error("Unsupported offer type")]
94    UnsupportedOffer,
95
96    /// Something went wrong on the other side
97    #[error("Something went wrong on the other side: {}", _0)]
98    PeerError(String),
99
100    /// Corrupt JSON message received. Some deserialization went wrong, we probably got some garbage
101    #[error("Corrupt JSON message received")]
102    ProtocolJson(
103        #[from]
104        #[source]
105        serde_json::Error,
106    ),
107
108    /// Corrupt Msgpack message received. Some deserialization went wrong, we probably got some garbage
109    #[error("Corrupt Msgpack message received")]
110    ProtocolMsgpack(
111        #[from]
112        #[source]
113        rmp_serde::decode::Error,
114    ),
115
116    /// A generic string message for "something went wrong", i.e.
117    /// the server sent some bullshit message order
118    #[error("Protocol error: {}", _0)]
119    Protocol(Box<str>),
120
121    /// Unexpected message (protocol error)
122    #[error(
123        "Unexpected message (protocol error): Expected '{}', but got: '{}'",
124        _0,
125        _1
126    )]
127    ProtocolUnexpectedMessage(Box<str>, Box<str>),
128
129    /// Wormhole connection error
130    #[error("Wormhole connection error")]
131    Wormhole(
132        #[from]
133        #[source]
134        WormholeError,
135    ),
136
137    /// Error while establishing transit connection
138    #[error("Error while establishing transit connection")]
139    TransitConnect(
140        #[from]
141        #[source]
142        TransitConnectError,
143    ),
144
145    /// Transit error
146    #[error("Transit error")]
147    Transit(
148        #[from]
149        #[source]
150        TransitError,
151    ),
152
153    /// I/O error
154    #[error("I/O error")]
155    IO(
156        #[from]
157        #[source]
158        std::io::Error,
159    ),
160}
161
162impl TransferError {
163    pub(self) fn unexpected_message(
164        expected: impl Into<Box<str>>,
165        got: impl std::fmt::Display,
166    ) -> Self {
167        Self::ProtocolUnexpectedMessage(expected.into(), got.to_string().into())
168    }
169}
170
171/**
172 * The application specific version information for this protocol.
173 */
174#[derive(Clone, Serialize, Deserialize)]
175#[serde(rename_all = "kebab-case")]
176pub struct AppVersion {
177    #[serde(default)]
178    abilities: Cow<'static, [Cow<'static, str>]>,
179    #[serde(default)]
180    #[cfg(feature = "experimental-transfer-v2")]
181    transfer_v2: Option<AppVersionTransferV2Hint>,
182}
183
184// TODO check invariants during deserialization
185impl AppVersion {
186    const fn new() -> Self {
187        Self {
188            // Dont advertize v2 for now
189            abilities: Cow::Borrowed(&[
190                Cow::Borrowed("transfer-v1"), /* Cow::Borrowed("experimental-transfer-v2") */
191            ]),
192            #[cfg(feature = "experimental-transfer-v2")]
193            transfer_v2: Some(AppVersionTransferV2Hint::new()),
194        }
195    }
196
197    #[allow(dead_code)]
198    fn supports_v2(&self) -> bool {
199        self.abilities.contains(&"transfer-v2".into())
200    }
201}
202
203impl Default for AppVersion {
204    fn default() -> Self {
205        Self::new()
206    }
207}
208
209/// A hint used in transfer v2 to determine the app version
210#[cfg(feature = "experimental-transfer-v2")]
211#[derive(Clone, Debug, Serialize, Deserialize)]
212#[serde(rename_all = "kebab-case")]
213pub struct AppVersionTransferV2Hint {
214    supported_formats: Cow<'static, [Cow<'static, str>]>,
215    transit_abilities: transit::Abilities,
216}
217
218#[cfg(feature = "experimental-transfer-v2")]
219impl AppVersionTransferV2Hint {
220    const fn new() -> Self {
221        Self {
222            supported_formats: Cow::Borrowed(&[Cow::Borrowed("plain"), Cow::Borrowed("tar")]),
223            transit_abilities: transit::Abilities::ALL,
224        }
225    }
226}
227
228#[cfg(feature = "experimental-transfer-v2")]
229impl Default for AppVersionTransferV2Hint {
230    fn default() -> Self {
231        Self::new()
232    }
233}
234
235/**
236 * The type of message exchanged over the wormhole for this protocol
237 */
238#[derive(Deserialize, Serialize, derive_more::Display, Debug, Clone)]
239#[serde(rename_all = "kebab-case")]
240#[non_exhaustive]
241pub(crate) enum PeerMessage {
242    /* V1 */
243    /// A transit message
244    #[display("transit")]
245    Transit(v1::TransitV1),
246
247    /// An offer message
248    #[display("offer")]
249    Offer(v1::OfferMessage),
250
251    /// An answer message
252    #[display("answer")]
253    Answer(v1::AnswerMessage),
254    /* V2 */
255    /// A transit v2 message
256    #[cfg(feature = "experimental-transfer-v2")]
257    #[display("transit-v2")]
258    TransitV2(v2::TransitV2),
259
260    /// Tell the other side you got an error
261    #[display("error")]
262    Error(String),
263
264    /// An unknown message
265    #[display("unknown")]
266    #[serde(other)]
267    Unknown,
268}
269
270impl PeerMessage {
271    #[allow(dead_code)]
272    fn offer_message_v1(msg: impl Into<String>) -> Self {
273        PeerMessage::Offer(v1::OfferMessage::Message(msg.into()))
274    }
275
276    fn offer_file_v1(name: impl Into<String>, size: u64) -> Self {
277        PeerMessage::Offer(v1::OfferMessage::File {
278            filename: name.into(),
279            filesize: size,
280        })
281    }
282
283    #[allow(dead_code)]
284    fn offer_directory_v1(
285        name: impl Into<String>,
286        mode: impl Into<String>,
287        compressed_size: u64,
288        numbytes: u64,
289        numfiles: u64,
290    ) -> Self {
291        PeerMessage::Offer(v1::OfferMessage::Directory {
292            dirname: name.into(),
293            mode: mode.into(),
294            zipsize: compressed_size,
295            numbytes,
296            numfiles,
297        })
298    }
299
300    #[allow(dead_code)]
301    fn message_ack_v1(msg: impl Into<String>) -> Self {
302        PeerMessage::Answer(v1::AnswerMessage::MessageAck(msg.into()))
303    }
304
305    fn file_ack_v1(msg: impl Into<String>) -> Self {
306        PeerMessage::Answer(v1::AnswerMessage::FileAck(msg.into()))
307    }
308
309    fn error_message(msg: impl Into<String>) -> Self {
310        PeerMessage::Error(msg.into())
311    }
312
313    fn transit_v1(abilities: TransitAbilities, hints: transit::Hints) -> Self {
314        PeerMessage::Transit(v1::TransitV1 {
315            abilities_v1: abilities,
316            hints_v1: hints,
317        })
318    }
319
320    #[cfg(feature = "experimental-transfer-v2")]
321    fn transit_v2(hints_v2: transit::Hints) -> Self {
322        PeerMessage::TransitV2(v2::TransitV2 { hints_v2 })
323    }
324
325    fn check_err(&self) -> Result<Self, TransferError> {
326        match self {
327            Self::Error(err) => Err(TransferError::PeerError(err.clone())),
328            other => Ok(other.clone()),
329        }
330    }
331
332    #[expect(dead_code)]
333    fn ser_json(&self) -> Vec<u8> {
334        serde_json::to_vec(self).unwrap()
335    }
336}
337
338/// Send a previously constructed offer.
339///
340/// Part of the experimental and unstable transfer-v2 API.
341/// Expect some amount of API breakage in the future to adapt to protocol changes and API ergonomics.
342#[cfg_attr(not(feature = "experimental-transfer-v2"), doc(hidden))]
343pub async fn send(
344    wormhole: Wormhole,
345    relay_hints: Vec<transit::RelayHint>,
346    transit_abilities: transit::Abilities,
347    offer: offer::OfferSend,
348    transit_handler: impl FnOnce(transit::TransitInfo),
349    progress_handler: impl FnMut(u64, u64) + 'static,
350    cancel: impl Future<Output = ()>,
351) -> Result<(), TransferError> {
352    let peer_version: AppVersion = serde_json::from_value(wormhole.peer_version().clone())?;
353
354    #[cfg(feature = "experimental-transfer-v2")]
355    {
356        if peer_version.supports_v2() {
357            return v2::send(
358                wormhole,
359                relay_hints,
360                transit_abilities,
361                offer,
362                progress_handler,
363                peer_version,
364                cancel,
365            )
366            .await;
367        }
368    }
369
370    v1::send(
371        wormhole,
372        relay_hints,
373        transit_abilities,
374        offer,
375        progress_handler,
376        transit_handler,
377        peer_version,
378        cancel,
379    )
380    .await
381}
382
383/**
384 * Wait for a file offer from the other side
385 *
386 * This method waits for an offer message and builds up a [`ReceiveRequest`].
387 * It will also start building a TCP connection to the other side using the transit protocol.
388 *
389 * Returns `None` if the task got cancelled.
390 *
391 * Part of the experimental and unstable transfer-v2 API.
392 * Expect some amount of API breakage in the future to adapt to protocol changes and API ergonomics.
393 */
394#[cfg(feature = "experimental-transfer-v2")]
395pub async fn request(
396    wormhole: Wormhole,
397    relay_hints: Vec<transit::RelayHint>,
398    transit_abilities: transit::Abilities,
399    cancel: impl Future<Output = ()>,
400) -> Result<Option<ReceiveRequest>, TransferError> {
401    #[cfg(feature = "experimental-transfer-v2")]
402    {
403        let peer_version: AppVersion = serde_json::from_value(wormhole.peer_version().clone())?;
404        if peer_version.supports_v2() {
405            v2::request(
406                wormhole,
407                relay_hints,
408                peer_version,
409                transit_abilities,
410                cancel,
411            )
412            .await
413            .map(|req| req.map(ReceiveRequest::V2))
414        } else {
415            v1::request(wormhole, relay_hints, transit_abilities, cancel)
416                .await
417                .map(|req| req.map(ReceiveRequest::V1))
418        }
419    }
420}
421
422/// Wait for a file offer from the other side
423///
424/// This method waits for an offer message and builds up a ReceiveRequest. It will also start building a TCP connection to the other side using the transit protocol.
425///
426/// Returns None if the task got cancelled.
427#[cfg_attr(
428    feature = "experimental-transfer-v2",
429    deprecated(
430        since = "0.7.0",
431        note = "transfer::request_file does not support file transfer protocol version 2.
432        To continue only supporting version 1, use transfer::v1::request. To support both protocol versions, use transfer::request"
433    )
434)]
435pub async fn request_file(
436    wormhole: Wormhole,
437    relay_hints: Vec<transit::RelayHint>,
438    transit_abilities: transit::Abilities,
439    cancel: impl Future<Output = ()>,
440) -> Result<Option<v1::ReceiveRequest>, TransferError> {
441    v1::request(wormhole, relay_hints, transit_abilities, cancel).await
442}
443
444/// Send a file to the other side
445///
446/// You must ensure that the Reader contains exactly as many bytes as advertized in file_size.
447#[cfg_attr(
448    feature = "experimental-transfer-v2",
449    deprecated(
450        since = "0.7.0",
451        note = "transfer::send_file does not support file transfer protocol version 2, use transfer::send"
452    )
453)]
454pub async fn send_file<F, N, G, H>(
455    wormhole: Wormhole,
456    relay_hints: Vec<transit::RelayHint>,
457    file: &mut F,
458    file_name: N,
459    file_size: u64,
460    transit_abilities: transit::Abilities,
461    transit_handler: G,
462    progress_handler: H,
463    cancel: impl Future<Output = ()>,
464) -> Result<(), TransferError>
465where
466    F: AsyncRead + Unpin + Send,
467    N: Into<String>,
468    G: FnOnce(transit::TransitInfo),
469    H: FnMut(u64, u64) + 'static,
470{
471    v1::send_file(
472        wormhole,
473        relay_hints,
474        file,
475        file_name,
476        file_size,
477        transit_abilities,
478        transit_handler,
479        progress_handler,
480        cancel,
481    )
482    .await
483}
484
485/// Send a file or folder
486#[cfg_attr(
487    feature = "experimental-transfer-v2",
488    deprecated(
489        since = "0.7.0",
490        note = "transfer::send_file_or_folder does not support file transfer protocol version 2, use transfer::send"
491    )
492)]
493#[cfg(not(target_family = "wasm"))]
494pub async fn send_file_or_folder<N, M, G, H>(
495    wormhole: Wormhole,
496    relay_hints: Vec<transit::RelayHint>,
497    file_path: N,
498    file_name: M,
499    transit_abilities: transit::Abilities,
500    transit_handler: G,
501    progress_handler: H,
502    cancel: impl Future<Output = ()>,
503) -> Result<(), TransferError>
504where
505    N: AsRef<Path>,
506    M: Into<String>,
507    G: FnOnce(transit::TransitInfo),
508    H: FnMut(u64, u64) + 'static,
509{
510    let file_path = file_path.as_ref();
511    let file_name = file_name.into();
512
513    let mut file = async_fs::File::open(file_path).await?;
514    let metadata = file.metadata().await?;
515    if metadata.is_dir() {
516        #[allow(deprecated)]
517        send_folder(
518            wormhole,
519            relay_hints,
520            file_path,
521            file_name,
522            transit_abilities,
523            transit_handler,
524            progress_handler,
525            cancel,
526        )
527        .await?;
528    } else {
529        let file_size = metadata.len();
530        #[allow(deprecated)]
531        send_file(
532            wormhole,
533            relay_hints,
534            &mut file,
535            file_name,
536            file_size,
537            transit_abilities,
538            transit_handler,
539            progress_handler,
540            cancel,
541        )
542        .await?;
543    }
544    Ok(())
545}
546
547/// Send a folder to the other side
548/// This isn’t a proper folder transfer as per the Wormhole protocol because it sends it in a way so
549/// that the receiver still has to manually unpack it. But it’s better than nothing
550#[cfg_attr(
551    feature = "experimental-transfer-v2",
552    deprecated(
553        since = "0.7.0",
554        note = "transfer::send_folder does not support file transfer protocol version 2, use transfer::send"
555    )
556)]
557#[cfg(not(target_family = "wasm"))]
558pub async fn send_folder<N, M, G, H>(
559    wormhole: Wormhole,
560    relay_hints: Vec<transit::RelayHint>,
561    folder_path: N,
562    folder_name: M,
563    transit_abilities: transit::Abilities,
564    transit_handler: G,
565    progress_handler: H,
566    cancel: impl Future<Output = ()>,
567) -> Result<(), TransferError>
568where
569    N: Into<PathBuf>,
570    M: Into<String>,
571    G: FnOnce(transit::TransitInfo),
572    H: FnMut(u64, u64) + 'static,
573{
574    let offer = offer::OfferSendEntry::new(folder_path.into()).await?;
575
576    v1::send_folder(
577        wormhole,
578        relay_hints,
579        folder_name.into(),
580        offer,
581        transit_abilities,
582        transit_handler,
583        progress_handler,
584        cancel,
585    )
586    .await
587}
588
589/**
590 * A pending files send offer from the other side
591 *
592 * You *should* consume this object, by matching on the protocol version and then calling either `accept` or `reject`.
593 */
594#[must_use]
595#[cfg(feature = "experimental-transfer-v2")]
596pub enum ReceiveRequest {
597    /// A protocol version 1 receive request
598    V1(ReceiveRequestV1),
599    /// A protocol version 2 receive request
600    V2(ReceiveRequestV2),
601}
602
603#[cfg(feature = "experimental-transfer-v2")]
604impl ReceiveRequest {
605    /// Accept this receive request
606    pub async fn accept<F, G, W>(
607        self,
608        transit_handler: G,
609        progress_handler: F,
610        mut answer: offer::OfferAccept,
611        cancel: impl Future<Output = ()>,
612    ) -> Result<(), TransferError>
613    where
614        F: FnMut(u64, u64) + 'static,
615        G: FnOnce(transit::TransitInfo),
616        W: AsyncWrite + Unpin,
617    {
618        match self {
619            ReceiveRequest::V1(request) => {
620                // Desynthesize the previously synthesized offer to make transfer v1 more similar to transfer v2
621                let (_name, entry) = answer.content.pop_first().expect(
622                    "must call accept(..) with an offer that contains at least one element",
623                );
624
625                let mut acceptor = match entry {
626                    offer::OfferEntry::RegularFile { content, .. } => {
627                        (content.content)(true).await?
628                    },
629                    _ => panic!(
630                        "when using transfer v1 you must call accept(..) with file offers only",
631                    ),
632                };
633
634                request
635                    .accept(transit_handler, progress_handler, &mut acceptor, cancel)
636                    .await
637            },
638            ReceiveRequest::V2(request) => {
639                request
640                    .accept(transit_handler, answer, progress_handler, cancel)
641                    .await
642            },
643        }
644    }
645
646    /**
647     * Reject the file offer
648     *
649     * This will send an error message to the other side so that it knows the transfer failed.
650     */
651    pub async fn reject(self) -> Result<(), TransferError> {
652        match self {
653            ReceiveRequest::V1(request) => request.reject().await,
654            ReceiveRequest::V2(request) => request.reject().await,
655        }
656    }
657
658    /// The file offer for this receive request
659    pub fn offer(&self) -> Arc<offer::Offer> {
660        match self {
661            ReceiveRequest::V1(req) => req.offer(),
662            ReceiveRequest::V2(req) => req.offer(),
663        }
664    }
665}
666
667#[cfg(test)]
668mod test {
669    use super::*;
670    use transit::{Abilities, DirectHint, RelayHint};
671
672    #[test]
673    #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)]
674    fn test_transit() {
675        let abilities = Abilities::ALL;
676        let hints = transit::Hints::new(
677            [DirectHint::new("192.168.1.8", 46295)],
678            [RelayHint::new(
679                None,
680                [DirectHint::new("magic-wormhole-transit.debian.net", 4001)],
681                [],
682            )],
683        );
684        assert_eq!(
685            serde_json::json!(crate::transfer::PeerMessage::transit_v1(abilities, hints)),
686            serde_json::json!({
687                "transit": {
688                    "abilities-v1": [{"type":"direct-tcp-v1"},{"type":"relay-v1"}],
689                    "hints-v1": [
690                        {"hostname":"192.168.1.8","port":46295,"type":"direct-tcp-v1"},
691                        {
692                            "type": "relay-v1",
693                            "hints": [
694                                {"type": "direct-tcp-v1", "hostname": "magic-wormhole-transit.debian.net", "port": 4001}
695                            ],
696                            "name": null
697                        }
698                    ],
699                }
700            })
701        );
702    }
703
704    #[test]
705    #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)]
706    fn test_message() {
707        let m1 = PeerMessage::offer_message_v1("hello from rust");
708        assert_eq!(
709            serde_json::json!(m1).to_string(),
710            "{\"offer\":{\"message\":\"hello from rust\"}}"
711        );
712    }
713
714    #[test]
715    #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)]
716    fn test_offer_file() {
717        let f1 = PeerMessage::offer_file_v1("somefile.txt", 34556);
718        assert_eq!(
719            serde_json::json!(f1).to_string(),
720            "{\"offer\":{\"file\":{\"filename\":\"somefile.txt\",\"filesize\":34556}}}"
721        );
722    }
723
724    #[test]
725    #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)]
726    fn test_offer_directory() {
727        let d1 = PeerMessage::offer_directory_v1("somedirectory", "zipped", 45, 1234, 10);
728        assert_eq!(
729            serde_json::json!(d1).to_string(),
730            "{\"offer\":{\"directory\":{\"dirname\":\"somedirectory\",\"mode\":\"zipped\",\"numbytes\":1234,\"numfiles\":10,\"zipsize\":45}}}"
731        );
732    }
733
734    #[test]
735    #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)]
736    fn test_message_ack() {
737        let m1 = PeerMessage::message_ack_v1("ok");
738        assert_eq!(
739            serde_json::json!(m1).to_string(),
740            "{\"answer\":{\"message_ack\":\"ok\"}}"
741        );
742    }
743
744    #[test]
745    #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)]
746    fn test_file_ack() {
747        let f1 = PeerMessage::file_ack_v1("ok");
748        assert_eq!(
749            serde_json::json!(f1).to_string(),
750            "{\"answer\":{\"file_ack\":\"ok\"}}"
751        );
752    }
753}