1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
//! Platform-agnostic functionality of RPC transport.

use async_trait::async_trait;
use derive_more::Display;
use futures::stream::LocalBoxStream;
use medea_client_api_proto::{ClientMsg, ServerMsg};
use tracerr::Traced;

use crate::{
    platform,
    rpc::{ApiUrl, ClientDisconnect, CloseMsg},
    utils::{Caused, JsonParseError},
};

/// Possible states of a [`RpcTransport`].
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum TransportState {
    /// Socket has been created. The connection is not opened yet.
    Connecting,

    /// The connection is opened and ready to communicate.
    Open,

    /// The connection is in the process of closing.
    Closing,

    /// The connection is closed or couldn't be opened.
    ///
    /// [`CloseMsg`] is the reason of why [`RpcTransport`] went into this
    /// [`TransportState`].
    Closed(CloseMsg),
}

impl TransportState {
    /// Indicates whether the socket can be closed.
    #[must_use]
    pub const fn can_close(self) -> bool {
        matches!(self, Self::Connecting | Self::Open)
    }
}

/// RPC transport between a client and a server.
#[allow(unused_lifetimes)]
#[async_trait(?Send)]
#[cfg_attr(feature = "mockable", mockall::automock)]
#[cfg_attr(feature = "mockable", allow(clippy::missing_docs_in_private_items))]
pub trait RpcTransport {
    /// Initiates a new [WebSocket] connection to the provided `url`.
    ///
    /// Resolves only when the underlying connection becomes active.
    ///
    /// # Errors
    ///
    /// With [`TransportError::CreateSocket`] if cannot establish [WebSocket] to
    /// the provided `url`.
    ///
    /// With [`TransportError::InitSocket`] if [WebSocket.onclose][1] callback
    /// fired before [WebSocket.onopen][2] callback.
    ///
    /// # Panics
    ///
    /// If the binding to [`close`][3] or [`open`][4] events fails. Not supposed
    /// to ever happen.
    ///
    /// [1]: https://developer.mozilla.org/docs/Web/API/WebSocket/onclose
    /// [2]: https://developer.mozilla.org/docs/Web/API/WebSocket/onopen
    /// [3]: https://html.spec.whatwg.org#event-close
    /// [4]: https://html.spec.whatwg.org#event-open
    /// [WebSocket]: https://developer.mozilla.org/docs/Web/API/WebSocket
    async fn connect(&self, url: ApiUrl) -> Result<(), Traced<TransportError>>;

    /// Returns [`LocalBoxStream`] of all messages received by this transport.
    fn on_message(&self) -> LocalBoxStream<'static, ServerMsg>;

    /// Sets reason, that will be sent to remote server when this transport will
    /// be dropped.
    fn set_close_reason(&self, reason: ClientDisconnect);

    /// Sends given [`ClientMsg`] to a server.
    ///
    /// # Errors
    ///
    /// Errors if sending [`ClientMsg`] fails.
    fn send(&self, msg: &ClientMsg) -> Result<(), Traced<TransportError>>;

    /// Subscribes to a [`RpcTransport`]'s [`TransportState`] changes.
    fn on_state_change(&self) -> LocalBoxStream<'static, TransportState>;
}

/// Errors that may occur when working with a [`RpcTransport`].
#[derive(Caused, Clone, Debug, Display, PartialEq)]
#[cause(error = platform::Error)]
pub enum TransportError {
    /// Error encountered when trying to establish connection.
    #[display(fmt = "Failed to create WebSocket: {}", _0)]
    CreateSocket(platform::Error),

    /// Connection was closed before becoming active.
    #[display(fmt = "Failed to init WebSocket")]
    InitSocket,

    /// Occurs when [`ClientMsg`] cannot be serialized.
    #[display(fmt = "Failed to parse client message: {}", _0)]
    SerializeClientMessage(JsonParseError),

    /// Occurs when [`ServerMsg`] cannot be parsed.
    #[display(fmt = "Failed to parse server message: {}", _0)]
    ParseServerMessage(JsonParseError),

    /// Occurs if the parsed message is not string.
    #[display(fmt = "Message is not a string")]
    MessageNotString,

    /// Occurs when a message cannot be send to server.
    #[display(fmt = "Failed to send message: {}", _0)]
    SendMessage(platform::Error),

    /// Occurs when message is sent to a closed socket.
    #[display(fmt = "Underlying socket is closed")]
    ClosedSocket,
}