use std::error::Error;
use std::future::Future;
use std::sync::Arc;
use derivative::Derivative;
use futures::channel::oneshot;
use js_sys::{Function, Uint8Array};
#[cfg(doc)]
use perspective_client::SystemInfo;
use perspective_client::{
ClientError, ReconnectCallback, Session, TableData, TableInitOptions, TableRef, asyncfn,
};
use wasm_bindgen::prelude::*;
use wasm_bindgen_derive::TryFromJsValue;
use wasm_bindgen_futures::{JsFuture, future_to_promise};
pub use crate::table::*;
use crate::utils::{ApiError, ApiResult, JsValueSerdeExt, LocalPollLoop};
use crate::{TableDataExt, apierror};
#[wasm_bindgen]
extern "C" {
#[derive(Clone)]
#[wasm_bindgen(typescript_type = "TableInitOptions")]
pub type JsTableInitOptions;
#[derive(Clone)]
#[wasm_bindgen(typescript_type = "JoinOptions")]
pub type JsJoinOptions;
}
async fn js_to_table_ref(val: &JsValue) -> ApiResult<TableRef> {
if let Some(name) = val.as_string() {
Ok(TableRef::from(name))
} else {
let get_name = js_sys::Reflect::get(val, &wasm_bindgen::intern("get_name").into())
.map_err(|_| apierror!(TableRefError))?
.dyn_into::<js_sys::Function>()
.map_err(|_| apierror!(TableRefError))?;
let promise = get_name
.call0(val)
.map_err(|_| apierror!(TableRefError))?
.dyn_into::<js_sys::Promise>()
.map_err(|_| apierror!(TableRefError))?;
let name = wasm_bindgen_futures::JsFuture::from(promise)
.await
.map_err(|_| apierror!(TableRefError))?
.as_string()
.ok_or_else(|| apierror!(TableRefError))?;
Ok(TableRef::from(name))
}
}
#[wasm_bindgen]
#[derive(Clone)]
pub struct ProxySession(perspective_client::ProxySession);
#[wasm_bindgen]
impl ProxySession {
#[wasm_bindgen(constructor)]
pub fn new(client: &Client, on_response: &Function) -> Self {
let poll_loop = LocalPollLoop::new({
let on_response = on_response.clone();
move |msg: Vec<u8>| {
let msg = Uint8Array::from(&msg[..]);
on_response.call1(&JsValue::UNDEFINED, &JsValue::from(msg))?;
Ok(JsValue::null())
}
});
let on_response = Box::new(move |msg: &[u8]| {
wasm_bindgen_futures::spawn_local(poll_loop.poll(msg.to_vec()));
Ok(())
});
Self(perspective_client::ProxySession::new(
client.client.clone(),
on_response,
))
}
#[wasm_bindgen]
pub async fn handle_request(&self, value: JsValue) -> ApiResult<()> {
let uint8array = Uint8Array::new(&value);
let slice = uint8array.to_vec();
self.0.handle_request(&slice).await?;
Ok(())
}
pub async fn close(self) {
self.0.close().await;
}
}
#[wasm_bindgen]
#[derive(TryFromJsValue, Clone)]
pub struct Client {
pub(crate) client: perspective_client::Client,
pub(crate) close: Option<Function>,
}
impl From<perspective_client::Client> for Client {
fn from(client: perspective_client::Client) -> Self {
Client {
client,
close: None,
}
}
}
impl PartialEq for Client {
fn eq(&self, other: &Self) -> bool {
self.client.get_name() == other.client.get_name()
}
}
#[derive(Derivative)]
#[derivative(Clone(bound = ""))]
struct JsReconnect<I>(Arc<dyn Fn(I) -> js_sys::Promise>);
unsafe impl<I> Send for JsReconnect<I> {}
unsafe impl<I> Sync for JsReconnect<I> {}
impl<I> JsReconnect<I> {
fn run(&self, args: I) -> js_sys::Promise {
self.0(args)
}
fn run_all(
&self,
args: I,
) -> impl Future<Output = Result<(), Box<dyn Error + Send + Sync + 'static>>>
+ Send
+ Sync
+ 'static
+ use<I> {
let (sender, receiver) = oneshot::channel::<Result<(), Box<dyn Error + Send + Sync>>>();
let p = self.0(args);
let _ = future_to_promise(async move {
let result = JsFuture::from(p)
.await
.map(|_| ())
.map_err(|x| format!("{x:?}").into());
sender.send(result).unwrap();
Ok(JsValue::UNDEFINED)
});
async move { receiver.await.unwrap() }
}
}
impl<F, I> From<F> for JsReconnect<I>
where
F: Fn(I) -> js_sys::Promise + 'static,
{
fn from(value: F) -> Self {
JsReconnect(Arc::new(value))
}
}
impl Client {
pub fn get_client(&self) -> &'_ perspective_client::Client {
&self.client
}
}
#[wasm_bindgen]
impl Client {
#[wasm_bindgen(constructor)]
pub fn new(send_request: Function, close: Option<Function>) -> ApiResult<Self> {
let send_request = JsReconnect::from(move |mut v: Vec<u8>| {
let buff2 = unsafe { js_sys::Uint8Array::view_mut_raw(v.as_mut_ptr(), v.len()) };
send_request
.call1(&JsValue::UNDEFINED, &buff2)
.unwrap()
.unchecked_into::<js_sys::Promise>()
});
let client = perspective_client::Client::new_with_callback(None, move |msg| {
send_request.run_all(msg)
})?;
Ok(Client { close, client })
}
#[wasm_bindgen]
pub fn new_proxy_session(&self, on_response: &Function) -> ProxySession {
ProxySession::new(self, on_response)
}
#[wasm_bindgen]
pub async fn handle_response(&self, value: &JsValue) -> ApiResult<()> {
let uint8array = Uint8Array::new(value);
let slice = uint8array.to_vec();
self.client.handle_response(&slice).await?;
Ok(())
}
#[wasm_bindgen]
pub async fn handle_error(&self, error: String, reconnect: Option<Function>) -> ApiResult<()> {
self.client
.handle_error(
ClientError::Unknown(error),
reconnect.map(|reconnect| {
let reconnect =
JsReconnect::from(move |()| match reconnect.call0(&JsValue::UNDEFINED) {
Ok(x) => x.unchecked_into::<js_sys::Promise>(),
Err(e) => {
tracing::warn!("{:?}", e);
js_sys::Promise::reject(&format!("C {e:?}").into())
},
});
asyncfn!(reconnect, async move || {
if let Err(e) = JsFuture::from(reconnect.run(())).await {
if let Some(e) = e.dyn_ref::<js_sys::Object>() {
Err(ClientError::Unknown(e.to_string().as_string().unwrap()))
} else {
Err(ClientError::Unknown(e.as_string().unwrap()))
}
} else {
Ok(())
}
})
}),
)
.await?;
Ok(())
}
#[wasm_bindgen]
pub async fn on_error(&self, callback: Function) -> ApiResult<u32> {
let callback = JsReconnect::from(
move |(message, reconnect): (ClientError, Option<ReconnectCallback>)| {
let cl: Closure<dyn Fn() -> js_sys::Promise> = Closure::new(move || {
let reconnect = reconnect.clone();
future_to_promise(async move {
if let Some(f) = reconnect {
(*f)().await.map_err(|e| JsValue::from(format!("A {e}")))?;
}
Ok(JsValue::UNDEFINED)
})
});
if let Err(e) = callback.call2(
&JsValue::UNDEFINED,
&JsValue::from(apierror!(ClientError(message))),
&cl.into_js_value(),
) {
tracing::warn!("{:?}", e);
}
js_sys::Promise::resolve(&JsValue::UNDEFINED)
},
);
let poll_loop = LocalPollLoop::new_async(move |x| JsFuture::from(callback.run(x)));
let id = self
.client
.on_error(asyncfn!(poll_loop, async move |message, reconnect| {
poll_loop.poll((message, reconnect)).await;
Ok(())
}))
.await?;
Ok(id)
}
#[wasm_bindgen]
pub async fn table(
&self,
value: &JsTableInitData,
options: Option<JsTableInitOptions>,
) -> ApiResult<Table> {
let options = options
.into_serde_ext::<Option<TableInitOptions>>()?
.unwrap_or_default();
let args = TableData::from_js_value(value, options.format)?;
Ok(Table(self.client.table(args, options).await?))
}
#[wasm_bindgen]
pub async fn join(
&self,
left: JsValue,
right: JsValue,
on: &str,
options: Option<JsJoinOptions>,
) -> ApiResult<Table> {
let options = options
.into_serde_ext::<Option<perspective_client::JoinOptions>>()?
.unwrap_or_default();
let left_ref = js_to_table_ref(&left).await?;
let right_ref = js_to_table_ref(&right).await?;
Ok(Table(
self.client.join(left_ref, right_ref, on, options).await?,
))
}
#[wasm_bindgen]
pub fn terminate(&self) -> ApiResult<JsValue> {
if let Some(f) = self.close.clone() {
Ok(f.call0(&JsValue::UNDEFINED)?)
} else {
Err(ApiError::new("Client type cannot be terminated"))
}
}
#[wasm_bindgen]
pub async fn open_table(&self, entity_id: String) -> ApiResult<Table> {
Ok(Table(self.client.open_table(entity_id).await?))
}
#[wasm_bindgen]
pub async fn get_hosted_table_names(&self) -> ApiResult<Vec<String>> {
Ok(self.client.get_hosted_table_names().await?)
}
#[wasm_bindgen]
pub async fn on_hosted_tables_update(&self, on_update_js: Function) -> ApiResult<u32> {
let poll_loop = LocalPollLoop::new(move |_| on_update_js.call0(&JsValue::UNDEFINED));
let on_update = Box::new(move || poll_loop.poll(()));
let id = self.client.on_hosted_tables_update(on_update).await?;
Ok(id)
}
#[wasm_bindgen]
pub async fn remove_hosted_tables_update(&self, update_id: u32) -> ApiResult<()> {
self.client.remove_hosted_tables_update(update_id).await?;
Ok(())
}
#[wasm_bindgen]
pub async fn system_info(&self) -> ApiResult<JsSystemInfo> {
let mut info = self.client.system_info().await?;
#[cfg(feature = "trace-allocator")]
if let perspective_client::SystemInfo {
client_used: None,
client_heap: None,
..
} = &info
{
let (client_used, client_heap) = crate::utils::get_used();
info.client_used = Some(client_used as u64);
info.client_heap = Some(client_heap as u64);
};
let timestamp = web_sys::window()
.and_then(|x| x.performance())
.map(|x| x.now() as u64);
info.timestamp = timestamp;
let record = JsValue::from_serde_ext(&info.cast::<f64>())?;
Ok(record.unchecked_into())
}
}
#[wasm_bindgen]
extern "C" {
#[wasm_bindgen(typescript_type = "SystemInfo")]
pub type JsSystemInfo;
}