medea_jason/platform/dart/
transport.rs1use std::{
2 cell::{Cell, RefCell},
3 rc::Rc,
4};
5
6use async_trait::async_trait;
7use futures::{channel::mpsc, prelude::stream::LocalBoxStream};
8use medea_client_api_proto::{ClientMsg, ServerMsg};
9use medea_macro::dart_bridge;
10use medea_reactive::ObservableCell;
11use tracerr::Traced;
12
13use crate::{
14 platform::{
15 RpcTransport, TransportError, TransportState,
16 dart::utils::{
17 callback::Callback, dart_future::FutureFromDart,
18 dart_string_into_rust, handle::DartHandle, string_into_c_str,
19 },
20 },
21 rpc::{ApiUrl, ClientDisconnect, CloseMsg},
22};
23
24type TransportResult<T> = Result<T, Traced<TransportError>>;
25
26#[dart_bridge("flutter/lib/src/native/platform/transport.g.dart")]
27mod transport {
28 use std::{os::raw::c_char, ptr};
29
30 use dart_sys::Dart_Handle;
31
32 use crate::platform::Error;
33
34 extern "C" {
35 pub fn connect(
45 url: ptr::NonNull<c_char>,
46 on_message: Dart_Handle,
47 on_close: Dart_Handle,
48 ) -> Result<Dart_Handle, Error>;
49
50 pub fn send(
55 transport: Dart_Handle,
56 message: ptr::NonNull<c_char>,
57 ) -> Result<(), Error>;
58
59 pub fn close(
64 transport: Dart_Handle,
65 close_code: i32,
66 close_msg: ptr::NonNull<c_char>,
67 ) -> Result<(), Error>;
68
69 pub fn close_code(close_frame: Dart_Handle) -> Result<i32, Error>;
74
75 pub fn close_reason(
80 close_frame: Dart_Handle,
81 ) -> Result<ptr::NonNull<c_char>, Error>;
82 }
83}
84
85#[derive(Clone, Debug)]
89pub struct WebSocketRpcTransport {
90 handle: RefCell<Option<DartHandle>>,
97
98 on_message_subs: Rc<RefCell<Vec<mpsc::UnboundedSender<ServerMsg>>>>,
100
101 close_reason: Cell<ClientDisconnect>,
107
108 socket_state: Rc<ObservableCell<TransportState>>,
110}
111
112impl WebSocketRpcTransport {
113 #[must_use]
116 pub fn new() -> Self {
117 Self {
118 handle: RefCell::new(None),
119 on_message_subs: Rc::new(RefCell::new(Vec::new())),
120 socket_state: Rc::new(ObservableCell::new(
121 TransportState::Connecting,
122 )),
123 close_reason: Cell::new(
124 ClientDisconnect::RpcTransportUnexpectedlyDropped,
125 ),
126 }
127 }
128}
129
130impl Default for WebSocketRpcTransport {
131 fn default() -> Self {
132 Self::new()
133 }
134}
135
136#[async_trait(?Send)]
137impl RpcTransport for WebSocketRpcTransport {
138 async fn connect(&self, url: ApiUrl) -> TransportResult<()> {
139 #[expect(clippy::map_err_ignore, reason = "needs refactoring")]
141 let handle = {
142 let on_message = Callback::from_fn_mut({
143 let weak_subs = Rc::downgrade(&self.on_message_subs);
144 move |msg: String| {
145 if let Some(subs) = weak_subs.upgrade() {
146 let msg = match serde_json::from_str::<ServerMsg>(&msg)
147 {
148 Ok(parsed) => parsed,
149 Err(e) => {
150 log::error!("{}", tracerr::new!(e));
153 return;
154 }
155 };
156
157 subs.borrow_mut().retain(
158 |sub: &mpsc::UnboundedSender<ServerMsg>| {
159 sub.unbounded_send(msg.clone()).is_ok()
160 },
161 );
162 }
163 }
164 })
165 .into_dart();
166 let on_close = Callback::from_fn_mut({
167 let socket_state = Rc::clone(&self.socket_state);
168 move |close_frame: DartHandle| {
169 let code =
170 unsafe { transport::close_code(close_frame.get()) }
171 .unwrap()
172 .try_into()
173 .unwrap_or(1007);
174 let reason =
175 unsafe { transport::close_reason(close_frame.get()) }
176 .unwrap();
177 let reason = unsafe { dart_string_into_rust(reason) };
178
179 socket_state.set(TransportState::Closed(CloseMsg::from((
180 code, reason,
181 ))));
182 }
183 })
184 .into_dart();
185
186 let fut = unsafe {
187 transport::connect(
188 string_into_c_str(url.as_ref().to_owned()),
189 on_message,
190 on_close,
191 )
192 }
193 .unwrap();
194 unsafe { FutureFromDart::execute::<DartHandle>(fut) }
195 }
196 .await
197 .map_err(|_| tracerr::new!(TransportError::InitSocket))?;
198
199 *self.handle.borrow_mut() = Some(handle);
200 self.socket_state.set(TransportState::Open);
201
202 Ok(())
203 }
204
205 fn on_message(&self) -> LocalBoxStream<'static, ServerMsg> {
206 let (tx, rx) = mpsc::unbounded();
207 self.on_message_subs.borrow_mut().push(tx);
208 Box::pin(rx)
209 }
210
211 fn set_close_reason(&self, reason: ClientDisconnect) {
212 self.close_reason.set(reason);
213 }
214
215 #[expect(clippy::unwrap_in_result, reason = "unrelated")]
216 fn send(&self, msg: &ClientMsg) -> TransportResult<()> {
217 let state = self.socket_state.get();
218 let handle = self
219 .handle
220 .borrow()
221 .as_ref()
222 .cloned()
223 .ok_or_else(|| tracerr::new!(TransportError::ClosedSocket))?;
224 match state {
225 TransportState::Open => unsafe {
226 let msg = serde_json::to_string(msg).unwrap();
227 transport::send(handle.get(), string_into_c_str(msg))
228 .map_err(TransportError::SendMessage)
229 .map_err(tracerr::wrap!())?;
230 Ok(())
231 },
232 TransportState::Connecting
233 | TransportState::Closing
234 | TransportState::Closed(_) => {
235 Err(tracerr::new!(TransportError::ClosedSocket))
236 }
237 }
238 }
239
240 fn on_state_change(&self) -> LocalBoxStream<'static, TransportState> {
241 self.socket_state.subscribe()
242 }
243}
244
245impl Drop for WebSocketRpcTransport {
246 fn drop(&mut self) {
247 let rsn = serde_json::to_string(&self.close_reason.get())
248 .unwrap_or_else(|e| {
249 panic!("Could not serialize close message: {e}")
250 });
251 if let Some(handle) = self.handle.borrow().as_ref() {
252 unsafe {
253 transport::close(handle.get(), 1000, string_into_c_str(rsn))
254 }
255 .unwrap();
256 }
257 }
258}