medea_jason/platform/dart/
transport.rs

1use std::{
2    cell::{Cell, RefCell},
3    rc::Rc,
4};
5
6use async_trait::async_trait;
7use futures::{channel::mpsc, prelude::stream::LocalBoxStream};
8use medea_client_api_proto::{ClientMsg, ServerMsg};
9use medea_macro::dart_bridge;
10use medea_reactive::ObservableCell;
11use tracerr::Traced;
12
13use crate::{
14    platform::{
15        RpcTransport, TransportError, TransportState,
16        dart::utils::{
17            callback::Callback, dart_future::FutureFromDart,
18            dart_string_into_rust, handle::DartHandle, string_into_c_str,
19        },
20    },
21    rpc::{ApiUrl, ClientDisconnect, CloseMsg},
22};
23
24type TransportResult<T> = Result<T, Traced<TransportError>>;
25
26#[dart_bridge("flutter/lib/src/native/platform/transport.g.dart")]
27mod transport {
28    use std::{os::raw::c_char, ptr};
29
30    use dart_sys::Dart_Handle;
31
32    use crate::platform::Error;
33
34    extern "C" {
35        /// [Connects][1] to the provided `url` and returns the created
36        /// [`WebSocket`][0].
37        ///
38        /// [Subscribes][2] to the created [`WebSocket`][0] passing the given
39        /// `on_message` and `on_close` callbacks.
40        ///
41        /// [0]: https://api.dart.dev/stable/dart-io/WebSocket-class.html
42        /// [1]: https://api.dart.dev/stable/dart-io/WebSocket/connect.html
43        /// [2]: https://api.dart.dev/stable/dart-async/Stream/listen.html
44        pub fn connect(
45            url: ptr::NonNull<c_char>,
46            on_message: Dart_Handle,
47            on_close: Dart_Handle,
48        ) -> Result<Dart_Handle, Error>;
49
50        /// [Sends][1] the provided `message` via the provided [`WebSocket`][0].
51        ///
52        /// [0]: https://api.dart.dev/stable/dart-io/WebSocket-class.html
53        /// [1]: https://api.dart.dev/stable/dart-io/WebSocket/add.html
54        pub fn send(
55            transport: Dart_Handle,
56            message: ptr::NonNull<c_char>,
57        ) -> Result<(), Error>;
58
59        /// [Closes][1] the provided [`WebSocket`][0] connection.
60        ///
61        /// [0]: https://api.dart.dev/stable/dart-io/WebSocket-class.html
62        /// [1]: https://api.dart.dev/stable/dart-io/WebSocket/close.html
63        pub fn close(
64            transport: Dart_Handle,
65            close_code: i32,
66            close_msg: ptr::NonNull<c_char>,
67        ) -> Result<(), Error>;
68
69        /// Returns the [closeCode][0] of the [close frame][1].
70        ///
71        /// [0]: https://api.dart.dev/stable/dart-io/WebSocket/closeCode.html
72        /// [1]: https://tools.ietf.org/html/rfc6455#section-5.5.1
73        pub fn close_code(close_frame: Dart_Handle) -> Result<i32, Error>;
74
75        /// Returns the [closeReason][0] of the [close frame][1].
76        ///
77        /// [0]: https://api.dart.dev/stable/dart-io/WebSocket/closeReason.html
78        /// [1]: https://tools.ietf.org/html/rfc6455#section-5.5.1
79        pub fn close_reason(
80            close_frame: Dart_Handle,
81        ) -> Result<ptr::NonNull<c_char>, Error>;
82    }
83}
84
85/// [`RpcTransport`] implementation of a Dart side [`WebSocket`][0].
86///
87/// [0]: https://api.dart.dev/stable/dart-io/WebSocket-class.html
88#[derive(Clone, Debug)]
89pub struct WebSocketRpcTransport {
90    /// Handle to the Dart side [`WebSocket`][0].
91    ///
92    /// If [`DartHandle`] is [`None`], then connection hasn't been instantiated
93    /// yet.
94    ///
95    /// [0]: https://api.dart.dev/stable/dart-io/WebSocket-class.html
96    handle: RefCell<Option<DartHandle>>,
97
98    /// Subscribers to the messages received by this transport.
99    on_message_subs: Rc<RefCell<Vec<mpsc::UnboundedSender<ServerMsg>>>>,
100
101    /// Reason of [`WebSocketRpcTransport`] closing.
102    ///
103    /// Is sent in a [WebSocket close frame][1].
104    ///
105    /// [1]: https://tools.ietf.org/html/rfc6455#section-5.5.1
106    close_reason: Cell<ClientDisconnect>,
107
108    /// State of this [`WebSocketRpcTransport`] connection.
109    socket_state: Rc<ObservableCell<TransportState>>,
110}
111
112impl WebSocketRpcTransport {
113    /// Creates a new [`WebSocketRpcTransport`] which can be connected to the
114    /// server with the [`RpcTransport::connect()`] method call.
115    #[must_use]
116    pub fn new() -> Self {
117        Self {
118            handle: RefCell::new(None),
119            on_message_subs: Rc::new(RefCell::new(Vec::new())),
120            socket_state: Rc::new(ObservableCell::new(
121                TransportState::Connecting,
122            )),
123            close_reason: Cell::new(
124                ClientDisconnect::RpcTransportUnexpectedlyDropped,
125            ),
126        }
127    }
128}
129
130impl Default for WebSocketRpcTransport {
131    fn default() -> Self {
132        Self::new()
133    }
134}
135
136#[async_trait(?Send)]
137impl RpcTransport for WebSocketRpcTransport {
138    async fn connect(&self, url: ApiUrl) -> TransportResult<()> {
139        // TODO: Propagate execution error.
140        #[expect(clippy::map_err_ignore, reason = "needs refactoring")]
141        let handle = {
142            let on_message = Callback::from_fn_mut({
143                let weak_subs = Rc::downgrade(&self.on_message_subs);
144                move |msg: String| {
145                    if let Some(subs) = weak_subs.upgrade() {
146                        let msg = match serde_json::from_str::<ServerMsg>(&msg)
147                        {
148                            Ok(parsed) => parsed,
149                            Err(e) => {
150                                // TODO: Protocol versions mismatch?
151                                //       Should drop connection if so.
152                                log::error!("{}", tracerr::new!(e));
153                                return;
154                            }
155                        };
156
157                        subs.borrow_mut().retain(
158                            |sub: &mpsc::UnboundedSender<ServerMsg>| {
159                                sub.unbounded_send(msg.clone()).is_ok()
160                            },
161                        );
162                    }
163                }
164            })
165            .into_dart();
166            let on_close = Callback::from_fn_mut({
167                let socket_state = Rc::clone(&self.socket_state);
168                move |close_frame: DartHandle| {
169                    let code =
170                        unsafe { transport::close_code(close_frame.get()) }
171                            .unwrap()
172                            .try_into()
173                            .unwrap_or(1007);
174                    let reason =
175                        unsafe { transport::close_reason(close_frame.get()) }
176                            .unwrap();
177                    let reason = unsafe { dart_string_into_rust(reason) };
178
179                    socket_state.set(TransportState::Closed(CloseMsg::from((
180                        code, reason,
181                    ))));
182                }
183            })
184            .into_dart();
185
186            let fut = unsafe {
187                transport::connect(
188                    string_into_c_str(url.as_ref().to_owned()),
189                    on_message,
190                    on_close,
191                )
192            }
193            .unwrap();
194            unsafe { FutureFromDart::execute::<DartHandle>(fut) }
195        }
196        .await
197        .map_err(|_| tracerr::new!(TransportError::InitSocket))?;
198
199        *self.handle.borrow_mut() = Some(handle);
200        self.socket_state.set(TransportState::Open);
201
202        Ok(())
203    }
204
205    fn on_message(&self) -> LocalBoxStream<'static, ServerMsg> {
206        let (tx, rx) = mpsc::unbounded();
207        self.on_message_subs.borrow_mut().push(tx);
208        Box::pin(rx)
209    }
210
211    fn set_close_reason(&self, reason: ClientDisconnect) {
212        self.close_reason.set(reason);
213    }
214
215    #[expect(clippy::unwrap_in_result, reason = "unrelated")]
216    fn send(&self, msg: &ClientMsg) -> TransportResult<()> {
217        let state = self.socket_state.get();
218        let handle = self
219            .handle
220            .borrow()
221            .as_ref()
222            .cloned()
223            .ok_or_else(|| tracerr::new!(TransportError::ClosedSocket))?;
224        match state {
225            TransportState::Open => unsafe {
226                let msg = serde_json::to_string(msg).unwrap();
227                transport::send(handle.get(), string_into_c_str(msg))
228                    .map_err(TransportError::SendMessage)
229                    .map_err(tracerr::wrap!())?;
230                Ok(())
231            },
232            TransportState::Connecting
233            | TransportState::Closing
234            | TransportState::Closed(_) => {
235                Err(tracerr::new!(TransportError::ClosedSocket))
236            }
237        }
238    }
239
240    fn on_state_change(&self) -> LocalBoxStream<'static, TransportState> {
241        self.socket_state.subscribe()
242    }
243}
244
245impl Drop for WebSocketRpcTransport {
246    fn drop(&mut self) {
247        let rsn = serde_json::to_string(&self.close_reason.get())
248            .unwrap_or_else(|e| {
249                panic!("Could not serialize close message: {e}")
250            });
251        if let Some(handle) = self.handle.borrow().as_ref() {
252            unsafe {
253                transport::close(handle.get(), 1000, string_into_c_str(rsn))
254            }
255            .unwrap();
256        }
257    }
258}