perspective_python/client/
client_sync.rs

1// ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
2// ┃ ██████ ██████ ██████       █      █      █      █      █ █▄  ▀███ █       ┃
3// ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█  ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄  ▀█ █ ▀▀▀▀▀ ┃
4// ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄   █ ▄▄▄▄▄ ┃
5// ┃ █      ██████ █  ▀█▄       █ ██████      █      ███▌▐███ ███████▄ █       ┃
6// ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫
7// ┃ Copyright (c) 2017, the Perspective Authors.                              ┃
8// ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃
9// ┃ This file is part of the Perspective library, distributed under the terms ┃
10// ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃
11// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
12
13use std::collections::HashMap;
14use std::future::Future;
15
16#[cfg(doc)]
17use perspective_client::{Schema, TableInitOptions, UpdateOptions, config::ViewConfigUpdate};
18use perspective_client::{assert_table_api, assert_view_api};
19use pyo3::exceptions::PyTypeError;
20use pyo3::marker::Ungil;
21use pyo3::prelude::*;
22use pyo3::types::*;
23
24use super::client_async::*;
25use crate::server::PyServer;
26
27pub(crate) trait PyFutureExt: Future {
28    fn py_block_on(self, py: Python<'_>) -> Self::Output
29    where
30        Self: Sized + Send,
31        Self::Output: Ungil,
32    {
33        use pollster::FutureExt;
34        py.allow_threads(move || self.block_on())
35    }
36}
37
38impl<F: Future> PyFutureExt for F {}
39
40/// An instance of a [`Client`] is a unique connection to a single
41/// `perspective_server::Server`, whether locally in-memory or remote over some
42/// transport like a WebSocket.
43///
44/// [`Client`] and Perspective objects derived from it have _synchronous_ APIs,
45/// suitable for use in a repl or script context where this is the _only_
46/// [`Client`] connected to its [`perspective_server::Server`]. If you want to
47/// integrate with a Web framework or otherwise connect multiple clients,
48/// use [`AsyncClient`].
49#[pyclass(subclass, module = "perspective")]
50pub struct Client(pub(crate) AsyncClient);
51
52#[pymethods]
53impl Client {
54    #[new]
55    #[pyo3(signature = (handle_request, close_cb=None, name=None))]
56    pub fn new(
57        handle_request: Py<PyAny>,
58        close_cb: Option<Py<PyAny>>,
59        name: Option<String>,
60    ) -> PyResult<Self> {
61        let client = AsyncClient::new(handle_request, close_cb, name)?;
62        Ok(Client(client))
63    }
64
65    #[staticmethod]
66    pub fn from_server(py: Python<'_>, server: Py<PyServer>) -> PyResult<Self> {
67        server.borrow(py).new_local_client()
68    }
69
70    /// Handle a message from the external message queue.
71    /// [`Client::handle_response`] is part of the low-level message-handling
72    /// API necessary to implement new transports for a [`Client`]
73    /// connection to a local-or-remote `perspective_server::Server`, and
74    /// doesn't generally need to be called directly by "users" of a
75    /// [`Client`] once connected.
76    pub fn handle_response(&self, py: Python<'_>, response: Py<PyBytes>) -> PyResult<bool> {
77        self.0.handle_response(response).py_block_on(py)
78    }
79
80    /// Creates a new [`Table`] from either a _schema_ or _data_.
81    ///
82    /// The [`Client::table`] factory function can be initialized with either a
83    /// _schema_ (see [`Table::schema`]), or data in one of these formats:
84    ///
85    /// - Apache Arrow
86    /// - CSV
87    /// - JSON row-oriented
88    /// - JSON column-oriented
89    ///
90    /// When instantiated with _data_, the schema is inferred from this data.
91    /// While this is convenient, inferrence is sometimes imperfect e.g.
92    /// when the input is empty, null or ambiguous. For these cases,
93    /// [`Client::table`] can first be instantiated with a explicit schema.
94    ///
95    /// When instantiated with a _schema_, the resulting [`Table`] is empty but
96    /// with known column names and column types. When subsqeuently
97    /// populated with [`Table::update`], these columns will be _coerced_ to
98    /// the schema's type. This behavior can be useful when
99    /// [`Client::table`]'s column type inferences doesn't work.
100    ///
101    /// The resulting [`Table`] is _virtual_, and invoking its methods
102    /// dispatches events to the `perspective_server::Server` this
103    /// [`Client`] connects to, where the data is stored and all calculation
104    /// occurs.
105    ///
106    /// # Arguments
107    ///
108    /// - `arg` - Either _schema_ or initialization _data_.
109    /// - `options` - Optional configuration which provides one of:
110    ///     - `limit` - The max number of rows the resulting [`Table`] can
111    ///       store.
112    ///     - `index` - The column name to use as an _index_ column. If this
113    ///       `Table` is being instantiated by _data_, this column name must be
114    ///       present in the data.
115    ///     - `name` - The name of the table. This will be generated if it is
116    ///       not provided.
117    ///     - `format` - The explicit format of the input data, can be one of
118    ///       `"json"`, `"columns"`, `"csv"` or `"arrow"`. This overrides
119    ///       language-specific type dispatch behavior, which allows stringified
120    ///       and byte array alternative inputs.
121    ///
122    /// # Python Examples
123    ///
124    /// Load a CSV from a `str`:
125    ///
126    /// ```python
127    /// table = client.table("x,y\n1,2\n3,4")
128    /// ```
129    #[pyo3(signature = (input, limit=None, index=None, name=None, format=None))]
130    pub fn table(
131        &self,
132        py: Python<'_>,
133        input: Py<PyAny>,
134        limit: Option<u32>,
135        index: Option<Py<PyString>>,
136        name: Option<Py<PyString>>,
137        format: Option<Py<PyString>>,
138    ) -> PyResult<Table> {
139        Ok(Table(
140            self.0
141                .table(input, limit, index, name, format)
142                .py_block_on(py)?,
143        ))
144    }
145
146    /// Opens a [`Table`] that is hosted on the `perspective_server::Server`
147    /// that is connected to this [`Client`].
148    ///
149    /// The `name` property of [`TableInitOptions`] is used to identify each
150    /// [`Table`]. [`Table`] `name`s can be looked up for each [`Client`]
151    /// via [`Client::get_hosted_table_names`].
152    ///
153    /// # Python Examples
154    ///
155    /// ```python
156    /// table =  client.open_table("table_one");
157    /// ```
158    pub fn open_table(&self, py: Python<'_>, name: String) -> PyResult<Table> {
159        let client = self.0.clone();
160        let table = client.open_table(name).py_block_on(py)?;
161        Ok(Table(table))
162    }
163
164    /// Retrieves the names of all tables that this client has access to.
165    ///
166    /// `name` is a string identifier unique to the [`Table`] (per [`Client`]),
167    /// which can be used in conjunction with [`Client::open_table`] to get
168    /// a [`Table`] instance without the use of [`Client::table`]
169    /// constructor directly (e.g., one created by another [`Client`]).
170    ///
171    /// # Python Examples
172    ///
173    /// ```python
174    /// tables = client.get_hosted_table_names();
175    /// ```
176    pub fn get_hosted_table_names(&self, py: Python<'_>) -> PyResult<Vec<String>> {
177        self.0.get_hosted_table_names().py_block_on(py)
178    }
179
180    /// Register a callback which is invoked whenever [`Client::table`] (on this
181    /// [`Client`]) or [`Table::delete`] (on a [`Table`] belinging to this
182    /// [`Client`]) are called.
183    pub fn on_hosted_tables_update(&self, py: Python<'_>, callback: Py<PyAny>) -> PyResult<u32> {
184        self.0.on_hosted_tables_update(callback).py_block_on(py)
185    }
186
187    /// Remove a callback previously registered via
188    /// [`Client::on_hosted_tables_update`].
189    pub fn remove_hosted_tables_update(&self, py: Python<'_>, callback_id: u32) -> PyResult<()> {
190        self.0
191            .remove_hosted_tables_update(callback_id)
192            .py_block_on(py)
193    }
194
195    /// Terminates this [`Client`], cleaning up any [`crate::View`] handles the
196    /// [`Client`] has open as well as its callbacks.
197    pub fn terminate(&self, py: Python<'_>) -> PyResult<()> {
198        self.0.terminate(py)
199    }
200}
201
202#[pyclass(subclass, name = "Table", module = "perspective")]
203pub struct Table(AsyncTable);
204
205assert_table_api!(Table);
206
207#[pymethods]
208impl Table {
209    #[new]
210    fn new() -> PyResult<Self> {
211        Err(PyTypeError::new_err(
212            "Do not call Table's constructor directly, construct from a Client instance.",
213        ))
214    }
215
216    pub fn get_index(&self) -> Option<String> {
217        self.0.get_index()
218    }
219
220    pub fn get_client(&self, py: Python<'_>) -> Client {
221        Client(self.0.get_client().py_block_on(py))
222    }
223
224    /// Returns the user-specified row limit for this table.
225    pub fn get_limit(&self) -> Option<u32> {
226        self.0.get_limit()
227    }
228
229    pub fn get_name(&self) -> String {
230        self.0.get_name()
231    }
232
233    /// Removes all the rows in the [`Table`], but preserves everything else
234    /// including the schema, index, and any callbacks or registered
235    /// [`View`] instances.
236    ///
237    /// Calling [`Table::clear`], like [`Table::update`] and [`Table::remove`],
238    /// will trigger an update event to any registered listeners via
239    /// [`View::on_update`].
240    pub fn clear(&self, py: Python<'_>) -> PyResult<()> {
241        self.0.clear().py_block_on(py)
242    }
243
244    /// Returns the column names of this [`Table`] in "natural" order (the
245    /// ordering implied by the input format).
246    ///  
247    ///  # Python Examples
248    ///
249    /// ```python
250    /// columns = table.columns()
251    /// ```
252    pub fn columns(&self, py: Python<'_>) -> PyResult<Vec<String>> {
253        self.0.columns().py_block_on(py)
254    }
255
256    /// Delete this [`Table`] and cleans up associated resources.
257    ///
258    /// [`Table`]s do not stop consuming resources or processing updates when
259    /// they are garbage collected in their host language - you must call
260    /// this method to reclaim these.
261    ///
262    /// # Arguments
263    ///
264    /// - `options` An options dictionary.
265    ///     - `lazy` Whether to delete this [`Table`] _lazily_. When false (the
266    ///       default), the delete will occur immediately, assuming it has no
267    ///       [`View`] instances registered to it (which must be deleted first,
268    ///       otherwise this method will throw an error). When true, the
269    ///       [`Table`] will only be marked for deltion once its [`View`]
270    ///       dependency count reaches 0.
271    ///
272    /// # Python Examples
273    ///
274    /// ```python
275    /// table = client.table("x,y\n1,2\n3,4")
276    ///
277    /// # ...
278    ///
279    /// table.delete(lazy=True)
280    /// ```
281    #[pyo3(signature=(lazy=false))]
282    pub fn delete(&self, py: Python<'_>, lazy: bool) -> PyResult<()> {
283        self.0.delete(lazy).py_block_on(py)
284    }
285
286    pub fn make_port(&self, py: Python<'_>) -> PyResult<i32> {
287        let table = self.0.clone();
288        table.make_port().py_block_on(py)
289    }
290
291    pub fn on_delete(&self, py: Python<'_>, callback: Py<PyAny>) -> PyResult<u32> {
292        let table = self.0.clone();
293        table.on_delete(callback).py_block_on(py)
294    }
295
296    #[pyo3(signature = (input, format=None))]
297    pub fn remove(&self, py: Python<'_>, input: Py<PyAny>, format: Option<String>) -> PyResult<()> {
298        let table = self.0.clone();
299        table.remove(input, format).py_block_on(py)
300    }
301
302    pub fn remove_delete(&self, py: Python<'_>, callback_id: u32) -> PyResult<()> {
303        let table = self.0.clone();
304        table.remove_delete(callback_id).py_block_on(py)
305    }
306
307    pub fn schema(&self, py: Python<'_>) -> PyResult<HashMap<String, String>> {
308        let table = self.0.clone();
309        table.schema().py_block_on(py)
310    }
311
312    pub fn validate_expressions(
313        &self,
314        py: Python<'_>,
315        expression: Py<PyAny>,
316    ) -> PyResult<Py<PyAny>> {
317        let table = self.0.clone();
318        table.validate_expressions(expression).py_block_on(py)
319    }
320
321    #[pyo3(signature = (**config))]
322    pub fn view(&self, py: Python<'_>, config: Option<Py<PyDict>>) -> PyResult<View> {
323        Ok(View(self.0.view(config).py_block_on(py)?))
324    }
325
326    pub fn size(&self, py: Python<'_>) -> PyResult<usize> {
327        self.0.size().py_block_on(py)
328    }
329
330    #[pyo3(signature = (input, format=None))]
331    pub fn replace(
332        &self,
333        py: Python<'_>,
334        input: Py<PyAny>,
335        format: Option<String>,
336    ) -> PyResult<()> {
337        self.0.replace(input, format).py_block_on(py)
338    }
339
340    #[pyo3(signature = (input, port_id=None, format=None))]
341    pub fn update(
342        &self,
343        py: Python<'_>,
344        input: Py<PyAny>,
345        port_id: Option<u32>,
346        format: Option<String>,
347    ) -> PyResult<()> {
348        self.0.update(input, port_id, format).py_block_on(py)
349    }
350}
351
352#[pyclass(subclass, name = "View", module = "perspective")]
353pub struct View(AsyncView);
354
355assert_view_api!(View);
356
357#[pymethods]
358impl View {
359    #[new]
360    fn new() -> PyResult<Self> {
361        Err(PyTypeError::new_err(
362            "Do not call View's constructor directly, construct from a Table instance.",
363        ))
364    }
365
366    pub fn column_paths(&self, py: Python<'_>) -> PyResult<Vec<String>> {
367        self.0.column_paths().py_block_on(py)
368    }
369
370    #[pyo3(signature = (**window))]
371    pub fn to_columns_string(
372        &self,
373        py: Python<'_>,
374        window: Option<Py<PyDict>>,
375    ) -> PyResult<String> {
376        self.0.to_columns_string(window).py_block_on(py)
377    }
378
379    #[pyo3(signature = (**window))]
380    pub fn to_json_string(&self, py: Python<'_>, window: Option<Py<PyDict>>) -> PyResult<String> {
381        self.0.to_json_string(window).py_block_on(py)
382    }
383
384    #[pyo3(signature = (**window))]
385    pub fn to_ndjson(&self, py: Python<'_>, window: Option<Py<PyDict>>) -> PyResult<String> {
386        self.0.to_ndjson(window).py_block_on(py)
387    }
388
389    #[pyo3(signature = (**window))]
390    pub fn to_records<'a>(
391        &self,
392        py: Python<'a>,
393        window: Option<Py<PyDict>>,
394    ) -> PyResult<Bound<'a, PyAny>> {
395        let json = self.0.to_json_string(window).py_block_on(py)?;
396        let json_module = PyModule::import(py, "json")?;
397        json_module.call_method1("loads", (json,))
398    }
399
400    #[pyo3(signature = (**window))]
401    pub fn to_json<'a>(
402        &self,
403        py: Python<'a>,
404        window: Option<Py<PyDict>>,
405    ) -> PyResult<Bound<'a, PyAny>> {
406        self.to_records(py, window)
407    }
408
409    #[pyo3(signature = (**window))]
410    pub fn to_columns<'a>(
411        &self,
412        py: Python<'a>,
413        window: Option<Py<PyDict>>,
414    ) -> PyResult<Bound<'a, PyAny>> {
415        let json = self.0.to_columns_string(window).py_block_on(py)?;
416        let json_module = PyModule::import(py, "json")?;
417        json_module.call_method1("loads", (json,))
418    }
419
420    #[pyo3(signature = (**window))]
421    pub fn to_csv(&self, py: Python<'_>, window: Option<Py<PyDict>>) -> PyResult<String> {
422        self.0.to_csv(window).py_block_on(py)
423    }
424
425    /// Serialize the data to a `pandas.DataFrame`.
426    #[pyo3(signature = (**window))]
427    // #[deprecated(since="3.2.0", note="Please use `View::to_pandas`")]
428    pub fn to_dataframe(&self, py: Python<'_>, window: Option<Py<PyDict>>) -> PyResult<Py<PyAny>> {
429        self.0.to_dataframe(window).py_block_on(py)
430    }
431
432    /// Serialize the data to a `pandas.DataFrame`.
433    #[pyo3(signature = (**window))]
434    pub fn to_pandas(&self, py: Python<'_>, window: Option<Py<PyDict>>) -> PyResult<Py<PyAny>> {
435        self.0.to_dataframe(window).py_block_on(py)
436    }
437
438    /// Serialize the data to a `polars.DataFrame`.
439    #[pyo3(signature = (**window))]
440    pub fn to_polars(&self, py: Python<'_>, window: Option<Py<PyDict>>) -> PyResult<Py<PyAny>> {
441        self.0.to_polars(window).py_block_on(py)
442    }
443
444    #[pyo3(signature = (**window))]
445    pub fn to_arrow(&self, py: Python<'_>, window: Option<Py<PyDict>>) -> PyResult<Py<PyBytes>> {
446        self.0.to_arrow(window).py_block_on(py)
447    }
448
449    pub fn delete(&self, py: Python<'_>) -> PyResult<()> {
450        self.0.delete().py_block_on(py)
451    }
452
453    pub fn expand(&self, py: Python<'_>, index: u32) -> PyResult<u32> {
454        self.0.expand(index).py_block_on(py)
455    }
456
457    pub fn collapse(&self, py: Python<'_>, index: u32) -> PyResult<u32> {
458        self.0.collapse(index).py_block_on(py)
459    }
460
461    pub fn dimensions(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
462        self.0.dimensions().py_block_on(py)
463    }
464
465    pub fn expression_schema(&self, py: Python<'_>) -> PyResult<HashMap<String, String>> {
466        self.0.expression_schema().py_block_on(py)
467    }
468
469    pub fn get_config(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
470        self.0.get_config().py_block_on(py)
471    }
472
473    pub fn get_min_max(&self, py: Python<'_>, column_name: String) -> PyResult<(String, String)> {
474        self.0.get_min_max(column_name).py_block_on(py)
475    }
476
477    pub fn num_rows(&self, py: Python<'_>) -> PyResult<u32> {
478        self.0.num_rows().py_block_on(py)
479    }
480
481    pub fn schema(&self, py: Python<'_>) -> PyResult<HashMap<String, String>> {
482        self.0.schema().py_block_on(py)
483    }
484
485    pub fn on_delete(&self, py: Python<'_>, callback: Py<PyAny>) -> PyResult<u32> {
486        self.0.on_delete(callback).py_block_on(py)
487    }
488
489    pub fn remove_delete(&self, py: Python<'_>, callback_id: u32) -> PyResult<()> {
490        self.0.remove_delete(callback_id).py_block_on(py)
491    }
492
493    #[pyo3(signature = (callback, mode=None))]
494    pub fn on_update(
495        &self,
496        py: Python<'_>,
497        callback: Py<PyAny>,
498        mode: Option<String>,
499    ) -> PyResult<u32> {
500        self.0.on_update(callback, mode).py_block_on(py)
501    }
502
503    pub fn remove_update(&self, py: Python<'_>, callback_id: u32) -> PyResult<()> {
504        self.0.remove_update(callback_id).py_block_on(py)
505    }
506}