use std::{
cell::RefCell,
collections::{HashMap, HashSet},
rc::Rc,
};
use nautilus_common::{
live::get_runtime,
msgbus::typed_handler::ShareableMessageHandler,
python::{cache::PyCache, clock::PyClock},
};
use nautilus_core::UnixNanos;
use nautilus_model::{
data::{
Bar, Data, IndexPriceUpdate, MarkPriceUpdate, OrderBookDelta, OrderBookDepth10, QuoteTick,
TradeTick, close::InstrumentClose,
},
python::instruments::pyobject_to_instrument_any,
};
use object_store::ObjectStoreExt;
use pyo3::{exceptions::PyIOError, prelude::*};
use crate::{
backend::feather::{FeatherWriter, RotationConfig},
parquet::create_object_store_from_path,
};
#[pyclass(
name = "StreamingFeatherWriter",
module = "nautilus_trader.core.nautilus_pyo3.persistence",
unsendable
)]
#[pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.persistence")]
pub struct PyStreamingFeatherWriter {
writer: Rc<RefCell<FeatherWriter>>,
handler: Option<ShareableMessageHandler>,
}
#[pymethods]
#[pyo3_stub_gen::derive::gen_stub_pymethods]
impl PyStreamingFeatherWriter {
#[new]
#[pyo3(signature = (
path,
cache,
clock,
fs_protocol=None,
fs_storage_options=None,
include_types=None,
rotation_mode=3,
max_file_size=1024*1024*1024,
rotation_interval_ns=None,
rotation_time_ns=None,
rotation_timezone="UTC",
flush_interval_ms=None,
replace=false
))]
#[allow(clippy::too_many_arguments, clippy::needless_pass_by_value)]
pub fn new(
path: String,
cache: PyCache,
clock: PyClock,
fs_protocol: Option<&str>,
fs_storage_options: Option<HashMap<String, String>>,
include_types: Option<Vec<String>>,
rotation_mode: u8,
max_file_size: u64,
rotation_interval_ns: Option<u64>,
rotation_time_ns: Option<u64>,
rotation_timezone: &str,
flush_interval_ms: Option<u64>,
replace: bool,
) -> PyResult<Self> {
let full_path = if let Some(protocol) = fs_protocol {
if protocol != "file" && !path.contains("://") {
format!("{protocol}://{path}")
} else {
path.clone()
}
} else {
path.clone()
};
let storage_options = fs_storage_options
.map(|map| map.into_iter().collect::<ahash::AHashMap<String, String>>());
let (object_store, _base_path, _original_uri) =
create_object_store_from_path(&full_path, storage_options)
.map_err(|e| PyIOError::new_err(format!("Failed to create object store: {e}")))?;
if replace {
let runtime = get_runtime();
let store_ref = object_store.clone();
runtime
.block_on(async {
let prefix =
object_store::path::Path::from(path.trim_start_matches('/').to_string());
let mut stream = store_ref.list(Some(&prefix));
let mut to_delete = Vec::new();
while let Some(result) = futures::StreamExt::next(&mut stream).await {
if let Ok(meta) = result {
to_delete.push(meta.location);
}
}
for path in to_delete {
let _ = store_ref.delete(&path).await;
}
Ok::<(), anyhow::Error>(())
})
.map_err(|e| {
PyIOError::new_err(format!("Failed to replace existing files: {e}"))
})?;
}
let rotation_config = match rotation_mode {
0 => RotationConfig::Size {
max_size: max_file_size,
},
1 => {
let interval = rotation_interval_ns.unwrap_or(86_400_000_000_000); RotationConfig::Interval {
interval_ns: interval,
}
}
2 => {
let interval = rotation_interval_ns.unwrap_or(86_400_000_000_000); let tz = rotation_timezone.parse::<chrono_tz::Tz>().map_err(|e| {
PyIOError::new_err(format!("Failed to parse rotation_timezone: {e}"))
})?;
let time_ns = rotation_time_ns.unwrap_or(0);
RotationConfig::ScheduledDates {
interval_ns: interval,
rotation_time: UnixNanos::from(time_ns),
rotation_timezone: tz,
}
}
3 => RotationConfig::NoRotation,
_ => RotationConfig::NoRotation, };
let included_types =
include_types.map(|types| types.into_iter().collect::<HashSet<String>>());
let mut per_instrument_types = HashSet::new();
per_instrument_types.insert("bars".to_string());
per_instrument_types.insert("order_book_deltas".to_string());
per_instrument_types.insert("order_book_depths".to_string());
per_instrument_types.insert("quotes".to_string());
per_instrument_types.insert("trades".to_string());
let clock_rc = clock.clock_rc();
let _cache = cache;
let writer = FeatherWriter::new(
path,
object_store,
clock_rc,
rotation_config,
included_types,
Some(per_instrument_types),
flush_interval_ms, );
Ok(Self {
writer: Rc::new(RefCell::new(writer)),
handler: None,
})
}
pub fn subscribe(&mut self) -> PyResult<()> {
if self.handler.is_some() {
return Ok(());
}
let handler = FeatherWriter::subscribe_to_message_bus(self.writer.clone())
.map_err(|e| PyIOError::new_err(format!("Failed to subscribe to message bus: {e}")))?;
self.handler = Some(handler);
Ok(())
}
pub fn unsubscribe(&mut self) -> PyResult<()> {
if let Some(handler) = self.handler.take() {
FeatherWriter::unsubscribe_from_message_bus(&handler);
}
Ok(())
}
#[allow(clippy::needless_pass_by_value)]
pub fn write(&self, py: Python, data: Py<PyAny>) -> PyResult<()> {
if let Ok(quote) = data.extract::<QuoteTick>(py) {
let mut writer = self.writer.borrow_mut();
let runtime = get_runtime();
return runtime
.block_on(async { writer.write_data(Data::Quote(quote)).await })
.map_err(|e| PyIOError::new_err(format!("Failed to write QuoteTick: {e}")));
}
if let Ok(trade) = data.extract::<TradeTick>(py) {
let mut writer = self.writer.borrow_mut();
let runtime = get_runtime();
return runtime
.block_on(async { writer.write_data(Data::Trade(trade)).await })
.map_err(|e| PyIOError::new_err(format!("Failed to write TradeTick: {e}")));
}
if let Ok(bar) = data.extract::<Bar>(py) {
let mut writer = self.writer.borrow_mut();
let runtime = get_runtime();
return runtime
.block_on(async { writer.write_data(Data::Bar(bar)).await })
.map_err(|e| PyIOError::new_err(format!("Failed to write Bar: {e}")));
}
if let Ok(delta) = data.extract::<OrderBookDelta>(py) {
let mut writer = self.writer.borrow_mut();
let runtime = get_runtime();
return runtime
.block_on(async { writer.write_data(Data::Delta(delta)).await })
.map_err(|e| PyIOError::new_err(format!("Failed to write OrderBookDelta: {e}")));
}
if let Ok(depth) = data.extract::<OrderBookDepth10>(py) {
let mut writer = self.writer.borrow_mut();
let runtime = get_runtime();
return runtime
.block_on(async { writer.write_data(Data::Depth10(Box::new(depth))).await })
.map_err(|e| PyIOError::new_err(format!("Failed to write OrderBookDepth10: {e}")));
}
if let Ok(price) = data.extract::<IndexPriceUpdate>(py) {
let mut writer = self.writer.borrow_mut();
let runtime = get_runtime();
return runtime
.block_on(async { writer.write_data(Data::IndexPriceUpdate(price)).await })
.map_err(|e| PyIOError::new_err(format!("Failed to write IndexPriceUpdate: {e}")));
}
if let Ok(price) = data.extract::<MarkPriceUpdate>(py) {
let mut writer = self.writer.borrow_mut();
let runtime = get_runtime();
return runtime
.block_on(async { writer.write_data(Data::MarkPriceUpdate(price)).await })
.map_err(|e| PyIOError::new_err(format!("Failed to write MarkPriceUpdate: {e}")));
}
if let Ok(close) = data.extract::<InstrumentClose>(py) {
let mut writer = self.writer.borrow_mut();
let runtime = get_runtime();
return runtime
.block_on(async { writer.write_data(Data::InstrumentClose(close)).await })
.map_err(|e| PyIOError::new_err(format!("Failed to write InstrumentClose: {e}")));
}
if let Ok(instrument) = pyobject_to_instrument_any(py, data.clone_ref(py)) {
let mut writer = self.writer.borrow_mut();
let runtime = get_runtime();
return runtime
.block_on(async { writer.write_instrument(instrument).await })
.map_err(|e| PyIOError::new_err(format!("Failed to write instrument: {e}")));
}
Err(PyIOError::new_err(
"Unsupported data type for feather writer",
))
}
pub fn flush(&self) -> PyResult<()> {
let mut writer = self.writer.borrow_mut();
let runtime = get_runtime();
runtime
.block_on(async { writer.flush().await })
.map_err(|e| PyIOError::new_err(format!("Failed to flush: {e}")))
}
pub fn close(&self) -> PyResult<()> {
let mut writer = self.writer.borrow_mut();
let runtime = get_runtime();
runtime
.block_on(async { writer.close().await })
.map_err(|e| PyIOError::new_err(format!("Failed to close: {e}")))
}
#[getter]
pub fn is_closed(&self) -> bool {
self.writer.borrow().is_closed()
}
pub fn get_current_file_info(&self) -> HashMap<String, (u64, String)> {
self.writer.borrow().get_current_file_info()
}
#[pyo3(signature = (type_str, instrument_id=None))]
pub fn get_next_rotation_time(
&self,
type_str: &str,
instrument_id: Option<&str>,
) -> Option<u64> {
self.writer
.borrow()
.get_next_rotation_time(type_str, instrument_id)
.map(|ns| ns.as_u64())
}
}