use super::data_sources::{DataSource, DataSourceStatus};
use super::names::{InvalidName, Name};
use super::providers::Error;
use crate::blobs::Blob;
use crate::notebooks::Cell;
use crate::providers::{ConfigSchema, ProviderConfig, SupportedQueryType};
use crate::timestamps::Timestamp;
use base64uuid::{Base64Uuid, InvalidId};
#[cfg(feature = "fp-bindgen")]
use fp_bindgen::prelude::Serializable;
use serde::{Deserialize, Serialize};
use std::fmt::{self, Debug, Formatter};
use std::{convert::TryFrom, str::FromStr};
use strum_macros::Display;
use typed_builder::TypedBuilder;
#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq, TypedBuilder)]
#[non_exhaustive]
#[serde(rename_all = "camelCase")]
pub struct Proxy {
#[builder(setter(into))]
pub id: Base64Uuid,
pub name: Name,
pub status: ProxyStatus,
#[builder(default)]
pub data_sources: Vec<DataSource>,
#[builder(default, setter(into, strip_option))]
#[serde(default, skip_serializing_if = "Option::is_none")]
pub token: Option<ProxyToken>,
#[builder(default, setter(into, strip_option))]
#[serde(default, skip_serializing_if = "Option::is_none")]
pub description: Option<String>,
#[builder(setter(into))]
pub created_at: Timestamp,
#[builder(setter(into))]
pub updated_at: Timestamp,
}
#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq, TypedBuilder)]
#[cfg_attr(
feature = "fp-bindgen",
derive(Serializable),
fp(rust_module = "fiberplane_models::proxies")
)]
#[non_exhaustive]
#[serde(rename_all = "camelCase")]
pub struct ProxySummary {
#[builder(setter(into))]
pub id: Base64Uuid,
pub name: Name,
pub status: ProxyStatus,
}
impl From<Proxy> for ProxySummary {
fn from(proxy: Proxy) -> Self {
Self {
id: proxy.id,
name: proxy.name,
status: proxy.status,
}
}
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone, Copy, Display)]
#[cfg_attr(
feature = "fp-bindgen",
derive(Serializable),
fp(rust_module = "fiberplane_models::proxies")
)]
#[non_exhaustive]
#[serde(rename_all = "snake_case")]
pub enum ProxyStatus {
Connected,
Disconnected,
}
#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq, TypedBuilder)]
#[cfg_attr(
feature = "fp-bindgen",
derive(Serializable),
fp(rust_module = "fiberplane_models::proxies")
)]
#[non_exhaustive]
#[serde(rename_all = "camelCase")]
pub struct NewProxy {
pub name: Name,
#[builder(default, setter(into, strip_option))]
pub description: Option<String>,
}
#[derive(Debug, thiserror::Error, PartialEq, Eq)]
#[cfg_attr(
feature = "fp-bindgen",
derive(Serializable),
fp(rust_module = "fiberplane_models::proxies")
)]
#[non_exhaustive]
pub enum InvalidProxyToken {
#[error("Invalid workspace ID")]
InvalidWorkspaceId(#[from] InvalidId),
#[error("Invalid proxy name")]
InvalidProxyName(#[from] InvalidName),
#[error("Missing token")]
MissingToken,
}
#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, TypedBuilder)]
#[cfg_attr(
feature = "fp-bindgen",
derive(Serializable),
fp(rust_module = "fiberplane_models::proxies")
)]
#[non_exhaustive]
#[serde(try_from = "&str", into = "String")]
pub struct ProxyToken {
#[builder(setter(into))]
pub workspace_id: Base64Uuid,
pub proxy_name: Name,
#[builder(default, setter(into))]
pub token: String,
}
impl Debug for ProxyToken {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("ProxyToken")
.field("workspace_id", &self.workspace_id)
.field("proxy_name", &self.proxy_name)
.field("token", &"[REDACTED]")
.finish()
}
}
impl From<ProxyToken> for String {
fn from(token: ProxyToken) -> Self {
format!(
"{}:{}:{}",
token.workspace_id, token.proxy_name, token.token
)
}
}
impl FromStr for ProxyToken {
type Err = InvalidProxyToken;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let mut parts = s.split(':');
let workspace_id = parts.next().unwrap_or_default().parse::<Base64Uuid>()?;
let proxy_name = Name::new(parts.next().unwrap_or_default())?;
let token = parts
.next()
.ok_or(InvalidProxyToken::MissingToken)?
.to_string();
Ok(ProxyToken {
workspace_id,
proxy_name,
token,
})
}
}
impl TryFrom<&str> for ProxyToken {
type Error = InvalidProxyToken;
fn try_from(s: &str) -> Result<Self, Self::Error> {
Self::from_str(s)
}
}
#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq, TypedBuilder)]
#[cfg_attr(
feature = "fp-bindgen",
derive(Serializable),
fp(rust_module = "fiberplane_models::proxies")
)]
#[non_exhaustive]
#[serde(rename_all = "camelCase")]
pub struct CreateCellsApiRequest {
pub response: Blob,
#[builder(setter(into))]
pub query_type: String,
}
#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq, TypedBuilder)]
#[cfg_attr(
feature = "fp-bindgen",
derive(Serializable),
fp(rust_module = "fiberplane_models::proxies")
)]
#[non_exhaustive]
#[serde(rename_all = "camelCase")]
pub struct ExtractDataApiRequest {
pub response: Blob,
#[builder(setter(into))]
pub mime_type: String,
#[builder(default, setter(into, strip_option))]
#[serde(default, skip_serializing_if = "Option::is_none")]
pub query: Option<String>,
}
#[derive(Debug, Deserialize, Serialize, TypedBuilder)]
#[cfg_attr(
feature = "fp-bindgen",
derive(Serializable),
fp(rust_module = "fiberplane_models::proxies")
)]
#[non_exhaustive]
#[serde(rename_all = "camelCase")]
pub struct ServerMessage {
#[builder(setter(into))]
pub op_id: Base64Uuid,
pub data_source_name: Name,
pub protocol_version: u8,
#[serde(flatten)]
pub payload: ServerMessagePayload,
}
impl ServerMessage {
pub fn deserialize_msgpack(
input: impl AsRef<[u8]>,
) -> Result<ServerMessage, rmp_serde::decode::Error> {
rmp_serde::from_slice(input.as_ref())
}
pub fn serialize_msgpack(&self) -> Vec<u8> {
rmp_serde::to_vec(&self).expect("MessgePack serialization error")
}
pub fn op_id(&self) -> Option<Base64Uuid> {
Some(self.op_id)
}
fn payload_with_header(
payload: ServerMessagePayload,
data_source_name: Name,
protocol_version: u8,
op_id: Base64Uuid,
) -> Self {
Self {
op_id,
data_source_name,
protocol_version,
payload,
}
}
pub fn new_invoke_proxy_request(
data: Vec<u8>,
data_source_name: Name,
protocol_version: u8,
op_id: Base64Uuid,
) -> Self {
Self::payload_with_header(
ServerMessagePayload::Invoke(InvokeRequest { data }),
data_source_name,
protocol_version,
op_id,
)
}
pub fn new_create_cells_request(
data: Blob,
query_type: String,
data_source_name: Name,
protocol_version: u8,
op_id: Base64Uuid,
) -> Self {
Self::payload_with_header(
ServerMessagePayload::CreateCells(CreateCellsRequest {
response: data,
query_type,
}),
data_source_name,
protocol_version,
op_id,
)
}
pub fn new_extract_data_request(
data: Blob,
mime_type: String,
query: Option<String>,
data_source_name: Name,
protocol_version: u8,
op_id: Base64Uuid,
) -> Self {
Self::payload_with_header(
ServerMessagePayload::ExtractData(ExtractDataRequest {
response: data,
mime_type,
query,
}),
data_source_name,
protocol_version,
op_id,
)
}
pub fn new_get_config_schema_request(
data_source_name: Name,
protocol_version: u8,
op_id: Base64Uuid,
) -> Self {
Self::payload_with_header(
ServerMessagePayload::GetConfigSchema(GetConfigSchemaRequest {}),
data_source_name,
protocol_version,
op_id,
)
}
pub fn new_get_supported_query_types_request(
config: ProviderConfig,
data_source_name: Name,
protocol_version: u8,
op_id: Base64Uuid,
) -> Self {
Self::payload_with_header(
ServerMessagePayload::GetSupportedQueryTypes(GetSupportedQueryTypesRequest { config }),
data_source_name,
protocol_version,
op_id,
)
}
}
#[derive(Debug, Deserialize, Serialize)]
#[cfg_attr(
feature = "fp-bindgen",
derive(Serializable),
fp(rust_module = "fiberplane_models::proxies")
)]
#[non_exhaustive]
#[serde(tag = "type", rename_all = "camelCase")]
pub enum ServerMessagePayload {
#[serde(rename = "invokeProxy")] Invoke(InvokeRequest),
CreateCells(CreateCellsRequest),
ExtractData(ExtractDataRequest),
GetConfigSchema(GetConfigSchemaRequest),
GetSupportedQueryTypes(GetSupportedQueryTypesRequest),
}
#[derive(Deserialize, Serialize, TypedBuilder)]
#[cfg_attr(
feature = "fp-bindgen",
derive(Serializable),
fp(rust_module = "fiberplane_models::proxies")
)]
#[non_exhaustive]
#[serde(rename_all = "camelCase")]
pub struct InvokeRequest {
#[serde(with = "serde_bytes")]
pub data: Vec<u8>,
}
impl Debug for InvokeRequest {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("InvokeRequest")
.field("data", &format!("[{} bytes]", self.data.len()))
.finish()
}
}
#[derive(Deserialize, Serialize, TypedBuilder)]
#[cfg_attr(
feature = "fp-bindgen",
derive(Serializable),
fp(rust_module = "fiberplane_models::proxies")
)]
#[non_exhaustive]
#[serde(rename_all = "camelCase")]
pub struct CreateCellsRequest {
pub response: Blob,
#[builder(setter(into))]
pub query_type: String,
}
impl Debug for CreateCellsRequest {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CreateCellsRequest")
.field("query_type", &self.query_type)
.field("response", &format!("[{} bytes]", self.response.data.len()))
.finish()
}
}
#[derive(Deserialize, Serialize, TypedBuilder)]
#[cfg_attr(
feature = "fp-bindgen",
derive(Serializable),
fp(rust_module = "fiberplane_models::proxies")
)]
#[non_exhaustive]
#[serde(rename_all = "camelCase")]
pub struct ExtractDataRequest {
pub response: Blob,
#[builder(setter(into))]
pub mime_type: String,
#[builder(default, setter(into, strip_option))]
#[serde(default, skip_serializing_if = "Option::is_none")]
pub query: Option<String>,
}
impl Debug for ExtractDataRequest {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ExtractDataRequest")
.field("mime_type", &self.mime_type)
.field("query", &self.query)
.field("response", &format!("[{} bytes]", self.response.data.len()))
.finish()
}
}
#[derive(Deserialize, Serialize)]
#[cfg_attr(
feature = "fp-bindgen",
derive(Serializable),
fp(rust_module = "fiberplane_models::proxies")
)]
#[non_exhaustive]
#[serde(rename_all = "camelCase")]
pub struct GetConfigSchemaRequest {}
impl Debug for GetConfigSchemaRequest {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ConfigSchemaRequest").finish()
}
}
#[derive(Deserialize, Serialize, Debug, TypedBuilder)]
#[cfg_attr(
feature = "fp-bindgen",
derive(Serializable),
fp(rust_module = "fiberplane_models::proxies")
)]
#[non_exhaustive]
#[serde(rename_all = "camelCase")]
pub struct GetSupportedQueryTypesRequest {
pub config: ProviderConfig,
}
#[derive(Debug, Deserialize, Serialize, TypedBuilder)]
#[cfg_attr(
feature = "fp-bindgen",
derive(Serializable),
fp(rust_module = "fiberplane_models::proxies")
)]
#[non_exhaustive]
#[serde(rename_all = "camelCase")]
pub struct ProxyMessage {
#[builder(default, setter(into, strip_option))]
pub op_id: Option<Base64Uuid>,
#[serde(flatten)]
pub payload: ProxyMessagePayload,
}
impl ProxyMessage {
fn response(payload: ProxyMessagePayload, op_id: Base64Uuid) -> Self {
Self {
op_id: Some(op_id),
payload,
}
}
fn notification(payload: ProxyMessagePayload) -> Self {
Self {
op_id: None,
payload,
}
}
pub fn new_error_response(error: Error, op_id: Base64Uuid) -> Self {
Self::response(ProxyMessagePayload::Error(ErrorMessage { error }), op_id)
}
pub fn new_invoke_proxy_response(data: Vec<u8>, op_id: Base64Uuid) -> Self {
Self::response(
ProxyMessagePayload::InvokeProxyResponse(InvokeProxyResponseMessage { data }),
op_id,
)
}
pub fn new_create_cells_response(cells: Result<Vec<Cell>, Error>, op_id: Base64Uuid) -> Self {
Self::response(
ProxyMessagePayload::CreateCellsResponse(CreateCellsResponseMessage { cells }),
op_id,
)
}
pub fn new_extract_data_response(data: Result<Blob, Error>, op_id: Base64Uuid) -> Self {
Self::response(
ProxyMessagePayload::ExtractDataResponse(ExtractDataResponseMessage { data }),
op_id,
)
}
pub fn new_config_schema_response(schema: ConfigSchema, op_id: Base64Uuid) -> Self {
Self::response(
ProxyMessagePayload::GetConfigSchemaResponse(GetConfigSchemaResponseMessage { schema }),
op_id,
)
}
pub fn new_supported_query_types_response(
queries: Vec<SupportedQueryType>,
op_id: Base64Uuid,
) -> Self {
Self::response(
ProxyMessagePayload::GetSupportedQueryTypesResponse(
GetSupportedQueryTypesResponseMessage { queries },
),
op_id,
)
}
pub fn new_set_data_sources_notification(data_sources: Vec<UpsertProxyDataSource>) -> Self {
Self::notification(ProxyMessagePayload::SetDataSources(SetDataSourcesMessage {
data_sources,
}))
}
}
#[derive(Debug, Deserialize, Serialize)]
#[cfg_attr(
feature = "fp-bindgen",
derive(Serializable),
fp(rust_module = "fiberplane_models::proxies")
)]
#[non_exhaustive]
#[serde(tag = "type", rename_all = "camelCase")]
pub enum ProxyMessagePayload {
SetDataSources(SetDataSourcesMessage),
InvokeProxyResponse(InvokeProxyResponseMessage),
CreateCellsResponse(CreateCellsResponseMessage),
ExtractDataResponse(ExtractDataResponseMessage),
GetConfigSchemaResponse(GetConfigSchemaResponseMessage),
GetSupportedQueryTypesResponse(GetSupportedQueryTypesResponseMessage),
Error(ErrorMessage),
}
impl From<(ErrorMessage, Base64Uuid)> for ProxyMessage {
fn from((message, op_id): (ErrorMessage, Base64Uuid)) -> Self {
Self::response(ProxyMessagePayload::Error(message), op_id)
}
}
#[derive(Deserialize, Serialize, TypedBuilder)]
#[cfg_attr(
feature = "fp-bindgen",
derive(Serializable),
fp(rust_module = "fiberplane_models::proxies")
)]
#[non_exhaustive]
#[serde(rename_all = "camelCase")]
pub struct InvokeProxyResponseMessage {
#[serde(with = "serde_bytes")]
pub data: Vec<u8>,
}
impl Debug for InvokeProxyResponseMessage {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("InvokeProxyResponseMessage")
.field("data", &format!("[{} bytes]", self.data.len()))
.finish()
}
}
#[derive(Deserialize, Serialize, Debug, TypedBuilder)]
#[cfg_attr(
feature = "fp-bindgen",
derive(Serializable),
fp(rust_module = "fiberplane_models::proxies")
)]
#[non_exhaustive]
#[serde(rename_all = "camelCase")]
pub struct ExtractDataResponseMessage {
pub data: Result<Blob, Error>,
}
#[derive(Deserialize, Serialize, Debug, TypedBuilder)]
#[cfg_attr(
feature = "fp-bindgen",
derive(Serializable),
fp(rust_module = "fiberplane_models::proxies")
)]
#[non_exhaustive]
#[serde(rename_all = "camelCase")]
pub struct CreateCellsResponseMessage {
pub cells: Result<Vec<Cell>, Error>,
}
#[derive(Deserialize, Serialize, Debug, TypedBuilder)]
#[cfg_attr(
feature = "fp-bindgen",
derive(Serializable),
fp(rust_module = "fiberplane_models::proxies")
)]
#[non_exhaustive]
#[serde(rename_all = "camelCase")]
pub struct GetConfigSchemaResponseMessage {
pub schema: ConfigSchema,
}
#[derive(Deserialize, Serialize, Debug, TypedBuilder)]
#[cfg_attr(
feature = "fp-bindgen",
derive(Serializable),
fp(rust_module = "fiberplane_models::proxies")
)]
#[non_exhaustive]
#[serde(rename_all = "camelCase")]
pub struct GetSupportedQueryTypesResponseMessage {
pub queries: Vec<SupportedQueryType>,
}
#[derive(Debug, Deserialize, Serialize, TypedBuilder)]
#[cfg_attr(
feature = "fp-bindgen",
derive(Serializable),
fp(rust_module = "fiberplane_models::proxies")
)]
#[non_exhaustive]
#[serde(rename_all = "camelCase")]
pub struct ErrorMessage {
pub error: Error,
}
impl ProxyMessage {
pub fn deserialize_msgpack(
input: impl AsRef<[u8]>,
) -> Result<ProxyMessage, rmp_serde::decode::Error> {
rmp_serde::from_slice(input.as_ref())
}
pub fn serialize_msgpack(&self) -> Vec<u8> {
rmp_serde::to_vec_named(&self).expect("MessgePack serialization error")
}
pub fn op_id(&self) -> Option<Base64Uuid> {
self.op_id
}
}
#[derive(Debug, Deserialize, Serialize, TypedBuilder)]
#[cfg_attr(
feature = "fp-bindgen",
derive(Serializable),
fp(rust_module = "fiberplane_models::proxies")
)]
#[non_exhaustive]
#[serde(rename_all = "camelCase")]
pub struct SetDataSourcesMessage {
pub data_sources: Vec<UpsertProxyDataSource>,
}
#[derive(Debug, Deserialize, Serialize, PartialEq, Eq, Clone, TypedBuilder)]
#[cfg_attr(
feature = "fp-bindgen",
derive(Serializable),
fp(rust_module = "fiberplane_models::proxies")
)]
#[non_exhaustive]
#[serde(tag = "type", rename_all = "camelCase")]
pub struct UpsertProxyDataSource {
pub name: Name,
#[builder(default, setter(into, strip_option))]
pub description: Option<String>,
#[builder(setter(into))]
pub provider_type: String,
#[builder(default)]
#[serde(default)]
pub protocol_version: u8,
#[serde(flatten)]
pub status: DataSourceStatus,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::providers::Error;
#[test]
fn serialization_deserialization() {
let data_sources = vec![
UpsertProxyDataSource {
name: Name::from_static("prometheus-prod"),
provider_type: "prometheus".to_string(),
protocol_version: 2,
description: Some("Production Prometheus".to_string()),
status: DataSourceStatus::Connected,
},
UpsertProxyDataSource {
name: Name::from_static("elasticsearch-prod"),
provider_type: "elasticsearch".to_string(),
protocol_version: 1,
description: None,
status: DataSourceStatus::Error(Error::NotFound),
},
];
let message = ProxyMessage::new_set_data_sources_notification(data_sources.clone());
let serialized = message.serialize_msgpack();
let deserialized = ProxyMessage::deserialize_msgpack(serialized).unwrap();
if let ProxyMessage {
op_id: None,
payload: ProxyMessagePayload::SetDataSources(set_data_sources),
} = deserialized
{
assert_eq!(set_data_sources.data_sources, data_sources)
} else {
panic!("Unexpected message type");
}
}
#[test]
fn backwards_compatibility() {
mod old {
use crate::names::Name;
use base64uuid::Base64Uuid;
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize)]
#[serde(tag = "type", rename_all = "camelCase")]
pub enum ServerMessage {
InvokeProxy(InvokeProxyMessage),
}
#[derive(Debug, Deserialize, Serialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct InvokeProxyMessage {
pub op_id: Base64Uuid,
pub data_source_name: Name,
#[serde(with = "serde_bytes")]
pub data: Vec<u8>,
pub protocol_version: u8,
}
}
let op_id = Base64Uuid::parse_str("34edc58d-f8ec-4c95-bce0-c2ae8800e6ef").unwrap();
let data_source_name = Name::from_static("test-name");
let data = b"aieu".to_vec();
let old_message = old::InvokeProxyMessage {
op_id,
data_source_name,
protocol_version: 12,
data,
};
let new_message: ServerMessage = rmp_serde::from_slice(
&rmp_serde::to_vec_named(&old::ServerMessage::InvokeProxy(old_message.clone()))
.unwrap(),
)
.unwrap();
assert_eq!(new_message.op_id, old_message.op_id);
assert_eq!(new_message.data_source_name, old_message.data_source_name);
assert_eq!(new_message.protocol_version, old_message.protocol_version);
if let ServerMessagePayload::Invoke(response) = new_message.payload {
assert_eq!(response.data, old_message.data)
} else {
panic!("Wrong variant of ServerMessage deserialized. Expecting Invoke")
}
}
}