1use std::error::Error;
14use std::future::Future;
15use std::sync::Arc;
16
17use derivative::Derivative;
18use futures::channel::oneshot;
19use js_sys::{Function, Uint8Array};
20use macro_rules_attribute::apply;
21#[cfg(doc)]
22use perspective_client::SystemInfo;
23use perspective_client::{
24 ClientError, ReconnectCallback, Session, TableData, TableInitOptions, asyncfn,
25};
26use wasm_bindgen::prelude::*;
27use wasm_bindgen_derive::TryFromJsValue;
28use wasm_bindgen_futures::{JsFuture, future_to_promise};
29
30pub use crate::table::*;
31use crate::utils::{ApiError, ApiResult, JsValueSerdeExt, LocalPollLoop, inherit_docs};
32
33#[wasm_bindgen]
34extern "C" {
35 #[derive(Clone)]
36 #[wasm_bindgen(typescript_type = "TableInitOptions")]
37 pub type JsTableInitOptions;
38}
39
40#[wasm_bindgen]
41#[derive(Clone)]
42pub struct ProxySession(perspective_client::ProxySession);
43
44#[wasm_bindgen]
45impl ProxySession {
46 #[wasm_bindgen(constructor)]
47 pub fn new(client: &Client, on_response: &Function) -> Self {
48 let poll_loop = LocalPollLoop::new({
49 let on_response = on_response.clone();
50 move |msg: Vec<u8>| {
51 let msg = Uint8Array::from(&msg[..]);
52 on_response.call1(&JsValue::UNDEFINED, &JsValue::from(msg))?;
53 Ok(JsValue::null())
54 }
55 });
56 let on_response = Box::new(move |msg: &[u8]| {
58 wasm_bindgen_futures::spawn_local(poll_loop.poll(msg.to_vec()));
59 Ok(())
60 });
61 Self(perspective_client::ProxySession::new(
62 client.client.clone(),
63 on_response,
64 ))
65 }
66
67 #[wasm_bindgen]
68 pub async fn handle_request(&self, value: JsValue) -> ApiResult<()> {
69 let uint8array = Uint8Array::new(&value);
70 let slice = uint8array.to_vec();
71 self.0.handle_request(&slice).await?;
72 Ok(())
73 }
74
75 #[wasm_bindgen]
76 pub async fn poll(&self) -> ApiResult<()> {
77 self.0.poll().await?;
78 Ok(())
79 }
80
81 pub async fn close(self) {
82 self.0.close().await;
83 }
84}
85
86#[apply(inherit_docs)]
87#[inherit_doc = "client.md"]
88#[wasm_bindgen]
89#[derive(TryFromJsValue, Clone)]
90pub struct Client {
91 pub(crate) close: Option<Function>,
92 pub(crate) client: perspective_client::Client,
93}
94
95#[derive(Derivative)]
98#[derivative(Clone(bound = ""))]
99struct JsReconnect<I>(Arc<dyn Fn(I) -> js_sys::Promise>);
100
101unsafe impl<I> Send for JsReconnect<I> {}
102unsafe impl<I> Sync for JsReconnect<I> {}
103
104impl<I> JsReconnect<I> {
105 fn run(&self, args: I) -> js_sys::Promise {
106 self.0(args)
107 }
108
109 fn run_all(
110 &self,
111 args: I,
112 ) -> impl Future<Output = Result<(), Box<dyn Error + Send + Sync + 'static>>>
113 + Send
114 + Sync
115 + 'static
116 + use<I> {
117 let (sender, receiver) = oneshot::channel::<Result<(), Box<dyn Error + Send + Sync>>>();
118 let p = self.0(args);
119 let _ = future_to_promise(async move {
120 let result = JsFuture::from(p)
121 .await
122 .map(|_| ())
123 .map_err(|x| format!("{:?}", x).into());
124
125 sender.send(result).unwrap();
126 Ok(JsValue::UNDEFINED)
127 });
128
129 async move { receiver.await.unwrap() }
130 }
131}
132
133impl<F, I> From<F> for JsReconnect<I>
134where
135 F: Fn(I) -> js_sys::Promise + 'static,
136{
137 fn from(value: F) -> Self {
138 JsReconnect(Arc::new(value))
139 }
140}
141
142impl Client {
143 pub fn get_client(&self) -> &'_ perspective_client::Client {
144 &self.client
145 }
146}
147
148#[wasm_bindgen]
149impl Client {
150 #[wasm_bindgen(constructor)]
151 pub fn new(send_request: Function, close: Option<Function>) -> Self {
152 let send_request = JsReconnect::from(move |mut v: Vec<u8>| {
153 let buff2 = unsafe { js_sys::Uint8Array::view_mut_raw(v.as_mut_ptr(), v.len()) };
154 send_request
155 .call1(&JsValue::UNDEFINED, &buff2)
156 .unwrap()
157 .unchecked_into::<js_sys::Promise>()
158 });
159
160 let client =
161 perspective_client::Client::new_with_callback(move |msg| send_request.run_all(msg));
162
163 Client { close, client }
164 }
165
166 #[wasm_bindgen]
167 pub fn new_proxy_session(&self, on_response: &Function) -> ProxySession {
168 ProxySession::new(self, on_response)
169 }
170
171 #[wasm_bindgen]
172 pub async fn init(&self) -> ApiResult<()> {
173 self.client.clone().init().await?;
174 Ok(())
175 }
176
177 #[doc(hidden)]
178 #[wasm_bindgen]
179 pub async fn handle_response(&self, value: &JsValue) -> ApiResult<()> {
180 let uint8array = Uint8Array::new(value);
181 let slice = uint8array.to_vec();
182 self.client.handle_response(&slice).await?;
183 Ok(())
184 }
185
186 #[doc(hidden)]
187 #[wasm_bindgen]
188 pub async fn handle_error(
189 &self,
190 error: Option<String>,
191 reconnect: Option<Function>,
192 ) -> ApiResult<()> {
193 self.client
194 .handle_error(
195 error,
196 reconnect.map(|reconnect| {
197 let reconnect =
198 JsReconnect::from(move |()| match reconnect.call0(&JsValue::UNDEFINED) {
199 Ok(x) => x.unchecked_into::<js_sys::Promise>(),
200 Err(e) => {
201 tracing::warn!("{:?}", e);
203 js_sys::Promise::reject(&format!("C {:?}", e).into())
204 },
205 });
206
207 asyncfn!(reconnect, async move || {
208 if let Err(e) = JsFuture::from(reconnect.run(())).await {
209 if let Some(e) = e.dyn_ref::<js_sys::Object>() {
210 Err(ClientError::Unknown(e.to_string().as_string().unwrap()))
211 } else {
212 Err(ClientError::Unknown(e.as_string().unwrap()))
213 }
214 } else {
215 Ok(())
216 }
217 })
218 }),
219 )
220 .await?;
221
222 Ok(())
223 }
224
225 #[doc(hidden)]
226 #[wasm_bindgen]
227 pub async fn on_error(&self, callback: Function) -> ApiResult<u32> {
228 let callback = JsReconnect::from(
229 move |(message, reconnect): (Option<String>, Option<ReconnectCallback>)| {
230 let cl: Closure<dyn Fn() -> js_sys::Promise> = Closure::new(move || {
231 let reconnect = reconnect.clone();
232 future_to_promise(async move {
233 if let Some(f) = reconnect {
234 f().await.map_err(|e| JsValue::from(format!("A {}", e)))?;
235 }
236
237 Ok(JsValue::UNDEFINED)
238 })
239 });
240
241 if let Err(e) = callback.call2(
242 &JsValue::UNDEFINED,
243 &JsValue::from(message),
244 &cl.into_js_value(),
245 ) {
246 tracing::warn!("D {:?}", e);
247 }
248
249 js_sys::Promise::resolve(&JsValue::UNDEFINED)
250 },
251 );
252
253 let poll_loop = LocalPollLoop::new_async(move |x| JsFuture::from(callback.run(x)));
254 let id = self
255 .client
256 .on_error(asyncfn!(poll_loop, async move |message, reconnect| {
257 poll_loop.poll((message, reconnect)).await;
258 Ok(())
259 }))
260 .await?;
261
262 Ok(id)
263 }
264
265 #[apply(inherit_docs)]
266 #[inherit_doc = "client/table.md"]
267 #[wasm_bindgen]
268 pub async fn table(
269 &self,
270 value: &JsTableInitData,
271 options: Option<JsTableInitOptions>,
272 ) -> ApiResult<Table> {
273 let options = options
274 .into_serde_ext::<Option<TableInitOptions>>()?
275 .unwrap_or_default();
276
277 let args = TableData::from_js_value(value, options.format)?;
278 Ok(Table(self.client.table(args, options).await?))
279 }
280
281 #[apply(inherit_docs)]
282 #[inherit_doc = "client/terminate.md"]
283 #[wasm_bindgen]
284 pub fn terminate(&self) -> ApiResult<JsValue> {
285 if let Some(f) = self.close.clone() {
286 Ok(f.call0(&JsValue::UNDEFINED)?)
287 } else {
288 Err(ApiError::new("Client type cannot be terminated"))
289 }
290 }
291
292 #[apply(inherit_docs)]
293 #[inherit_doc = "client/open_table.md"]
294 #[wasm_bindgen]
295 pub async fn open_table(&self, entity_id: String) -> ApiResult<Table> {
296 Ok(Table(self.client.open_table(entity_id).await?))
297 }
298
299 #[apply(inherit_docs)]
300 #[inherit_doc = "client/get_hosted_table_names.md"]
301 #[wasm_bindgen]
302 pub async fn get_hosted_table_names(&self) -> ApiResult<JsValue> {
303 Ok(JsValue::from_serde_ext(
304 &self.client.get_hosted_table_names().await?,
305 )?)
306 }
307
308 #[apply(inherit_docs)]
309 #[inherit_doc = "client/on_hosted_tables_update.md"]
310 #[wasm_bindgen]
311 pub async fn on_hosted_tables_update(&self, on_update_js: Function) -> ApiResult<u32> {
312 let poll_loop = LocalPollLoop::new(move |_| on_update_js.call0(&JsValue::UNDEFINED));
313 let on_update = Box::new(move || poll_loop.poll(()));
314 let id = self.client.on_hosted_tables_update(on_update).await?;
315 Ok(id)
316 }
317
318 #[apply(inherit_docs)]
319 #[inherit_doc = "client/remove_hosted_tables_update.md"]
320 #[wasm_bindgen]
321 pub async fn remove_hosted_tables_update(&self, update_id: u32) -> ApiResult<()> {
322 self.client.remove_hosted_tables_update(update_id).await?;
323 Ok(())
324 }
325
326 #[apply(inherit_docs)]
327 #[inherit_doc = "client/system_info.md"]
328 #[wasm_bindgen]
329 pub async fn system_info(&self) -> ApiResult<JsValue> {
330 let info = self.client.system_info().await?;
331 Ok(JsValue::from_serde_ext(&info)?)
332 }
333}