neural_trader/
market_data.rs

1//! Market data streaming and fetching bindings for Node.js
2
3use napi::bindgen_prelude::*;
4use napi::threadsafe_function::{ErrorStrategy, ThreadsafeFunction, ThreadsafeFunctionCallMode};
5use napi_derive::napi;
6use std::sync::Arc;
7use tokio::sync::Mutex;
8
9/// Market data bar/candle
10#[napi(object)]
11pub struct Bar {
12    pub symbol: String,
13    pub timestamp: String,
14    pub open: f64,
15    pub high: f64,
16    pub low: f64,
17    pub close: f64,
18    pub volume: f64,
19}
20
21/// Real-time quote
22#[napi(object)]
23pub struct Quote {
24    pub symbol: String,
25    pub bid: f64,
26    pub ask: f64,
27    pub bid_size: u32,
28    pub ask_size: u32,
29    pub last: f64,
30    pub last_size: u32,
31    pub timestamp: String,
32}
33
34/// Market data provider configuration
35#[napi(object)]
36pub struct MarketDataConfig {
37    pub provider: String,  // "alpaca", "polygon", "yahoo", "binance"
38    pub api_key: Option<String>,
39    pub api_secret: Option<String>,
40    pub websocket_enabled: bool,
41}
42
43/// Market data provider
44#[napi]
45pub struct MarketDataProvider {
46    config: Arc<MarketDataConfig>,
47    _connection: Arc<Mutex<Option<String>>>,
48}
49
50#[napi]
51impl MarketDataProvider {
52    /// Create a new market data provider
53    #[napi(constructor)]
54    pub fn new(config: MarketDataConfig) -> Self {
55        tracing::info!("Creating market data provider: {}", config.provider);
56
57        Self {
58            config: Arc::new(config),
59            _connection: Arc::new(Mutex::new(None)),
60        }
61    }
62
63    /// Connect to market data provider
64    #[napi]
65    pub async fn connect(&self) -> Result<bool> {
66        tracing::info!("Connecting to market data provider");
67
68        // TODO: Implement actual connection
69        let mut conn = self._connection.lock().await;
70        *conn = Some(format!("connected-{}", self.config.provider));
71
72        Ok(true)
73    }
74
75    /// Disconnect from provider
76    #[napi]
77    pub async fn disconnect(&self) -> Result<()> {
78        tracing::info!("Disconnecting from market data provider");
79
80        let mut conn = self._connection.lock().await;
81        *conn = None;
82
83        Ok(())
84    }
85
86    /// Fetch historical bars
87    #[napi]
88    pub async fn fetch_bars(
89        &self,
90        symbol: String,
91        start: String,
92        end: String,
93        timeframe: String,
94    ) -> Result<Vec<Bar>> {
95        tracing::info!(
96            "Fetching bars: {} from {} to {} ({})",
97            symbol,
98            start,
99            end,
100            timeframe
101        );
102
103        // TODO: Implement actual data fetching
104        // For now, return mock data
105        let bars = vec![
106            Bar {
107                symbol: symbol.clone(),
108                timestamp: start.clone(),
109                open: 100.0,
110                high: 102.0,
111                low: 99.0,
112                close: 101.0,
113                volume: 1000000.0,
114            },
115            Bar {
116                symbol: symbol.clone(),
117                timestamp: end.clone(),
118                open: 101.0,
119                high: 103.0,
120                low: 100.0,
121                close: 102.0,
122                volume: 1200000.0,
123            },
124        ];
125
126        Ok(bars)
127    }
128
129    /// Get latest quote
130    #[napi]
131    pub async fn get_quote(&self, symbol: String) -> Result<Quote> {
132        tracing::debug!("Getting quote for {}", symbol);
133
134        // TODO: Implement actual quote fetching
135        Ok(Quote {
136            symbol,
137            bid: 100.50,
138            ask: 100.55,
139            bid_size: 100,
140            ask_size: 200,
141            last: 100.52,
142            last_size: 50,
143            timestamp: chrono::Utc::now().to_rfc3339(),
144        })
145    }
146
147    /// Subscribe to real-time quotes
148    #[napi]
149    pub fn subscribe_quotes(
150        &self,
151        symbols: Vec<String>,
152        callback: JsFunction,
153    ) -> Result<SubscriptionHandle> {
154        tracing::info!("Subscribing to quotes for {} symbols", symbols.len());
155
156        let tsfn: ThreadsafeFunction<Quote, ErrorStrategy::CalleeHandled> =
157            callback.create_threadsafe_function(0, |ctx| Ok(vec![ctx.value]))?;
158
159        // Spawn background task for streaming quotes
160        let symbols_clone = symbols.clone();
161        let handle = tokio::spawn(async move {
162            loop {
163                tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
164
165                // TODO: Implement actual streaming
166                // For now, just simulate with periodic updates
167                for symbol in &symbols_clone {
168                    let quote = Quote {
169                        symbol: symbol.clone(),
170                        bid: 100.0,
171                        ask: 100.1,
172                        bid_size: 100,
173                        ask_size: 100,
174                        last: 100.05,
175                        last_size: 50,
176                        timestamp: chrono::Utc::now().to_rfc3339(),
177                    };
178
179                    let _ = tsfn.call(Ok(quote), ThreadsafeFunctionCallMode::NonBlocking);
180                }
181            }
182        });
183
184        Ok(SubscriptionHandle {
185            handle: Arc::new(Mutex::new(Some(handle))),
186        })
187    }
188
189    /// Get multiple quotes at once
190    #[napi]
191    pub async fn get_quotes_batch(&self, symbols: Vec<String>) -> Result<Vec<Quote>> {
192        tracing::debug!("Getting batch quotes for {} symbols", symbols.len());
193
194        // TODO: Implement actual batch quote fetching
195        let quotes = symbols
196            .iter()
197            .map(|symbol| Quote {
198                symbol: symbol.clone(),
199                bid: 100.0,
200                ask: 100.1,
201                bid_size: 100,
202                ask_size: 100,
203                last: 100.05,
204                last_size: 50,
205                timestamp: chrono::Utc::now().to_rfc3339(),
206            })
207            .collect();
208
209        Ok(quotes)
210    }
211
212    /// Check if provider is connected
213    #[napi]
214    pub async fn is_connected(&self) -> Result<bool> {
215        let conn = self._connection.lock().await;
216        Ok(conn.is_some())
217    }
218}
219
220/// Subscription handle for cleanup
221#[napi]
222pub struct SubscriptionHandle {
223    handle: Arc<Mutex<Option<tokio::task::JoinHandle<()>>>>,
224}
225
226#[napi]
227impl SubscriptionHandle {
228    /// Unsubscribe from quotes
229    #[napi]
230    pub async fn unsubscribe(&self) -> Result<()> {
231        let mut guard = self.handle.lock().await;
232        if let Some(handle) = guard.take() {
233            handle.abort();
234            tracing::info!("Unsubscribed from market data");
235        }
236        Ok(())
237    }
238}
239
240/// Calculate technical indicators
241#[napi]
242pub fn calculate_sma(prices: Vec<f64>, period: u32) -> Result<Vec<f64>> {
243    if prices.len() < period as usize {
244        return Err(Error::from_reason("Not enough data for SMA calculation"));
245    }
246
247    let mut sma = vec![f64::NAN; period as usize - 1];
248
249    for i in (period as usize - 1)..prices.len() {
250        let sum: f64 = prices[i - (period as usize - 1)..=i].iter().sum();
251        sma.push(sum / period as f64);
252    }
253
254    Ok(sma)
255}
256
257/// Calculate Relative Strength Index (RSI)
258#[napi]
259pub fn calculate_rsi(prices: Vec<f64>, period: u32) -> Result<Vec<f64>> {
260    if prices.len() < (period + 1) as usize {
261        return Err(Error::from_reason("Not enough data for RSI calculation"));
262    }
263
264    let mut gains = Vec::new();
265    let mut losses = Vec::new();
266
267    // Calculate price changes
268    for i in 1..prices.len() {
269        let change = prices[i] - prices[i - 1];
270        gains.push(if change > 0.0 { change } else { 0.0 });
271        losses.push(if change < 0.0 { -change } else { 0.0 });
272    }
273
274    // Calculate average gains and losses
275    let mut avg_gain = gains[..period as usize].iter().sum::<f64>() / period as f64;
276    let mut avg_loss = losses[..period as usize].iter().sum::<f64>() / period as f64;
277
278    let mut rsi_values = vec![f64::NAN; period as usize];
279
280    for i in period as usize..gains.len() {
281        avg_gain = (avg_gain * (period as f64 - 1.0) + gains[i]) / period as f64;
282        avg_loss = (avg_loss * (period as f64 - 1.0) + losses[i]) / period as f64;
283
284        let rs = if avg_loss > 0.0 {
285            avg_gain / avg_loss
286        } else {
287            0.0
288        };
289
290        let rsi = 100.0 - (100.0 / (1.0 + rs));
291        rsi_values.push(rsi);
292    }
293
294    Ok(rsi_values)
295}
296
297/// List available market data providers
298#[napi]
299pub fn list_data_providers() -> Vec<String> {
300    vec![
301        "alpaca".to_string(),
302        "polygon".to_string(),
303        "yahoo".to_string(),
304        "binance".to_string(),
305        "coinbase".to_string(),
306    ]
307}