use std::borrow::Cow;
use std::collections::HashMap;
use std::fmt;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use crate::client::Client;
use crate::error::Result;
use crate::http::{send, send_empty, RequestOpts};
use crate::timestamp::Timestamp;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum StreamCategory {
Trade,
Orderbook,
BookUpdate,
Quote,
Kline,
Ticker,
Liquidation,
Other(String),
}
impl StreamCategory {
pub fn as_str(&self) -> &str {
match self {
Self::Trade => "trade",
Self::Orderbook => "orderbook",
Self::BookUpdate => "book_update",
Self::Quote => "quote",
Self::Kline => "kline",
Self::Ticker => "ticker",
Self::Liquidation => "liquidation",
Self::Other(s) => s,
}
}
}
impl fmt::Display for StreamCategory {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(self.as_str())
}
}
impl Serialize for StreamCategory {
fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
s.serialize_str(self.as_str())
}
}
impl<'de> Deserialize<'de> for StreamCategory {
fn deserialize<D: Deserializer<'de>>(d: D) -> std::result::Result<Self, D::Error> {
let s = String::deserialize(d)?;
Ok(match s.as_str() {
"trade" => Self::Trade,
"orderbook" => Self::Orderbook,
"book_update" => Self::BookUpdate,
"quote" => Self::Quote,
"kline" => Self::Kline,
"ticker" => Self::Ticker,
"liquidation" => Self::Liquidation,
_ => Self::Other(s),
})
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum MetaValue {
CollectionTimestampNs,
Other(String),
}
impl MetaValue {
pub fn as_str(&self) -> &str {
match self {
Self::CollectionTimestampNs => "collection_timestamp_ns",
Self::Other(s) => s,
}
}
}
impl Serialize for MetaValue {
fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
s.serialize_str(self.as_str())
}
}
impl<'de> Deserialize<'de> for MetaValue {
fn deserialize<D: Deserializer<'de>>(d: D) -> std::result::Result<Self, D::Error> {
let s = String::deserialize(d)?;
Ok(match s.as_str() {
"collection_timestamp_ns" => Self::CollectionTimestampNs,
_ => Self::Other(s),
})
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum TimestampFormat {
Ns,
Us,
Ms,
S,
Iso8601,
Other(String),
}
impl TimestampFormat {
pub fn as_str(&self) -> &str {
match self {
Self::Ns => "ns",
Self::Us => "us",
Self::Ms => "ms",
Self::S => "s",
Self::Iso8601 => "iso8601",
Self::Other(s) => s,
}
}
}
impl Serialize for TimestampFormat {
fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
s.serialize_str(self.as_str())
}
}
impl<'de> Deserialize<'de> for TimestampFormat {
fn deserialize<D: Deserializer<'de>>(d: D) -> std::result::Result<Self, D::Error> {
let s = String::deserialize(d)?;
Ok(match s.as_str() {
"ns" => Self::Ns,
"us" => Self::Us,
"ms" => Self::Ms,
"s" => Self::S,
"iso8601" => Self::Iso8601,
_ => Self::Other(s),
})
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum DataType {
Decimal(u8),
I64,
F64,
Bool,
Str,
Other(String),
}
impl DataType {
pub fn as_str(&self) -> Cow<'_, str> {
match self {
Self::Decimal(n) => Cow::Owned(format!("decimal({n})")),
Self::I64 => Cow::Borrowed("i64"),
Self::F64 => Cow::Borrowed("f64"),
Self::Bool => Cow::Borrowed("bool"),
Self::Str => Cow::Borrowed("string"),
Self::Other(s) => Cow::Borrowed(s),
}
}
}
impl Serialize for DataType {
fn serialize<S: Serializer>(&self, s: S) -> std::result::Result<S::Ok, S::Error> {
s.serialize_str(&self.as_str())
}
}
impl<'de> Deserialize<'de> for DataType {
fn deserialize<D: Deserializer<'de>>(d: D) -> std::result::Result<Self, D::Error> {
let s = String::deserialize(d)?;
if let Some(rest) = s.strip_prefix("decimal(").and_then(|r| r.strip_suffix(')')) {
if let Ok(n) = rest.parse::<u8>() {
return Ok(Self::Decimal(n));
}
}
Ok(match s.as_str() {
"i64" => Self::I64,
"f64" => Self::F64,
"bool" => Self::Bool,
"string" => Self::Str,
_ => Self::Other(s),
})
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MetaExtraction {
pub value: MetaValue,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub format: Option<TimestampFormat>,
}
impl MetaExtraction {
pub fn new(value: MetaValue) -> Self {
Self {
value,
format: None,
}
}
#[must_use]
pub fn format(mut self, format: TimestampFormat) -> Self {
self.format = Some(format);
self
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JsonExtraction {
pub path: String,
#[serde(rename = "type")]
pub data_type: DataType,
}
impl JsonExtraction {
pub fn new(path: impl Into<String>, data_type: DataType) -> Self {
Self {
path: path.into(),
data_type,
}
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ExchangeExtractor {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub json: Option<JsonExtraction>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub transform: Option<String>,
}
impl ExchangeExtractor {
pub fn json(path: impl Into<String>, data_type: DataType) -> Self {
Self {
json: Some(JsonExtraction::new(path, data_type)),
transform: None,
}
}
#[must_use]
pub fn transform(mut self, expr: impl Into<String>) -> Self {
self.transform = Some(expr.into());
self
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ColumnSource {
Meta(MetaExtraction),
Data(HashMap<String, ExchangeExtractor>),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SchemaColumn {
pub output_column: String,
#[serde(flatten)]
pub source: ColumnSource,
}
impl SchemaColumn {
pub fn meta(output_column: impl Into<String>, extraction: MetaExtraction) -> Self {
Self {
output_column: output_column.into(),
source: ColumnSource::Meta(extraction),
}
}
pub fn data(output_column: impl Into<String>) -> DataColumn {
DataColumn {
output_column: output_column.into(),
extractors: HashMap::new(),
}
}
}
pub struct DataColumn {
output_column: String,
extractors: HashMap<String, ExchangeExtractor>,
}
impl DataColumn {
#[must_use]
pub fn exchange(mut self, exchange: impl Into<String>, extractor: ExchangeExtractor) -> Self {
self.extractors.insert(exchange.into(), extractor);
self
}
pub fn build(self) -> SchemaColumn {
SchemaColumn {
output_column: self.output_column,
source: ColumnSource::Data(self.extractors),
}
}
}
impl From<DataColumn> for SchemaColumn {
fn from(d: DataColumn) -> Self {
d.build()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UnfoldConfig {
pub path: String,
}
impl UnfoldConfig {
pub fn new(path: impl Into<String>) -> Self {
Self { path: path.into() }
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TaggedArray {
pub path: String,
pub tag: String,
}
impl TaggedArray {
pub fn new(path: impl Into<String>, tag: impl Into<String>) -> Self {
Self {
path: path.into(),
tag: tag.into(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "op", rename_all = "snake_case")]
#[non_exhaustive]
pub enum DeriveOp {
TaggedConcat {
arrays: Vec<TaggedArray>,
tag_field: String,
value_field: String,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeriveField {
pub field: String,
#[serde(flatten)]
pub op: DeriveOp,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SchemaContent {
pub columns: Vec<SchemaColumn>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub unfold: Option<HashMap<String, UnfoldConfig>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub derive: Option<HashMap<String, Vec<DeriveField>>>,
}
impl SchemaContent {
pub fn builder() -> SchemaContentBuilder {
SchemaContentBuilder::default()
}
}
#[derive(Debug, Clone, Default)]
pub struct SchemaContentBuilder {
columns: Vec<SchemaColumn>,
unfold: HashMap<String, UnfoldConfig>,
derive: HashMap<String, Vec<DeriveField>>,
}
impl SchemaContentBuilder {
#[must_use]
pub fn column(mut self, column: impl Into<SchemaColumn>) -> Self {
self.columns.push(column.into());
self
}
#[must_use]
pub fn columns<I, C>(mut self, columns: I) -> Self
where
I: IntoIterator<Item = C>,
C: Into<SchemaColumn>,
{
self.columns.extend(columns.into_iter().map(Into::into));
self
}
#[must_use]
pub fn unfold(mut self, exchange: impl Into<String>, config: UnfoldConfig) -> Self {
self.unfold.insert(exchange.into(), config);
self
}
#[must_use]
pub fn derive(
mut self,
exchange: impl Into<String>,
fields: impl IntoIterator<Item = DeriveField>,
) -> Self {
self.derive
.insert(exchange.into(), fields.into_iter().collect());
self
}
pub fn build(self) -> SchemaContent {
SchemaContent {
columns: self.columns,
unfold: if self.unfold.is_empty() {
None
} else {
Some(self.unfold)
},
derive: if self.derive.is_empty() {
None
} else {
Some(self.derive)
},
}
}
}
#[derive(Debug, Clone, Deserialize)]
pub struct ExportSchemaListItem {
pub id: String,
pub name: String,
pub stream_category: StreamCategory,
pub is_built_in: bool,
pub created_at: Timestamp,
pub version: i32,
pub has_draft: bool,
}
#[derive(Debug, Clone, Deserialize)]
pub struct ExportSchemaWithVersion {
pub id: String,
pub name: String,
pub stream_category: StreamCategory,
pub is_built_in: bool,
pub created_at: Timestamp,
pub version: i32,
pub has_draft: bool,
pub columns: Vec<SchemaColumn>,
#[serde(default)]
pub unfold: Option<HashMap<String, UnfoldConfig>>,
#[serde(default)]
pub derive: Option<HashMap<String, Vec<DeriveField>>>,
}
pub struct ExportSchemasResource<'a> {
pub(crate) client: &'a Client,
}
impl<'a> ExportSchemasResource<'a> {
pub fn for_id(&self, id: impl Into<String>) -> SchemaHandle<'a> {
SchemaHandle {
client: self.client,
id: id.into(),
}
}
pub async fn list(&self) -> Result<Vec<ExportSchemaListItem>> {
send::<_, ()>(
self.client,
reqwest::Method::GET,
"/export-schemas",
None,
None,
RequestOpts::default(),
)
.await
}
pub async fn get(&self, id: &str) -> Result<ExportSchemaWithVersion> {
let path = format!("/export-schemas/{id}");
send::<_, ()>(
self.client,
reqwest::Method::GET,
&path,
None,
None,
RequestOpts::default(),
)
.await
}
pub fn create(
&self,
name: impl Into<String>,
stream_category: StreamCategory,
content: SchemaContent,
) -> CreateExportSchemaRequest<'a> {
CreateExportSchemaRequest {
client: self.client,
name: name.into(),
stream_category,
content,
idempotency_key: None,
}
}
pub fn update(
&self,
id: impl Into<String>,
content: SchemaContent,
) -> UpdateExportSchemaRequest<'a> {
UpdateExportSchemaRequest {
client: self.client,
id: id.into(),
content,
idempotency_key: None,
}
}
pub fn delete(&self, id: impl Into<String>) -> DeleteExportSchemaRequest<'a> {
DeleteExportSchemaRequest {
client: self.client,
id: id.into(),
idempotency_key: None,
}
}
pub fn create_draft(&self, id: impl Into<String>) -> CreateDraftRequest<'a> {
CreateDraftRequest {
client: self.client,
id: id.into(),
content: None,
idempotency_key: None,
}
}
pub async fn get_draft(&self, id: &str) -> Result<ExportSchemaWithVersion> {
let path = format!("/export-schemas/{id}/draft");
send::<_, ()>(
self.client,
reqwest::Method::GET,
&path,
None,
None,
RequestOpts::default(),
)
.await
}
pub fn update_draft(
&self,
id: impl Into<String>,
content: SchemaContent,
) -> UpdateDraftRequest<'a> {
UpdateDraftRequest {
client: self.client,
id: id.into(),
content,
}
}
pub fn discard_draft(&self, id: impl Into<String>) -> DiscardDraftRequest<'a> {
DiscardDraftRequest {
client: self.client,
id: id.into(),
idempotency_key: None,
}
}
pub fn publish_draft(&self, id: impl Into<String>) -> PublishDraftRequest<'a> {
PublishDraftRequest {
client: self.client,
id: id.into(),
idempotency_key: None,
}
}
}
pub struct SchemaHandle<'a> {
client: &'a Client,
id: String,
}
impl<'a> SchemaHandle<'a> {
pub fn id(&self) -> &str {
&self.id
}
pub async fn get(&self) -> Result<ExportSchemaWithVersion> {
ExportSchemasResource {
client: self.client,
}
.get(&self.id)
.await
}
pub fn update(&self, content: SchemaContent) -> UpdateExportSchemaRequest<'a> {
UpdateExportSchemaRequest {
client: self.client,
id: self.id.clone(),
content,
idempotency_key: None,
}
}
pub fn delete(&self) -> DeleteExportSchemaRequest<'a> {
DeleteExportSchemaRequest {
client: self.client,
id: self.id.clone(),
idempotency_key: None,
}
}
pub async fn get_draft(&self) -> Result<ExportSchemaWithVersion> {
ExportSchemasResource {
client: self.client,
}
.get_draft(&self.id)
.await
}
pub fn create_draft(&self) -> CreateDraftRequest<'a> {
CreateDraftRequest {
client: self.client,
id: self.id.clone(),
content: None,
idempotency_key: None,
}
}
pub fn update_draft(&self, content: SchemaContent) -> UpdateDraftRequest<'a> {
UpdateDraftRequest {
client: self.client,
id: self.id.clone(),
content,
}
}
pub fn discard_draft(&self) -> DiscardDraftRequest<'a> {
DiscardDraftRequest {
client: self.client,
id: self.id.clone(),
idempotency_key: None,
}
}
pub fn publish_draft(&self) -> PublishDraftRequest<'a> {
PublishDraftRequest {
client: self.client,
id: self.id.clone(),
idempotency_key: None,
}
}
}
#[derive(Serialize)]
struct CreateBody<'b> {
name: &'b str,
stream_category: &'b StreamCategory,
#[serde(flatten)]
content: &'b SchemaContent,
}
pub struct CreateExportSchemaRequest<'a> {
client: &'a Client,
name: String,
stream_category: StreamCategory,
content: SchemaContent,
idempotency_key: Option<String>,
}
impl<'a> CreateExportSchemaRequest<'a> {
#[must_use]
pub fn idempotency_key(mut self, k: impl Into<String>) -> Self {
self.idempotency_key = Some(k.into());
self
}
pub async fn send(self) -> Result<ExportSchemaWithVersion> {
let body = CreateBody {
name: &self.name,
stream_category: &self.stream_category,
content: &self.content,
};
send(
self.client,
reqwest::Method::POST,
"/export-schemas",
None,
Some(&body),
RequestOpts {
idempotency_key: self.idempotency_key.as_deref(),
},
)
.await
}
}
pub struct UpdateExportSchemaRequest<'a> {
client: &'a Client,
id: String,
content: SchemaContent,
idempotency_key: Option<String>,
}
impl<'a> UpdateExportSchemaRequest<'a> {
#[must_use]
pub fn idempotency_key(mut self, k: impl Into<String>) -> Self {
self.idempotency_key = Some(k.into());
self
}
pub async fn send(self) -> Result<ExportSchemaWithVersion> {
let path = format!("/export-schemas/{}", self.id);
send(
self.client,
reqwest::Method::PUT,
&path,
None,
Some(&self.content),
RequestOpts {
idempotency_key: self.idempotency_key.as_deref(),
},
)
.await
}
}
pub struct DeleteExportSchemaRequest<'a> {
client: &'a Client,
id: String,
idempotency_key: Option<String>,
}
impl<'a> DeleteExportSchemaRequest<'a> {
#[must_use]
pub fn idempotency_key(mut self, k: impl Into<String>) -> Self {
self.idempotency_key = Some(k.into());
self
}
pub async fn send(self) -> Result<()> {
let path = format!("/export-schemas/{}", self.id);
send_empty::<()>(
self.client,
reqwest::Method::DELETE,
&path,
None,
None,
RequestOpts {
idempotency_key: self.idempotency_key.as_deref(),
},
)
.await
}
}
pub struct CreateDraftRequest<'a> {
client: &'a Client,
id: String,
content: Option<SchemaContent>,
idempotency_key: Option<String>,
}
impl<'a> CreateDraftRequest<'a> {
#[must_use]
pub fn content(mut self, content: SchemaContent) -> Self {
self.content = Some(content);
self
}
#[must_use]
pub fn idempotency_key(mut self, k: impl Into<String>) -> Self {
self.idempotency_key = Some(k.into());
self
}
pub async fn send(self) -> Result<ExportSchemaWithVersion> {
let path = format!("/export-schemas/{}/draft", self.id);
match self.content {
Some(c) => {
send(
self.client,
reqwest::Method::POST,
&path,
None,
Some(&c),
RequestOpts {
idempotency_key: self.idempotency_key.as_deref(),
},
)
.await
}
None => {
send::<_, ()>(
self.client,
reqwest::Method::POST,
&path,
None,
None,
RequestOpts {
idempotency_key: self.idempotency_key.as_deref(),
},
)
.await
}
}
}
}
pub struct UpdateDraftRequest<'a> {
client: &'a Client,
id: String,
content: SchemaContent,
}
impl<'a> UpdateDraftRequest<'a> {
pub async fn send(self) -> Result<ExportSchemaWithVersion> {
let path = format!("/export-schemas/{}/draft", self.id);
send(
self.client,
reqwest::Method::PUT,
&path,
None,
Some(&self.content),
RequestOpts::default(),
)
.await
}
}
pub struct DiscardDraftRequest<'a> {
client: &'a Client,
id: String,
idempotency_key: Option<String>,
}
impl<'a> DiscardDraftRequest<'a> {
#[must_use]
pub fn idempotency_key(mut self, k: impl Into<String>) -> Self {
self.idempotency_key = Some(k.into());
self
}
pub async fn send(self) -> Result<()> {
let path = format!("/export-schemas/{}/draft", self.id);
send_empty::<()>(
self.client,
reqwest::Method::DELETE,
&path,
None,
None,
RequestOpts {
idempotency_key: self.idempotency_key.as_deref(),
},
)
.await
}
}
pub struct PublishDraftRequest<'a> {
client: &'a Client,
id: String,
idempotency_key: Option<String>,
}
impl<'a> PublishDraftRequest<'a> {
#[must_use]
pub fn idempotency_key(mut self, k: impl Into<String>) -> Self {
self.idempotency_key = Some(k.into());
self
}
pub async fn send(self) -> Result<ExportSchemaWithVersion> {
let path = format!("/export-schemas/{}/publish", self.id);
send::<_, ()>(
self.client,
reqwest::Method::POST,
&path,
None,
None,
RequestOpts {
idempotency_key: self.idempotency_key.as_deref(),
},
)
.await
}
}