use std::{collections::HashMap, num::NonZeroU64, str::FromStr};
use dbn::{Encoding, SType, Schema};
use reqwest::RequestBuilder;
use serde::{Deserialize, Deserializer};
use typed_builder::TypedBuilder;
use crate::Symbols;
use super::{AddToQuery, DateRange, DateTimeRange};
pub struct MetadataClient<'a> {
pub(crate) inner: &'a mut super::Client,
}
impl MetadataClient<'_> {
pub async fn list_publishers(&mut self) -> crate::Result<Vec<PublisherDetail>> {
Ok(self
.get("list_publishers")?
.send()
.await?
.error_for_status()?
.json()
.await?)
}
pub async fn list_datasets(
&mut self,
date_range: Option<DateRange>,
) -> crate::Result<Vec<String>> {
let mut builder = self.get("list_datasets")?;
if let Some(date_range) = date_range {
builder = builder.add_to_query(&date_range);
}
Ok(builder.send().await?.error_for_status()?.json().await?)
}
pub async fn list_schemas(&mut self, dataset: &str) -> crate::Result<Vec<Schema>> {
Ok(self
.get("list_schemas")?
.query(&[("dataset", dataset)])
.send()
.await?
.error_for_status()?
.json()
.await?)
}
pub async fn list_fields(
&mut self,
params: &ListFieldsParams,
) -> crate::Result<Vec<FieldDetail>> {
let builder = self.get("list_fields")?.query(&[
("encoding", params.encoding.as_str()),
("schema", params.schema.as_str()),
]);
Ok(builder.send().await?.error_for_status()?.json().await?)
}
pub async fn list_unit_prices(
&mut self,
dataset: &str,
) -> crate::Result<Vec<UnitPricesForMode>> {
let builder = self
.get("list_unit_prices")?
.query(&[("dataset", &dataset)]);
Ok(builder.send().await?.error_for_status()?.json().await?)
}
pub async fn get_dataset_condition(
&mut self,
params: &GetDatasetConditionParams,
) -> crate::Result<Vec<DatasetConditionDetail>> {
let mut builder = self
.get("get_dataset_condition")?
.query(&[("dataset", ¶ms.dataset)]);
if let Some(ref date_range) = params.date_range {
builder = builder.add_to_query(date_range);
}
Ok(builder.send().await?.error_for_status()?.json().await?)
}
pub async fn get_dataset_range(&mut self, dataset: &str) -> crate::Result<DatasetRange> {
Ok(self
.get("get_dataset_range")?
.query(&[("dataset", dataset)])
.send()
.await?
.error_for_status()?
.json()
.await?)
}
pub async fn get_record_count(&mut self, params: &GetRecordCountParams) -> crate::Result<u64> {
Ok(self
.get("get_record_count")?
.add_to_query(params)
.send()
.await?
.error_for_status()?
.json()
.await?)
}
pub async fn get_billable_size(
&mut self,
params: &GetBillableSizeParams,
) -> crate::Result<u64> {
Ok(self
.get("get_billable_size")?
.add_to_query(params)
.send()
.await?
.error_for_status()?
.json()
.await?)
}
pub async fn get_cost(&mut self, params: &GetCostParams) -> crate::Result<f64> {
Ok(self
.get("get_cost")?
.add_to_query(params)
.send()
.await?
.error_for_status()?
.json()
.await?)
}
fn get(&mut self, slug: &str) -> crate::Result<RequestBuilder> {
self.inner.get(&format!("metadata.{slug}"))
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum FeedMode {
Historical,
HistoricalStreaming,
Live,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum DatasetCondition {
Available,
Degraded,
Pending,
Missing,
}
#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
pub struct PublisherDetail {
pub publisher_id: u16,
pub dataset: String,
pub venue: String,
pub description: String,
}
#[derive(Debug, Clone, TypedBuilder)]
pub struct ListFieldsParams {
pub encoding: Encoding,
pub schema: Schema,
}
#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
pub struct FieldDetail {
pub name: String,
#[serde(rename = "type")]
pub type_name: String,
}
#[derive(Debug, Clone, Deserialize, PartialEq)]
pub struct UnitPricesForMode {
pub mode: FeedMode,
pub unit_prices: HashMap<Schema, f64>,
}
#[derive(Debug, Clone, TypedBuilder)]
pub struct GetDatasetConditionParams {
#[builder(setter(transform = |dataset: impl ToString| dataset.to_string()))]
pub dataset: String,
#[builder(default, setter(transform = |dr: impl Into<DateRange>| Some(dr.into())))]
pub date_range: Option<DateRange>,
}
#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
pub struct DatasetConditionDetail {
#[serde(deserialize_with = "deserialize_date")]
pub date: time::Date,
pub condition: DatasetCondition,
#[serde(deserialize_with = "deserialize_date")]
pub last_modified_date: time::Date,
}
#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
pub struct DatasetRange {
#[serde(deserialize_with = "deserialize_date")]
pub start_date: time::Date,
#[serde(deserialize_with = "deserialize_date")]
pub end_date: time::Date,
}
#[derive(Debug, Clone, TypedBuilder)]
pub struct GetQueryParams {
#[builder(setter(transform = |dataset: impl ToString| dataset.to_string()))]
pub dataset: String,
#[builder(setter(into))]
pub symbols: Symbols,
pub schema: Schema,
#[builder(setter(into))]
pub date_time_range: DateTimeRange,
#[builder(default = SType::RawSymbol)]
pub stype_in: SType,
#[builder(default)]
pub limit: Option<NonZeroU64>,
}
pub type GetRecordCountParams = GetQueryParams;
pub type GetBillableSizeParams = GetQueryParams;
pub type GetCostParams = GetQueryParams;
impl AsRef<str> for FeedMode {
fn as_ref(&self) -> &str {
self.as_str()
}
}
impl FeedMode {
pub const fn as_str(&self) -> &'static str {
match self {
FeedMode::Historical => "historical",
FeedMode::HistoricalStreaming => "historical-streaming",
FeedMode::Live => "live",
}
}
}
impl FromStr for FeedMode {
type Err = crate::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"historical" => Ok(Self::Historical),
"historical-streaming" => Ok(Self::HistoricalStreaming),
"live" => Ok(Self::Live),
_ => Err(crate::Error::internal(format_args!(
"Unabled to convert {s} to FeedMode"
))),
}
}
}
impl<'de> Deserialize<'de> for FeedMode {
fn deserialize<D: Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
let str = String::deserialize(deserializer)?;
FromStr::from_str(&str).map_err(serde::de::Error::custom)
}
}
impl AsRef<str> for DatasetCondition {
fn as_ref(&self) -> &str {
self.as_str()
}
}
impl DatasetCondition {
pub const fn as_str(&self) -> &'static str {
match self {
DatasetCondition::Available => "available",
DatasetCondition::Degraded => "degraded",
DatasetCondition::Pending => "pending",
DatasetCondition::Missing => "missing",
}
}
}
impl FromStr for DatasetCondition {
type Err = crate::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"available" => Ok(DatasetCondition::Available),
"degraded" => Ok(DatasetCondition::Degraded),
"pending" => Ok(DatasetCondition::Pending),
"missing" => Ok(DatasetCondition::Missing),
_ => Err(crate::Error::internal(format_args!(
"Unabled to convert {s} to DatasetCondition"
))),
}
}
}
impl<'de> Deserialize<'de> for DatasetCondition {
fn deserialize<D: Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
let str = String::deserialize(deserializer)?;
FromStr::from_str(&str).map_err(serde::de::Error::custom)
}
}
fn deserialize_date<'de, D: serde::Deserializer<'de>>(
deserializer: D,
) -> Result<time::Date, D::Error> {
let dt_str = String::deserialize(deserializer)?;
time::Date::parse(&dt_str, super::DATE_FORMAT).map_err(serde::de::Error::custom)
}
impl AddToQuery<GetQueryParams> for reqwest::RequestBuilder {
fn add_to_query(mut self, params: &GetQueryParams) -> Self {
self = self
.query(&[
("dataset", params.dataset.as_str()),
("schema", params.schema.as_str()),
("stype_in", params.stype_in.as_str()),
])
.add_to_query(¶ms.symbols)
.add_to_query(¶ms.date_time_range);
if let Some(limit) = params.limit {
self = self.query(&[("limit", &limit.to_string())]);
}
self
}
}
#[cfg(test)]
mod tests {
use reqwest::StatusCode;
use serde_json::json;
use time::macros::date;
use wiremock::{
matchers::{basic_auth, method, path, query_param, query_param_is_missing},
Mock, MockServer, ResponseTemplate,
};
use super::*;
use crate::{
historical::{HistoricalGateway, API_VERSION},
HistoricalClient,
};
const API_KEY: &str = "test-metadata";
#[tokio::test]
async fn test_list_fields() {
const ENC: Encoding = Encoding::Csv;
const SCHEMA: Schema = Schema::Ohlcv1S;
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(basic_auth(API_KEY, ""))
.and(path(format!("/v{API_VERSION}/metadata.list_fields")))
.and(query_param("encoding", ENC.as_str()))
.and(query_param("schema", SCHEMA.as_str()))
.respond_with(ResponseTemplate::new(StatusCode::OK).set_body_json(json!([
{"name":"ts_event", "type": "uint64_t"},
{"name":"rtype", "type": "uint8_t"},
{"name":"open", "type": "int64_t"},
{"name":"high", "type": "int64_t"},
{"name":"low", "type": "int64_t"},
{"name":"close", "type": "int64_t"},
{"name":"volume", "type": "uint64_t"},
])))
.mount(&mock_server)
.await;
let mut target = HistoricalClient::with_url(
mock_server.uri(),
API_KEY.to_owned(),
HistoricalGateway::Bo1,
)
.unwrap();
let fields = target
.metadata()
.list_fields(
&ListFieldsParams::builder()
.encoding(ENC)
.schema(SCHEMA)
.build(),
)
.await
.unwrap();
let exp = vec![
FieldDetail {
name: "ts_event".to_owned(),
type_name: "uint64_t".to_owned(),
},
FieldDetail {
name: "rtype".to_owned(),
type_name: "uint8_t".to_owned(),
},
FieldDetail {
name: "open".to_owned(),
type_name: "int64_t".to_owned(),
},
FieldDetail {
name: "high".to_owned(),
type_name: "int64_t".to_owned(),
},
FieldDetail {
name: "low".to_owned(),
type_name: "int64_t".to_owned(),
},
FieldDetail {
name: "close".to_owned(),
type_name: "int64_t".to_owned(),
},
FieldDetail {
name: "volume".to_owned(),
type_name: "uint64_t".to_owned(),
},
];
assert_eq!(*fields, exp);
}
#[tokio::test]
async fn test_list_unit_prices() {
const SCHEMA: Schema = Schema::Tbbo;
const DATASET: &str = "GLBX.MDP3";
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(basic_auth(API_KEY, ""))
.and(path(format!("/v{API_VERSION}/metadata.list_unit_prices")))
.and(query_param("dataset", DATASET))
.respond_with(ResponseTemplate::new(StatusCode::OK).set_body_json(json!([
{
"mode": "historical",
"unit_prices": {
SCHEMA.as_str(): 17.89
}
},
{
"mode": "live",
"unit_prices": {
SCHEMA.as_str(): 34.22
}
}
])))
.mount(&mock_server)
.await;
let mut target = HistoricalClient::with_url(
mock_server.uri(),
API_KEY.to_owned(),
HistoricalGateway::Bo1,
)
.unwrap();
let prices = target.metadata().list_unit_prices(DATASET).await.unwrap();
assert_eq!(
prices,
vec![
UnitPricesForMode {
mode: FeedMode::Historical,
unit_prices: HashMap::from([(SCHEMA, 17.89)])
},
UnitPricesForMode {
mode: FeedMode::Live,
unit_prices: HashMap::from([(SCHEMA, 34.22)])
}
]
);
}
#[tokio::test]
async fn test_get_dataset_condition() {
const DATASET: &str = "GLBX.MDP3";
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(basic_auth(API_KEY, ""))
.and(path(format!(
"/v{API_VERSION}/metadata.get_dataset_condition"
)))
.and(query_param("dataset", DATASET))
.and(query_param("start_date", "2022-05-17"))
.and(query_param_is_missing("end_date"))
.respond_with(ResponseTemplate::new(StatusCode::OK).set_body_json(json!([
{
"date": "2022-05-17",
"condition": "available",
"last_modified_date": "2023-07-11",
},
{
"date": "2022-05-18",
"condition": "degraded",
"last_modified_date": "2022-05-19",
}
])))
.mount(&mock_server)
.await;
let mut target = HistoricalClient::with_url(
mock_server.uri(),
API_KEY.to_owned(),
HistoricalGateway::Bo1,
)
.unwrap();
let condition = target
.metadata()
.get_dataset_condition(
&GetDatasetConditionParams::builder()
.dataset(DATASET.to_owned())
.date_range(date!(2022 - 05 - 17))
.build(),
)
.await
.unwrap();
assert_eq!(condition.len(), 2);
assert_eq!(
condition[0],
DatasetConditionDetail {
date: date!(2022 - 05 - 17),
condition: DatasetCondition::Available,
last_modified_date: date!(2023 - 07 - 11)
}
);
assert_eq!(
condition[1],
DatasetConditionDetail {
date: date!(2022 - 05 - 18),
condition: DatasetCondition::Degraded,
last_modified_date: date!(2022 - 05 - 19)
}
);
}
#[tokio::test]
async fn test_get_dataset_range() {
const DATASET: &str = "XNAS.ITCH";
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(basic_auth(API_KEY, ""))
.and(path(format!("/v{API_VERSION}/metadata.get_dataset_range")))
.and(query_param("dataset", DATASET))
.respond_with(ResponseTemplate::new(StatusCode::OK).set_body_json(json!({
"start_date": "2019-07-07",
"end_date": "2023-07-19",
})))
.mount(&mock_server)
.await;
let mut target = HistoricalClient::with_url(
mock_server.uri(),
API_KEY.to_owned(),
HistoricalGateway::Bo1,
)
.unwrap();
let range = target.metadata().get_dataset_range(DATASET).await.unwrap();
assert_eq!(range.start_date, date!(2019 - 07 - 07));
assert_eq!(range.end_date, date!(2023 - 07 - 19));
}
}