use std::collections::HashMap;
use std::future::Future;
use std::sync::Arc;
use perspective_client::config::Scalar;
use perspective_client::{JoinType, TableRef, assert_table_api, assert_view_api};
#[cfg(doc)]
use perspective_client::{TableInitOptions, UpdateOptions, config::ViewConfigUpdate};
use pyo3::exceptions::PyTypeError;
use pyo3::marker::Ungil;
use pyo3::prelude::*;
use pyo3::types::*;
use super::client_async::*;
use crate::py_err::ResultTClientErrorExt;
use crate::server::Server;
pub(crate) fn py_to_table_ref(val: &Bound<'_, PyAny>) -> PyResult<TableRef> {
if let Ok(t) = val.downcast::<Table>() {
let table_ref = t.borrow();
Ok(TableRef::from(&*table_ref.0.table))
} else if let Ok(name) = val.extract::<String>() {
Ok(TableRef::from(name))
} else {
Err(PyTypeError::new_err(
"Expected a Table or string table name",
))
}
}
pub(crate) fn parse_join_type(join_type: Option<&str>) -> PyResult<JoinType> {
match join_type {
Some("left") => Ok(JoinType::Left),
Some("outer") => Ok(JoinType::Outer),
None | Some("inner") => Ok(JoinType::Inner),
Some(other) => Err(pyo3::exceptions::PyValueError::new_err(format!(
"Unknown join type: \"{}\"",
other
))),
}
}
pub(crate) fn scalar_to_py(py: Python<'_>, scalar: &Scalar) -> PyObject {
match scalar {
Scalar::Float(x) => x.into_pyobject(py).unwrap().into_any().unbind(),
Scalar::String(x) => x.into_pyobject(py).unwrap().into_any().unbind(),
Scalar::Bool(x) => x.into_pyobject(py).unwrap().to_owned().into_any().unbind(),
Scalar::Null => py.None(),
}
}
pub(crate) trait PyFutureExt: Future {
fn py_block_on(self, py: Python<'_>) -> Self::Output
where
Self: Sized + Send,
Self::Output: Ungil,
{
use pollster::FutureExt;
py.allow_threads(move || self.block_on())
}
}
impl<F: Future> PyFutureExt for F {}
#[pyclass(subclass, module = "perspective")]
pub struct Client(pub(crate) AsyncClient);
#[pymethods]
impl Client {
#[new]
#[pyo3(signature = (handle_request, close_cb=None, name=None))]
pub fn new(
handle_request: Py<PyAny>,
close_cb: Option<Py<PyAny>>,
name: Option<String>,
) -> PyResult<Self> {
let client = AsyncClient::new(handle_request, close_cb, name)?;
Ok(Client(client))
}
#[staticmethod]
pub fn from_server(py: Python<'_>, server: Py<Server>) -> PyResult<Self> {
server.borrow(py).new_local_client()
}
pub fn handle_response(&self, py: Python<'_>, response: Py<PyBytes>) -> PyResult<bool> {
self.0.handle_response(response).py_block_on(py)
}
#[pyo3(signature = (input, limit=None, index=None, name=None, format=None))]
pub fn table(
&self,
py: Python<'_>,
input: Py<PyAny>,
limit: Option<u32>,
index: Option<Py<PyString>>,
name: Option<Py<PyString>>,
format: Option<Py<PyString>>,
) -> PyResult<Table> {
Ok(Table(
self.0
.table(input, limit, index, name, format)
.py_block_on(py)?,
))
}
pub fn open_table(&self, py: Python<'_>, name: String) -> PyResult<Table> {
let client = self.0.clone();
let table = client.open_table(name).py_block_on(py)?;
Ok(Table(table))
}
#[pyo3(signature = (left, right, on, join_type=None, name=None, right_on=None))]
#[allow(clippy::too_many_arguments, reason = "This is a Python API")]
pub fn join(
&self,
py: Python<'_>,
left: &Bound<'_, PyAny>,
right: &Bound<'_, PyAny>,
on: String,
join_type: Option<String>,
name: Option<String>,
right_on: Option<String>,
) -> PyResult<Table> {
let left_ref = py_to_table_ref(left)?;
let right_ref = py_to_table_ref(right)?;
let jt = parse_join_type(join_type.as_deref())?;
let options = perspective_client::JoinOptions {
join_type: Some(jt),
name,
right_on,
};
let table = self
.0
.client
.join(left_ref, right_ref, &on, options)
.py_block_on(py)
.into_pyerr()?;
Ok(Table(AsyncTable {
table: Arc::new(table),
client: self.0.clone(),
}))
}
pub fn get_hosted_table_names(&self, py: Python<'_>) -> PyResult<Vec<String>> {
self.0.get_hosted_table_names().py_block_on(py)
}
pub fn on_hosted_tables_update(&self, py: Python<'_>, callback: Py<PyAny>) -> PyResult<u32> {
self.0.on_hosted_tables_update(callback).py_block_on(py)
}
pub fn remove_hosted_tables_update(&self, py: Python<'_>, callback_id: u32) -> PyResult<()> {
self.0
.remove_hosted_tables_update(callback_id)
.py_block_on(py)
}
pub fn system_info(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
self.0.system_info().py_block_on(py)
}
pub fn terminate(&self, py: Python<'_>) -> PyResult<()> {
self.0.terminate(py)
}
}
#[pyclass(subclass, name = "Table", module = "perspective")]
pub struct Table(AsyncTable);
assert_table_api!(Table);
#[pymethods]
impl Table {
#[new]
fn new() -> PyResult<Self> {
Err(PyTypeError::new_err(
"Do not call Table's constructor directly, construct from a Client instance.",
))
}
pub fn get_index(&self) -> Option<String> {
self.0.get_index()
}
pub fn get_client(&self, py: Python<'_>) -> Client {
Client(self.0.get_client().py_block_on(py))
}
pub fn get_limit(&self) -> Option<u32> {
self.0.get_limit()
}
pub fn get_name(&self) -> String {
self.0.get_name()
}
pub fn clear(&self, py: Python<'_>) -> PyResult<()> {
self.0.clear().py_block_on(py)
}
pub fn columns(&self, py: Python<'_>) -> PyResult<Vec<String>> {
self.0.columns().py_block_on(py)
}
#[pyo3(signature=(lazy=false))]
pub fn delete(&self, py: Python<'_>, lazy: bool) -> PyResult<()> {
self.0.delete(lazy).py_block_on(py)
}
pub fn make_port(&self, py: Python<'_>) -> PyResult<i32> {
let table = self.0.clone();
table.make_port().py_block_on(py)
}
pub fn on_delete(&self, py: Python<'_>, callback: Py<PyAny>) -> PyResult<u32> {
let table = self.0.clone();
table.on_delete(callback).py_block_on(py)
}
#[pyo3(signature = (input, format=None))]
pub fn remove(&self, py: Python<'_>, input: Py<PyAny>, format: Option<String>) -> PyResult<()> {
let table = self.0.clone();
table.remove(input, format).py_block_on(py)
}
pub fn remove_delete(&self, py: Python<'_>, callback_id: u32) -> PyResult<()> {
let table = self.0.clone();
table.remove_delete(callback_id).py_block_on(py)
}
pub fn schema(&self, py: Python<'_>) -> PyResult<HashMap<String, String>> {
let table = self.0.clone();
table.schema().py_block_on(py)
}
pub fn validate_expressions(
&self,
py: Python<'_>,
expression: Py<PyAny>,
) -> PyResult<Py<PyAny>> {
let table = self.0.clone();
table.validate_expressions(expression).py_block_on(py)
}
#[pyo3(signature = (**config))]
pub fn view(&self, py: Python<'_>, config: Option<Py<PyDict>>) -> PyResult<View> {
Ok(View(self.0.view(config).py_block_on(py)?))
}
pub fn size(&self, py: Python<'_>) -> PyResult<usize> {
self.0.size().py_block_on(py)
}
#[pyo3(signature = (input, format=None))]
pub fn replace(
&self,
py: Python<'_>,
input: Py<PyAny>,
format: Option<String>,
) -> PyResult<()> {
self.0.replace(input, format).py_block_on(py)
}
#[pyo3(signature = (input, port_id=None, format=None))]
pub fn update(
&self,
py: Python<'_>,
input: Py<PyAny>,
port_id: Option<u32>,
format: Option<String>,
) -> PyResult<()> {
self.0.update(input, port_id, format).py_block_on(py)
}
}
#[pyclass(subclass, name = "View", module = "perspective")]
pub struct View(pub(crate) AsyncView);
assert_view_api!(View);
#[pymethods]
impl View {
#[new]
fn new() -> PyResult<Self> {
Err(PyTypeError::new_err(
"Do not call View's constructor directly, construct from a Table instance.",
))
}
#[pyo3(signature = (**window))]
pub fn column_paths(
&self,
py: Python<'_>,
window: Option<Py<PyDict>>,
) -> PyResult<Vec<String>> {
self.0.column_paths(window).py_block_on(py)
}
#[pyo3(signature = (**window))]
pub fn to_columns_string(
&self,
py: Python<'_>,
window: Option<Py<PyDict>>,
) -> PyResult<String> {
self.0.to_columns_string(window).py_block_on(py)
}
#[pyo3(signature = (**window))]
pub fn to_json_string(&self, py: Python<'_>, window: Option<Py<PyDict>>) -> PyResult<String> {
self.0.to_json_string(window).py_block_on(py)
}
#[pyo3(signature = (**window))]
pub fn to_ndjson(&self, py: Python<'_>, window: Option<Py<PyDict>>) -> PyResult<String> {
self.0.to_ndjson(window).py_block_on(py)
}
#[pyo3(signature = (**window))]
pub fn to_records<'a>(
&self,
py: Python<'a>,
window: Option<Py<PyDict>>,
) -> PyResult<Bound<'a, PyAny>> {
let json = self.0.to_json_string(window).py_block_on(py)?;
let json_module = PyModule::import(py, "json")?;
json_module.call_method1("loads", (json,))
}
#[pyo3(signature = (**window))]
pub fn to_json<'a>(
&self,
py: Python<'a>,
window: Option<Py<PyDict>>,
) -> PyResult<Bound<'a, PyAny>> {
self.to_records(py, window)
}
#[pyo3(signature = (**window))]
pub fn to_columns<'a>(
&self,
py: Python<'a>,
window: Option<Py<PyDict>>,
) -> PyResult<Bound<'a, PyAny>> {
let json = self.0.to_columns_string(window).py_block_on(py)?;
let json_module = PyModule::import(py, "json")?;
json_module.call_method1("loads", (json,))
}
#[pyo3(signature = (**window))]
pub fn to_csv(&self, py: Python<'_>, window: Option<Py<PyDict>>) -> PyResult<String> {
self.0.to_csv(window).py_block_on(py)
}
#[pyo3(signature = (**window))]
pub fn to_dataframe(&self, py: Python<'_>, window: Option<Py<PyDict>>) -> PyResult<Py<PyAny>> {
self.0.to_dataframe(window).py_block_on(py)
}
#[pyo3(signature = (**window))]
pub fn to_pandas(&self, py: Python<'_>, window: Option<Py<PyDict>>) -> PyResult<Py<PyAny>> {
self.0.to_dataframe(window).py_block_on(py)
}
#[pyo3(signature = (**window))]
pub fn to_polars(&self, py: Python<'_>, window: Option<Py<PyDict>>) -> PyResult<Py<PyAny>> {
self.0.to_polars(window).py_block_on(py)
}
#[pyo3(signature = (**window))]
pub fn to_arrow(&self, py: Python<'_>, window: Option<Py<PyDict>>) -> PyResult<Py<PyBytes>> {
self.0.to_arrow(window).py_block_on(py)
}
pub fn delete(&self, py: Python<'_>) -> PyResult<()> {
self.0.delete().py_block_on(py)
}
pub fn expand(&self, py: Python<'_>, index: u32) -> PyResult<u32> {
self.0.expand(index).py_block_on(py)
}
pub fn collapse(&self, py: Python<'_>, index: u32) -> PyResult<u32> {
self.0.collapse(index).py_block_on(py)
}
pub fn dimensions(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
self.0.dimensions().py_block_on(py)
}
pub fn expression_schema(&self, py: Python<'_>) -> PyResult<HashMap<String, String>> {
self.0.expression_schema().py_block_on(py)
}
pub fn get_config(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
self.0.get_config().py_block_on(py)
}
pub fn get_min_max(
&self,
py: Python<'_>,
column_name: String,
) -> PyResult<(PyObject, PyObject)> {
self.0.get_min_max(column_name).py_block_on(py)
}
pub fn num_rows(&self, py: Python<'_>) -> PyResult<u32> {
self.0.num_rows().py_block_on(py)
}
pub fn num_columns(&self, py: Python<'_>) -> PyResult<u32> {
self.0.num_columns().py_block_on(py)
}
pub fn schema(&self, py: Python<'_>) -> PyResult<HashMap<String, String>> {
self.0.schema().py_block_on(py)
}
pub fn on_delete(&self, py: Python<'_>, callback: Py<PyAny>) -> PyResult<u32> {
self.0.on_delete(callback).py_block_on(py)
}
pub fn remove_delete(&self, py: Python<'_>, callback_id: u32) -> PyResult<()> {
self.0.remove_delete(callback_id).py_block_on(py)
}
#[pyo3(signature = (callback, mode=None))]
pub fn on_update(
&self,
py: Python<'_>,
callback: Py<PyAny>,
mode: Option<String>,
) -> PyResult<u32> {
self.0.on_update(callback, mode).py_block_on(py)
}
pub fn remove_update(&self, py: Python<'_>, callback_id: u32) -> PyResult<()> {
self.0.remove_update(callback_id).py_block_on(py)
}
}