use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use futures_util::{SinkExt, Stream, StreamExt};
use prost::Message;
use tokio::net::TcpStream;
use tokio::sync::{Mutex, oneshot};
use tokio::task::JoinHandle;
use tokio::time::timeout;
use tokio_stream::wrappers::ReceiverStream;
use tokio_tungstenite::{
MaybeTlsStream, WebSocketStream, connect_async, tungstenite::Message as WsMessage,
};
use uuid::Uuid;
use crate::connection::{ConnectionOptions, build_ws_request};
use crate::models::{BooleanVar, FloatVar, IntegerVar, TextVar, VariableMetadataPatch};
use crate::namespace::{
self, Command, FolderInfo, ItemMeta, ItemType, NamespaceSchema, Response, VarDataType,
VarIdValue, VarInfo, command, response, value,
};
use crate::protocol::{
build_pairs, ensure_ok, extract_cmd_id_from_command, extract_cmd_id_from_response,
};
use crate::schema::namespace_schema_from_json;
use crate::types::errors::ClientError;
pub type WsSink =
futures_util::stream::SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, WsMessage>;
pub struct Client {
sink: Arc<Mutex<WsSink>>,
pending: Arc<Mutex<HashMap<String, oneshot::Sender<Response>>>>,
reader: JoinHandle<()>,
options: ConnectionOptions,
}
impl Client {
pub async fn connect(host: &str, port: i64, tls: bool) -> Result<Self, ClientError> {
Self::connect_with_options(ConnectionOptions::new(host, port, tls, None)).await
}
pub async fn connect_with_pat(
host: &str,
port: i64,
tls: bool,
pat_token: impl Into<String>,
) -> Result<Self, ClientError> {
Self::connect_with_options(ConnectionOptions::new(
host,
port,
tls,
Some(pat_token.into()),
))
.await
}
pub async fn connect_with_options(options: ConnectionOptions) -> Result<Self, ClientError> {
let request = build_ws_request(&options)?;
let (ws, _) = connect_async(request).await?;
let (sink, mut stream) = ws.split();
let pending: Arc<Mutex<HashMap<String, oneshot::Sender<Response>>>> =
Arc::new(Mutex::new(HashMap::new()));
let pending_for_reader = Arc::clone(&pending);
let reader = tokio::spawn(async move {
while let Some(msg) = stream.next().await {
match msg {
Ok(WsMessage::Binary(data)) => {
if let Ok(resp) = Response::decode(&*data)
&& let Some(cmd_id) = extract_cmd_id_from_response(&resp)
&& let Some(tx) = pending_for_reader.lock().await.remove(cmd_id)
{
let _ = tx.send(resp);
}
}
Ok(WsMessage::Close(_)) => break,
Ok(WsMessage::Text(_)) => continue,
Err(_) => break,
_ => continue,
}
}
});
Ok(Self {
sink: Arc::new(Mutex::new(sink)),
pending,
reader,
options,
})
}
pub fn connection_options(&self) -> &ConnectionOptions {
&self.options
}
pub async fn disconnect(&self) -> Result<(), ClientError> {
let mut sink = self.sink.lock().await;
sink.close().await?;
self.reader.abort();
Ok(())
}
async fn send_command<F, R>(
&self,
command: Command,
timeout_ms: u64,
on_resp: F,
) -> Result<R, ClientError>
where
F: FnOnce(Response) -> Result<R, ClientError> + Send + 'static,
R: Send + 'static,
{
let cmd_id = extract_cmd_id_from_command(&command)
.ok_or(ClientError::UnexpectedFrame)?
.to_string();
let (tx, rx) = oneshot::channel();
{
let mut pending = self.pending.lock().await;
pending.insert(cmd_id.clone(), tx);
}
let msg = WsMessage::Binary(command.encode_to_vec().into());
{
let mut sink = self.sink.lock().await;
sink.send(msg).await?;
}
match timeout(Duration::from_millis(timeout_ms), rx).await {
Ok(Ok(resp)) => on_resp(resp),
Ok(Err(_)) => Err(ClientError::ConnectionClosed),
Err(_) => {
let mut pending = self.pending.lock().await;
pending.remove(&cmd_id);
Err(ClientError::Timeout)
}
}
}
pub async fn list(
&self,
folder_id: Option<String>,
timeout_ms: u64,
) -> Result<(Vec<FolderInfo>, Vec<VarInfo>), ClientError> {
let cmd = Command {
command_type: Some(command::CommandType::List(namespace::ListCommand {
cmd_id: Uuid::new_v4().to_string(),
folder_id,
})),
};
self.send_command(cmd, timeout_ms, |resp| match resp.response_type {
Some(response::ResponseType::List(list_resp)) => {
Ok((list_resp.folders, list_resp.variables))
}
_ => Err(ClientError::UnexpectedFrame),
})
.await
}
pub async fn create_folders(
&self,
names: Vec<String>,
parent_id: Option<String>,
timeout_ms: u64,
) -> Result<(), ClientError> {
let items: Vec<ItemMeta> = names
.into_iter()
.map(|name| ItemMeta {
name,
i_type: ItemType::Folder as i32,
var_d_type: None,
unit: None,
min: None,
max: None,
options: vec![],
max_len: None,
})
.collect();
self.send_add(items, parent_id, timeout_ms).await
}
pub async fn create_integer_variables(
&self,
vars: Vec<IntegerVar>,
parent_id: Option<String>,
timeout_ms: u64,
) -> Result<(), ClientError> {
let items = vars
.into_iter()
.map(|v| ItemMeta {
name: v.name,
i_type: ItemType::Variable as i32,
var_d_type: Some(VarDataType::Integer as i32),
unit: v.unit,
min: v.min,
max: v.max,
options: vec![],
max_len: None,
})
.collect();
self.send_add(items, parent_id, timeout_ms).await
}
pub async fn create_float_variables(
&self,
vars: Vec<FloatVar>,
parent_id: Option<String>,
timeout_ms: u64,
) -> Result<(), ClientError> {
let items = vars
.into_iter()
.map(|v| ItemMeta {
name: v.name,
i_type: ItemType::Variable as i32,
var_d_type: Some(VarDataType::Float as i32),
unit: v.unit,
min: v.min,
max: v.max,
options: vec![],
max_len: None,
})
.collect();
self.send_add(items, parent_id, timeout_ms).await
}
pub async fn create_text_variables(
&self,
vars: Vec<TextVar>,
parent_id: Option<String>,
timeout_ms: u64,
) -> Result<(), ClientError> {
let items = vars
.into_iter()
.map(|v| ItemMeta {
name: v.name,
i_type: ItemType::Variable as i32,
var_d_type: Some(VarDataType::Text as i32),
unit: v.unit,
min: None,
max: None,
options: v.options,
max_len: v.max_len,
})
.collect();
self.send_add(items, parent_id, timeout_ms).await
}
pub async fn create_boolean_variables(
&self,
vars: Vec<BooleanVar>,
parent_id: Option<String>,
timeout_ms: u64,
) -> Result<(), ClientError> {
let items = vars
.into_iter()
.map(|v| ItemMeta {
name: v.name,
i_type: ItemType::Variable as i32,
var_d_type: Some(VarDataType::Boolean as i32),
unit: v.unit,
min: None,
max: None,
options: vec![],
max_len: None,
})
.collect();
self.send_add(items, parent_id, timeout_ms).await
}
pub async fn delete_items(
&self,
item_ids: Vec<String>,
timeout_ms: u64,
) -> Result<(), ClientError> {
let cmd = Command {
command_type: Some(command::CommandType::Del(namespace::DelCommand {
cmd_id: Uuid::new_v4().to_string(),
item_ids,
})),
};
self.send_command(cmd, timeout_ms, |resp| {
ensure_ok(&resp)?;
match resp.response_type {
Some(response::ResponseType::Del(_)) => Ok(()),
_ => Err(ClientError::UnexpectedFrame),
}
})
.await
}
pub async fn get_values(
&self,
var_ids: Vec<String>,
timeout_ms: u64,
) -> Result<Vec<Option<value::Typed>>, ClientError> {
let cmd = Command {
command_type: Some(command::CommandType::Get(namespace::GetCommand {
cmd_id: Uuid::new_v4().to_string(),
var_ids,
})),
};
self.send_command(cmd, timeout_ms, |resp| {
ensure_ok(&resp)?;
match resp.response_type {
Some(response::ResponseType::Get(get_resp)) => {
let vals = get_resp
.var_values
.into_iter()
.map(|ov| ov.value.and_then(|v| v.typed))
.collect();
Ok(vals)
}
_ => Err(ClientError::UnexpectedFrame),
}
})
.await
}
pub async fn set_integer_variables(
&self,
var_ids: Vec<String>,
values: Vec<i64>,
timeout_ms: u64,
) -> Result<(), ClientError> {
let pairs = build_pairs(var_ids, values, value::Typed::IntegerValue)?;
self.send_set(pairs, timeout_ms).await
}
pub async fn set_float_variables(
&self,
var_ids: Vec<String>,
values: Vec<f64>,
timeout_ms: u64,
) -> Result<(), ClientError> {
let pairs = build_pairs(var_ids, values, value::Typed::FloatValue)?;
self.send_set(pairs, timeout_ms).await
}
pub async fn set_text_variables(
&self,
var_ids: Vec<String>,
values: Vec<String>,
timeout_ms: u64,
) -> Result<(), ClientError> {
let pairs = build_pairs(var_ids, values, value::Typed::TextValue)?;
self.send_set(pairs, timeout_ms).await
}
pub async fn set_boolean_variables(
&self,
var_ids: Vec<String>,
values: Vec<bool>,
timeout_ms: u64,
) -> Result<(), ClientError> {
let pairs = build_pairs(var_ids, values, value::Typed::BooleanValue)?;
self.send_set(pairs, timeout_ms).await
}
pub async fn edit_variable_metadata(
&self,
var_id: String,
patch: VariableMetadataPatch,
timeout_ms: u64,
) -> Result<(), ClientError> {
let cmd = Command {
command_type: Some(command::CommandType::EditMeta(namespace::EditMetaCommand {
cmd_id: Uuid::new_v4().to_string(),
var_id,
unit: patch.unit,
min: patch.min,
max: patch.max,
options: patch.options,
max_len: patch.max_len,
})),
};
self.send_command(cmd, timeout_ms, |resp| {
ensure_ok(&resp)?;
match resp.response_type {
Some(response::ResponseType::EditMeta(_)) => Ok(()),
_ => Err(ClientError::UnexpectedFrame),
}
})
.await
}
pub async fn create_bulk_from_json(
&self,
json: &str,
parent_id: Option<String>,
timeout_ms: u64,
) -> Result<(), ClientError> {
let schema: NamespaceSchema = namespace_schema_from_json(json)?;
let cmd = Command {
command_type: Some(command::CommandType::AddBulk(namespace::AddBulkCommand {
cmd_id: Uuid::new_v4().to_string(),
parent_id: parent_id.unwrap_or_else(|| "/".to_string()),
schema: Some(schema),
})),
};
self.send_command(cmd, timeout_ms, |resp| {
ensure_ok(&resp)?;
match resp.response_type {
Some(response::ResponseType::AddBulk(_)) => Ok(()),
_ => Err(ClientError::UnexpectedFrame),
}
})
.await
}
pub async fn subscribe_var_values(
&self,
var_ids: Vec<String>,
timeout_ms: u64,
) -> Result<impl Stream<Item = (String, Option<value::Typed>)>, ClientError> {
let request = build_ws_request(&self.options)?;
let (ws, _) = connect_async(request).await?;
let (mut sink, mut stream) = ws.split();
let cmd = Command {
command_type: Some(command::CommandType::Sub(namespace::SubscribeCommand {
cmd_id: Uuid::new_v4().to_string(),
var_ids,
events: vec![namespace::EventType::VarValues as i32],
})),
};
sink.send(WsMessage::Binary(cmd.encode_to_vec().into()))
.await?;
let resp = timeout(Duration::from_millis(timeout_ms), stream.next())
.await
.map_err(|_| ClientError::Timeout)?
.ok_or(ClientError::ConnectionClosed)??;
match resp {
WsMessage::Binary(bin) => {
let resp = Response::decode(&*bin)?;
ensure_ok(&resp)?;
}
_ => return Err(ClientError::UnexpectedFrame),
}
let (tx, rx) = tokio::sync::mpsc::channel(256);
tokio::spawn(async move {
while let Some(msg) = stream.next().await {
match msg {
Ok(WsMessage::Binary(data)) => {
if let Ok(ev) = namespace::Event::decode(&*data)
&& let Some(namespace::event::Ev::VarValueEv(v)) = ev.ev
{
let _ = tx.send((v.var_id, v.value.and_then(|val| val.typed))).await;
}
}
Ok(WsMessage::Close(_)) | Err(_) => break,
_ => {}
}
}
});
Ok(ReceiverStream::new(rx))
}
async fn send_add(
&self,
items_meta: Vec<ItemMeta>,
parent_id: Option<String>,
timeout_ms: u64,
) -> Result<(), ClientError> {
let cmd = Command {
command_type: Some(command::CommandType::Add(namespace::AddCommand {
cmd_id: Uuid::new_v4().to_string(),
parent_id,
items_meta,
})),
};
self.send_command(cmd, timeout_ms, |resp| {
ensure_ok(&resp)?;
match resp.response_type {
Some(response::ResponseType::Add(_)) => Ok(()),
_ => Err(ClientError::UnexpectedFrame),
}
})
.await
}
async fn send_set(&self, pairs: Vec<VarIdValue>, timeout_ms: u64) -> Result<(), ClientError> {
let cmd = Command {
command_type: Some(command::CommandType::Set(namespace::SetCommand {
cmd_id: Uuid::new_v4().to_string(),
var_ids_values: pairs,
})),
};
self.send_command(cmd, timeout_ms, |resp| {
ensure_ok(&resp)?;
match resp.response_type {
Some(response::ResponseType::Set(_)) => Ok(()),
_ => Err(ClientError::UnexpectedFrame),
}
})
.await
}
}