use std::collections::HashMap;
use std::ops::Deref;
use std::str::FromStr;
use std::sync::Arc;
use futures::Future;
use prost::bytes::Bytes;
use serde::{Deserialize, Serialize};
use ts_rs::TS;
use self::view_on_update_req::Mode;
use crate::assert_view_api;
use crate::client::Client;
use crate::proto::request::ClientReq;
use crate::proto::response::ClientResp;
use crate::proto::*;
#[cfg(doc)]
use crate::table::Table;
pub use crate::utils::*;
#[derive(Default, Debug, Deserialize, TS)]
pub struct OnUpdateOptions {
pub mode: Option<OnUpdateMode>,
}
#[derive(Default, Debug, Deserialize, TS)]
pub enum OnUpdateMode {
#[default]
#[serde(rename = "row")]
Row,
}
impl FromStr for OnUpdateMode {
type Err = ClientError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
if s == "row" {
Ok(OnUpdateMode::Row)
} else {
Err(ClientError::Option)
}
}
}
#[derive(Clone, Debug, Default, Deserialize, Serialize, TS, PartialEq)]
pub struct ColumnWindow {
#[serde(skip_serializing_if = "Option::is_none")]
pub start_col: Option<f32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub end_col: Option<f32>,
}
#[derive(Clone, Debug, Default, Deserialize, Serialize, TS, PartialEq)]
pub struct ViewWindow {
#[ts(optional)]
#[serde(skip_serializing_if = "Option::is_none")]
pub start_row: Option<f64>,
#[ts(optional)]
#[serde(skip_serializing_if = "Option::is_none")]
pub start_col: Option<f64>,
#[ts(optional)]
#[serde(skip_serializing_if = "Option::is_none")]
pub end_row: Option<f64>,
#[ts(optional)]
#[serde(skip_serializing_if = "Option::is_none")]
pub end_col: Option<f64>,
#[ts(optional)]
#[serde(skip_serializing_if = "Option::is_none")]
pub id: Option<bool>,
#[ts(optional)]
#[serde(skip_serializing_if = "Option::is_none")]
pub index: Option<bool>,
#[ts(optional)]
#[serde(skip_serializing_if = "Option::is_none")]
pub formatted: Option<bool>,
#[ts(optional)]
#[serde(skip_serializing_if = "Option::is_none")]
pub compression: Option<String>,
}
impl From<ViewWindow> for ViewPort {
fn from(window: ViewWindow) -> Self {
ViewPort {
start_row: window.start_row.map(|x| x.floor() as u32),
start_col: window.start_col.map(|x| x.floor() as u32),
end_row: window.end_row.map(|x| x.ceil() as u32),
end_col: window.end_col.map(|x| x.ceil() as u32),
}
}
}
impl From<ViewPort> for ViewWindow {
fn from(window: ViewPort) -> Self {
ViewWindow {
start_row: window.start_row.map(|x| x as f64),
start_col: window.start_col.map(|x| x as f64),
end_row: window.end_row.map(|x| x as f64),
end_col: window.end_col.map(|x| x as f64),
..ViewWindow::default()
}
}
}
#[derive(TS)]
pub struct OnUpdateData(crate::proto::ViewOnUpdateResp);
impl Deref for OnUpdateData {
type Target = crate::proto::ViewOnUpdateResp;
fn deref(&self) -> &Self::Target {
&self.0
}
}
#[derive(Clone, Debug)]
pub struct View {
pub name: String,
client: Client,
}
assert_view_api!(View);
impl View {
pub fn new(name: String, client: Client) -> Self {
View { name, client }
}
fn client_message(&self, req: ClientReq) -> Request {
crate::proto::Request {
msg_id: self.client.gen_id(),
entity_id: self.name.clone(),
client_req: Some(req),
}
}
pub async fn column_paths(&self, window: ColumnWindow) -> ClientResult<Vec<String>> {
let msg = self.client_message(ClientReq::ViewColumnPathsReq(ViewColumnPathsReq {
start_col: window.start_col.map(|x| x as u32),
end_col: window.end_col.map(|x| x as u32),
}));
match self.client.oneshot(&msg).await? {
ClientResp::ViewColumnPathsResp(ViewColumnPathsResp { paths }) => {
Ok(paths)
},
resp => Err(resp.into()),
}
}
pub async fn dimensions(&self) -> ClientResult<ViewDimensionsResp> {
let msg = self.client_message(ClientReq::ViewDimensionsReq(ViewDimensionsReq {}));
match self.client.oneshot(&msg).await? {
ClientResp::ViewDimensionsResp(resp) => Ok(resp),
resp => Err(resp.into()),
}
}
pub async fn expression_schema(&self) -> ClientResult<HashMap<String, ColumnType>> {
if self.client.get_features().await?.expressions {
let msg = self.client_message(ClientReq::ViewExpressionSchemaReq(
ViewExpressionSchemaReq {},
));
match self.client.oneshot(&msg).await? {
ClientResp::ViewExpressionSchemaResp(ViewExpressionSchemaResp { schema }) => {
Ok(schema
.into_iter()
.map(|(x, y)| (x, ColumnType::try_from(y).unwrap()))
.collect())
},
resp => Err(resp.into()),
}
} else {
Ok([].into_iter().collect())
}
}
pub async fn get_config(&self) -> ClientResult<crate::config::ViewConfig> {
let msg = self.client_message(ClientReq::ViewGetConfigReq(ViewGetConfigReq {}));
match self.client.oneshot(&msg).await? {
ClientResp::ViewGetConfigResp(ViewGetConfigResp {
config: Some(config),
}) => Ok(config.into()),
resp => Err(resp.into()),
}
}
pub async fn num_rows(&self) -> ClientResult<u32> {
Ok(self.dimensions().await?.num_view_rows)
}
pub async fn schema(&self) -> ClientResult<HashMap<String, ColumnType>> {
let msg = self.client_message(ClientReq::ViewSchemaReq(ViewSchemaReq {}));
match self.client.oneshot(&msg).await? {
ClientResp::ViewSchemaResp(ViewSchemaResp { schema }) => Ok(schema
.into_iter()
.map(|(x, y)| (x, ColumnType::try_from(y).unwrap()))
.collect()),
resp => Err(resp.into()),
}
}
pub async fn to_arrow(&self, window: ViewWindow) -> ClientResult<Bytes> {
let msg = self.client_message(ClientReq::ViewToArrowReq(ViewToArrowReq {
viewport: Some(window.clone().into()),
compression: window.compression,
}));
match self.client.oneshot(&msg).await? {
ClientResp::ViewToArrowResp(ViewToArrowResp { arrow }) => Ok(arrow.into()),
resp => Err(resp.into()),
}
}
pub async fn to_columns_string(&self, window: ViewWindow) -> ClientResult<String> {
let msg = self.client_message(ClientReq::ViewToColumnsStringReq(ViewToColumnsStringReq {
viewport: Some(window.clone().into()),
id: window.id,
index: window.index,
formatted: window.formatted,
}));
match self.client.oneshot(&msg).await? {
ClientResp::ViewToColumnsStringResp(ViewToColumnsStringResp { json_string }) => {
Ok(json_string)
},
resp => Err(resp.into()),
}
}
pub async fn to_json_string(&self, window: ViewWindow) -> ClientResult<String> {
let viewport = ViewPort::from(window.clone());
let msg = self.client_message(ClientReq::ViewToRowsStringReq(ViewToRowsStringReq {
viewport: Some(viewport),
id: window.id,
index: window.index,
formatted: window.formatted,
}));
match self.client.oneshot(&msg).await? {
ClientResp::ViewToRowsStringResp(ViewToRowsStringResp { json_string }) => {
Ok(json_string)
},
resp => Err(resp.into()),
}
}
pub async fn to_ndjson(&self, window: ViewWindow) -> ClientResult<String> {
let viewport = ViewPort::from(window.clone());
let msg = self.client_message(ClientReq::ViewToNdjsonStringReq(ViewToNdjsonStringReq {
viewport: Some(viewport),
id: window.id,
index: window.index,
formatted: window.formatted,
}));
match self.client.oneshot(&msg).await? {
ClientResp::ViewToNdjsonStringResp(ViewToNdjsonStringResp { ndjson_string }) => {
Ok(ndjson_string)
},
resp => Err(resp.into()),
}
}
pub async fn to_csv(&self, window: ViewWindow) -> ClientResult<String> {
let msg = self.client_message(ClientReq::ViewToCsvReq(ViewToCsvReq {
viewport: Some(window.into()),
}));
match self.client.oneshot(&msg).await? {
ClientResp::ViewToCsvResp(ViewToCsvResp { csv }) => Ok(csv),
resp => Err(resp.into()),
}
}
pub async fn delete(&self) -> ClientResult<()> {
let msg = self.client_message(ClientReq::ViewDeleteReq(ViewDeleteReq {}));
match self.client.oneshot(&msg).await? {
ClientResp::ViewDeleteResp(_) => Ok(()),
resp => Err(resp.into()),
}
}
pub async fn get_min_max(
&self,
column_name: String,
) -> ClientResult<(crate::config::Scalar, crate::config::Scalar)> {
let msg = self.client_message(ClientReq::ViewGetMinMaxReq(ViewGetMinMaxReq {
column_name,
}));
match self.client.oneshot(&msg).await? {
ClientResp::ViewGetMinMaxResp(ViewGetMinMaxResp { min, max }) => {
let min = min.map(crate::config::Scalar::from).unwrap_or_default();
let max = max.map(crate::config::Scalar::from).unwrap_or_default();
Ok((min, max))
},
resp => Err(resp.into()),
}
}
pub async fn on_update<T, U>(&self, on_update: T, options: OnUpdateOptions) -> ClientResult<u32>
where
T: Fn(OnUpdateData) -> U + Send + Sync + 'static,
U: Future<Output = ()> + Send + 'static,
{
let on_update = Arc::new(on_update);
let callback = move |resp: Response| {
let on_update = on_update.clone();
async move {
match resp.client_resp {
Some(ClientResp::ViewOnUpdateResp(resp)) => {
on_update(OnUpdateData(resp)).await;
Ok(())
},
resp => Err(resp.into()),
}
}
};
let msg = self.client_message(ClientReq::ViewOnUpdateReq(ViewOnUpdateReq {
mode: options.mode.map(|OnUpdateMode::Row| Mode::Row as i32),
}));
self.client.subscribe(&msg, callback).await?;
Ok(msg.msg_id)
}
pub async fn remove_update(&self, update_id: u32) -> ClientResult<()> {
let msg = self.client_message(ClientReq::ViewRemoveOnUpdateReq(ViewRemoveOnUpdateReq {
id: update_id,
}));
self.client.unsubscribe(update_id).await?;
match self.client.oneshot(&msg).await? {
ClientResp::ViewRemoveOnUpdateResp(_) => Ok(()),
resp => Err(resp.into()),
}
}
pub async fn on_delete(
&self,
on_delete: Box<dyn Fn() + Send + Sync + 'static>,
) -> ClientResult<u32> {
let callback = move |resp: Response| match resp.client_resp.unwrap() {
ClientResp::ViewOnDeleteResp(_) => {
on_delete();
Ok(())
},
resp => Err(resp.into()),
};
let msg = self.client_message(ClientReq::ViewOnDeleteReq(ViewOnDeleteReq {}));
self.client.subscribe_once(&msg, Box::new(callback)).await?;
Ok(msg.msg_id)
}
pub async fn remove_delete(&self, callback_id: u32) -> ClientResult<()> {
let msg = self.client_message(ClientReq::ViewRemoveDeleteReq(ViewRemoveDeleteReq {
id: callback_id,
}));
match self.client.oneshot(&msg).await? {
ClientResp::ViewRemoveDeleteResp(ViewRemoveDeleteResp {}) => Ok(()),
resp => Err(resp.into()),
}
}
pub async fn collapse(&self, row_index: u32) -> ClientResult<u32> {
let msg = self.client_message(ClientReq::ViewCollapseReq(ViewCollapseReq { row_index }));
match self.client.oneshot(&msg).await? {
ClientResp::ViewCollapseResp(ViewCollapseResp { num_changed }) => Ok(num_changed),
resp => Err(resp.into()),
}
}
pub async fn expand(&self, row_index: u32) -> ClientResult<u32> {
let msg = self.client_message(ClientReq::ViewExpandReq(ViewExpandReq { row_index }));
match self.client.oneshot(&msg).await? {
ClientResp::ViewExpandResp(ViewExpandResp { num_changed }) => Ok(num_changed),
resp => Err(resp.into()),
}
}
pub async fn set_depth(&self, depth: u32) -> ClientResult<()> {
let msg = self.client_message(ClientReq::ViewSetDepthReq(ViewSetDepthReq { depth }));
match self.client.oneshot(&msg).await? {
ClientResp::ViewSetDepthResp(_) => Ok(()),
resp => Err(resp.into()),
}
}
}