Skip to main content

jflow_core/
signal.rs

1//! Signal types and broadcast bus for inter-module communication
2
3use chrono::{DateTime, Utc};
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6use tokio::sync::broadcast;
7use uuid::Uuid;
8
9/// Trading signal type
10#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
11#[serde(rename_all = "lowercase")]
12pub enum SignalType {
13    /// Buy signal
14    Buy,
15    /// Sell signal
16    Sell,
17    /// Hold (no action)
18    Hold,
19    /// Close existing position
20    Close,
21}
22
23impl std::fmt::Display for SignalType {
24    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
25        match self {
26            SignalType::Buy => write!(f, "BUY"),
27            SignalType::Sell => write!(f, "SELL"),
28            SignalType::Hold => write!(f, "HOLD"),
29            SignalType::Close => write!(f, "CLOSE"),
30        }
31    }
32}
33
34/// Signal priority for routing
35#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
36pub enum SignalPriority {
37    Low = 0,
38    Normal = 1,
39    High = 2,
40    Critical = 3,
41}
42
43/// Trading signal shared across all modules
44#[derive(Debug, Clone, Serialize, Deserialize)]
45pub struct Signal {
46    /// Unique signal ID
47    pub id: String,
48    /// Trading symbol (e.g., BTCUSD)
49    pub symbol: String,
50    /// Signal type (buy, sell, hold, close)
51    pub signal_type: SignalType,
52    /// Confidence score (0.0 to 1.0)
53    pub confidence: f64,
54    /// Signal timestamp
55    pub timestamp: DateTime<Utc>,
56    /// Signal priority
57    pub priority: SignalPriority,
58    /// Source module that generated the signal
59    pub source: String,
60    /// Strategy ID that generated this signal
61    pub strategy_id: Option<String>,
62    /// Target price (optional)
63    pub target_price: Option<f64>,
64    /// Stop loss price (optional)
65    pub stop_loss: Option<f64>,
66    /// Take profit price (optional)
67    pub take_profit: Option<f64>,
68    /// Position size (optional)
69    pub quantity: Option<f64>,
70    /// Additional metadata
71    pub metadata: HashMap<String, String>,
72}
73
74impl Signal {
75    /// Create a new signal
76    pub fn new(symbol: impl Into<String>, signal_type: SignalType, confidence: f64) -> Self {
77        Self {
78            id: Uuid::new_v4().to_string(),
79            symbol: symbol.into(),
80            signal_type,
81            confidence: confidence.clamp(0.0, 1.0),
82            timestamp: Utc::now(),
83            priority: SignalPriority::Normal,
84            source: "unknown".to_string(),
85            strategy_id: None,
86            target_price: None,
87            stop_loss: None,
88            take_profit: None,
89            quantity: None,
90            metadata: HashMap::new(),
91        }
92    }
93
94    /// Builder pattern methods
95    pub fn with_source(mut self, source: impl Into<String>) -> Self {
96        self.source = source.into();
97        self
98    }
99
100    pub fn with_priority(mut self, priority: SignalPriority) -> Self {
101        self.priority = priority;
102        self
103    }
104
105    pub fn with_strategy(mut self, strategy_id: impl Into<String>) -> Self {
106        self.strategy_id = Some(strategy_id.into());
107        self
108    }
109
110    pub fn with_target_price(mut self, price: f64) -> Self {
111        self.target_price = Some(price);
112        self
113    }
114
115    pub fn with_stop_loss(mut self, price: f64) -> Self {
116        self.stop_loss = Some(price);
117        self
118    }
119
120    pub fn with_take_profit(mut self, price: f64) -> Self {
121        self.take_profit = Some(price);
122        self
123    }
124
125    pub fn with_quantity(mut self, quantity: f64) -> Self {
126        self.quantity = Some(quantity);
127        self
128    }
129
130    pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
131        self.metadata.insert(key.into(), value.into());
132        self
133    }
134
135    /// Check if signal is actionable (not a hold)
136    pub fn is_actionable(&self) -> bool {
137        !matches!(self.signal_type, SignalType::Hold)
138    }
139
140    /// Check if signal meets confidence threshold
141    pub fn meets_threshold(&self, threshold: f64) -> bool {
142        self.confidence >= threshold
143    }
144}
145
146/// Signal broadcast bus for inter-module communication
147///
148/// This allows modules to publish signals and other modules to subscribe
149/// without direct coupling.
150pub struct SignalBus {
151    /// Broadcast sender for signals
152    tx: broadcast::Sender<Signal>,
153    /// Channel capacity
154    capacity: usize,
155}
156
157impl SignalBus {
158    /// Create a new signal bus
159    pub fn new(capacity: usize) -> Self {
160        let (tx, _rx) = broadcast::channel(capacity);
161        Self { tx, capacity }
162    }
163
164    /// Publish a signal to all subscribers
165    pub fn publish(&self, signal: Signal) -> crate::Result<usize> {
166        let receivers = self.tx.send(signal)?;
167        Ok(receivers)
168    }
169
170    /// Subscribe to signals
171    pub fn subscribe(&self) -> broadcast::Receiver<Signal> {
172        self.tx.subscribe()
173    }
174
175    /// Get current subscriber count
176    pub fn subscriber_count(&self) -> usize {
177        self.tx.receiver_count()
178    }
179
180    /// Get channel capacity
181    pub fn capacity(&self) -> usize {
182        self.capacity
183    }
184}
185
186impl Default for SignalBus {
187    fn default() -> Self {
188        Self::new(1000)
189    }
190}
191
192impl Clone for SignalBus {
193    fn clone(&self) -> Self {
194        Self {
195            tx: self.tx.clone(),
196            capacity: self.capacity,
197        }
198    }
199}
200
201#[cfg(test)]
202mod tests {
203    use super::*;
204
205    #[test]
206    fn test_signal_creation() {
207        let signal = Signal::new("BTCUSD", SignalType::Buy, 0.85)
208            .with_source("forward")
209            .with_strategy("momentum_v1");
210
211        assert_eq!(signal.symbol, "BTCUSD");
212        assert_eq!(signal.signal_type, SignalType::Buy);
213        assert_eq!(signal.confidence, 0.85);
214        assert_eq!(signal.source, "forward");
215        assert!(signal.is_actionable());
216    }
217
218    #[test]
219    fn test_signal_bus() {
220        let bus = SignalBus::new(100);
221        let _rx = bus.subscribe();
222
223        let signal = Signal::new("ETHUSDT", SignalType::Sell, 0.9);
224        let _ = bus.publish(signal.clone());
225
226        // Note: In a real async test, you'd await the receive
227        assert_eq!(bus.subscriber_count(), 1);
228    }
229
230    #[test]
231    fn test_confidence_clamping() {
232        let signal = Signal::new("BTCUSD", SignalType::Buy, 1.5);
233        assert_eq!(signal.confidence, 1.0);
234
235        let signal2 = Signal::new("BTCUSD", SignalType::Buy, -0.5);
236        assert_eq!(signal2.confidence, 0.0);
237    }
238}