1use std::error::Error;
14use std::future::Future;
15use std::sync::Arc;
16
17use derivative::Derivative;
18use futures::channel::oneshot;
19use futures::future::LocalBoxFuture;
20use js_sys::{Function, Uint8Array};
21use macro_rules_attribute::apply;
22#[cfg(doc)]
23use perspective_client::SystemInfo;
24use perspective_client::{ReconnectCallback, Session, TableData, TableInitOptions};
25use wasm_bindgen::prelude::*;
26use wasm_bindgen_futures::{future_to_promise, JsFuture};
27
28pub use crate::table::*;
29use crate::utils::{inherit_docs, ApiError, ApiResult, JsValueSerdeExt, LocalPollLoop};
30
31#[wasm_bindgen]
32extern "C" {
33 #[derive(Clone)]
34 #[wasm_bindgen(typescript_type = "TableInitOptions")]
35 pub type JsTableInitOptions;
36}
37
38#[wasm_bindgen]
39#[derive(Clone)]
40pub struct ProxySession(perspective_client::ProxySession);
41
42#[wasm_bindgen]
43impl ProxySession {
44 #[wasm_bindgen(constructor)]
45 pub fn new(client: &Client, on_response: &Function) -> Self {
46 let poll_loop = LocalPollLoop::new({
47 let on_response = on_response.clone();
48 move |msg: Vec<u8>| {
49 let msg = Uint8Array::from(&msg[..]);
50 on_response.call1(&JsValue::UNDEFINED, &JsValue::from(msg))?;
51 Ok(JsValue::null())
52 }
53 });
54 let on_response = Box::new(move |msg: &[u8]| {
56 wasm_bindgen_futures::spawn_local(poll_loop.poll(msg.to_vec()));
57 Ok(())
58 });
59 Self(perspective_client::ProxySession::new(
60 client.client.clone(),
61 on_response,
62 ))
63 }
64
65 #[wasm_bindgen]
66 pub async fn handle_request(&self, value: JsValue) -> ApiResult<()> {
67 let uint8array = Uint8Array::new(&value);
68 let slice = uint8array.to_vec();
69 self.0.handle_request(&slice).await?;
70 Ok(())
71 }
72
73 #[wasm_bindgen]
74 pub async fn poll(&self) -> ApiResult<()> {
75 self.0.poll().await?;
76 Ok(())
77 }
78
79 pub async fn close(self) {
80 self.0.close().await;
81 }
82}
83
84#[apply(inherit_docs)]
85#[inherit_doc = "client.md"]
86#[wasm_bindgen]
87pub struct Client {
88 pub(crate) close: Option<Function>,
89 pub(crate) client: perspective_client::Client,
90}
91
92#[derive(Derivative)]
95#[derivative(Clone(bound = ""))]
96struct JsReconnect<I>(Arc<dyn Fn(I) -> js_sys::Promise>);
97
98unsafe impl<I> Send for JsReconnect<I> {}
99unsafe impl<I> Sync for JsReconnect<I> {}
100
101impl<I> JsReconnect<I> {
102 fn run(&self, args: I) -> js_sys::Promise {
103 self.0(args)
104 }
105
106 fn run_all(
107 &self,
108 args: I,
109 ) -> impl Future<Output = Result<(), Box<dyn Error + Send + Sync + 'static>>> + Send + Sync + 'static
110 {
111 let (sender, receiver) = oneshot::channel::<Result<(), Box<dyn Error + Send + Sync>>>();
112 let p = self.0(args);
113 let _ = future_to_promise(async move {
114 let result = JsFuture::from(p)
115 .await
116 .map(|_| ())
117 .map_err(|x| format!("{:?}", x).into());
118
119 sender.send(result).unwrap();
120 Ok(JsValue::UNDEFINED)
121 });
122
123 async move { receiver.await.unwrap() }
124 }
125}
126
127impl<F, I> From<F> for JsReconnect<I>
128where
129 F: Fn(I) -> js_sys::Promise + 'static,
130{
131 fn from(value: F) -> Self {
132 JsReconnect(Arc::new(value))
133 }
134}
135
136#[wasm_bindgen]
137impl Client {
138 #[wasm_bindgen(constructor)]
139 pub fn new(send_request: Function, close: Option<Function>) -> Self {
140 let send_request = JsReconnect::from(move |mut v: Vec<u8>| {
141 let buff2 = unsafe { js_sys::Uint8Array::view_mut_raw(v.as_mut_ptr(), v.len()) };
142 send_request
143 .call1(&JsValue::UNDEFINED, &buff2)
144 .unwrap()
145 .unchecked_into::<js_sys::Promise>()
146 });
147
148 let client = perspective_client::Client::new_with_callback(move |msg| {
149 let vec = msg.to_vec();
150 Box::pin(send_request.run_all(vec))
151 });
152
153 Client { close, client }
154 }
155
156 #[wasm_bindgen]
157 pub fn new_proxy_session(&self, on_response: &Function) -> ProxySession {
158 ProxySession::new(self, on_response)
159 }
160
161 #[wasm_bindgen]
162 pub async fn init(&self) -> ApiResult<()> {
163 self.client.clone().init().await?;
164 Ok(())
165 }
166
167 #[doc(hidden)]
168 #[wasm_bindgen]
169 pub async fn handle_response(&self, value: &JsValue) -> ApiResult<()> {
170 let uint8array = Uint8Array::new(value);
171 let slice = uint8array.to_vec();
172 self.client.handle_response(&slice).await?;
173 Ok(())
174 }
175
176 #[doc(hidden)]
177 #[wasm_bindgen]
178 pub async fn handle_error(
179 &self,
180 error: Option<String>,
181 reconnect: Option<Function>,
182 ) -> ApiResult<()> {
183 self.client
184 .handle_error(
185 error,
186 reconnect.map(|reconnect| {
187 let reconnect =
188 JsReconnect::from(move |()| match reconnect.call0(&JsValue::UNDEFINED) {
189 Ok(x) => x.unchecked_into::<js_sys::Promise>(),
190 Err(e) => {
191 tracing::warn!("{:?}", e);
193 js_sys::Promise::reject(&format!("C {:?}", e).into())
194 },
195 });
196
197 Arc::new(move || {
198 let fut = JsFuture::from(reconnect.run(()));
199 Box::pin(async move {
200 if let Err(e) = fut.await {
203 if let Some(e) = e.dyn_ref::<js_sys::Object>() {
204 Err(e.to_string().as_string().unwrap().into())
205 } else {
206 Err(e.as_string().unwrap().into())
207 }
208 } else {
209 Ok(())
210 }
211 })
212 as LocalBoxFuture<'static, Result<(), Box<dyn Error>>>
213 }) as Arc<(dyn Fn() -> _ + Send + Sync)>
214 }),
215 )
216 .await?;
217
218 Ok(())
219 }
220
221 #[doc(hidden)]
222 #[wasm_bindgen]
223 pub async fn on_error(&self, callback: Function) -> ApiResult<u32> {
224 let callback = JsReconnect::from(
225 move |(message, reconnect): (Option<String>, Option<ReconnectCallback>)| {
226 let cl: Closure<dyn Fn() -> js_sys::Promise> = Closure::new(move || {
227 let reconnect = reconnect.clone();
228 future_to_promise(async move {
229 if let Some(f) = reconnect {
230 f().await.map_err(|e| JsValue::from(format!("A {}", e)))?;
231 }
232
233 Ok(JsValue::UNDEFINED)
234 })
235 });
236
237 if let Err(e) = callback.call2(
238 &JsValue::UNDEFINED,
239 &JsValue::from(message),
240 &cl.into_js_value(),
241 ) {
242 tracing::warn!("D {:?}", e);
243 }
244
245 js_sys::Promise::resolve(&JsValue::UNDEFINED)
246 },
247 );
248
249 let id = self
250 .client
251 .on_error(Box::new(move |message, reconnect| {
252 let callback = callback.clone();
253 Box::pin(async move {
254 let _promise = callback.run((message, reconnect));
255 Ok(())
256 })
257 }))
258 .await?;
259
260 Ok(id)
261 }
262
263 #[apply(inherit_docs)]
264 #[inherit_doc = "client/table.md"]
265 #[wasm_bindgen]
266 pub async fn table(
267 &self,
268 value: &JsTableInitData,
269 options: Option<JsTableInitOptions>,
270 ) -> ApiResult<Table> {
271 let options = options
272 .into_serde_ext::<Option<TableInitOptions>>()?
273 .unwrap_or_default();
274
275 let args = TableData::from_js_value(value, options.format)?;
276 Ok(Table(self.client.table(args, options).await?))
277 }
278
279 #[apply(inherit_docs)]
280 #[inherit_doc = "client/terminate.md"]
281 #[wasm_bindgen]
282 pub fn terminate(&self) -> ApiResult<JsValue> {
283 if let Some(f) = self.close.clone() {
284 Ok(f.call0(&JsValue::UNDEFINED)?)
285 } else {
286 Err(ApiError::new("Client type cannot be terminated"))
287 }
288 }
289
290 #[apply(inherit_docs)]
291 #[inherit_doc = "client/open_table.md"]
292 #[wasm_bindgen]
293 pub async fn open_table(&self, entity_id: String) -> ApiResult<Table> {
294 Ok(Table(self.client.open_table(entity_id).await?))
295 }
296
297 #[apply(inherit_docs)]
298 #[inherit_doc = "client/get_hosted_table_names.md"]
299 #[wasm_bindgen]
300 pub async fn get_hosted_table_names(&self) -> ApiResult<JsValue> {
301 Ok(JsValue::from_serde_ext(
302 &self.client.get_hosted_table_names().await?,
303 )?)
304 }
305
306 #[apply(inherit_docs)]
307 #[inherit_doc = "client/system_info.md"]
308 #[wasm_bindgen]
309 pub async fn system_info(&self) -> ApiResult<JsValue> {
310 let info = self.client.system_info().await?;
311 Ok(JsValue::from_serde_ext(&info)?)
312 }
313}