use std::collections::HashMap;
use std::pin::pin;
use std::str::FromStr;
use std::sync::Arc;
use futures::FutureExt;
use perspective_client::{
Client, ColumnWindow, DeleteOptions, OnUpdateData, OnUpdateMode, OnUpdateOptions, Table,
TableData, TableInitOptions, TableReadFormat, TableRef, UpdateData, UpdateOptions, View,
ViewWindow, assert_table_api, assert_view_api, asyncfn,
};
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use pyo3::types::{PyAny, PyBytes, PyDict, PyString};
use pythonize::depythonize;
use super::pandas::arrow_to_pandas;
use super::polars::arrow_to_polars;
use super::table_data::TableDataExt;
use super::update_data::UpdateDataExt;
use super::{pandas, polars, pyarrow};
use crate::py_async::{self, AllowThreads};
use crate::py_err::{PyPerspectiveError, ResultTClientErrorExt};
fn py_to_table_ref_async(val: &Bound<'_, PyAny>) -> PyResult<TableRef> {
if let Ok(t) = val.extract::<AsyncTable>() {
Ok(TableRef::from(t.table.as_ref()))
} else if let Ok(name) = val.extract::<String>() {
Ok(TableRef::from(name))
} else {
Err(pyo3::exceptions::PyTypeError::new_err(
"Expected a Table or string table name",
))
}
}
fn py_to_table_ref_from_owned(py: Python<'_>, val: &Py<PyAny>) -> PyResult<TableRef> {
let bound = val.bind(py);
py_to_table_ref_async(bound)
}
#[pyclass(module = "perspective")]
#[derive(Clone)]
pub struct AsyncClient {
pub(crate) client: Client,
close_cb: Arc<Option<Py<PyAny>>>,
}
impl AsyncClient {
pub fn new_from_client(client: Client) -> Self {
AsyncClient {
client,
close_cb: Arc::default(),
}
}
}
#[pymethods]
impl AsyncClient {
#[new]
#[pyo3(signature=(handle_request, handle_close=None, name=None))]
pub fn new(
handle_request: Py<PyAny>,
handle_close: Option<Py<PyAny>>,
name: Option<String>,
) -> PyResult<Self> {
let handle_request = Arc::new(handle_request);
let client = Client::new_with_callback(
name.as_deref(),
asyncfn!(handle_request, async move |msg| {
if let Some(fut) = Python::with_gil(move |py| -> PyResult<_> {
let ret = handle_request.call1(py, (PyBytes::new(py, &msg),))?;
if isawaitable(ret.bind(py)).unwrap_or(false) {
Ok(Some(py_async::py_into_future(ret.into_bound(py))?))
} else {
Ok(None)
}
})? {
let result = fut.await;
Python::with_gil(|_| {
result
.map(|_| ())
.map_err(perspective_server::ServerError::from)
})?
}
Ok(())
}),
);
Ok(AsyncClient {
client: client.into_pyerr()?,
close_cb: handle_close.into(),
})
}
pub async fn handle_response(&self, bytes: Py<PyBytes>) -> PyResult<bool> {
self.client
.handle_response(Python::with_gil(|py| bytes.as_bytes(py)))
.await
.into_pyerr()
}
#[pyo3(signature=(input, limit=None, index=None, name=None, format=None))]
pub async fn table(
&self,
input: Py<PyAny>,
limit: Option<u32>,
index: Option<Py<PyString>>,
name: Option<Py<PyString>>,
format: Option<Py<PyString>>,
) -> PyResult<AsyncTable> {
let client = self.client.clone();
let py_client = Python::with_gil(|_| self.clone());
let table = Python::with_gil(|py| {
let mut options = TableInitOptions {
name: name.map(|x| x.extract::<String>(py)).transpose()?,
..TableInitOptions::default()
};
let format = TableReadFormat::parse(format.map(|x| x.to_string()))
.map_err(PyPerspectiveError::new_err)?;
match (limit, index) {
(None, None) => {},
(None, Some(index)) => {
options.index = Some(index.extract::<String>(py)?);
},
(Some(limit), None) => options.limit = Some(limit),
(Some(_), Some(_)) => {
Err(PyValueError::new_err("Cannot set both `limit` and `index`"))?
},
};
let input = input.into_bound(py);
let input_data = if pyarrow::is_arrow_table(py, &input)? {
pyarrow::to_arrow_bytes(py, &input)?.into_any()
} else if pandas::is_pandas_df(py, &input)? {
pandas::pandas_to_arrow_bytes(py, &input)?.into_any()
} else if polars::is_polars_df(py, &input)? || polars::is_polars_lf(py, &input)? {
polars::polars_to_arrow_bytes(py, &input)?.into_any()
} else {
input
};
let table_data = TableData::from_py(input_data, format)?;
let table = client.table(table_data, options);
Ok::<_, PyErr>(table)
})?;
let table = table.await.into_pyerr()?;
Ok(AsyncTable {
table: Arc::new(table),
client: py_client,
})
}
pub async fn open_table(&self, name: String) -> PyResult<AsyncTable> {
let client = self.client.clone();
let py_client = self.clone();
let table = client.open_table(name).await.into_pyerr()?;
Ok(AsyncTable {
table: Arc::new(table),
client: py_client,
})
}
#[pyo3(signature = (left, right, on, join_type=None, name=None, right_on=None))]
pub async fn join(
&self,
left: Py<PyAny>,
right: Py<PyAny>,
on: String,
join_type: Option<String>,
name: Option<String>,
right_on: Option<String>,
) -> PyResult<AsyncTable> {
let (left_ref, right_ref) = Python::with_gil(|py| {
let left_ref = py_to_table_ref_from_owned(py, &left)?;
let right_ref = py_to_table_ref_from_owned(py, &right)?;
Ok::<_, PyErr>((left_ref, right_ref))
})?;
let jt = super::client_sync::parse_join_type(join_type.as_deref())?;
let options = perspective_client::JoinOptions {
join_type: Some(jt),
name,
right_on,
};
let py_client = self.clone();
let table = self
.client
.join(left_ref, right_ref, &on, options)
.await
.into_pyerr()?;
Ok(AsyncTable {
table: Arc::new(table),
client: py_client,
})
}
pub async fn get_hosted_table_names(&self) -> PyResult<Vec<String>> {
self.client.get_hosted_table_names().await.into_pyerr()
}
pub async fn on_hosted_tables_update(&self, callback_py: Py<PyAny>) -> PyResult<u32> {
let callback = Box::new(move || {
let callback = Python::with_gil(|py| Py::clone_ref(&callback_py, py));
async move {
let aggregate_errors: PyResult<()> = {
let callback = Python::with_gil(|py| Py::clone_ref(&callback, py));
Python::with_gil(|py| {
callback.call0(py)?;
Ok(())
})
};
if let Err(err) = aggregate_errors {
tracing::warn!("Error in on_hosted_tables_update callback: {:?}", err);
}
}
.boxed()
});
let callback_id = self
.client
.on_hosted_tables_update(callback)
.await
.into_pyerr()?;
Ok(callback_id)
}
pub async fn remove_hosted_tables_update(&self, id: u32) -> PyResult<()> {
self.client
.remove_hosted_tables_update(id)
.await
.into_pyerr()
}
pub async fn system_info(&self) -> PyResult<Py<PyAny>> {
let sysinfo = self.client.system_info().await.into_pyerr()?;
Python::with_gil(|py| Ok(pythonize::pythonize(py, &sysinfo)?.unbind()))
}
pub fn terminate(&self, py: Python<'_>) -> PyResult<()> {
if let Some(cb) = &*self.close_cb {
cb.call0(py)?;
}
Ok(())
}
}
#[pyclass]
#[derive(Clone)]
pub struct AsyncTable {
pub(super) table: Arc<Table>,
pub(super) client: AsyncClient,
}
assert_table_api!(AsyncTable);
#[pymethods]
impl AsyncTable {
pub fn get_index(&self) -> Option<String> {
self.table.get_index()
}
pub async fn get_client(&self) -> AsyncClient {
AsyncClient {
client: self.table.get_client(),
close_cb: self.client.close_cb.clone(),
}
}
pub fn get_limit(&self) -> Option<u32> {
self.table.get_limit()
}
pub fn get_name(&self) -> String {
self.table.get_name().into()
}
pub async fn size(&self) -> PyResult<usize> {
self.table.size().await.into_pyerr()
}
pub async fn columns(&self) -> PyResult<Vec<String>> {
self.table.columns().await.into_pyerr()
}
pub async fn clear(&self) -> PyResult<()> {
self.table.clear().await.into_pyerr()
}
#[pyo3(signature=(lazy=false))]
pub async fn delete(&self, lazy: bool) -> PyResult<()> {
self.table.delete(DeleteOptions { lazy }).await.into_pyerr()
}
pub async fn make_port(&self) -> PyResult<i32> {
self.table.make_port().await.into_pyerr()
}
pub async fn on_delete(&self, callback_py: Py<PyAny>) -> PyResult<u32> {
let callback = {
let callback_py = Python::with_gil(|py| Py::clone_ref(&callback_py, py));
Box::new(move || {
Python::with_gil(|py| callback_py.call0(py))
.expect("`on_delete()` callback failed");
})
};
let callback_id = self.table.on_delete(callback).await.into_pyerr()?;
Ok(callback_id)
}
pub async fn remove_delete(&self, callback_id: u32) -> PyResult<()> {
self.table.remove_delete(callback_id).await.into_pyerr()
}
#[pyo3(signature=(input, format=None))]
pub async fn remove(&self, input: Py<PyAny>, format: Option<String>) -> PyResult<()> {
let table = &self.table;
let format = TableReadFormat::parse(format).map_err(PyPerspectiveError::new_err)?;
let table_data = Python::with_gil(|py| UpdateData::from_py(input.into_bound(py), format))?;
table.remove(table_data).await.into_pyerr()
}
#[pyo3(signature=(input, format=None))]
pub async fn replace(&self, input: Py<PyAny>, format: Option<String>) -> PyResult<()> {
let table = &self.table;
let format = TableReadFormat::parse(format).map_err(PyPerspectiveError::new_err)?;
let table_data = Python::with_gil(|py| UpdateData::from_py(input.into_bound(py), format))?;
table.replace(table_data).await.into_pyerr()
}
#[pyo3(signature=(input, port_id=None, format=None))]
pub async fn update(
&self,
input: Py<PyAny>,
port_id: Option<u32>,
format: Option<String>,
) -> PyResult<()> {
let input_data: Py<PyAny> = Python::with_gil(|py| {
let input = input.into_bound(py);
let data = if pyarrow::is_arrow_table(py, &input)? {
pyarrow::to_arrow_bytes(py, &input)?.into_any()
} else if pandas::is_pandas_df(py, &input)? {
pandas::pandas_to_arrow_bytes(py, &input)?.into_any()
} else if polars::is_polars_df(py, &input)? || polars::is_polars_lf(py, &input)? {
polars::polars_to_arrow_bytes(py, &input)?.into_any()
} else {
input
};
Ok(data.unbind()) as PyResult<Py<PyAny>>
})?;
let table = &self.table;
let format = TableReadFormat::parse(format).map_err(PyPerspectiveError::new_err)?;
let table_data =
Python::with_gil(|py| UpdateData::from_py(input_data.into_bound(py), format))?;
let options = UpdateOptions { port_id, format };
AllowThreads(pin!(table.update(table_data, options)))
.await
.into_pyerr()?;
Ok(())
}
pub async fn validate_expressions(&self, expressions: Py<PyAny>) -> PyResult<Py<PyAny>> {
let expressions = Python::with_gil(|py| depythonize(expressions.bind(py)))?;
let records = self
.table
.validate_expressions(expressions)
.await
.into_pyerr()?;
Python::with_gil(|py| Ok(pythonize::pythonize(py, &records)?.unbind()))
}
pub async fn schema(&self) -> PyResult<HashMap<String, String>> {
let schema = self.table.schema().await.into_pyerr()?;
Ok(schema
.into_iter()
.map(|(x, y)| (x, format!("{y}")))
.collect())
}
#[pyo3(signature = (**kwargs))]
pub async fn view(&self, kwargs: Option<Py<PyDict>>) -> PyResult<AsyncView> {
let config = kwargs
.map(|config| Python::with_gil(|py| depythonize(config.bind(py))))
.transpose()?;
let view = self.table.view(config).await.into_pyerr()?;
Ok(AsyncView {
view: Arc::new(view),
_client: self.client.clone(),
})
}
}
#[pyclass]
#[derive(Clone)]
pub struct AsyncView {
pub(crate) view: Arc<View>,
_client: AsyncClient,
}
assert_view_api!(AsyncView);
#[pymethods]
impl AsyncView {
pub async fn column_paths(&self, window: Option<Py<PyDict>>) -> PyResult<Vec<String>> {
let window: ColumnWindow = Python::with_gil(|py| window.map(|x| depythonize(x.bind(py))))
.transpose()?
.unwrap_or_default();
self.view.column_paths(window).await.into_pyerr()
}
pub async fn delete(&self) -> PyResult<()> {
self.view.delete().await.into_pyerr()
}
pub async fn dimensions(&self) -> PyResult<Py<PyAny>> {
let dim = self.view.dimensions().await.into_pyerr()?;
Python::with_gil(|py| Ok(pythonize::pythonize(py, &dim)?.unbind()))
}
pub async fn expand(&self, index: u32) -> PyResult<u32> {
self.view.expand(index).await.into_pyerr()
}
pub async fn collapse(&self, index: u32) -> PyResult<u32> {
self.view.collapse(index).await.into_pyerr()
}
pub async fn expression_schema(&self) -> PyResult<HashMap<String, String>> {
Ok(self
.view
.expression_schema()
.await
.into_pyerr()?
.into_iter()
.map(|(k, v)| (k, format!("{v}")))
.collect())
}
pub async fn get_config(&self) -> PyResult<Py<PyAny>> {
let config = self.view.get_config().await.into_pyerr()?;
Python::with_gil(|py| Ok(pythonize::pythonize(py, &config)?.unbind()))
}
pub async fn get_min_max(&self, name: String) -> PyResult<(PyObject, PyObject)> {
let (min, max) = self.view.get_min_max(name).await.into_pyerr()?;
Python::with_gil(|py| {
Ok((
super::client_sync::scalar_to_py(py, &min),
super::client_sync::scalar_to_py(py, &max),
))
})
}
pub async fn num_rows(&self) -> PyResult<u32> {
self.view.num_rows().await.into_pyerr()
}
pub async fn num_columns(&self) -> PyResult<u32> {
let dim = self.view.dimensions().await.into_pyerr()?;
Ok(dim.num_view_columns)
}
pub async fn schema(&self) -> PyResult<HashMap<String, String>> {
Ok(self
.view
.schema()
.await
.into_pyerr()?
.into_iter()
.map(|(k, v)| (k, format!("{v}")))
.collect())
}
pub async fn on_delete(&self, callback_py: Py<PyAny>) -> PyResult<u32> {
let callback = {
let callback_py = Arc::new(callback_py);
Box::new(move || {
Python::with_gil(|py| callback_py.call0(py))
.expect("`on_delete()` callback failed");
})
};
let callback_id = self.view.on_delete(callback).await.into_pyerr()?;
Ok(callback_id)
}
pub async fn remove_delete(&self, callback_id: u32) -> PyResult<()> {
self.view.remove_delete(callback_id).await.into_pyerr()
}
#[pyo3(signature=(callback, mode=None))]
pub async fn on_update(&self, callback: Py<PyAny>, mode: Option<String>) -> PyResult<u32> {
let callback = move |x: OnUpdateData| {
let callback = Python::with_gil(|py| Py::clone_ref(&callback, py));
async move {
let aggregate_errors: PyResult<()> = {
let callback = Python::with_gil(|py| Py::clone_ref(&callback, py));
Python::with_gil(|py| {
match &x.delta {
None => callback.call1(py, (x.port_id,))?,
Some(delta) => {
callback.call1(py, (x.port_id, PyBytes::new(py, delta)))?
},
};
Ok(())
})
};
if let Err(err) = aggregate_errors {
tracing::warn!("Error in on_update callback: {:?}", err);
}
}
.boxed()
};
let mode = mode
.map(|x| OnUpdateMode::from_str(x.as_str()))
.transpose()
.into_pyerr()?;
self.view
.on_update(Box::new(callback), OnUpdateOptions { mode })
.await
.into_pyerr()
}
pub async fn remove_update(&self, callback_id: u32) -> PyResult<()> {
self.view.remove_update(callback_id).await.into_pyerr()
}
#[pyo3(signature=(**window))]
pub async fn to_dataframe(&self, window: Option<Py<PyDict>>) -> PyResult<Py<PyAny>> {
let window: ViewWindow = Python::with_gil(|py| window.map(|x| depythonize(x.bind(py))))
.transpose()?
.unwrap_or_default();
let arrow = self.view.to_arrow(window).await.into_pyerr()?;
Python::with_gil(|py| arrow_to_pandas(py, &arrow))
}
#[pyo3(signature=(**window))]
pub async fn to_polars(&self, window: Option<Py<PyDict>>) -> PyResult<Py<PyAny>> {
let window: ViewWindow = Python::with_gil(|py| window.map(|x| depythonize(x.bind(py))))
.transpose()?
.unwrap_or_default();
let arrow = self.view.to_arrow(window).await.into_pyerr()?;
Python::with_gil(|py| arrow_to_polars(py, &arrow))
}
#[pyo3(signature=(**window))]
pub async fn to_arrow(&self, window: Option<Py<PyDict>>) -> PyResult<Py<PyBytes>> {
let window: ViewWindow = Python::with_gil(|py| window.map(|x| depythonize(x.bind(py))))
.transpose()?
.unwrap_or_default();
let arrow = self.view.to_arrow(window).await.into_pyerr()?;
Ok(Python::with_gil(|py| PyBytes::new(py, &arrow).into()))
}
#[pyo3(signature=(**window))]
pub async fn to_csv(&self, window: Option<Py<PyDict>>) -> PyResult<String> {
let window: ViewWindow = Python::with_gil(|py| window.map(|x| depythonize(x.bind(py))))
.transpose()?
.unwrap_or_default();
self.view.to_csv(window).await.into_pyerr()
}
#[pyo3(signature=(**window))]
pub async fn to_columns_string(&self, window: Option<Py<PyDict>>) -> PyResult<String> {
let window: ViewWindow = Python::with_gil(|py| window.map(|x| depythonize(x.bind(py))))
.transpose()?
.unwrap_or_default();
self.view.to_columns_string(window).await.into_pyerr()
}
#[pyo3(signature = (**window))]
pub async fn to_columns(&self, window: Option<Py<PyDict>>) -> PyResult<Py<PyAny>> {
let json = self.to_columns_string(window).await?;
Python::with_gil(|py| {
let json_module = PyModule::import(py, "json")?;
let records = json_module.call_method1("loads", (json,))?;
Ok(records.unbind())
})
}
#[pyo3(signature=(window=None))]
pub async fn to_json_string(&self, window: Option<Py<PyDict>>) -> PyResult<String> {
let window: ViewWindow = Python::with_gil(|py| window.map(|x| depythonize(x.bind(py))))
.transpose()?
.unwrap_or_default();
self.view.to_json_string(window).await.into_pyerr()
}
#[pyo3(signature=(window=None))]
pub async fn to_ndjson(&self, window: Option<Py<PyDict>>) -> PyResult<String> {
let window: ViewWindow = Python::with_gil(|py| window.map(|x| depythonize(x.bind(py))))
.transpose()?
.unwrap_or_default();
self.view.to_ndjson(window).await.into_pyerr()
}
#[pyo3(signature = (**window))]
pub async fn to_records(&self, window: Option<Py<PyDict>>) -> PyResult<Py<PyAny>> {
let json = self.to_json_string(window).await?;
Python::with_gil(|py| {
let json_module = PyModule::import(py, "json")?;
let records = json_module.call_method1("loads", (json,))?;
Ok(records.unbind())
})
}
#[pyo3(signature = (**window))]
pub async fn to_json(&self, window: Option<Py<PyDict>>) -> PyResult<Py<PyAny>> {
self.to_records(window).await
}
}
fn isawaitable(object: &Bound<'_, PyAny>) -> PyResult<bool> {
let py = object.py();
py.import("inspect")?
.call_method1("isawaitable", (object,))?
.extract()
}