1use std::error::Error;
14use std::future::Future;
15use std::sync::Arc;
16
17use derivative::Derivative;
18use futures::channel::oneshot;
19use futures::future::LocalBoxFuture;
20use js_sys::{Function, Uint8Array};
21use macro_rules_attribute::apply;
22#[cfg(doc)]
23use perspective_client::SystemInfo;
24use perspective_client::{ReconnectCallback, Session, TableData, TableInitOptions};
25use wasm_bindgen::prelude::*;
26use wasm_bindgen_futures::{JsFuture, future_to_promise};
27
28pub use crate::table::*;
29use crate::utils::{ApiError, ApiResult, JsValueSerdeExt, LocalPollLoop, inherit_docs};
30
31#[wasm_bindgen]
32extern "C" {
33 #[derive(Clone)]
34 #[wasm_bindgen(typescript_type = "TableInitOptions")]
35 pub type JsTableInitOptions;
36}
37
38#[wasm_bindgen]
39#[derive(Clone)]
40pub struct ProxySession(perspective_client::ProxySession);
41
42#[wasm_bindgen]
43impl ProxySession {
44 #[wasm_bindgen(constructor)]
45 pub fn new(client: &Client, on_response: &Function) -> Self {
46 let poll_loop = LocalPollLoop::new({
47 let on_response = on_response.clone();
48 move |msg: Vec<u8>| {
49 let msg = Uint8Array::from(&msg[..]);
50 on_response.call1(&JsValue::UNDEFINED, &JsValue::from(msg))?;
51 Ok(JsValue::null())
52 }
53 });
54 let on_response = Box::new(move |msg: &[u8]| {
56 wasm_bindgen_futures::spawn_local(poll_loop.poll(msg.to_vec()));
57 Ok(())
58 });
59 Self(perspective_client::ProxySession::new(
60 client.client.clone(),
61 on_response,
62 ))
63 }
64
65 #[wasm_bindgen]
66 pub async fn handle_request(&self, value: JsValue) -> ApiResult<()> {
67 let uint8array = Uint8Array::new(&value);
68 let slice = uint8array.to_vec();
69 self.0.handle_request(&slice).await?;
70 Ok(())
71 }
72
73 #[wasm_bindgen]
74 pub async fn poll(&self) -> ApiResult<()> {
75 self.0.poll().await?;
76 Ok(())
77 }
78
79 pub async fn close(self) {
80 self.0.close().await;
81 }
82}
83
84#[apply(inherit_docs)]
85#[inherit_doc = "client.md"]
86#[wasm_bindgen]
87pub struct Client {
88 pub(crate) close: Option<Function>,
89 pub(crate) client: perspective_client::Client,
90}
91
92#[derive(Derivative)]
95#[derivative(Clone(bound = ""))]
96struct JsReconnect<I>(Arc<dyn Fn(I) -> js_sys::Promise>);
97
98unsafe impl<I> Send for JsReconnect<I> {}
99unsafe impl<I> Sync for JsReconnect<I> {}
100
101impl<I> JsReconnect<I> {
102 fn run(&self, args: I) -> js_sys::Promise {
103 self.0(args)
104 }
105
106 fn run_all(
107 &self,
108 args: I,
109 ) -> impl Future<Output = Result<(), Box<dyn Error + Send + Sync + 'static>>> + Send + Sync + 'static
110 {
111 let (sender, receiver) = oneshot::channel::<Result<(), Box<dyn Error + Send + Sync>>>();
112 let p = self.0(args);
113 let _ = future_to_promise(async move {
114 let result = JsFuture::from(p)
115 .await
116 .map(|_| ())
117 .map_err(|x| format!("{:?}", x).into());
118
119 sender.send(result).unwrap();
120 Ok(JsValue::UNDEFINED)
121 });
122
123 async move { receiver.await.unwrap() }
124 }
125}
126
127impl<F, I> From<F> for JsReconnect<I>
128where
129 F: Fn(I) -> js_sys::Promise + 'static,
130{
131 fn from(value: F) -> Self {
132 JsReconnect(Arc::new(value))
133 }
134}
135
136#[wasm_bindgen]
137impl Client {
138 #[wasm_bindgen(constructor)]
139 pub fn new(send_request: Function, close: Option<Function>) -> Self {
140 let send_request = JsReconnect::from(move |mut v: Vec<u8>| {
141 let buff2 = unsafe { js_sys::Uint8Array::view_mut_raw(v.as_mut_ptr(), v.len()) };
142 send_request
143 .call1(&JsValue::UNDEFINED, &buff2)
144 .unwrap()
145 .unchecked_into::<js_sys::Promise>()
146 });
147
148 let client = perspective_client::Client::new_with_callback(move |msg| {
149 Box::pin(send_request.run_all(msg))
150 });
151
152 Client { close, client }
153 }
154
155 #[wasm_bindgen]
156 pub fn new_proxy_session(&self, on_response: &Function) -> ProxySession {
157 ProxySession::new(self, on_response)
158 }
159
160 #[wasm_bindgen]
161 pub async fn init(&self) -> ApiResult<()> {
162 self.client.clone().init().await?;
163 Ok(())
164 }
165
166 #[doc(hidden)]
167 #[wasm_bindgen]
168 pub async fn handle_response(&self, value: &JsValue) -> ApiResult<()> {
169 let uint8array = Uint8Array::new(value);
170 let slice = uint8array.to_vec();
171 self.client.handle_response(&slice).await?;
172 Ok(())
173 }
174
175 #[doc(hidden)]
176 #[wasm_bindgen]
177 pub async fn handle_error(
178 &self,
179 error: Option<String>,
180 reconnect: Option<Function>,
181 ) -> ApiResult<()> {
182 self.client
183 .handle_error(
184 error,
185 reconnect.map(|reconnect| {
186 let reconnect =
187 JsReconnect::from(move |()| match reconnect.call0(&JsValue::UNDEFINED) {
188 Ok(x) => x.unchecked_into::<js_sys::Promise>(),
189 Err(e) => {
190 tracing::warn!("{:?}", e);
192 js_sys::Promise::reject(&format!("C {:?}", e).into())
193 },
194 });
195
196 Arc::new(move || {
197 let fut = JsFuture::from(reconnect.run(()));
198 Box::pin(async move {
199 if let Err(e) = fut.await {
202 if let Some(e) = e.dyn_ref::<js_sys::Object>() {
203 Err(e.to_string().as_string().unwrap().into())
204 } else {
205 Err(e.as_string().unwrap().into())
206 }
207 } else {
208 Ok(())
209 }
210 })
211 as LocalBoxFuture<'static, Result<(), Box<dyn Error>>>
212 }) as Arc<(dyn Fn() -> _ + Send + Sync)>
213 }),
214 )
215 .await?;
216
217 Ok(())
218 }
219
220 #[doc(hidden)]
221 #[wasm_bindgen]
222 pub async fn on_error(&self, callback: Function) -> ApiResult<u32> {
223 let callback = JsReconnect::from(
224 move |(message, reconnect): (Option<String>, Option<ReconnectCallback>)| {
225 let cl: Closure<dyn Fn() -> js_sys::Promise> = Closure::new(move || {
226 let reconnect = reconnect.clone();
227 future_to_promise(async move {
228 if let Some(f) = reconnect {
229 f().await.map_err(|e| JsValue::from(format!("A {}", e)))?;
230 }
231
232 Ok(JsValue::UNDEFINED)
233 })
234 });
235
236 if let Err(e) = callback.call2(
237 &JsValue::UNDEFINED,
238 &JsValue::from(message),
239 &cl.into_js_value(),
240 ) {
241 tracing::warn!("D {:?}", e);
242 }
243
244 js_sys::Promise::resolve(&JsValue::UNDEFINED)
245 },
246 );
247
248 let id = self
249 .client
250 .on_error(Box::new(move |message, reconnect| {
251 let callback = callback.clone();
252 Box::pin(async move {
253 let _promise = callback.run((message, reconnect));
254 Ok(())
255 })
256 }))
257 .await?;
258
259 Ok(id)
260 }
261
262 #[apply(inherit_docs)]
263 #[inherit_doc = "client/table.md"]
264 #[wasm_bindgen]
265 pub async fn table(
266 &self,
267 value: &JsTableInitData,
268 options: Option<JsTableInitOptions>,
269 ) -> ApiResult<Table> {
270 let options = options
271 .into_serde_ext::<Option<TableInitOptions>>()?
272 .unwrap_or_default();
273
274 let args = TableData::from_js_value(value, options.format)?;
275 Ok(Table(self.client.table(args, options).await?))
276 }
277
278 #[apply(inherit_docs)]
279 #[inherit_doc = "client/terminate.md"]
280 #[wasm_bindgen]
281 pub fn terminate(&self) -> ApiResult<JsValue> {
282 if let Some(f) = self.close.clone() {
283 Ok(f.call0(&JsValue::UNDEFINED)?)
284 } else {
285 Err(ApiError::new("Client type cannot be terminated"))
286 }
287 }
288
289 #[apply(inherit_docs)]
290 #[inherit_doc = "client/open_table.md"]
291 #[wasm_bindgen]
292 pub async fn open_table(&self, entity_id: String) -> ApiResult<Table> {
293 Ok(Table(self.client.open_table(entity_id).await?))
294 }
295
296 #[apply(inherit_docs)]
297 #[inherit_doc = "client/get_hosted_table_names.md"]
298 #[wasm_bindgen]
299 pub async fn get_hosted_table_names(&self) -> ApiResult<JsValue> {
300 Ok(JsValue::from_serde_ext(
301 &self.client.get_hosted_table_names().await?,
302 )?)
303 }
304
305 #[apply(inherit_docs)]
306 #[inherit_doc = "client/on_hosted_tables_update.md"]
307 #[wasm_bindgen]
308 pub async fn on_hosted_tables_update(&self, on_update_js: Function) -> ApiResult<u32> {
309 let poll_loop = LocalPollLoop::new(move |_| on_update_js.call0(&JsValue::UNDEFINED));
310 let on_update = Box::new(move || poll_loop.poll(()));
311 let id = self.client.on_hosted_tables_update(on_update).await?;
312 Ok(id)
313 }
314
315 #[apply(inherit_docs)]
316 #[inherit_doc = "client/remove_hosted_tables_update.md"]
317 #[wasm_bindgen]
318 pub async fn remove_hosted_tables_update(&self, update_id: u32) -> ApiResult<()> {
319 self.client.remove_hosted_tables_update(update_id).await?;
320 Ok(())
321 }
322
323 #[apply(inherit_docs)]
324 #[inherit_doc = "client/system_info.md"]
325 #[wasm_bindgen]
326 pub async fn system_info(&self) -> ApiResult<JsValue> {
327 let info = self.client.system_info().await?;
328 Ok(JsValue::from_serde_ext(&info)?)
329 }
330}