Skip to main content

shape_runtime/data/
async_provider.rs

1//! Async data provider trait for historical and live data
2//!
3//! This module defines the async interface for data providers that can:
4//! - Load historical data concurrently
5//! - Subscribe to live data streams
6//! - Support industry-agnostic time series data
7
8use super::{DataFrame, DataQuery, Timeframe};
9use std::future::Future;
10use std::pin::Pin;
11use std::sync::Arc;
12use thiserror::Error;
13
14/// Error type for async data operations
15#[derive(Debug, Error)]
16pub enum AsyncDataError {
17    #[error("Symbol not found: {0}")]
18    SymbolNotFound(String),
19
20    #[error("No data available for timeframe: {0}")]
21    TimeframeNotAvailable(String),
22
23    #[error("No data in requested range")]
24    NoDataInRange,
25
26    #[error("Connection error: {0}")]
27    Connection(String),
28
29    #[error("Timeout")]
30    Timeout,
31
32    #[error("IO error: {0}")]
33    Io(#[from] std::io::Error),
34
35    #[error("Provider error: {0}")]
36    Provider(String),
37
38    #[error("Configuration error: {0}")]
39    Config(String),
40}
41
42/// Async data provider trait for historical and live data
43///
44/// This trait provides an async interface for loading data. Implementations
45/// can support both historical data fetching and live data streaming.
46///
47/// # Object Safety
48///
49/// Uses `Pin<Box<dyn Future>>` instead of `async fn` to maintain object safety
50/// and allow `dyn AsyncDataProvider` trait objects.
51///
52/// # Example
53///
54/// ```ignore
55/// use shape_core::data::{AsyncDataProvider, DataQuery, Timeframe};
56///
57/// async fn load_data(provider: &dyn AsyncDataProvider) {
58///     let query = DataQuery::new("AAPL", Timeframe::d1());
59///     let df = provider.load(&query).await.unwrap();
60///     println!("Loaded {} rows", df.row_count());
61/// }
62/// ```
63pub trait AsyncDataProvider: Send + Sync {
64    /// Load historical data matching the query
65    ///
66    /// This method is async and can load data concurrently. The returned
67    /// DataFrame contains all requested data in columnar format.
68    ///
69    /// # Arguments
70    ///
71    /// * `query` - Specifies symbol, timeframe, range, and limits
72    ///
73    /// # Returns
74    ///
75    /// A DataFrame with the requested data, or an error if unavailable.
76    fn load<'a>(
77        &'a self,
78        query: &'a DataQuery,
79    ) -> Pin<Box<dyn Future<Output = Result<DataFrame, AsyncDataError>> + Send + 'a>>;
80
81    /// Check if data is available for a symbol/timeframe combination
82    ///
83    /// This is a sync metadata query - doesn't load actual data.
84    fn has_data(&self, symbol: &str, timeframe: &Timeframe) -> bool;
85
86    /// List available symbols
87    ///
88    /// This is a sync metadata query.
89    fn symbols(&self) -> Vec<String>;
90
91    /// List available timeframes for a symbol
92    ///
93    /// Default implementation returns empty. Providers should override if
94    /// they can provide this information.
95    fn timeframes(&self, symbol: &str) -> Vec<Timeframe> {
96        let _ = symbol;
97        Vec::new()
98    }
99
100    /// Subscribe to live bar updates
101    ///
102    /// Returns a channel receiver that yields new bars as they complete.
103    /// Default implementation returns an error (live data not supported).
104    ///
105    /// # Arguments
106    ///
107    /// * `symbol` - Symbol to subscribe to
108    /// * `timeframe` - Timeframe for bars
109    ///
110    /// # Returns
111    ///
112    /// A receiver for DataFrames containing new bars, or an error.
113    ///
114    /// # Example
115    ///
116    /// ```ignore
117    /// let mut rx = provider.subscribe("AAPL", &Timeframe::m1())?;
118    /// while let Some(df) = rx.recv().await {
119    ///     println!("New bar: {}", df.row_count());
120    /// }
121    /// ```
122    fn subscribe(
123        &self,
124        symbol: &str,
125        timeframe: &Timeframe,
126    ) -> Result<tokio::sync::mpsc::Receiver<DataFrame>, AsyncDataError> {
127        let _ = (symbol, timeframe);
128        Err(AsyncDataError::Provider(
129            "Live data not supported by this provider".into(),
130        ))
131    }
132
133    /// Unsubscribe from live updates
134    ///
135    /// Default implementation is a no-op. Providers should override if they
136    /// manage subscription state.
137    fn unsubscribe(&self, symbol: &str, timeframe: &Timeframe) -> Result<(), AsyncDataError> {
138        let _ = (symbol, timeframe);
139        Ok(())
140    }
141}
142
143/// Type alias for a shared async data provider
144pub type SharedAsyncProvider = Arc<dyn AsyncDataProvider>;
145
146/// A no-op async provider that returns no data
147///
148/// Useful as a default when no provider is configured.
149#[derive(Debug, Clone, Default)]
150pub struct NullAsyncProvider;
151
152impl AsyncDataProvider for NullAsyncProvider {
153    fn load<'a>(
154        &'a self,
155        query: &'a DataQuery,
156    ) -> Pin<Box<dyn Future<Output = Result<DataFrame, AsyncDataError>> + Send + 'a>> {
157        Box::pin(async move { Err(AsyncDataError::SymbolNotFound(query.id.clone())) })
158    }
159
160    fn has_data(&self, _symbol: &str, _timeframe: &Timeframe) -> bool {
161        false
162    }
163
164    fn symbols(&self) -> Vec<String> {
165        Vec::new()
166    }
167}
168
169#[cfg(test)]
170mod tests {
171    use super::*;
172
173    #[tokio::test]
174    async fn test_null_provider() {
175        let provider = NullAsyncProvider;
176        let query = DataQuery::new("TEST", Timeframe::d1());
177
178        assert!(!provider.has_data("TEST", &Timeframe::d1()));
179        assert!(provider.symbols().is_empty());
180        assert!(provider.load(&query).await.is_err());
181    }
182}