use crate::response::ApiResponse;
use crate::{error::Error, error::Result};
use futures_util::StreamExt;
use mbinary::params::RetrieveParams;
use reqwest::StatusCode;
use reqwest::{self, Client, ClientBuilder};
use std::fs::File;
use std::io::Write;
use std::time::Duration;
#[derive(Clone)]
pub struct Historical {
base_url: String,
client: Client,
}
impl Historical {
pub fn new(base_url: &str) -> Self {
let client = ClientBuilder::new()
.timeout(Duration::from_secs(20000)) .build()
.expect("Failed to build HTTP client");
Historical {
base_url: base_url.to_string(),
client,
}
}
fn url(&self, endpoint: &str) -> String {
format!(
"{}{}{}",
self.base_url,
"/historical/".to_string(),
endpoint.to_string()
)
}
pub async fn create_mbp(&self, data: &[u8]) -> Result<ApiResponse<String>> {
let url = self.url("mbp/create/stream");
let response = self.client.post(&url).json(data).send().await?;
if response.status() != StatusCode::OK {
return ApiResponse::<String>::from_response(response).await;
}
let mut stream = response.bytes_stream();
while let Some(chunk) = stream.next().await {
match chunk {
Ok(bytes) => {
let bytes_str = String::from_utf8_lossy(&bytes);
match serde_json::from_str::<ApiResponse<String>>(&bytes_str) {
Ok(response) => {
println!("{:?}", response.message);
if response.status != "success" {
return Ok(response);
}
}
Err(e) => {
eprintln!("Error while receiving chunk: {:?}", e);
return Err(Error::from(e));
}
}
}
Err(e) => {
eprintln!("Error while reading chunk: {:?}", e);
return Err(Error::from(e));
}
}
}
let api_response = ApiResponse::new("success", "", StatusCode::OK, "".to_string());
Ok(api_response)
}
pub async fn create_mbp_from_file(&self, file_path: &str) -> Result<ApiResponse<String>> {
let url = self.url("mbp/create/bulk");
let response = self
.client
.post(&url)
.json(&file_path) .send()
.await?;
if response.status() != StatusCode::OK {
return ApiResponse::<String>::from_response(response).await;
}
let mut stream = response.bytes_stream();
while let Some(chunk) = stream.next().await {
match chunk {
Ok(bytes) => {
let bytes_str = String::from_utf8_lossy(&bytes);
match serde_json::from_str::<ApiResponse<String>>(&bytes_str) {
Ok(response) => {
println!("{:?}", response.message);
if response.status != "success" {
return Ok(response);
}
}
Err(e) => {
eprintln!("Error while receiving chunk: {:?}", e);
return Err(Error::from(e));
}
}
}
Err(e) => {
eprintln!("Error while reading chunk: {:?}", e);
return Err(Error::from(e));
}
}
}
let api_response = ApiResponse::new("success", "", StatusCode::OK, "".to_string());
Ok(api_response)
}
pub async fn get_records(&self, params: &RetrieveParams) -> Result<ApiResponse<Vec<u8>>> {
let url = self.url("mbp/get/stream");
let response = self.client.get(&url).json(params).send().await?;
if response.status() != StatusCode::OK {
return ApiResponse::<Vec<u8>>::from_response(response).await;
}
let mut data = Vec::new();
let mut stream = response.bytes_stream();
while let Some(chunk) = stream.next().await {
match chunk {
Ok(bytes) => data.extend_from_slice(&bytes),
Err(e) => {
println!("Error while receiving chunk: {:?}", e);
return Err(Error::from(e));
}
}
}
let api_response = ApiResponse::new("success", "", StatusCode::OK, data);
Ok(api_response)
}
pub async fn get_records_to_file(
&self,
params: &RetrieveParams,
file_path: &str,
) -> Result<()> {
let response = self.get_records(params).await?;
let mut file = File::create(file_path)?;
let _ = file.write_all(&response.data);
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::instrument::Instruments;
use dbn;
use dotenv::dotenv;
use mbinary::decode::Decoder;
use mbinary::encode::CombinedEncoder;
use mbinary::enums::{Action, Dataset, Schema};
use mbinary::metadata::Metadata;
use mbinary::record_ref::RecordRef;
use mbinary::records::{BidAskPair, Mbp1Msg, RecordHeader};
use mbinary::symbols::{Instrument, SymbolMap};
use mbinary::vendors::Vendors;
use mbinary::vendors::{DatabentoData, VendorData};
use serial_test::serial;
use std::io::Cursor;
use std::str::FromStr;
async fn create_dummy_instrument(ticker: &str, dataset: Dataset) -> anyhow::Result<i32> {
dotenv().ok();
let base_url = std::env::var("MIDAS_URL").expect("Expected database_url.");
let client = Instruments::new(&base_url);
let schema = dbn::Schema::from_str("mbp-1")?;
let dbn_dataset = dbn::Dataset::from_str("GLBX.MDP3")?;
let stype = dbn::SType::from_str("raw_symbol")?;
let vendor_data = VendorData::Databento(DatabentoData {
schema,
dataset: dbn_dataset,
stype,
});
let instrument = Instrument::new(
None,
ticker,
"Apple tester client",
dataset,
Vendors::Databento,
vendor_data.encode(),
1,
1,
1,
false,
true,
);
let create_response = client.create_symbol(&instrument).await?;
let id = create_response.data as i32;
Ok(id)
}
async fn delete_dummy_instrument(id: &i32) -> Result<()> {
dotenv().ok();
let base_url = std::env::var("MIDAS_URL").expect("Expected database_url.");
let client = Instruments::new(&base_url);
let _ = client.delete_symbol(&id).await?;
Ok(())
}
#[allow(dead_code)]
async fn create_dummy_records(ticker: &str, dataset: Dataset) -> anyhow::Result<i32> {
dotenv().ok();
let base_url = std::env::var("MIDAS_URL").expect("Expected database_url.");
let client = Historical::new(&base_url);
let id = create_dummy_instrument(ticker, dataset).await?;
let mbp_1 = Mbp1Msg {
hd: { RecordHeader::new::<Mbp1Msg>(id as u32, 1704209103644092564, 0) },
price: 6770,
size: 1,
action: Action::Trade as i8,
side: 2,
depth: 0,
flags: 0,
ts_recv: 1704209103644092564,
ts_in_delta: 17493,
sequence: 739763,
discriminator: 0,
levels: [BidAskPair {
ask_px: 1,
bid_px: 1,
bid_sz: 2,
ask_sz: 2,
bid_ct: 10,
ask_ct: 20,
}],
};
let mbp_2 = Mbp1Msg {
hd: { RecordHeader::new::<Mbp1Msg>(id as u32, 1704209103644092565, 0) },
price: 6870,
size: 2,
action: Action::Trade as i8,
side: 1,
depth: 0,
flags: 0,
ts_recv: 1704209103644092565,
ts_in_delta: 17493,
sequence: 739763,
discriminator: 0,
levels: [BidAskPair {
ask_px: 1,
bid_px: 1,
bid_sz: 2,
ask_sz: 2,
bid_ct: 10,
ask_ct: 20,
}],
};
let record_ref1: RecordRef = (&mbp_1).into();
let record_ref2: RecordRef = (&mbp_2).into();
let metadata = Metadata::new(
Schema::Mbp1,
Dataset::Equities,
1704209103644092564,
1704209103644092566,
SymbolMap::new(),
);
let mut buffer = Vec::new();
let mut encoder = CombinedEncoder::new(&mut buffer);
encoder.encode_metadata(&metadata)?;
encoder
.encode_records(&[record_ref1, record_ref2])
.expect("Encoding failed");
let _ = client.create_mbp(&buffer).await?;
Ok(id)
}
#[tokio::test]
#[serial]
async fn test_create_mbp() -> anyhow::Result<()> {
dotenv().ok();
let base_url = std::env::var("MIDAS_URL").expect("Expected database_url.");
let client = Historical::new(&base_url);
let ticker = "AAPL";
let dataset = Dataset::Equities;
let id = create_dummy_instrument(ticker, dataset).await?;
let mbp_1 = Mbp1Msg {
hd: { RecordHeader::new::<Mbp1Msg>(id as u32, 1704209103644092564, 0) },
price: 6770,
size: 1,
action: 1,
side: 2,
depth: 0,
flags: 0,
ts_recv: 1704209103644092564,
ts_in_delta: 17493,
sequence: 739763,
discriminator: 0,
levels: [BidAskPair {
ask_px: 1,
bid_px: 1,
bid_sz: 2,
ask_sz: 2,
bid_ct: 10,
ask_ct: 20,
}],
};
let mbp_2 = Mbp1Msg {
hd: { RecordHeader::new::<Mbp1Msg>(id as u32, 1704239109644092565, 0) },
price: 6870,
size: 2,
action: 1,
side: 1,
depth: 0,
flags: 0,
ts_recv: 1704209103644092565,
ts_in_delta: 17493,
sequence: 739763,
discriminator: 0,
levels: [BidAskPair {
ask_px: 1,
bid_px: 1,
bid_sz: 2,
ask_sz: 2,
bid_ct: 10,
ask_ct: 20,
}],
};
let record_ref1: RecordRef = (&mbp_1).into();
let record_ref2: RecordRef = (&mbp_2).into();
let metadata = Metadata::new(
Schema::Mbp1,
Dataset::Equities,
1704209103644092564,
1704209103644092566,
SymbolMap::new(),
);
let mut buffer = Vec::new();
let mut encoder = CombinedEncoder::new(&mut buffer);
encoder.encode_metadata(&metadata)?;
encoder
.encode_records(&[record_ref1, record_ref2])
.expect("Encoding failed");
let response = client.create_mbp(&buffer).await?;
assert_eq!(response.code, 200);
assert_eq!(response.status, "success");
let _ = delete_dummy_instrument(&id).await?;
Ok(())
}
#[tokio::test]
#[serial]
async fn test_create_mbp_duplicate_error() -> anyhow::Result<()> {
dotenv().ok();
let base_url = std::env::var("MIDAS_URL").expect("Expected database_url.");
let client = Historical::new(&base_url);
let ticker = "AAPL";
let dataset = Dataset::Equities;
let id = create_dummy_instrument(ticker, dataset).await?;
let mbp_1 = Mbp1Msg {
hd: { RecordHeader::new::<Mbp1Msg>(id as u32, 1704209103644092564, 0) },
price: 6770,
size: 1,
action: 1,
side: 2,
depth: 0,
flags: 0,
ts_recv: 1704209103644092564,
ts_in_delta: 17493,
sequence: 739763,
discriminator: 0,
levels: [BidAskPair {
ask_px: 1,
bid_px: 1,
bid_sz: 2,
ask_sz: 2,
bid_ct: 10,
ask_ct: 20,
}],
};
let mbp_2 = Mbp1Msg {
hd: { RecordHeader::new::<Mbp1Msg>(id as u32, 1704209103644092564, 0) },
price: 6770,
size: 1,
action: 1,
side: 2,
depth: 0,
flags: 0,
ts_recv: 1704209103644092564,
ts_in_delta: 17493,
sequence: 739763,
discriminator: 0,
levels: [BidAskPair {
ask_px: 1,
bid_px: 1,
bid_sz: 2,
ask_sz: 2,
bid_ct: 10,
ask_ct: 20,
}],
};
let record_ref1: RecordRef = (&mbp_1).into();
let record_ref2: RecordRef = (&mbp_2).into();
let metadata = Metadata::new(
Schema::Mbp1,
Dataset::Equities,
1704209103644092564,
1704209103644092566,
SymbolMap::new(),
);
let mut buffer = Vec::new();
let mut encoder = CombinedEncoder::new(&mut buffer);
encoder.encode_metadata(&metadata)?;
encoder
.encode_records(&[record_ref1, record_ref2])
.expect("Encoding failed");
let response = client.create_mbp(&buffer).await?;
println!("{:?}", response);
assert_eq!(response.status, "failed");
let _ = delete_dummy_instrument(&id).await?;
Ok(())
}
#[tokio::test]
#[serial]
async fn test_get_mbp() -> anyhow::Result<()> {
dotenv().ok();
let base_url = std::env::var("MIDAS_URL").expect("Expected database_url.");
let client = Historical::new(&base_url);
let ticker = "AAPL";
let dataset = Dataset::Equities;
let id = create_dummy_instrument(ticker, dataset.clone()).await?;
let query_params = RetrieveParams {
symbols: vec!["AAPL".to_string()],
start_ts: 1704209103644092563,
end_ts: 1704239109644092565,
schema: Schema::Mbp1,
dataset,
stype: mbinary::enums::Stype::Raw,
};
let response = client.get_records(&query_params).await?;
let data = response.data;
let cursor = Cursor::new(data);
let mut decoder = Decoder::new(cursor)?;
let _decoded = decoder.decode().expect("Error decoding metadata.");
assert_eq!(response.code, 200);
assert_eq!(response.status, "success");
let _ = delete_dummy_instrument(&id).await?;
Ok(())
}
#[tokio::test]
#[serial]
async fn test_get_records_to_file() -> anyhow::Result<()> {
dotenv().ok();
let base_url = std::env::var("MIDAS_URL").expect("Expected database_url.");
let client = Historical::new(&base_url);
let ticker = "AAPL";
let dataset = Dataset::Equities;
let id = create_dummy_instrument(ticker, dataset.clone()).await?;
let query_params = RetrieveParams {
symbols: vec!["AAPL".to_string()],
start_ts: 1704209103644092563,
end_ts: 1704239109644092565,
schema: Schema::Mbp1,
dataset,
stype: mbinary::enums::Stype::Raw,
};
let response = client
.get_records_to_file(&query_params, "tests/test_data_pull.bin")
.await?;
assert_eq!(response, ());
let _ = delete_dummy_instrument(&id).await?;
Ok(())
}
#[tokio::test]
#[serial]
async fn test_get_ohlcv() -> anyhow::Result<()> {
dotenv().ok();
let base_url = std::env::var("MIDAS_URL").expect("Expected database_url.");
let client = Historical::new(&base_url);
let ticker = "AAPL";
let dataset = Dataset::Equities;
let id = create_dummy_instrument(ticker, dataset.clone()).await?;
let query_params = RetrieveParams {
symbols: vec!["AAPL".to_string()],
start_ts: 1704209103644092563,
end_ts: 1704209203654092563,
schema: Schema::Ohlcv1S, dataset,
stype: mbinary::enums::Stype::Raw,
};
let response = client.get_records(&query_params).await?;
let data = response.data;
let cursor = Cursor::new(data);
let mut decoder = Decoder::new(cursor)?;
let _record = decoder.decode().expect("Error decoding metadata.");
assert_eq!(response.code, 200);
assert_eq!(response.status, "success");
let _ = delete_dummy_instrument(&id).await?;
Ok(())
}
#[tokio::test]
#[serial]
async fn test_get_trades() -> anyhow::Result<()> {
dotenv().ok();
let base_url = std::env::var("MIDAS_URL").expect("Expected database_url.");
let client = Historical::new(&base_url);
let ticker = "AAPL";
let dataset = Dataset::Equities;
let id = create_dummy_instrument(ticker, dataset.clone()).await?;
let query_params = RetrieveParams {
symbols: vec!["AAPL".to_string()],
start_ts: 1704209103644092563,
end_ts: 1704209203654092563,
schema: Schema::Trades,
dataset,
stype: mbinary::enums::Stype::Raw,
};
let response = client.get_records(&query_params).await?;
let data = response.data;
let cursor = Cursor::new(data);
let mut decoder = Decoder::new(cursor)?;
let _record = decoder.decode().expect("Error decoding metadata.");
assert_eq!(response.code, 200);
assert_eq!(response.status, "success");
let _ = delete_dummy_instrument(&id).await?;
Ok(())
}
#[tokio::test]
#[serial]
async fn test_get_tbbo() -> anyhow::Result<()> {
dotenv().ok();
let base_url = std::env::var("MIDAS_URL").expect("Expected database_url.");
let client = Historical::new(&base_url);
let ticker = "AAPL";
let dataset = Dataset::Equities;
let id = create_dummy_instrument(ticker, dataset.clone()).await?;
let query_params = RetrieveParams {
symbols: vec!["AAPL".to_string()],
start_ts: 1704209103644092563,
end_ts: 1704209203654092563,
schema: Schema::Tbbo,
dataset,
stype: mbinary::enums::Stype::Raw,
};
let response = client.get_records(&query_params).await?;
let data = response.data;
let cursor = Cursor::new(data);
let mut decoder = Decoder::new(cursor)?;
let _record = decoder.decode().expect("Error decoding metadata.");
assert_eq!(response.code, 200);
assert_eq!(response.status, "success");
let _ = delete_dummy_instrument(&id).await?;
Ok(())
}
#[tokio::test]
#[serial]
async fn test_get_bbo() -> anyhow::Result<()> {
dotenv().ok();
let base_url = std::env::var("MIDAS_URL").expect("Expected database_url.");
let client = Historical::new(&base_url);
let ticker = "AAPL";
let dataset = Dataset::Equities;
let id = create_dummy_instrument(ticker, dataset.clone()).await?;
let query_params = RetrieveParams {
symbols: vec!["AAPL".to_string()],
start_ts: 1704209103644092563,
end_ts: 1704209203654092563,
schema: Schema::Bbo1S,
dataset,
stype: mbinary::enums::Stype::Raw,
};
let response = client.get_records(&query_params).await?;
let data = response.data;
let cursor = Cursor::new(data);
let mut decoder = Decoder::new(cursor)?;
let _record = decoder.decode().expect("Error decoding metadata.");
assert_eq!(response.code, 200);
assert_eq!(response.status, "success");
let _ = delete_dummy_instrument(&id).await?;
Ok(())
}
#[tokio::test]
#[serial]
#[ignore]
async fn test_get_records_to_file_server() -> Result<()> {
dotenv().ok();
let base_url = std::env::var("MIDAS_URL").expect("Expected database_url.");
let client = Historical::new(&base_url);
let query_params = RetrieveParams::new(
vec!["HE.n.0".to_string(), "ZC.n.0".to_string()],
"2024-01-01 00:00:00",
"2024-01-03 23:00:00",
Schema::Bbo1M,
Dataset::Equities,
mbinary::enums::Stype::Continuous,
)?;
let _response = client.get_records_to_file(&query_params, "bbo.bin").await?;
Ok(())
}
}