use std::{
borrow::Cow,
collections::{BTreeMap, HashSet},
fmt::Debug,
io::Cursor,
ops::Bound as RangeBound,
path::{Path, PathBuf},
sync::Arc,
};
use ahash::AHashMap;
use datafusion::arrow::record_batch::RecordBatch;
use futures::StreamExt;
use itertools::Itertools;
use nautilus_common::live::get_runtime;
use nautilus_core::{
UnixNanos,
datetime::{iso8601_to_unix_nanos, unix_nanos_to_iso8601},
string::to_snake_case,
};
use nautilus_model::{
data::{
Bar, CustomData, Data, HasTsInit, IndexPriceUpdate, MarkPriceUpdate, OrderBookDelta,
OrderBookDepth10, QuoteTick, TradeTick, close::InstrumentClose,
is_monotonically_increasing_by_init, to_variant,
},
instruments::InstrumentAny,
};
use nautilus_serialization::arrow::{
DecodeDataFromRecordBatch, EncodeToRecordBatch, custom::CustomDataDecoder,
};
use object_store::{ObjectStore, ObjectStoreExt, path::Path as ObjectPath};
use serde::Serialize;
use unbounded_interval_tree::interval_tree::IntervalTree;
use super::{
custom::{
custom_data_path_components, decode_batch_to_data as orchestration_decode_batch_to_data,
decode_custom_batches_to_data as orchestration_decode_custom_batches_to_data,
prepare_custom_data_batch,
},
session::{self, DataBackendSession, QueryResult, build_query},
};
use crate::parquet::{read_parquet_from_object_store, write_batches_to_object_store};
pub struct ParquetDataCatalog {
pub base_path: String,
pub original_uri: String,
pub object_store: Arc<dyn ObjectStore>,
pub session: DataBackendSession,
pub batch_size: usize,
pub compression: parquet::basic::Compression,
pub max_row_group_size: usize,
}
impl Debug for ParquetDataCatalog {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct(stringify!(ParquetDataCatalog))
.field("base_path", &self.base_path)
.finish()
}
}
impl ParquetDataCatalog {
#[must_use]
pub fn new(
base_path: &Path,
storage_options: Option<AHashMap<String, String>>,
batch_size: Option<usize>,
compression: Option<parquet::basic::Compression>,
max_row_group_size: Option<usize>,
) -> Self {
let path_str = base_path.to_string_lossy().to_string();
Self::from_uri(
&path_str,
storage_options,
batch_size,
compression,
max_row_group_size,
)
.expect("Failed to create catalog from path")
}
pub fn from_uri(
uri: &str,
storage_options: Option<AHashMap<String, String>>,
batch_size: Option<usize>,
compression: Option<parquet::basic::Compression>,
max_row_group_size: Option<usize>,
) -> anyhow::Result<Self> {
let batch_size = batch_size.unwrap_or(5000);
let compression = compression.unwrap_or(parquet::basic::Compression::SNAPPY);
let max_row_group_size = max_row_group_size.unwrap_or(5000);
let (object_store, base_path, original_uri) =
crate::parquet::create_object_store_from_path(uri, storage_options)?;
Ok(Self {
base_path,
original_uri,
object_store,
session: session::DataBackendSession::new(batch_size),
batch_size,
compression,
max_row_group_size,
})
}
#[must_use]
pub fn get_base_path(&self) -> String {
self.base_path.clone()
}
pub fn reset_session(&mut self) {
self.session.clear_registered_tables();
}
pub fn write_data_enum(
&self,
data: &[Data],
start: Option<UnixNanos>,
end: Option<UnixNanos>,
skip_disjoint_check: Option<bool>,
) -> anyhow::Result<()> {
let mut deltas: Vec<OrderBookDelta> = Vec::new();
let mut depth10s: Vec<OrderBookDepth10> = Vec::new();
let mut quotes: Vec<QuoteTick> = Vec::new();
let mut trades: Vec<TradeTick> = Vec::new();
let mut bars: Vec<Bar> = Vec::new();
let mut mark_prices: Vec<MarkPriceUpdate> = Vec::new();
let mut index_prices: Vec<IndexPriceUpdate> = Vec::new();
let mut closes: Vec<InstrumentClose> = Vec::new();
let custom_data_key = |c: &CustomData| {
(
c.data_type.type_name().to_string(),
c.data_type.identifier().map(String::from),
c.data_type.metadata_str(),
)
};
let mut custom_data: AHashMap<(String, Option<String>, String), Vec<CustomData>> =
AHashMap::new();
for d in data.iter().cloned() {
match d {
Data::Deltas(_) => {}
Data::Delta(d) => {
deltas.push(d);
}
Data::Depth10(d) => {
depth10s.push(*d);
}
Data::Quote(d) => {
quotes.push(d);
}
Data::Trade(d) => {
trades.push(d);
}
Data::Bar(d) => {
bars.push(d);
}
Data::MarkPriceUpdate(p) => {
mark_prices.push(p);
}
Data::IndexPriceUpdate(p) => {
index_prices.push(p);
}
Data::InstrumentClose(c) => {
closes.push(c);
}
Data::Custom(c) => {
custom_data.entry(custom_data_key(&c)).or_default().push(c);
}
}
}
self.write_to_parquet(deltas, start, end, skip_disjoint_check)?;
self.write_to_parquet(depth10s, start, end, skip_disjoint_check)?;
self.write_to_parquet(quotes, start, end, skip_disjoint_check)?;
self.write_to_parquet(trades, start, end, skip_disjoint_check)?;
self.write_to_parquet(bars, start, end, skip_disjoint_check)?;
self.write_to_parquet(mark_prices, start, end, skip_disjoint_check)?;
self.write_to_parquet(index_prices, start, end, skip_disjoint_check)?;
self.write_to_parquet(closes, start, end, skip_disjoint_check)?;
for (_, items) in custom_data {
self.write_custom_data_batch(items, start, end, skip_disjoint_check)?;
}
Ok(())
}
pub fn write_to_parquet<T>(
&self,
data: Vec<T>,
start: Option<UnixNanos>,
end: Option<UnixNanos>,
skip_disjoint_check: Option<bool>,
) -> anyhow::Result<PathBuf>
where
T: HasTsInit + EncodeToRecordBatch + CatalogPathPrefix,
{
if data.is_empty() {
return Ok(PathBuf::new());
}
let type_name = to_snake_case(std::any::type_name::<T>());
Self::check_ascending_timestamps(&data, &type_name)?;
let start_ts = start.unwrap_or(data.first().unwrap().ts_init());
let end_ts = end.unwrap_or(data.last().unwrap().ts_init());
let batches = self.data_to_record_batches(data)?;
let schema = batches.first().expect("Batches are empty.").schema();
let identifier = if T::path_prefix() == "bars" {
schema.metadata.get("bar_type").cloned()
} else {
schema.metadata.get("instrument_id").cloned()
};
let directory = self.make_path(T::path_prefix(), identifier.as_deref())?;
let filename = timestamps_to_filename(start_ts, end_ts);
let path = PathBuf::from(format!("{directory}/{filename}"));
let object_path = self.to_object_path(&path.to_string_lossy());
let file_exists = self.execute_async(async {
let exists: bool = self.object_store.head(&object_path).await.is_ok();
Ok(exists)
})?;
if file_exists {
log::info!("File {} already exists, skipping write", path.display());
return Ok(path);
}
if !skip_disjoint_check.unwrap_or(false) {
let current_intervals = self.get_directory_intervals(&directory)?;
let new_interval = (start_ts.as_u64(), end_ts.as_u64());
let mut new_intervals = current_intervals.clone();
new_intervals.push(new_interval);
if !are_intervals_disjoint(&new_intervals) {
anyhow::bail!(
"Writing file {filename} with interval ({start_ts}, {end_ts}) would create \
non-disjoint intervals. Existing intervals: {current_intervals:?}"
);
}
}
log::info!(
"Writing {} batches of {type_name} data to {}",
batches.len(),
path.display(),
);
self.execute_async(async {
write_batches_to_object_store(
&batches,
self.object_store.clone(),
&object_path,
Some(self.compression),
Some(self.max_row_group_size),
None,
)
.await
})?;
Ok(path)
}
pub fn write_custom_data_batch(
&self,
data: Vec<CustomData>,
start: Option<UnixNanos>,
end: Option<UnixNanos>,
skip_disjoint_check: Option<bool>,
) -> anyhow::Result<PathBuf> {
if data.is_empty() {
return Ok(PathBuf::new());
}
let (batch, type_name, identifier, start_ts, end_ts) = prepare_custom_data_batch(data)?;
let start_ts = start.unwrap_or(start_ts);
let end_ts = end.unwrap_or(end_ts);
let batches = vec![batch];
let directory = self.make_path_custom_data(&type_name, identifier.as_deref())?;
let filename = timestamps_to_filename(start_ts, end_ts);
let path = PathBuf::from(format!("{directory}/{filename}"));
let object_path = self.to_object_path(&path.to_string_lossy());
let file_exists = self.execute_async(async {
let exists: bool = self.object_store.head(&object_path).await.is_ok();
Ok(exists)
})?;
if file_exists {
log::info!("File {} already exists, skipping write", path.display());
return Ok(path);
}
if !skip_disjoint_check.unwrap_or(false) {
let current_intervals = self.get_directory_intervals(&directory)?;
let new_interval = (start_ts.as_u64(), end_ts.as_u64());
let mut new_intervals = current_intervals.clone();
new_intervals.push(new_interval);
if !are_intervals_disjoint(&new_intervals) {
anyhow::bail!(
"Writing file {filename} with interval ({start_ts}, {end_ts}) would create \
non-disjoint intervals. Existing intervals: {current_intervals:?}"
);
}
}
self.execute_async(async {
write_batches_to_object_store(
&batches,
self.object_store.clone(),
&object_path,
Some(self.compression),
Some(self.max_row_group_size),
None,
)
.await
})?;
Ok(path)
}
pub fn write_instruments(
&self,
instruments: Vec<InstrumentAny>,
) -> anyhow::Result<Vec<PathBuf>> {
use nautilus_model::instruments::Instrument;
if instruments.is_empty() {
return Ok(Vec::new());
}
let mut by_type_and_id: BTreeMap<(String, String), Vec<InstrumentAny>> = BTreeMap::new();
for instrument in instruments {
let instrument_type = Self::instrument_type_name(&instrument).to_string();
let instrument_id = Instrument::id(&instrument).to_string();
by_type_and_id
.entry((instrument_type, instrument_id))
.or_default()
.push(instrument);
}
let mut paths = Vec::new();
for ((_instrument_type, instrument_id), instrument_group) in by_type_and_id {
Self::check_ascending_timestamps(&instrument_group, "instrument")?;
let start_ts = HasTsInit::ts_init(instrument_group.first().unwrap());
let end_ts = HasTsInit::ts_init(instrument_group.last().unwrap());
let batches = self.data_to_record_batches(instrument_group)?;
if batches.is_empty() {
continue;
}
let directory = self.make_path("instruments", Some(instrument_id.as_str()))?;
let filename = timestamps_to_filename(start_ts, end_ts);
let path = PathBuf::from(format!("{directory}/{filename}"));
let object_path = self.to_object_path(&path.to_string_lossy());
let file_exists = self
.execute_async(async { Ok(self.object_store.head(&object_path).await.is_ok()) })?;
if file_exists {
log::info!(
"Instrument file {} already exists, skipping write",
path.display()
);
paths.push(path);
continue;
}
let current_intervals = self.get_directory_intervals(&directory)?;
let new_interval = (start_ts.as_u64(), end_ts.as_u64());
let mut new_intervals = current_intervals.clone();
new_intervals.push(new_interval);
if !are_intervals_disjoint(&new_intervals) {
anyhow::bail!(
"Writing file {filename} with interval ({start_ts}, {end_ts}) would create \
non-disjoint intervals. Existing intervals: {current_intervals:?}"
);
}
log::info!(
"Writing {} batches of instrument data for {instrument_id} to {}",
batches.len(),
path.display(),
);
self.execute_async(async {
write_batches_to_object_store(
&batches,
self.object_store.clone(),
&object_path,
Some(self.compression),
Some(self.max_row_group_size),
None,
)
.await
})?;
paths.push(path);
}
Ok(paths)
}
pub fn query_instruments(
&self,
instrument_ids: Option<&[String]>,
) -> anyhow::Result<Vec<InstrumentAny>> {
self.query_instruments_filtered(instrument_ids, None, None)
}
pub fn query_instruments_filtered(
&self,
instrument_ids: Option<&[String]>,
start: Option<UnixNanos>,
end: Option<UnixNanos>,
) -> anyhow::Result<Vec<InstrumentAny>> {
use nautilus_serialization::arrow::instrument::decode_instrument_any_batch;
let base_dir = self.make_path("instruments", None)?;
let mut all_instruments = Vec::new();
let start_u64 = start.map(|ts| ts.as_u64());
let end_u64 = end.map(|ts| ts.as_u64());
let list_result = self.execute_async(async {
let prefix = ObjectPath::from(format!("{base_dir}/"));
let mut stream = self.object_store.list(Some(&prefix));
let mut objects = Vec::new();
while let Some(object) = stream.next().await {
objects.push(object?);
}
Ok::<Vec<_>, anyhow::Error>(objects)
})?;
let mut instrument_files = Vec::new();
for object in list_result {
let path_str = object.location.to_string();
if !path_str.ends_with(".parquet") {
continue;
}
let path_parts: Vec<&str> = path_str.split('/').collect();
if path_parts.len() < 2 {
continue;
}
let instrument_id_dir = path_parts[path_parts.len() - 2];
if let Some(ids) = instrument_ids
&& !ids
.iter()
.map(|id| urisafe_instrument_id(id))
.any(|x| x.as_str() == urisafe_instrument_id(instrument_id_dir))
{
continue;
}
let include_file = if path_str.ends_with("/instrument.parquet") {
true
} else {
query_intersects_filename(&path_str, start_u64, end_u64)
};
if include_file {
instrument_files.push(path_str);
}
}
instrument_files.sort();
for file_path in instrument_files {
let object_path = self.to_object_path_parsed(&file_path)?;
let (batches, builder_schema) = self.execute_async(async {
read_parquet_from_object_store(self.object_store.clone(), &object_path).await
})?;
let metadata: std::collections::HashMap<String, String> =
builder_schema.metadata().clone();
for batch in batches {
let mut instruments = decode_instrument_any_batch(&metadata, &batch)?;
if start.is_some() || end.is_some() {
instruments.retain(|instrument| {
let ts = HasTsInit::ts_init(instrument).as_u64();
start_u64.is_none_or(|value| ts >= value)
&& end_u64.is_none_or(|value| ts <= value)
});
}
all_instruments.extend(instruments);
}
}
all_instruments.sort_by_key(HasTsInit::ts_init);
Ok(all_instruments)
}
pub fn write_to_json<T>(
&self,
data: Vec<T>,
path: Option<PathBuf>,
write_metadata: bool,
) -> anyhow::Result<PathBuf>
where
T: HasTsInit + Serialize + CatalogPathPrefix + EncodeToRecordBatch,
{
if data.is_empty() {
return Ok(PathBuf::new());
}
let type_name = to_snake_case(std::any::type_name::<T>());
Self::check_ascending_timestamps(&data, &type_name)?;
let start_ts = data.first().unwrap().ts_init();
let end_ts = data.last().unwrap().ts_init();
let directory =
path.unwrap_or_else(|| PathBuf::from(self.make_path(T::path_prefix(), None).unwrap()));
let filename = timestamps_to_filename(start_ts, end_ts).replace(".parquet", ".json");
let json_path = directory.join(&filename);
log::info!(
"Writing {} records of {type_name} data to {}",
data.len(),
json_path.display(),
);
if write_metadata {
let metadata = T::chunk_metadata(&data);
let metadata_path = json_path.with_extension("metadata.json");
log::info!("Writing metadata to {}", metadata_path.display());
let metadata_object_path = ObjectPath::from(metadata_path.to_string_lossy().as_ref());
let metadata_json = serde_json::to_vec_pretty(&metadata)?;
self.execute_async(async {
let _: object_store::PutResult = self
.object_store
.put(&metadata_object_path, metadata_json.into())
.await?;
Ok(())
})?;
}
let json_object_path = ObjectPath::from(json_path.to_string_lossy().as_ref());
let json_data = serde_json::to_vec_pretty(&serde_json::to_value(data)?)?;
self.execute_async(async {
let _: object_store::PutResult = self
.object_store
.put(&json_object_path, json_data.into())
.await?;
Ok(())
})?;
Ok(json_path)
}
pub fn check_ascending_timestamps<T: HasTsInit>(
data: &[T],
type_name: &str,
) -> anyhow::Result<()> {
if !data
.array_windows()
.all(|[a, b]| a.ts_init() <= b.ts_init())
{
anyhow::bail!("{type_name} timestamps must be in ascending order");
}
Ok(())
}
fn instrument_type_name(instrument: &InstrumentAny) -> &'static str {
match instrument {
InstrumentAny::Betting(_) => "BettingInstrument",
InstrumentAny::BinaryOption(_) => "BinaryOption",
InstrumentAny::Cfd(_) => "Cfd",
InstrumentAny::Commodity(_) => "Commodity",
InstrumentAny::CryptoFuture(_) => "CryptoFuture",
InstrumentAny::CryptoOption(_) => "CryptoOption",
InstrumentAny::CryptoPerpetual(_) => "CryptoPerpetual",
InstrumentAny::CurrencyPair(_) => "CurrencyPair",
InstrumentAny::Equity(_) => "Equity",
InstrumentAny::FuturesContract(_) => "FuturesContract",
InstrumentAny::FuturesSpread(_) => "FuturesSpread",
InstrumentAny::IndexInstrument(_) => "IndexInstrument",
InstrumentAny::OptionContract(_) => "OptionContract",
InstrumentAny::OptionSpread(_) => "OptionSpread",
InstrumentAny::PerpetualContract(_) => "PerpetualContract",
InstrumentAny::TokenizedAsset(_) => "TokenizedAsset",
}
}
pub fn data_to_record_batches<T>(&self, data: Vec<T>) -> anyhow::Result<Vec<RecordBatch>>
where
T: HasTsInit + EncodeToRecordBatch,
{
let mut batches = Vec::new();
for chunk in &data.into_iter().chunks(self.batch_size) {
let data = chunk.collect_vec();
let metadata = EncodeToRecordBatch::chunk_metadata(&data);
let record_batch = T::encode_batch(&metadata, &data)?;
batches.push(record_batch);
}
Ok(batches)
}
pub fn extend_file_name(
&self,
data_cls: &str,
identifier: Option<&str>,
start: UnixNanos,
end: UnixNanos,
) -> anyhow::Result<()> {
let directory = self.make_path(data_cls, identifier)?;
let intervals = self.get_directory_intervals(&directory)?;
let start = start.as_u64();
let end = end.as_u64();
for interval in intervals {
if interval.0 == end + 1 {
self.rename_parquet_file(&directory, interval.0, interval.1, start, interval.1)?;
break;
} else if interval.1 == start - 1 {
self.rename_parquet_file(&directory, interval.0, interval.1, interval.0, end)?;
break;
}
}
let intervals = self.get_directory_intervals(&directory)?;
if !are_intervals_disjoint(&intervals) {
anyhow::bail!("Intervals are not disjoint after extending a file");
}
Ok(())
}
pub fn list_parquet_files(&self, directory: &str) -> anyhow::Result<Vec<String>> {
self.execute_async(async {
let prefix = ObjectPath::from(format!("{directory}/"));
let mut stream = self.object_store.list(Some(&prefix));
let mut files = Vec::new();
while let Some(object) = stream.next().await {
let object = object?;
if object.location.as_ref().ends_with(".parquet") {
files.push(object.location.to_string());
}
}
Ok::<Vec<String>, anyhow::Error>(files)
})
}
pub fn list_instruments(&self, data_type: &str) -> anyhow::Result<Vec<String>> {
self.execute_async(async {
let prefix = ObjectPath::from(format!("data/{data_type}/"));
let mut stream = self.object_store.list(Some(&prefix));
let mut instruments = HashSet::new();
while let Some(object) = stream.next().await {
let object = object?;
let path = object.location.as_ref();
let parts: Vec<&str> = path.split('/').collect();
if parts.len() >= 3 {
instruments.insert(parts[2].to_string());
}
}
Ok::<Vec<String>, anyhow::Error>(instruments.into_iter().collect())
})
}
pub fn list_parquet_files_with_criteria(
&self,
data_type: &str,
identifiers: Option<&[String]>,
start: Option<UnixNanos>,
end: Option<UnixNanos>,
) -> anyhow::Result<Vec<String>> {
let mut all_files = Vec::new();
let start_u64 = start.map(|s| s.as_u64());
let end_u64 = end.map(|e| e.as_u64());
let base_dir = self.make_path(data_type, None)?;
let list_result = self.execute_async(async {
let prefix = ObjectPath::from(format!("{base_dir}/"));
let mut stream = self.object_store.list(Some(&prefix));
let mut objects = Vec::new();
while let Some(object) = stream.next().await {
objects.push(object?);
}
Ok::<Vec<_>, anyhow::Error>(objects)
})?;
for object in list_result {
let path_str = object.location.to_string();
if let Some(ids) = identifiers {
let path_components = extract_path_components(&path_str);
let mut matches = false;
for id in ids {
if path_components.iter().any(|c| c.contains(id)) {
matches = true;
break;
}
}
if !matches {
continue;
}
}
if path_str.ends_with(".parquet")
&& query_intersects_filename(&path_str, start_u64, end_u64)
{
all_files.push(path_str);
}
}
Ok(all_files)
}
#[must_use]
pub fn reconstruct_full_uri(&self, path_str: &str) -> String {
if self.is_remote_uri() {
if let Ok(url) = url::Url::parse(&self.original_uri)
&& let Some(host) = url.host_str()
{
return format!("{}://{}/{}", url.scheme(), host, path_str);
}
}
if self.original_uri.starts_with("file://") {
if let Ok(url) = url::Url::parse(&self.original_uri)
&& let Ok(base_path) = url.to_file_path()
{
let base_str = base_path.to_string_lossy();
return self.join_paths(&base_str, path_str);
}
}
if self.base_path.is_empty() {
if self.original_uri.contains("://") {
path_str.to_string()
} else {
self.join_paths(self.original_uri.trim_end_matches('/'), path_str)
}
} else {
let base = self.base_path.trim_end_matches('/');
self.join_paths(base, path_str)
}
}
#[must_use]
fn join_paths(&self, base: &str, path: &str) -> String {
make_object_store_path(base, &[path])
}
#[must_use]
fn resolve_path_for_datafusion(&self, path: &str) -> String {
if path.contains("://") {
return path.to_string();
}
if path.starts_with('/') {
return path.to_string();
}
if self.original_uri.starts_with("file://") {
let base = self.original_uri.trim_end_matches('/');
let path_trimmed = path.trim_end_matches('/');
return format!("{base}/{path_trimmed}");
}
self.reconstruct_full_uri(path)
}
#[must_use]
fn resolve_directory_for_datafusion(&self, directory: &str) -> String {
let mut resolved = self.resolve_path_for_datafusion(directory);
if !resolved.ends_with('/') {
resolved.push('/');
}
resolved
}
#[must_use]
fn path_for_query_list(&self, path: &str) -> String {
if self.original_uri.starts_with("file://") {
path.to_string()
} else {
self.reconstruct_full_uri(path)
}
}
#[must_use]
fn native_base_path_string(&self) -> String {
if self.original_uri.starts_with("file://") {
crate::parquet::file_uri_to_native_path(&self.original_uri)
} else {
self.original_uri.clone()
}
}
#[must_use]
pub fn is_remote_uri(&self) -> bool {
self.original_uri.starts_with("s3://")
|| self.original_uri.starts_with("gs://")
|| self.original_uri.starts_with("gcs://")
|| self.original_uri.starts_with("az://")
|| self.original_uri.starts_with("abfs://")
|| self.original_uri.starts_with("http://")
|| self.original_uri.starts_with("https://")
}
pub fn query<T>(
&mut self,
identifiers: Option<Vec<String>>,
start: Option<UnixNanos>,
end: Option<UnixNanos>,
where_clause: Option<&str>,
files: Option<Vec<String>>,
optimize_file_loading: bool,
) -> anyhow::Result<QueryResult>
where
T: DecodeDataFromRecordBatch + CatalogPathPrefix,
{
if self.is_remote_uri() {
let url = url::Url::parse(&self.original_uri)?;
let host = url
.host_str()
.ok_or_else(|| anyhow::anyhow!("Remote URI missing host/bucket name"))?;
let base_url = url::Url::parse(&format!("{}://{}", url.scheme(), host))?;
self.session
.register_object_store(&base_url, self.object_store.clone());
}
let files_list = if let Some(files) = files {
files
} else {
self.query_files(T::path_prefix(), identifiers, start, end)?
};
if optimize_file_loading {
let directories: HashSet<String> = files_list
.iter()
.filter_map(|file_uri| {
let path = Path::new(file_uri);
path.parent().map(|p| p.to_string_lossy().to_string())
})
.collect();
for directory in directories {
let path_parts: Vec<&str> = directory.split('/').collect();
let identifier = if path_parts.is_empty() {
"unknown".to_string()
} else {
path_parts[path_parts.len() - 1].to_string()
};
let safe_sql_identifier = make_sql_safe_identifier(&identifier);
let table_name = format!("{}_{}", T::path_prefix(), safe_sql_identifier);
let query = build_query(&table_name, start, end, where_clause);
let resolved_path = self.resolve_directory_for_datafusion(&directory);
self.session
.add_file::<T>(&table_name, &resolved_path, Some(&query), None)?;
}
} else {
for file_uri in &files_list {
let identifier = extract_identifier_from_path(file_uri);
let safe_sql_identifier = make_sql_safe_identifier(&identifier);
let safe_filename = extract_sql_safe_filename(file_uri);
let table_name = format!(
"{}_{}_{}",
T::path_prefix(),
safe_sql_identifier,
safe_filename
);
let query = build_query(&table_name, start, end, where_clause);
let resolved_path = self.resolve_path_for_datafusion(file_uri);
self.session
.add_file::<T>(&table_name, &resolved_path, Some(&query), None)?;
}
}
Ok(self.session.get_query_result())
}
pub fn query_typed_data<T>(
&mut self,
identifiers: Option<Vec<String>>,
start: Option<UnixNanos>,
end: Option<UnixNanos>,
where_clause: Option<&str>,
files: Option<Vec<String>>,
optimize_file_loading: bool,
) -> anyhow::Result<Vec<T>>
where
T: DecodeDataFromRecordBatch + CatalogPathPrefix + TryFrom<Data>,
{
self.reset_session();
let query_result = self.query::<T>(
identifiers,
start,
end,
where_clause,
files,
optimize_file_loading,
)?;
let all_data = query_result.collect();
Ok(to_variant::<T>(all_data))
}
#[allow(clippy::too_many_arguments)]
pub fn query_custom_data_dynamic(
&mut self,
type_name: &str,
identifiers: Option<&[String]>,
start: Option<UnixNanos>,
end: Option<UnixNanos>,
where_clause: Option<&str>,
files: Option<Vec<String>>,
_optimize_file_loading: bool,
) -> anyhow::Result<Vec<Data>> {
self.reset_session();
let path_prefix = format!("custom/{type_name}");
let files = if let Some(f) = files {
f.into_iter()
.map(|p| self.to_object_path(&p).to_string())
.collect::<Vec<_>>()
} else {
self.list_parquet_files_with_criteria(&path_prefix, identifiers, start, end)?
};
if files.is_empty() {
return Ok(Vec::new());
}
let table_name = "custom_data_table";
for file in files {
let resolved_path = self.resolve_path_for_datafusion(&file);
let sql_query = build_query(table_name, start, end, where_clause);
self.session
.add_file::<CustomDataDecoder>(
table_name,
&resolved_path,
Some(&sql_query),
Some(type_name),
)
.map_err(|e| anyhow::anyhow!(e.to_string()))?;
}
let query_result = self.session.get_query_result();
Ok(query_result.collect())
}
pub fn query_files(
&self,
data_cls: &str,
identifiers: Option<Vec<String>>,
start: Option<UnixNanos>,
end: Option<UnixNanos>,
) -> anyhow::Result<Vec<String>> {
let mut files = Vec::new();
let start_u64 = start.map(|s| s.as_u64());
let end_u64 = end.map(|e| e.as_u64());
let base_dir = self.make_path(data_cls, None)?;
let list_result = self.execute_async(async {
let prefix = ObjectPath::from(format!("{base_dir}/"));
let mut stream = self.object_store.list(Some(&prefix));
let mut objects = Vec::new();
while let Some(object) = stream.next().await {
objects.push(object?);
}
Ok::<Vec<_>, anyhow::Error>(objects)
})?;
let mut file_paths: Vec<String> = list_result
.into_iter()
.filter_map(|object| {
let path_str = object.location.to_string();
if path_str.ends_with(".parquet") {
Some(path_str)
} else {
None
}
})
.collect();
if let Some(identifiers) = identifiers {
let safe_identifiers: Vec<String> = identifiers
.iter()
.map(|id| urisafe_instrument_id(id))
.collect();
let exact_match_file_paths: Vec<String> = file_paths
.iter()
.filter(|file_path| {
let path_parts: Vec<&str> = file_path.split('/').collect();
if path_parts.len() >= 2 {
let dir_name = path_parts[path_parts.len() - 2];
safe_identifiers.iter().any(|safe_id| safe_id == dir_name)
} else {
false
}
})
.cloned()
.collect();
if exact_match_file_paths.is_empty() && data_cls == "bars" {
file_paths.retain(|file_path| {
let path_parts: Vec<&str> = file_path.split('/').collect();
if path_parts.len() >= 2 {
let dir_name = path_parts[path_parts.len() - 2];
if let Some(bar_instrument_id) = extract_bar_type_instrument_id(dir_name) {
safe_identifiers.iter().any(|id| id == bar_instrument_id)
} else {
false
}
} else {
false
}
});
} else {
file_paths = exact_match_file_paths;
}
}
file_paths.retain(|file_path| query_intersects_filename(file_path, start_u64, end_u64));
for file_path in file_paths {
files.push(self.path_for_query_list(&file_path));
}
Ok(files)
}
pub fn quote_ticks(
&mut self,
instrument_ids: Option<Vec<String>>,
start: Option<UnixNanos>,
end: Option<UnixNanos>,
) -> anyhow::Result<Vec<QuoteTick>> {
self.query_typed_data::<QuoteTick>(instrument_ids, start, end, None, None, true)
}
pub fn trade_ticks(
&mut self,
instrument_ids: Option<Vec<String>>,
start: Option<UnixNanos>,
end: Option<UnixNanos>,
) -> anyhow::Result<Vec<TradeTick>> {
self.query_typed_data::<TradeTick>(instrument_ids, start, end, None, None, true)
}
pub fn bars(
&mut self,
instrument_ids: Option<Vec<String>>,
start: Option<UnixNanos>,
end: Option<UnixNanos>,
) -> anyhow::Result<Vec<Bar>> {
self.query_typed_data::<Bar>(instrument_ids, start, end, None, None, true)
}
pub fn order_book_deltas(
&mut self,
instrument_ids: Option<Vec<String>>,
start: Option<UnixNanos>,
end: Option<UnixNanos>,
) -> anyhow::Result<Vec<OrderBookDelta>> {
self.query_typed_data::<OrderBookDelta>(instrument_ids, start, end, None, None, true)
}
pub fn order_book_depth10(
&mut self,
instrument_ids: Option<Vec<String>>,
start: Option<UnixNanos>,
end: Option<UnixNanos>,
) -> anyhow::Result<Vec<OrderBookDepth10>> {
self.query_typed_data::<OrderBookDepth10>(instrument_ids, start, end, None, None, true)
}
pub fn instrument_closes(
&mut self,
instrument_ids: Option<Vec<String>>,
start: Option<UnixNanos>,
end: Option<UnixNanos>,
) -> anyhow::Result<Vec<InstrumentClose>> {
self.query_typed_data::<InstrumentClose>(instrument_ids, start, end, None, None, true)
}
pub fn instruments(
&self,
instrument_ids: Option<&[String]>,
_start: Option<UnixNanos>,
_end: Option<UnixNanos>,
) -> anyhow::Result<Vec<InstrumentAny>> {
self.query_instruments(instrument_ids)
}
pub fn get_file_list_from_data_cls(&self, data_cls: &str) -> anyhow::Result<Vec<String>> {
let base_dir = self.make_path(data_cls, None)?;
let list_result = self.execute_async(async {
let prefix = ObjectPath::from(format!("{base_dir}/"));
let mut stream = self.object_store.list(Some(&prefix));
let mut objects = Vec::new();
while let Some(object) = stream.next().await {
objects.push(object?);
}
Ok::<Vec<_>, anyhow::Error>(objects)
})?;
let file_paths: Vec<String> = list_result
.into_iter()
.filter_map(|object| {
let path_str = object.location.to_string();
if path_str.ends_with(".parquet") {
Some(path_str)
} else {
None
}
})
.collect();
Ok(file_paths)
}
pub fn filter_files(
&self,
data_cls: &str,
file_paths: Vec<String>,
identifiers: Option<Vec<String>>,
start: Option<UnixNanos>,
end: Option<UnixNanos>,
) -> anyhow::Result<Vec<String>> {
let mut filtered_paths = file_paths;
if let Some(identifiers) = identifiers {
let safe_identifiers: Vec<String> = identifiers
.iter()
.map(|id| urisafe_instrument_id(id))
.collect();
let file_safe_identifiers: Vec<String> = filtered_paths
.iter()
.map(|file_path| {
let path_parts: Vec<&str> = file_path.split('/').collect();
if path_parts.len() >= 2 {
path_parts[path_parts.len() - 2].to_string()
} else {
String::new()
}
})
.collect();
let exact_match_file_paths: Vec<String> = filtered_paths
.iter()
.enumerate()
.filter_map(|(i, file_path)| {
let dir_name = &file_safe_identifiers[i];
if safe_identifiers.iter().any(|safe_id| safe_id == dir_name) {
Some(file_path.clone())
} else {
None
}
})
.collect();
if exact_match_file_paths.is_empty() && data_cls == "bars" {
filtered_paths.retain(|file_path| {
let path_parts: Vec<&str> = file_path.split('/').collect();
if path_parts.len() >= 2 {
let dir_name = path_parts[path_parts.len() - 2];
safe_identifiers
.iter()
.any(|safe_id| dir_name.starts_with(&format!("{safe_id}-")))
} else {
false
}
});
} else {
filtered_paths = exact_match_file_paths;
}
}
let start_u64 = start.map(|s| s.as_u64());
let end_u64 = end.map(|e| e.as_u64());
filtered_paths.retain(|file_path| query_intersects_filename(file_path, start_u64, end_u64));
Ok(filtered_paths)
}
pub fn get_missing_intervals_for_request(
&self,
start: u64,
end: u64,
data_cls: &str,
identifier: Option<&str>,
) -> anyhow::Result<Vec<(u64, u64)>> {
let intervals = self.get_intervals(data_cls, identifier)?;
Ok(query_interval_diff(start, end, &intervals))
}
pub fn query_first_timestamp(
&self,
data_cls: &str,
identifier: Option<&str>,
) -> anyhow::Result<Option<u64>> {
let intervals = self.get_intervals(data_cls, identifier)?;
if intervals.is_empty() {
return Ok(None);
}
Ok(Some(intervals.first().unwrap().0))
}
pub fn query_last_timestamp(
&self,
data_cls: &str,
identifier: Option<&str>,
) -> anyhow::Result<Option<u64>> {
let intervals = self.get_intervals(data_cls, identifier)?;
if intervals.is_empty() {
return Ok(None);
}
Ok(Some(intervals.last().unwrap().1))
}
pub fn get_intervals(
&self,
data_cls: &str,
identifier: Option<&str>,
) -> anyhow::Result<Vec<(u64, u64)>> {
let directory = self.make_path(data_cls, identifier)?;
let intervals = self.get_directory_intervals(&directory)?;
if !intervals.is_empty() || data_cls != "bars" || identifier.is_none() {
return Ok(intervals);
}
let safe_id = urisafe_instrument_id(identifier.unwrap());
let bars_subdir = format!("data/{data_cls}");
let subdirs = self.list_directory_stems(&bars_subdir)?;
let mut all_intervals = Vec::new();
for subdir in &subdirs {
let decoded = urlencoding::decode(subdir).unwrap_or(Cow::Borrowed(subdir));
if extract_bar_type_instrument_id(&decoded) == Some(safe_id.as_str()) {
let subdir_path = self.make_path(data_cls, Some(&decoded))?;
all_intervals.extend(self.get_directory_intervals(&subdir_path)?);
}
}
all_intervals.sort_by_key(|&(start, _)| start);
let mut merged: Vec<(u64, u64)> = Vec::new();
for interval in all_intervals {
if let Some(last) = merged.last_mut()
&& interval.0 <= last.1
{
last.1 = last.1.max(interval.1);
continue;
}
merged.push(interval);
}
Ok(merged)
}
pub fn get_directory_intervals(&self, directory: &str) -> anyhow::Result<Vec<(u64, u64)>> {
let object_dir = self.to_object_path(directory);
let list_result = self.execute_async(async {
let dir_str = format!("{}/", object_dir.as_ref());
let prefix = ObjectPath::from(dir_str);
let mut stream = self.object_store.list(Some(&prefix));
let mut objects = Vec::new();
while let Some(object) = stream.next().await {
objects.push(object?);
}
Ok::<Vec<_>, anyhow::Error>(objects)
})?;
let mut intervals = Vec::new();
for object in list_result {
let path_str = object.location.to_string();
if path_str.ends_with(".parquet")
&& let Some(interval) = parse_filename_timestamps(&path_str)
{
intervals.push(interval);
}
}
intervals.sort_by_key(|&(start, _)| start);
Ok(intervals)
}
pub fn make_path(&self, type_name: &str, identifier: Option<&str>) -> anyhow::Result<String> {
let mut components = vec!["data".to_string(), type_name.to_string()];
if let Some(id) = identifier {
let safe_id = urisafe_instrument_id(id);
components.push(safe_id);
}
let path = make_object_store_path_owned(&self.base_path, components);
Ok(path)
}
pub fn make_path_custom_data(
&self,
type_name: &str,
identifier: Option<&str>,
) -> anyhow::Result<String> {
let components = custom_data_path_components(type_name, identifier);
let path = make_object_store_path_owned(&self.base_path, components);
Ok(path)
}
fn rename_parquet_file(
&self,
directory: &str,
old_start: u64,
old_end: u64,
new_start: u64,
new_end: u64,
) -> anyhow::Result<()> {
let old_filename =
timestamps_to_filename(UnixNanos::from(old_start), UnixNanos::from(old_end));
let old_path = format!("{directory}/{old_filename}");
let old_object_path = self.to_object_path(&old_path);
let new_filename =
timestamps_to_filename(UnixNanos::from(new_start), UnixNanos::from(new_end));
let new_path = format!("{directory}/{new_filename}");
let new_object_path = self.to_object_path(&new_path);
self.move_file(&old_object_path, &new_object_path)
}
#[must_use]
pub fn to_object_path(&self, path: &str) -> ObjectPath {
let normalized_path = path.replace('\\', "/");
if self.base_path.is_empty() {
return ObjectPath::from(normalized_path);
}
let normalized_base = self.base_path.replace('\\', "/");
let base = normalized_base.trim_end_matches('/');
let without_base = normalized_path
.strip_prefix(&format!("{base}/"))
.or_else(|| normalized_path.strip_prefix(base))
.unwrap_or(&normalized_path);
ObjectPath::from(without_base)
}
pub fn to_object_path_parsed(&self, path: &str) -> anyhow::Result<ObjectPath> {
let normalized_path = path.replace('\\', "/");
let to_parse = if self.base_path.is_empty() {
normalized_path.as_str()
} else {
let normalized_base = self.base_path.replace('\\', "/");
let base = normalized_base.trim_end_matches('/');
normalized_path
.strip_prefix(&format!("{base}/"))
.or_else(|| normalized_path.strip_prefix(base))
.unwrap_or(normalized_path.as_str())
};
ObjectPath::parse(to_parse).map_err(anyhow::Error::from)
}
#[allow(dead_code)]
fn to_file_path(&self, path: &ObjectPath) -> String {
path.to_string()
}
pub fn move_file(&self, old_path: &ObjectPath, new_path: &ObjectPath) -> anyhow::Result<()> {
self.execute_async(async {
self.object_store
.rename(old_path, new_path)
.await
.map_err(anyhow::Error::from)
})
}
pub fn execute_async<F, R>(&self, future: F) -> anyhow::Result<R>
where
F: std::future::Future<Output = anyhow::Result<R>>,
{
let rt = get_runtime();
rt.block_on(future)
}
pub fn list_directory_stems(&self, subdirectory: &str) -> anyhow::Result<Vec<String>> {
if !self.is_remote_uri() {
let directory = PathBuf::from(self.native_base_path_string()).join(subdirectory);
if !directory.exists() {
return Ok(Vec::new());
}
let mut directories = Vec::new();
if let Ok(entries) = std::fs::read_dir(&directory) {
for entry in entries.flatten() {
if let Ok(file_type) = entry.file_type()
&& file_type.is_dir()
{
if let Some(name) = entry.path().file_name() {
directories.push(name.to_string_lossy().to_string());
}
}
}
}
directories.sort();
return Ok(directories);
}
let directory = make_object_store_path(&self.base_path, &[subdirectory]);
let list_result = self.execute_async(async {
let prefix = ObjectPath::from(format!("{directory}/"));
let mut stream = self.object_store.list(Some(&prefix));
let mut directories = Vec::new();
let mut seen_dirs = std::collections::HashSet::new();
while let Some(object) = stream.next().await {
let object = object?;
let path_str = object.location.to_string();
if let Some(relative_path) = path_str.strip_prefix(&format!("{directory}/")) {
let parts: Vec<&str> = relative_path.split('/').collect();
if let Some(first_part) = parts.first()
&& !first_part.is_empty()
&& !seen_dirs.contains(*first_part)
{
seen_dirs.insert(first_part.to_string());
directories.push(first_part.to_string());
}
}
}
Ok::<Vec<String>, anyhow::Error>(directories)
})?;
Ok(list_result)
}
pub fn list_data_types(&self) -> anyhow::Result<Vec<String>> {
let stems = self.list_directory_stems("data")?;
Ok(stems
.into_iter()
.filter(|s| !Self::is_excluded_stream_data_type(s))
.collect())
}
fn is_excluded_stream_data_type(name: &str) -> bool {
matches!(name, "funding_rate_update" | "funding_rates")
}
pub fn list_backtest_runs(&self) -> anyhow::Result<Vec<String>> {
self.list_directory_stems("backtest")
}
pub fn list_live_runs(&self) -> anyhow::Result<Vec<String>> {
self.list_directory_stems("live")
}
pub fn read_live_run(&self, instance_id: &str) -> anyhow::Result<Vec<Data>> {
self.read_run_data("live", instance_id)
}
pub fn read_backtest(&self, instance_id: &str) -> anyhow::Result<Vec<Data>> {
self.read_run_data("backtest", instance_id)
}
fn read_run_data(&self, subdirectory: &str, instance_id: &str) -> anyhow::Result<Vec<Data>> {
let instance_dir = make_object_store_path(&self.base_path, &[subdirectory, instance_id]);
let data_types = if self.is_remote_uri() {
self.execute_async(async {
let prefix = ObjectPath::from(format!("{instance_dir}/"));
let mut stream = self.object_store.list(Some(&prefix));
let mut directories = Vec::new();
let mut seen_dirs = std::collections::HashSet::new();
while let Some(object) = stream.next().await {
let object = object?;
let path_str = object.location.to_string();
if let Some(relative_path) = path_str.strip_prefix(&format!("{instance_dir}/"))
{
let parts: Vec<&str> = relative_path.split('/').collect();
if let Some(first_part) = parts.first()
&& !first_part.is_empty()
&& !seen_dirs.contains(*first_part)
{
seen_dirs.insert(first_part.to_string());
directories.push(first_part.to_string());
}
}
}
Ok::<Vec<String>, anyhow::Error>(directories)
})?
} else {
let directory = PathBuf::from(self.native_base_path_string())
.join(subdirectory)
.join(instance_id);
if !directory.exists() {
return Ok(Vec::new());
}
let mut directories = Vec::new();
if let Ok(entries) = std::fs::read_dir(&directory) {
for entry in entries.flatten() {
if let Ok(file_type) = entry.file_type()
&& file_type.is_dir()
&& let Some(name) = entry.path().file_name()
{
directories.push(name.to_string_lossy().to_string());
}
}
}
directories.sort();
directories
};
if data_types.is_empty() {
return Ok(Vec::new());
}
let mut all_data: Vec<Data> = Vec::new();
for data_cls in data_types
.into_iter()
.filter(|s| !Self::is_excluded_stream_data_type(s))
{
let feather_files = self.list_feather_files(
subdirectory,
instance_id,
&data_cls,
None, )?;
if feather_files.is_empty() {
continue; }
for file_path in feather_files {
let batches = self.read_feather_file(&file_path)?;
if batches.is_empty() {
continue; }
let file_data: Vec<Data> = match data_cls.as_str() {
"quotes" => {
let quotes: Vec<QuoteTick> =
self.convert_record_batches_to_data(batches, false)?;
quotes.into_iter().map(Data::from).collect()
}
"trades" => {
let trades: Vec<TradeTick> =
self.convert_record_batches_to_data(batches, false)?;
trades.into_iter().map(Data::from).collect()
}
"order_book_deltas" => {
let deltas: Vec<OrderBookDelta> =
self.convert_record_batches_to_data(batches, false)?;
deltas.into_iter().map(Data::from).collect()
}
"order_book_depths" => {
let depths: Vec<OrderBookDepth10> =
self.convert_record_batches_to_data(batches, false)?;
depths.into_iter().map(Data::from).collect()
}
"bars" => {
let bars: Vec<Bar> = self.convert_record_batches_to_data(batches, false)?;
bars.into_iter().map(Data::from).collect()
}
"index_prices" => {
let prices: Vec<IndexPriceUpdate> =
self.convert_record_batches_to_data(batches, false)?;
prices.into_iter().map(Data::from).collect()
}
"mark_prices" => {
let prices: Vec<MarkPriceUpdate> =
self.convert_record_batches_to_data(batches, false)?;
prices.into_iter().map(Data::from).collect()
}
"instrument_closes" => {
let closes: Vec<InstrumentClose> =
self.convert_record_batches_to_data(batches, false)?;
closes.into_iter().map(Data::from).collect()
}
_ => {
if data_cls.starts_with("custom/") {
self.decode_custom_batches_to_data(batches, false)?
} else {
continue;
}
}
};
all_data.extend(file_data);
}
}
all_data.sort_by(|a, b| {
let ts_a = a.ts_init();
let ts_b = b.ts_init();
ts_a.cmp(&ts_b)
});
Ok(all_data)
}
fn decode_custom_batches_to_data(
&self,
batches: Vec<RecordBatch>,
use_ts_event_for_ts_init: bool,
) -> anyhow::Result<Vec<Data>> {
orchestration_decode_custom_batches_to_data(batches, use_ts_event_for_ts_init)
}
#[allow(dead_code)] fn decode_batch_to_data(
&self,
metadata: &std::collections::HashMap<String, String>,
batch: RecordBatch,
allow_custom_fallback: bool,
) -> anyhow::Result<Vec<Data>> {
orchestration_decode_batch_to_data(metadata, batch, allow_custom_fallback)
}
fn list_feather_files(
&self,
subdirectory: &str,
instance_id: &str,
data_name: &str,
identifiers: Option<&[String]>,
) -> anyhow::Result<Vec<String>> {
let base_dir = make_object_store_path(&self.base_path, &[subdirectory, instance_id]);
let data_dir = make_object_store_path(&base_dir, &[data_name]);
let mut files = Vec::new();
let subdir_prefix = ObjectPath::from(format!("{data_dir}/"));
let list_result = self.execute_async(async {
let mut stream = self.object_store.list(Some(&subdir_prefix));
let mut subdirs = Vec::new();
let mut flat_files = Vec::new();
while let Some(object) = stream.next().await {
let object = object?;
let path_str = object.location.to_string();
if let Some(relative_path) = path_str.strip_prefix(&format!("{data_dir}/")) {
if relative_path.ends_with(".feather") {
if path_str.contains(&format!("{data_name}_")) {
flat_files.push(path_str);
}
} else {
let subdir_path = format!("{path_str}/");
let mut subdir_stream = self
.object_store
.list(Some(&ObjectPath::from(subdir_path.as_str())));
while let Some(subdir_object) = subdir_stream.next().await {
let subdir_object = subdir_object?;
let subdir_file_path = subdir_object.location.to_string();
if subdir_file_path.ends_with(".feather") {
if let Some(identifiers) = identifiers {
let subdir_name = relative_path.split('/').next().unwrap_or("");
if !identifiers.iter().any(|id| subdir_name.contains(id)) {
continue;
}
}
subdirs.push(subdir_file_path);
}
}
}
}
}
Ok::<Vec<String>, anyhow::Error>([subdirs, flat_files].concat())
})?;
files.extend(list_result);
files.sort();
Ok(files)
}
fn read_feather_file(&self, file_path: &str) -> anyhow::Result<Vec<RecordBatch>> {
use datafusion::arrow::ipc::reader::StreamReader;
let bytes = self.execute_async(async {
let path = ObjectPath::from(file_path);
let result = self.object_store.get(&path).await?;
let bytes = result.bytes().await?;
Ok::<_, anyhow::Error>(bytes)
})?;
if bytes.is_empty() {
return Ok(Vec::new());
}
let cursor = Cursor::new(bytes.as_ref());
let reader = StreamReader::try_new(cursor, None)
.map_err(|e| anyhow::anyhow!("Failed to create StreamReader: {e}"))?;
let mut batches = Vec::new();
for batch_result in reader {
let batch = batch_result.map_err(|e| anyhow::anyhow!("Failed to read batch: {e}"))?;
batches.push(batch);
}
Ok(batches)
}
fn convert_record_batches_to_data<T>(
&self,
batches: Vec<RecordBatch>,
use_ts_event_for_ts_init: bool,
) -> anyhow::Result<Vec<T>>
where
T: DecodeDataFromRecordBatch + TryFrom<Data>,
{
self.convert_record_batches_to_data_with_bar_type_conversion(
batches,
use_ts_event_for_ts_init,
false,
)
}
fn convert_record_batches_to_data_with_bar_type_conversion<T>(
&self,
batches: Vec<RecordBatch>,
use_ts_event_for_ts_init: bool,
convert_bar_type_to_external: bool,
) -> anyhow::Result<Vec<T>>
where
T: DecodeDataFromRecordBatch + TryFrom<Data>,
{
if batches.is_empty() {
return Ok(Vec::new());
}
let schema = batches[0].schema();
let mut metadata = schema.metadata().clone();
if convert_bar_type_to_external
&& let Some(bar_type_str) = metadata.get("bar_type").cloned()
&& bar_type_str.ends_with("-INTERNAL")
{
let external = bar_type_str.replace("-INTERNAL", "-EXTERNAL");
metadata.insert("bar_type".to_string(), external);
}
let mut all_data = Vec::new();
for mut batch in batches {
if use_ts_event_for_ts_init {
let column_names: Vec<String> =
schema.fields().iter().map(|f| f.name().clone()).collect();
let ts_event_idx = column_names
.iter()
.position(|n| n == "ts_event")
.ok_or_else(|| anyhow::anyhow!("ts_event column not found"))?;
let ts_init_idx = column_names
.iter()
.position(|n| n == "ts_init")
.ok_or_else(|| anyhow::anyhow!("ts_init column not found"))?;
let mut new_columns = batch.columns().to_vec();
new_columns[ts_init_idx] = new_columns[ts_event_idx].clone();
batch = RecordBatch::try_new(schema.clone(), new_columns)
.map_err(|e| anyhow::anyhow!("Failed to create new batch: {e}"))?;
}
let data_vec = T::decode_data_batch(&metadata, batch)
.map_err(|e| anyhow::anyhow!("Failed to decode batch: {e}"))?;
all_data.extend(data_vec);
}
Ok(to_variant::<T>(all_data))
}
pub fn convert_stream_to_data(
&mut self,
instance_id: &str,
data_cls: &str,
subdirectory: Option<&str>,
identifiers: Option<&[String]>,
use_ts_event_for_ts_init: bool,
) -> anyhow::Result<()> {
let subdirectory = subdirectory.unwrap_or("backtest");
if Self::is_excluded_stream_data_type(data_cls) {
return Ok(());
}
let data_name = to_snake_case(data_cls);
let feather_files =
self.list_feather_files(subdirectory, instance_id, &data_name, identifiers)?;
if feather_files.is_empty() {
return Ok(());
}
let convert_bar_type = data_cls == "bars";
for file_path in feather_files {
let batches = self.read_feather_file(&file_path)?;
if batches.is_empty() {
continue;
}
match data_cls {
"quotes" => {
let mut data: Vec<QuoteTick> =
self.convert_record_batches_to_data(batches, use_ts_event_for_ts_init)?;
if !is_monotonically_increasing_by_init(&data) {
data.sort_by_key(|d| d.ts_init);
}
self.write_to_parquet(data, None, None, None)?;
}
"trades" => {
let mut data: Vec<TradeTick> =
self.convert_record_batches_to_data(batches, use_ts_event_for_ts_init)?;
if !is_monotonically_increasing_by_init(&data) {
data.sort_by_key(|d| d.ts_init);
}
self.write_to_parquet(data, None, None, None)?;
}
"order_book_deltas" => {
let mut data: Vec<OrderBookDelta> =
self.convert_record_batches_to_data(batches, use_ts_event_for_ts_init)?;
if !is_monotonically_increasing_by_init(&data) {
data.sort_by_key(|d| d.ts_init);
}
self.write_to_parquet(data, None, None, None)?;
}
"order_book_depths" => {
let mut data: Vec<OrderBookDepth10> =
self.convert_record_batches_to_data(batches, use_ts_event_for_ts_init)?;
if !is_monotonically_increasing_by_init(&data) {
data.sort_by_key(|d| d.ts_init);
}
self.write_to_parquet(data, None, None, None)?;
}
"bars" => {
let mut data: Vec<Bar> = self
.convert_record_batches_to_data_with_bar_type_conversion(
batches,
use_ts_event_for_ts_init,
convert_bar_type,
)?;
if !is_monotonically_increasing_by_init(&data) {
data.sort_by_key(|d| d.ts_init);
}
self.write_to_parquet(data, None, None, None)?;
}
"index_prices" => {
let mut data: Vec<IndexPriceUpdate> =
self.convert_record_batches_to_data(batches, use_ts_event_for_ts_init)?;
if !is_monotonically_increasing_by_init(&data) {
data.sort_by_key(|d| d.ts_init);
}
self.write_to_parquet(data, None, None, None)?;
}
"mark_prices" => {
let mut data: Vec<MarkPriceUpdate> =
self.convert_record_batches_to_data(batches, use_ts_event_for_ts_init)?;
if !is_monotonically_increasing_by_init(&data) {
data.sort_by_key(|d| d.ts_init);
}
self.write_to_parquet(data, None, None, None)?;
}
"instrument_closes" => {
let mut data: Vec<InstrumentClose> =
self.convert_record_batches_to_data(batches, use_ts_event_for_ts_init)?;
if !is_monotonically_increasing_by_init(&data) {
data.sort_by_key(|d| d.ts_init);
}
self.write_to_parquet(data, None, None, None)?;
}
_ => {
if data_cls.starts_with("custom/") {
let data =
self.decode_custom_batches_to_data(batches, use_ts_event_for_ts_init)?;
let custom_items: Vec<CustomData> = data
.into_iter()
.filter_map(|d| match d {
Data::Custom(c) => Some(c),
_ => None,
})
.collect();
if !custom_items.is_empty() {
self.write_custom_data_batch(custom_items, None, None, None)?;
}
} else {
anyhow::bail!("Unknown data class: {data_cls}");
}
}
}
}
Ok(())
}
}
pub trait CatalogPathPrefix {
fn path_prefix() -> &'static str;
}
macro_rules! impl_catalog_path_prefix {
($type:ty, $path:expr) => {
impl CatalogPathPrefix for $type {
fn path_prefix() -> &'static str {
$path
}
}
};
}
impl_catalog_path_prefix!(QuoteTick, "quotes");
impl_catalog_path_prefix!(TradeTick, "trades");
impl_catalog_path_prefix!(OrderBookDelta, "order_book_deltas");
impl_catalog_path_prefix!(OrderBookDepth10, "order_book_depths");
impl_catalog_path_prefix!(Bar, "bars");
impl_catalog_path_prefix!(IndexPriceUpdate, "index_prices");
impl_catalog_path_prefix!(MarkPriceUpdate, "mark_prices");
impl_catalog_path_prefix!(InstrumentClose, "instrument_closes");
impl_catalog_path_prefix!(InstrumentAny, "instruments");
#[must_use]
pub fn timestamps_to_filename(timestamp_1: UnixNanos, timestamp_2: UnixNanos) -> String {
let datetime_1 = iso_timestamp_to_file_timestamp(&unix_nanos_to_iso8601(timestamp_1));
let datetime_2 = iso_timestamp_to_file_timestamp(&unix_nanos_to_iso8601(timestamp_2));
format!("{datetime_1}_{datetime_2}.parquet")
}
fn iso_timestamp_to_file_timestamp(iso_timestamp: &str) -> String {
iso_timestamp.replace([':', '.'], "-")
}
fn file_timestamp_to_iso_timestamp(file_timestamp: &str) -> String {
let (date_part, time_part) = file_timestamp
.split_once('T')
.unwrap_or((file_timestamp, ""));
let time_part = time_part.strip_suffix('Z').unwrap_or(time_part);
if let Some(last_hyphen_idx) = time_part.rfind('-') {
let time_with_dot_for_nanos = format!(
"{}.{}",
&time_part[..last_hyphen_idx],
&time_part[last_hyphen_idx + 1..]
);
let final_time_part = time_with_dot_for_nanos.replace('-', ":");
format!("{date_part}T{final_time_part}Z")
} else {
let final_time_part = time_part.replace('-', ":");
format!("{date_part}T{final_time_part}Z")
}
}
fn iso_to_unix_nanos(iso_timestamp: &str) -> anyhow::Result<u64> {
Ok(iso8601_to_unix_nanos(iso_timestamp)?.into())
}
pub fn urisafe_instrument_id(instrument_id: &str) -> String {
instrument_id.replace('/', "").replace('^', "_")
}
fn extract_bar_type_instrument_id(bar_type_dir: &str) -> Option<&str> {
let standard = bar_type_dir.split('@').next().unwrap_or(bar_type_dir);
let pieces: Vec<&str> = standard.rsplitn(5, '-').collect();
if pieces.len() == 5 && pieces[3].chars().all(|c| c.is_ascii_digit()) {
Some(pieces[4])
} else {
None
}
}
#[must_use]
pub fn safe_directory_identifier(identifier: &str) -> String {
let normalized = identifier.replace("//", "/");
let segments: Vec<&str> = normalized
.split('/')
.filter(|s| !s.is_empty() && *s != "..")
.collect();
segments.join("/")
}
#[must_use]
pub fn extract_identifier_from_path(file_path: &str) -> String {
let path_parts: Vec<&str> = file_path.split('/').collect();
if path_parts.len() >= 2 {
path_parts[path_parts.len() - 2].to_string()
} else {
"unknown".to_string()
}
}
#[must_use]
pub fn make_sql_safe_identifier(identifier: &str) -> String {
urisafe_instrument_id(identifier)
.replace(['.', '-', ' ', '%'], "_")
.to_lowercase()
}
#[must_use]
pub fn extract_sql_safe_filename(file_path: &str) -> String {
if file_path.is_empty() {
return "unknown_file".to_string();
}
let filename = file_path.split('/').next_back().unwrap_or("unknown_file");
let name_without_ext = if let Some(dot_pos) = filename.rfind(".parquet") {
&filename[..dot_pos]
} else {
filename
};
name_without_ext
.replace(['-', ':', '.'], "_")
.to_lowercase()
}
pub fn make_local_path<P: AsRef<Path>>(base_path: P, components: &[&str]) -> PathBuf {
let mut path = PathBuf::from(base_path.as_ref());
for component in components {
path.push(component);
}
path
}
#[must_use]
pub fn make_object_store_path(base_path: &str, components: &[&str]) -> String {
let mut parts = Vec::new();
if !base_path.is_empty() {
let normalized_base = base_path
.replace('\\', "/")
.trim_end_matches('/')
.to_string();
if !normalized_base.is_empty() {
parts.push(normalized_base);
}
}
for component in components {
let normalized_component = component
.replace('\\', "/")
.trim_start_matches('/')
.trim_end_matches('/')
.to_string();
if !normalized_component.is_empty() {
parts.push(normalized_component);
}
}
parts.join("/")
}
#[must_use]
pub fn make_object_store_path_owned(base_path: &str, components: Vec<String>) -> String {
let mut parts = Vec::new();
if !base_path.is_empty() {
let normalized_base = base_path
.replace('\\', "/")
.trim_end_matches('/')
.to_string();
if !normalized_base.is_empty() {
parts.push(normalized_base);
}
}
for component in components {
let normalized_component = component
.replace('\\', "/")
.trim_start_matches('/')
.trim_end_matches('/')
.to_string();
if !normalized_component.is_empty() {
parts.push(normalized_component);
}
}
parts.join("/")
}
#[must_use]
pub fn local_to_object_store_path(local_path: &Path) -> String {
local_path.to_string_lossy().replace('\\', "/")
}
#[must_use]
pub fn extract_path_components(path_str: &str) -> Vec<String> {
let normalized = path_str.replace('\\', "/");
normalized
.split('/')
.filter(|s| !s.is_empty())
.map(ToString::to_string)
.collect()
}
fn query_intersects_filename(filename: &str, start: Option<u64>, end: Option<u64>) -> bool {
if let Some((file_start, file_end)) = parse_filename_timestamps(filename) {
(start.is_none() || start.unwrap() <= file_end)
&& (end.is_none() || file_start <= end.unwrap())
} else {
true
}
}
#[must_use]
pub fn parse_filename_timestamps(filename: &str) -> Option<(u64, u64)> {
let path = Path::new(filename);
let base_name = path.file_name()?.to_str()?;
let base_filename = base_name.strip_suffix(".parquet")?;
let (first_part, second_part) = base_filename.split_once('_')?;
let first_iso = file_timestamp_to_iso_timestamp(first_part);
let second_iso = file_timestamp_to_iso_timestamp(second_part);
let first_ts = iso_to_unix_nanos(&first_iso).ok()?;
let second_ts = iso_to_unix_nanos(&second_iso).ok()?;
Some((first_ts, second_ts))
}
#[must_use]
pub fn are_intervals_disjoint(intervals: &[(u64, u64)]) -> bool {
let n = intervals.len();
if n <= 1 {
return true;
}
let mut sorted_intervals: Vec<(u64, u64)> = intervals.to_vec();
sorted_intervals.sort_by_key(|&(start, _)| start);
for i in 0..(n - 1) {
let (_, end1) = sorted_intervals[i];
let (start2, _) = sorted_intervals[i + 1];
if end1 >= start2 {
return false;
}
}
true
}
#[must_use]
pub fn are_intervals_contiguous(intervals: &[(u64, u64)]) -> bool {
let n = intervals.len();
if n <= 1 {
return true;
}
let mut sorted_intervals: Vec<(u64, u64)> = intervals.to_vec();
sorted_intervals.sort_by_key(|&(start, _)| start);
for i in 0..(n - 1) {
let (_, end1) = sorted_intervals[i];
let (start2, _) = sorted_intervals[i + 1];
if end1 + 1 != start2 {
return false;
}
}
true
}
fn query_interval_diff(start: u64, end: u64, closed_intervals: &[(u64, u64)]) -> Vec<(u64, u64)> {
if start > end {
return Vec::new();
}
let interval_set = get_interval_set(closed_intervals);
let query_range = (RangeBound::Included(start), RangeBound::Included(end));
let query_diff = interval_set.get_interval_difference(&query_range);
let mut result: Vec<(u64, u64)> = Vec::new();
for interval in query_diff {
if let Some(tuple) = interval_to_tuple(interval, start, end) {
result.push(tuple);
}
}
result
}
fn get_interval_set(intervals: &[(u64, u64)]) -> IntervalTree<u64> {
let mut tree = IntervalTree::default();
if intervals.is_empty() {
return tree;
}
for &(start, end) in intervals {
if start > end {
continue;
}
tree.insert((
RangeBound::Included(start),
RangeBound::Excluded(end.saturating_add(1)),
));
}
tree
}
fn interval_to_tuple(
interval: (RangeBound<&u64>, RangeBound<&u64>),
query_start: u64,
query_end: u64,
) -> Option<(u64, u64)> {
let (bound_start, bound_end) = interval;
let start = match bound_start {
RangeBound::Included(val) => *val,
RangeBound::Excluded(val) => val.saturating_add(1),
RangeBound::Unbounded => query_start,
};
let end = match bound_end {
RangeBound::Included(val) => *val,
RangeBound::Excluded(val) => {
if *val == 0 {
return None; }
val - 1
}
RangeBound::Unbounded => query_end,
};
if start <= end {
Some((start, end))
} else {
None
}
}