shape_runtime/data/async_provider.rs
1//! Async data provider trait for historical and live data
2//!
3//! This module defines the async interface for data providers that can:
4//! - Load historical data concurrently
5//! - Subscribe to live data streams
6//! - Support industry-agnostic time series data
7
8use super::{DataFrame, DataQuery, Timeframe};
9use std::future::Future;
10use std::pin::Pin;
11use std::sync::Arc;
12use thiserror::Error;
13
14/// Error type for async data operations
15#[derive(Debug, Error)]
16pub enum AsyncDataError {
17 #[error("Symbol not found: {0}")]
18 SymbolNotFound(String),
19
20 #[error("No data available for timeframe: {0}")]
21 TimeframeNotAvailable(String),
22
23 #[error("No data in requested range")]
24 NoDataInRange,
25
26 #[error("Connection error: {0}")]
27 Connection(String),
28
29 #[error("Timeout")]
30 Timeout,
31
32 #[error("IO error: {0}")]
33 Io(#[from] std::io::Error),
34
35 #[error("Provider error: {0}")]
36 Provider(String),
37
38 #[error("Configuration error: {0}")]
39 Config(String),
40}
41
42/// Async data provider trait for historical and live data
43///
44/// This trait provides an async interface for loading data. Implementations
45/// can support both historical data fetching and live data streaming.
46///
47/// # Object Safety
48///
49/// Uses `Pin<Box<dyn Future>>` instead of `async fn` to maintain object safety
50/// and allow `dyn AsyncDataProvider` trait objects.
51///
52/// # Example
53///
54/// ```ignore
55/// use shape_core::data::{AsyncDataProvider, DataQuery, Timeframe};
56///
57/// async fn load_data(provider: &dyn AsyncDataProvider) {
58/// let query = DataQuery::new("AAPL", Timeframe::d1());
59/// let df = provider.load(&query).await.unwrap();
60/// println!("Loaded {} rows", df.row_count());
61/// }
62/// ```
63pub trait AsyncDataProvider: Send + Sync {
64 /// Load historical data matching the query
65 ///
66 /// This method is async and can load data concurrently. The returned
67 /// DataFrame contains all requested data in columnar format.
68 ///
69 /// # Arguments
70 ///
71 /// * `query` - Specifies symbol, timeframe, range, and limits
72 ///
73 /// # Returns
74 ///
75 /// A DataFrame with the requested data, or an error if unavailable.
76 fn load<'a>(
77 &'a self,
78 query: &'a DataQuery,
79 ) -> Pin<Box<dyn Future<Output = Result<DataFrame, AsyncDataError>> + Send + 'a>>;
80
81 /// Check if data is available for a symbol/timeframe combination
82 ///
83 /// This is a sync metadata query - doesn't load actual data.
84 fn has_data(&self, symbol: &str, timeframe: &Timeframe) -> bool;
85
86 /// List available symbols
87 ///
88 /// This is a sync metadata query.
89 fn symbols(&self) -> Vec<String>;
90
91 /// List available timeframes for a symbol
92 ///
93 /// Default implementation returns empty. Providers should override if
94 /// they can provide this information.
95 fn timeframes(&self, symbol: &str) -> Vec<Timeframe> {
96 let _ = symbol;
97 Vec::new()
98 }
99
100 /// Subscribe to live bar updates
101 ///
102 /// Returns a channel receiver that yields new bars as they complete.
103 /// Default implementation returns an error (live data not supported).
104 ///
105 /// # Arguments
106 ///
107 /// * `symbol` - Symbol to subscribe to
108 /// * `timeframe` - Timeframe for bars
109 ///
110 /// # Returns
111 ///
112 /// A receiver for DataFrames containing new bars, or an error.
113 ///
114 /// # Example
115 ///
116 /// ```ignore
117 /// let mut rx = provider.subscribe("AAPL", &Timeframe::m1())?;
118 /// while let Some(df) = rx.recv().await {
119 /// println!("New bar: {}", df.row_count());
120 /// }
121 /// ```
122 fn subscribe(
123 &self,
124 symbol: &str,
125 timeframe: &Timeframe,
126 ) -> Result<tokio::sync::mpsc::Receiver<DataFrame>, AsyncDataError> {
127 let _ = (symbol, timeframe);
128 Err(AsyncDataError::Provider(
129 "Live data not supported by this provider".into(),
130 ))
131 }
132
133 /// Unsubscribe from live updates
134 ///
135 /// Default implementation is a no-op. Providers should override if they
136 /// manage subscription state.
137 fn unsubscribe(&self, symbol: &str, timeframe: &Timeframe) -> Result<(), AsyncDataError> {
138 let _ = (symbol, timeframe);
139 Ok(())
140 }
141}
142
143/// Type alias for a shared async data provider
144pub type SharedAsyncProvider = Arc<dyn AsyncDataProvider>;
145
146/// A no-op async provider that returns no data
147///
148/// Useful as a default when no provider is configured.
149#[derive(Debug, Clone, Default)]
150pub struct NullAsyncProvider;
151
152impl AsyncDataProvider for NullAsyncProvider {
153 fn load<'a>(
154 &'a self,
155 query: &'a DataQuery,
156 ) -> Pin<Box<dyn Future<Output = Result<DataFrame, AsyncDataError>> + Send + 'a>> {
157 Box::pin(async move { Err(AsyncDataError::SymbolNotFound(query.id.clone())) })
158 }
159
160 fn has_data(&self, _symbol: &str, _timeframe: &Timeframe) -> bool {
161 false
162 }
163
164 fn symbols(&self) -> Vec<String> {
165 Vec::new()
166 }
167}
168
169#[cfg(test)]
170mod tests {
171 use super::*;
172
173 #[tokio::test]
174 async fn test_null_provider() {
175 let provider = NullAsyncProvider;
176 let query = DataQuery::new("TEST", Timeframe::d1());
177
178 assert!(!provider.has_data("TEST", &Timeframe::d1()));
179 assert!(provider.symbols().is_empty());
180 assert!(provider.load(&query).await.is_err());
181 }
182}