use std::{collections::HashMap, num::NonZeroU64, str::FromStr};
use dbn::{Encoding, SType, Schema};
use reqwest::RequestBuilder;
use serde::{Deserialize, Deserializer};
use tracing::instrument;
use crate::{
deserialize::deserialize_date_time,
historical::{AddToForm, Limit, ReqwestForm},
Symbols,
};
use super::{handle_response, AddToQuery, DateRange, DateTimeRange};
#[derive(Debug)]
pub struct MetadataClient<'a> {
pub(crate) inner: &'a mut super::Client,
}
impl MetadataClient<'_> {
#[instrument(name = "metadata.list_publishers")]
pub async fn list_publishers(&mut self) -> crate::Result<Vec<PublisherDetail>> {
let resp = self.get("list_publishers")?.send().await?;
handle_response(resp).await
}
#[instrument(name = "metadata.list_datasets")]
pub async fn list_datasets(
&mut self,
date_range: Option<DateRange>,
) -> crate::Result<Vec<String>> {
let mut builder = self.get("list_datasets")?;
if let Some(date_range) = date_range {
builder = builder.add_to_query(&date_range);
}
let resp = builder.send().await?;
handle_response(resp).await
}
#[instrument(name = "metadata.list_schemas", skip(dataset), fields(dataset = %dataset.as_ref()))]
pub async fn list_schemas(&mut self, dataset: impl AsRef<str>) -> crate::Result<Vec<Schema>> {
let resp = self
.get("list_schemas")?
.query(&[("dataset", dataset.as_ref())])
.send()
.await?;
handle_response(resp).await
}
#[instrument(name = "metadata.list_fields")]
pub async fn list_fields(
&mut self,
params: &ListFieldsParams,
) -> crate::Result<Vec<FieldDetail>> {
let builder = self.get("list_fields")?.query(&[
("encoding", params.encoding.as_str()),
("schema", params.schema.as_str()),
]);
let resp = builder.send().await?;
handle_response(resp).await
}
#[instrument(name = "metadata.list_unit_prices", skip(dataset), fields(dataset = %dataset.as_ref()))]
pub async fn list_unit_prices(
&mut self,
dataset: impl AsRef<str>,
) -> crate::Result<Vec<UnitPricesForMode>> {
let builder = self
.get("list_unit_prices")?
.query(&[("dataset", &dataset.as_ref())]);
let resp = builder.send().await?;
handle_response(resp).await
}
#[instrument(name = "metadata.get_dataset_condition")]
pub async fn get_dataset_condition(
&mut self,
params: &GetDatasetConditionParams,
) -> crate::Result<Vec<DatasetConditionDetail>> {
let mut builder = self
.get("get_dataset_condition")?
.query(&[("dataset", ¶ms.dataset)]);
if let Some(ref date_range) = params.date_range {
builder = builder.add_to_query(date_range);
}
let resp = builder.send().await?;
handle_response(resp).await
}
#[instrument(name = "metadata.get_dataset_range", skip(dataset), fields(dataset = %dataset.as_ref()))]
pub async fn get_dataset_range(
&mut self,
dataset: impl AsRef<str>,
) -> crate::Result<DatasetRange> {
let resp = self
.get("get_dataset_range")?
.query(&[("dataset", dataset.as_ref())])
.send()
.await?;
handle_response(resp).await
}
#[instrument(name = "metadata.get_record_count")]
pub async fn get_record_count(&mut self, params: &GetRecordCountParams) -> crate::Result<u64> {
let form = ReqwestForm::new().add_to_form(params);
let resp = self.post("get_record_count")?.form(&form).send().await?;
handle_response(resp).await
}
#[instrument(name = "metadata.get_billable_size")]
pub async fn get_billable_size(
&mut self,
params: &GetBillableSizeParams,
) -> crate::Result<u64> {
let form = ReqwestForm::new().add_to_form(params);
let resp = self.post("get_billable_size")?.form(&form).send().await?;
handle_response(resp).await
}
#[instrument(name = "metadata.get_cost")]
pub async fn get_cost(&mut self, params: &GetCostParams) -> crate::Result<f64> {
let form = ReqwestForm::new().add_to_form(params);
let resp = self.post("get_cost")?.form(&form).send().await?;
handle_response(resp).await
}
fn get(&mut self, slug: &str) -> crate::Result<RequestBuilder> {
self.inner.get(&format!("metadata.{slug}"))
}
fn post(&mut self, slug: &str) -> crate::Result<RequestBuilder> {
self.inner.post(&format!("metadata.{slug}"))
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum FeedMode {
Historical,
HistoricalStreaming,
Live,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum DatasetCondition {
Available,
Degraded,
Pending,
Missing,
}
#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
pub struct PublisherDetail {
pub publisher_id: u16,
pub dataset: String,
pub venue: String,
pub description: String,
}
#[derive(Debug, Clone, bon::Builder, PartialEq, Eq)]
pub struct ListFieldsParams {
pub encoding: Encoding,
pub schema: Schema,
}
#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
pub struct FieldDetail {
pub name: String,
#[serde(rename = "type")]
pub type_name: String,
}
#[derive(Debug, Clone, Deserialize, PartialEq)]
pub struct UnitPricesForMode {
pub mode: FeedMode,
pub unit_prices: HashMap<Schema, f64>,
}
#[derive(Debug, Clone, bon::Builder, PartialEq, Eq)]
pub struct GetDatasetConditionParams {
#[builder(with = |d: impl ToString| d.to_string())]
pub dataset: String,
#[builder(into)]
pub date_range: Option<DateRange>,
}
#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
pub struct DatasetConditionDetail {
#[serde(deserialize_with = "deserialize_date")]
pub date: time::Date,
pub condition: DatasetCondition,
#[serde(deserialize_with = "deserialize_opt_date")]
pub last_modified_date: Option<time::Date>,
}
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
pub struct DatasetRange {
#[serde(deserialize_with = "deserialize_date_time")]
pub start: time::OffsetDateTime,
#[serde(deserialize_with = "deserialize_date_time")]
pub end: time::OffsetDateTime,
#[serde(rename = "schema")]
pub range_by_schema: HashMap<Schema, DateTimeRange>,
}
impl From<DatasetRange> for DateTimeRange {
fn from(DatasetRange { start, end, .. }: DatasetRange) -> Self {
Self { start, end }
}
}
#[derive(Debug, Clone, bon::Builder, PartialEq, Eq)]
pub struct GetQueryParams {
#[builder(with = |d: impl ToString| d.to_string())]
pub dataset: String,
#[builder(into)]
pub symbols: Symbols,
pub schema: Schema,
#[builder(into)]
pub date_time_range: DateTimeRange,
#[builder(default = SType::RawSymbol)]
pub stype_in: SType,
pub limit: Option<NonZeroU64>,
}
pub type GetRecordCountParams = GetQueryParams;
pub type GetBillableSizeParams = GetQueryParams;
pub type GetCostParams = GetQueryParams;
impl AsRef<str> for FeedMode {
fn as_ref(&self) -> &str {
self.as_str()
}
}
impl FeedMode {
pub const fn as_str(&self) -> &'static str {
match self {
FeedMode::Historical => "historical",
FeedMode::HistoricalStreaming => "historical-streaming",
FeedMode::Live => "live",
}
}
}
impl FromStr for FeedMode {
type Err = crate::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"historical" => Ok(Self::Historical),
"historical-streaming" => Ok(Self::HistoricalStreaming),
"live" => Ok(Self::Live),
_ => Err(crate::Error::internal(format_args!(
"Unable to convert {s} to FeedMode"
))),
}
}
}
impl<'de> Deserialize<'de> for FeedMode {
fn deserialize<D: Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
let str = String::deserialize(deserializer)?;
FromStr::from_str(&str).map_err(serde::de::Error::custom)
}
}
impl AsRef<str> for DatasetCondition {
fn as_ref(&self) -> &str {
self.as_str()
}
}
impl DatasetCondition {
pub const fn as_str(&self) -> &'static str {
match self {
DatasetCondition::Available => "available",
DatasetCondition::Degraded => "degraded",
DatasetCondition::Pending => "pending",
DatasetCondition::Missing => "missing",
}
}
}
impl FromStr for DatasetCondition {
type Err = crate::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"available" => Ok(DatasetCondition::Available),
"degraded" => Ok(DatasetCondition::Degraded),
"pending" => Ok(DatasetCondition::Pending),
"missing" => Ok(DatasetCondition::Missing),
_ => Err(crate::Error::internal(format_args!(
"Unable to convert {s} to DatasetCondition"
))),
}
}
}
impl<'de> Deserialize<'de> for DatasetCondition {
fn deserialize<D: Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
let str = String::deserialize(deserializer)?;
FromStr::from_str(&str).map_err(serde::de::Error::custom)
}
}
fn deserialize_date<'de, D: serde::Deserializer<'de>>(
deserializer: D,
) -> Result<time::Date, D::Error> {
let date_str = String::deserialize(deserializer)?;
time::Date::parse(&date_str, super::DATE_FORMAT).map_err(serde::de::Error::custom)
}
fn deserialize_opt_date<'de, D: serde::Deserializer<'de>>(
deserializer: D,
) -> Result<Option<time::Date>, D::Error> {
let opt_date_str: Option<String> = Option::deserialize(deserializer)?;
match opt_date_str {
Some(date_str) => {
let date = time::Date::parse(&date_str, super::DATE_FORMAT)
.map_err(serde::de::Error::custom)?;
Ok(Some(date))
}
None => Ok(None),
}
}
impl AddToForm<GetQueryParams> for ReqwestForm {
fn add_to_form(mut self, param: &GetQueryParams) -> Self {
self.push(("dataset", param.dataset.to_string()));
self.push(("schema", param.schema.to_string()));
self.push(("stype_in", param.stype_in.to_string()));
self.push(("symbols", param.symbols.to_api_string()));
self.add_to_form(¶m.date_time_range)
.add_to_form(&Limit(param.limit))
}
}
#[cfg(test)]
mod tests {
use reqwest::StatusCode;
use serde_json::json;
use time::macros::{date, datetime};
use wiremock::{
matchers::{basic_auth, method, path, query_param},
Mock, MockServer, ResponseTemplate,
};
use super::*;
use crate::{
historical::test_infra::{client, API_KEY},
historical::API_VERSION,
};
#[tokio::test]
async fn test_list_fields() {
const ENC: Encoding = Encoding::Csv;
const SCHEMA: Schema = Schema::Ohlcv1S;
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(basic_auth(API_KEY, ""))
.and(path(format!("/v{API_VERSION}/metadata.list_fields")))
.and(query_param("encoding", ENC.as_str()))
.and(query_param("schema", SCHEMA.as_str()))
.respond_with(
ResponseTemplate::new(StatusCode::OK.as_u16()).set_body_json(json!([
{"name":"ts_event", "type": "uint64_t"},
{"name":"rtype", "type": "uint8_t"},
{"name":"open", "type": "int64_t"},
{"name":"high", "type": "int64_t"},
{"name":"low", "type": "int64_t"},
{"name":"close", "type": "int64_t"},
{"name":"volume", "type": "uint64_t"},
])),
)
.mount(&mock_server)
.await;
let mut target = client(&mock_server);
let fields = target
.metadata()
.list_fields(
&ListFieldsParams::builder()
.encoding(ENC)
.schema(SCHEMA)
.build(),
)
.await
.unwrap();
let exp = vec![
FieldDetail {
name: "ts_event".to_owned(),
type_name: "uint64_t".to_owned(),
},
FieldDetail {
name: "rtype".to_owned(),
type_name: "uint8_t".to_owned(),
},
FieldDetail {
name: "open".to_owned(),
type_name: "int64_t".to_owned(),
},
FieldDetail {
name: "high".to_owned(),
type_name: "int64_t".to_owned(),
},
FieldDetail {
name: "low".to_owned(),
type_name: "int64_t".to_owned(),
},
FieldDetail {
name: "close".to_owned(),
type_name: "int64_t".to_owned(),
},
FieldDetail {
name: "volume".to_owned(),
type_name: "uint64_t".to_owned(),
},
];
assert_eq!(*fields, exp);
}
#[tokio::test]
async fn test_list_unit_prices() {
const SCHEMA: Schema = Schema::Tbbo;
const DATASET: &str = "GLBX.MDP3";
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(basic_auth(API_KEY, ""))
.and(path(format!("/v{API_VERSION}/metadata.list_unit_prices")))
.and(query_param("dataset", DATASET))
.respond_with(
ResponseTemplate::new(StatusCode::OK.as_u16()).set_body_json(json!([
{
"mode": "historical",
"unit_prices": {
SCHEMA.as_str(): 17.89
}
},
{
"mode": "live",
"unit_prices": {
SCHEMA.as_str(): 34.22
}
}
])),
)
.mount(&mock_server)
.await;
let mut target = client(&mock_server);
let prices = target.metadata().list_unit_prices(DATASET).await.unwrap();
assert_eq!(
prices,
vec![
UnitPricesForMode {
mode: FeedMode::Historical,
unit_prices: HashMap::from([(SCHEMA, 17.89)])
},
UnitPricesForMode {
mode: FeedMode::Live,
unit_prices: HashMap::from([(SCHEMA, 34.22)])
}
]
);
}
#[tokio::test]
async fn test_get_dataset_condition() {
const DATASET: &str = "GLBX.MDP3";
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(basic_auth(API_KEY, ""))
.and(path(format!(
"/v{API_VERSION}/metadata.get_dataset_condition"
)))
.and(query_param("dataset", DATASET))
.and(query_param("start_date", "2022-05-17"))
.and(query_param("end_date", "2022-05-19"))
.respond_with(
ResponseTemplate::new(StatusCode::OK.as_u16()).set_body_json(json!([
{
"date": "2022-05-17",
"condition": "available",
"last_modified_date": "2023-07-11",
},
{
"date": "2022-05-18",
"condition": "degraded",
"last_modified_date": "2022-05-19",
},
{
"date": "2022-05-19",
"condition": "missing",
"last_modified_date": null,
},
])),
)
.mount(&mock_server)
.await;
let mut target = client(&mock_server);
let condition = target
.metadata()
.get_dataset_condition(
&GetDatasetConditionParams::builder()
.dataset(DATASET.to_owned())
.date_range((date!(2022 - 05 - 17), time::Duration::days(2)))
.build(),
)
.await
.unwrap();
assert_eq!(condition.len(), 3);
assert_eq!(
condition[0],
DatasetConditionDetail {
date: date!(2022 - 05 - 17),
condition: DatasetCondition::Available,
last_modified_date: Some(date!(2023 - 07 - 11))
}
);
assert_eq!(
condition[1],
DatasetConditionDetail {
date: date!(2022 - 05 - 18),
condition: DatasetCondition::Degraded,
last_modified_date: Some(date!(2022 - 05 - 19))
}
);
assert_eq!(
condition[2],
DatasetConditionDetail {
date: date!(2022 - 05 - 19),
condition: DatasetCondition::Missing,
last_modified_date: None
}
);
}
#[tokio::test]
async fn test_get_dataset_range() {
const DATASET: &str = "XNAS.ITCH";
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(basic_auth(API_KEY, ""))
.and(path(format!("/v{API_VERSION}/metadata.get_dataset_range")))
.and(query_param("dataset", DATASET))
.respond_with(
ResponseTemplate::new(StatusCode::OK.as_u16()).set_body_json(json!({
"start": "2019-07-07T00:00:00.000000000Z",
"end": "2023-07-20T00:00:00.000000000Z",
"schema": {
"bbo-1m": {
"start": "2020-08-02T00:00:00.000000000Z",
"end": "2023-03-23T00:00:00.000000000Z"
},
"ohlcv-1s": {
"start": "2020-08-02T00:00:00.000000000Z",
"end": "2023-03-23T00:00:00.000000000Z"
},
"ohlcv-1m": {
"start": "2020-08-02T00:00:00.000000000Z",
"end": "2023-03-23T00:00:00.000000000Z"
},
}
})),
)
.mount(&mock_server)
.await;
let mut target = client(&mock_server);
let range = target.metadata().get_dataset_range(DATASET).await.unwrap();
assert_eq!(range.start, datetime!(2019 - 07 - 07 00:00:00+00:00));
assert_eq!(range.end, datetime!(2023 - 07 - 20 00:00:00.000000+00:00));
}
}