Skip to main content

jflow_core/
optimized_params.rs

1//! Optimized Parameters Module
2//!
3//! Receives and applies optimized trading parameters from the FKS Optimizer service
4//! via Redis pub/sub. Enables hot-reloading of strategy parameters without restart.
5//!
6//! # Redis Integration
7//!
8//! The optimizer publishes parameters to:
9//! - Hash: `fks:{instance}:optimized_params` - Stores current params per asset
10//! - Channel: `fks:{instance}:param_updates` - Notifies of param changes
11//!
12//! # Usage
13//!
14//! ```rust,ignore
15//! use janus_core::optimized_params::{OptimizedParams, ParamUpdateListener};
16//!
17//! // Load params from Redis
18//! let params = OptimizedParams::load_from_redis(&redis_client, "BTC").await?;
19//!
20//! // Subscribe to updates
21//! let listener = ParamUpdateListener::new("redis://localhost:6379", "default").await?;
22//! while let Some(update) = listener.next_update().await {
23//!     println!("New params for {}: {:?}", update.asset, update.params);
24//! }
25//! ```
26
27use serde::{Deserialize, Serialize};
28use std::collections::HashMap;
29use std::sync::Arc;
30use tokio::sync::{RwLock, broadcast};
31use tracing::{debug, error, info, warn};
32
33/// Optimized parameters for a single asset
34#[derive(Debug, Clone, Serialize, Deserialize)]
35pub struct OptimizedParams {
36    /// Asset symbol (e.g., "BTC", "ETH", "SOL")
37    pub asset: String,
38
39    /// Fast EMA period (typically 8-14)
40    #[serde(default = "default_ema_fast")]
41    pub ema_fast_period: u32,
42
43    /// Slow EMA period (typically 21-40)
44    #[serde(default = "default_ema_slow")]
45    pub ema_slow_period: u32,
46
47    /// ATR calculation period
48    #[serde(default = "default_atr_length")]
49    pub atr_length: u32,
50
51    /// ATR multiplier for trailing stop
52    #[serde(default = "default_atr_multiplier")]
53    pub atr_multiplier: f64,
54
55    /// Minimum trailing stop percentage
56    #[serde(default = "default_min_trailing_stop")]
57    pub min_trailing_stop_pct: f64,
58
59    /// Minimum EMA spread to confirm trend
60    #[serde(default = "default_min_ema_spread")]
61    pub min_ema_spread_pct: f64,
62
63    /// Minimum profit percentage to allow exit
64    #[serde(default = "default_min_profit")]
65    pub min_profit_pct: f64,
66
67    /// Take profit target percentage
68    #[serde(default = "default_take_profit")]
69    pub take_profit_pct: f64,
70
71    /// Stop loss percentage (positive value; e.g. `2.0` = exit when
72    /// unrealized loss reaches 2% of notional). Defaults to `2.0` so
73    /// existing optimizer payloads that lack this field keep the same
74    /// conservative behaviour as the original hardcoded threshold.
75    #[serde(default = "default_stop_loss")]
76    pub stop_loss_pct: f64,
77
78    /// Cooldown between trades in seconds
79    #[serde(default = "default_cooldown")]
80    pub trade_cooldown_seconds: u64,
81
82    /// Whether to require higher timeframe alignment
83    #[serde(default = "default_require_htf")]
84    pub require_htf_alignment: bool,
85
86    /// Higher timeframe for trend filter (minutes)
87    #[serde(default = "default_htf_timeframe")]
88    pub htf_timeframe_minutes: u32,
89
90    /// Maximum position size in USD
91    #[serde(default = "default_max_position")]
92    pub max_position_size_usd: f64,
93
94    /// Whether this asset is enabled for trading
95    #[serde(default = "default_enabled")]
96    pub enabled: bool,
97
98    /// Minimum hold time in minutes
99    #[serde(default = "default_min_hold")]
100    pub min_hold_minutes: u32,
101
102    /// Prefer trailing stop exit over EMA reversal
103    #[serde(default = "default_prefer_trailing")]
104    pub prefer_trailing_stop_exit: bool,
105
106    /// When these params were optimized (ISO 8601)
107    #[serde(default)]
108    pub optimized_at: String,
109
110    /// Optimization score (higher is better)
111    #[serde(default)]
112    pub optimization_score: f64,
113
114    /// Backtest results summary
115    #[serde(default)]
116    pub backtest_result: BacktestResultSummary,
117}
118
119/// Summary of backtest results
120#[derive(Debug, Clone, Default, Serialize, Deserialize)]
121pub struct BacktestResultSummary {
122    pub total_trades: u32,
123    pub winning_trades: u32,
124    pub losing_trades: u32,
125    pub total_pnl_pct: f64,
126    pub max_drawdown_pct: f64,
127    pub win_rate: f64,
128    pub profit_factor: f64,
129    pub sharpe_ratio: f64,
130    pub trades_per_day: f64,
131}
132
133// Default value functions for serde
134fn default_ema_fast() -> u32 {
135    9
136}
137fn default_ema_slow() -> u32 {
138    28
139}
140fn default_atr_length() -> u32 {
141    14
142}
143/// Default ATR multiplier (number of ATRs to the trailing stop). Exposed
144/// so volatility-aware guidance can size its noise band with the same
145/// baseline an un-optimized asset would use.
146pub const DEFAULT_ATR_MULTIPLIER: f64 = 2.0;
147
148fn default_atr_multiplier() -> f64 {
149    DEFAULT_ATR_MULTIPLIER
150}
151fn default_min_trailing_stop() -> f64 {
152    0.5
153}
154fn default_min_ema_spread() -> f64 {
155    0.20
156}
157fn default_min_profit() -> f64 {
158    0.40
159}
160fn default_take_profit() -> f64 {
161    5.0
162}
163fn default_stop_loss() -> f64 {
164    2.0
165}
166fn default_cooldown() -> u64 {
167    1800
168}
169fn default_require_htf() -> bool {
170    true
171}
172fn default_htf_timeframe() -> u32 {
173    15
174}
175fn default_max_position() -> f64 {
176    20.0
177}
178fn default_enabled() -> bool {
179    true
180}
181fn default_min_hold() -> u32 {
182    15
183}
184fn default_prefer_trailing() -> bool {
185    true
186}
187
188impl Default for OptimizedParams {
189    fn default() -> Self {
190        Self {
191            asset: String::new(),
192            ema_fast_period: default_ema_fast(),
193            ema_slow_period: default_ema_slow(),
194            atr_length: default_atr_length(),
195            atr_multiplier: default_atr_multiplier(),
196            min_trailing_stop_pct: default_min_trailing_stop(),
197            min_ema_spread_pct: default_min_ema_spread(),
198            min_profit_pct: default_min_profit(),
199            take_profit_pct: default_take_profit(),
200            stop_loss_pct: default_stop_loss(),
201            trade_cooldown_seconds: default_cooldown(),
202            require_htf_alignment: default_require_htf(),
203            htf_timeframe_minutes: default_htf_timeframe(),
204            max_position_size_usd: default_max_position(),
205            enabled: default_enabled(),
206            min_hold_minutes: default_min_hold(),
207            prefer_trailing_stop_exit: default_prefer_trailing(),
208            optimized_at: String::new(),
209            optimization_score: 0.0,
210            backtest_result: BacktestResultSummary::default(),
211        }
212    }
213}
214
215impl OptimizedParams {
216    /// Create new params for an asset with defaults
217    pub fn new(asset: impl Into<String>) -> Self {
218        Self {
219            asset: asset.into(),
220            ..Default::default()
221        }
222    }
223
224    /// Validate the parameters
225    pub fn validate(&self) -> Result<(), ParamError> {
226        if self.ema_fast_period >= self.ema_slow_period {
227            return Err(ParamError::InvalidConfig(
228                "EMA fast period must be less than slow period".into(),
229            ));
230        }
231
232        if self.ema_fast_period < 3 || self.ema_slow_period < 5 {
233            return Err(ParamError::InvalidConfig("EMA periods too small".into()));
234        }
235
236        if self.atr_multiplier <= 0.0 {
237            return Err(ParamError::InvalidConfig(
238                "ATR multiplier must be positive".into(),
239            ));
240        }
241
242        if self.min_ema_spread_pct < 0.0 || self.min_ema_spread_pct > 10.0 {
243            return Err(ParamError::InvalidConfig(
244                "Min EMA spread must be between 0 and 10%".into(),
245            ));
246        }
247
248        if self.stop_loss_pct <= 0.0 || self.stop_loss_pct > 100.0 {
249            return Err(ParamError::InvalidConfig(
250                "Stop loss must be a positive percentage (e.g. 2.0 = 2%)".into(),
251            ));
252        }
253
254        if self.take_profit_pct <= 0.0 || self.take_profit_pct > 100.0 {
255            return Err(ParamError::InvalidConfig(
256                "Take profit must be a positive percentage (e.g. 5.0 = 5%)".into(),
257            ));
258        }
259
260        Ok(())
261    }
262
263    /// Check if trading is allowed with these params
264    pub fn is_trading_enabled(&self) -> bool {
265        self.enabled && self.max_position_size_usd > 0.0
266    }
267}
268
269/// Notification types from the optimizer
270#[derive(Debug, Clone, Serialize, Deserialize)]
271#[serde(tag = "type", rename_all = "snake_case")]
272pub enum ParamNotification {
273    /// Single asset params updated
274    ParamUpdate {
275        asset: String,
276        timestamp: String,
277        params: OptimizedParams,
278    },
279    /// Optimization cycle started
280    OptimizationStarted {
281        timestamp: String,
282        assets: Vec<String>,
283    },
284    /// Optimization cycle completed
285    OptimizationComplete {
286        timestamp: String,
287        successful: u32,
288        failed: u32,
289        assets: Vec<String>,
290    },
291    /// Single asset optimization failed
292    OptimizationFailed {
293        timestamp: String,
294        asset: String,
295        error: String,
296    },
297}
298
299/// Error types for parameter operations
300#[derive(Debug, thiserror::Error)]
301pub enum ParamError {
302    #[error("Redis error: {0}")]
303    Redis(String),
304
305    #[error("Serialization error: {0}")]
306    Serialization(String),
307
308    #[error("Invalid configuration: {0}")]
309    InvalidConfig(String),
310
311    #[error("Asset not found: {0}")]
312    AssetNotFound(String),
313
314    #[error("Connection error: {0}")]
315    Connection(String),
316}
317
318/// Manager for optimized parameters with caching and updates
319pub struct ParamManager {
320    /// Current params cache (asset -> params)
321    params: Arc<RwLock<HashMap<String, OptimizedParams>>>,
322
323    /// Redis instance ID for key prefix
324    instance_id: String,
325
326    /// Channel for broadcasting param updates
327    update_tx: broadcast::Sender<ParamNotification>,
328}
329
330impl ParamManager {
331    /// Create a new param manager
332    pub fn new(instance_id: impl Into<String>) -> Self {
333        let (update_tx, _) = broadcast::channel(64);
334        Self {
335            params: Arc::new(RwLock::new(HashMap::new())),
336            instance_id: instance_id.into(),
337            update_tx,
338        }
339    }
340
341    /// Get the Redis key prefix
342    pub fn redis_key(&self, suffix: &str) -> String {
343        format!("fks:{}:{}", self.instance_id, suffix)
344    }
345
346    /// Get the param updates channel name
347    pub fn updates_channel(&self) -> String {
348        self.redis_key("param_updates")
349    }
350
351    /// Get the params hash key
352    pub fn params_hash_key(&self) -> String {
353        self.redis_key("optimized_params")
354    }
355
356    /// Subscribe to param update notifications
357    pub fn subscribe(&self) -> broadcast::Receiver<ParamNotification> {
358        self.update_tx.subscribe()
359    }
360
361    /// Get params for an asset (from cache)
362    pub async fn get(&self, asset: &str) -> Option<OptimizedParams> {
363        let params = self.params.read().await;
364        params.get(asset).cloned()
365    }
366
367    /// Get all cached params
368    pub async fn get_all(&self) -> HashMap<String, OptimizedParams> {
369        let params = self.params.read().await;
370        params.clone()
371    }
372
373    /// Update params for an asset
374    pub async fn update(&self, params: OptimizedParams) {
375        let asset = params.asset.clone();
376        let notification = ParamNotification::ParamUpdate {
377            asset: asset.clone(),
378            timestamp: chrono::Utc::now().to_rfc3339(),
379            params: params.clone(),
380        };
381
382        // Update cache
383        {
384            let mut cache = self.params.write().await;
385            cache.insert(asset.clone(), params);
386        }
387
388        // Broadcast notification
389        if let Err(e) = self.update_tx.send(notification) {
390            debug!("No subscribers for param update: {}", e);
391        }
392
393        info!("Updated optimized params for {}", asset);
394    }
395
396    /// Load all params from Redis
397    #[cfg(feature = "redis")]
398    pub async fn load_from_redis(&self, client: &redis::Client) -> Result<usize, ParamError> {
399        use redis::AsyncCommands;
400
401        let mut conn = client
402            .get_multiplexed_async_connection()
403            .await
404            .map_err(|e| ParamError::Redis(e.to_string()))?;
405
406        let key = self.params_hash_key();
407        let all_params: HashMap<String, String> = conn
408            .hgetall(&key)
409            .await
410            .map_err(|e| ParamError::Redis(e.to_string()))?;
411
412        let mut count = 0;
413        let mut cache = self.params.write().await;
414
415        for (asset, json) in all_params {
416            // Skip metadata keys
417            if asset.starts_with('_') {
418                continue;
419            }
420
421            match serde_json::from_str::<OptimizedParams>(&json) {
422                Ok(params) => {
423                    cache.insert(asset, params);
424                    count += 1;
425                }
426                Err(e) => {
427                    warn!("Failed to parse params for {}: {}", asset, e);
428                }
429            }
430        }
431
432        info!("Loaded {} optimized params from Redis", count);
433        Ok(count)
434    }
435
436    /// Process a notification from Redis pub/sub
437    pub async fn process_notification(&self, json: &str) -> Result<(), ParamError> {
438        let notification: ParamNotification =
439            serde_json::from_str(json).map_err(|e| ParamError::Serialization(e.to_string()))?;
440
441        match &notification {
442            ParamNotification::ParamUpdate { asset, params, .. } => {
443                let mut cache = self.params.write().await;
444                cache.insert(asset.clone(), params.clone());
445                info!(
446                    "Applied optimized params for {} (score: {:.2})",
447                    asset, params.optimization_score
448                );
449            }
450            ParamNotification::OptimizationStarted { assets, .. } => {
451                info!("Optimization started for: {:?}", assets);
452            }
453            ParamNotification::OptimizationComplete {
454                successful, failed, ..
455            } => {
456                info!(
457                    "Optimization complete: {} successful, {} failed",
458                    successful, failed
459                );
460            }
461            ParamNotification::OptimizationFailed { asset, error, .. } => {
462                warn!("Optimization failed for {}: {}", asset, error);
463            }
464        }
465
466        // Broadcast to subscribers
467        if let Err(e) = self.update_tx.send(notification) {
468            debug!("No subscribers for notification: {}", e);
469        }
470
471        Ok(())
472    }
473}
474
475/// Listener for Redis pub/sub param updates
476#[cfg(feature = "redis")]
477pub struct ParamUpdateListener {
478    manager: Arc<ParamManager>,
479    pubsub: Option<redis::aio::PubSub>,
480}
481
482#[cfg(feature = "redis")]
483impl ParamUpdateListener {
484    /// Create a new listener
485    pub async fn new(redis_url: &str, instance_id: impl Into<String>) -> Result<Self, ParamError> {
486        let manager = Arc::new(ParamManager::new(instance_id));
487
488        let client =
489            redis::Client::open(redis_url).map_err(|e| ParamError::Connection(e.to_string()))?;
490
491        let mut pubsub = client
492            .get_async_pubsub()
493            .await
494            .map_err(|e| ParamError::Connection(e.to_string()))?;
495
496        let channel = manager.updates_channel();
497        pubsub
498            .subscribe(&channel)
499            .await
500            .map_err(|e| ParamError::Redis(e.to_string()))?;
501
502        info!("Subscribed to param updates on channel: {}", channel);
503
504        Ok(Self {
505            manager,
506            pubsub: Some(pubsub),
507        })
508    }
509
510    /// Get the param manager
511    pub fn manager(&self) -> Arc<ParamManager> {
512        Arc::clone(&self.manager)
513    }
514
515    /// Wait for and process the next update
516    pub async fn next_update(&mut self) -> Option<ParamNotification> {
517        use futures_util::StreamExt;
518
519        let pubsub = self.pubsub.as_mut()?;
520
521        while let Some(msg) = pubsub.on_message().next().await {
522            let payload: String = match msg.get_payload() {
523                Ok(p) => p,
524                Err(e) => {
525                    error!("Failed to get message payload: {}", e);
526                    continue;
527                }
528            };
529
530            match self.manager.process_notification(&payload).await {
531                Ok(()) => {
532                    // Return the notification to the caller
533                    if let Ok(notification) = serde_json::from_str(&payload) {
534                        return Some(notification);
535                    }
536                }
537                Err(e) => {
538                    error!("Failed to process notification: {}", e);
539                }
540            }
541        }
542
543        None
544    }
545
546    /// Run the listener loop (blocking)
547    pub async fn run(&mut self) {
548        info!("Starting param update listener loop");
549        while let Some(notification) = self.next_update().await {
550            debug!("Processed notification: {:?}", notification);
551        }
552        warn!("Param update listener loop ended");
553    }
554}
555
556#[cfg(test)]
557mod tests {
558    use super::*;
559
560    #[test]
561    fn test_default_params() {
562        let params = OptimizedParams::default();
563        assert_eq!(params.ema_fast_period, 9);
564        assert_eq!(params.ema_slow_period, 28);
565        assert!(params.enabled);
566    }
567
568    #[test]
569    fn test_params_validation() {
570        let mut params = OptimizedParams::new("BTC");
571        assert!(params.validate().is_ok());
572
573        // Invalid: fast >= slow
574        params.ema_fast_period = 30;
575        params.ema_slow_period = 20;
576        assert!(params.validate().is_err());
577    }
578
579    #[test]
580    fn test_notification_serialization() {
581        let notification = ParamNotification::ParamUpdate {
582            asset: "BTC".to_string(),
583            timestamp: "2025-01-15T12:00:00Z".to_string(),
584            params: OptimizedParams::new("BTC"),
585        };
586
587        let json = serde_json::to_string(&notification).unwrap();
588        assert!(json.contains("param_update"));
589        assert!(json.contains("BTC"));
590
591        let parsed: ParamNotification = serde_json::from_str(&json).unwrap();
592        match parsed {
593            ParamNotification::ParamUpdate { asset, .. } => {
594                assert_eq!(asset, "BTC");
595            }
596            _ => panic!("Wrong notification type"),
597        }
598    }
599
600    #[tokio::test]
601    async fn test_param_manager() {
602        let manager = ParamManager::new("test");
603
604        // Initially empty
605        assert!(manager.get("BTC").await.is_none());
606
607        // Update
608        let params = OptimizedParams::new("BTC");
609        manager.update(params.clone()).await;
610
611        // Should now exist
612        let retrieved = manager.get("BTC").await.unwrap();
613        assert_eq!(retrieved.asset, "BTC");
614        assert_eq!(retrieved.ema_fast_period, params.ema_fast_period);
615    }
616
617    #[test]
618    fn test_redis_keys() {
619        let manager = ParamManager::new("personal");
620        assert_eq!(manager.params_hash_key(), "fks:personal:optimized_params");
621        assert_eq!(manager.updates_channel(), "fks:personal:param_updates");
622    }
623
624    #[tokio::test]
625    async fn process_notification_applies_param_update_to_cache() {
626        // Simulates a Redis pub/sub payload from the optimizer: a
627        // `ParamUpdate` JSON should land in the cache and be visible
628        // via `get`. This is the path the janus-api live-update
629        // subscriber relies on.
630        let manager = ParamManager::new("test");
631        let mut params = OptimizedParams::new("BTC");
632        params.take_profit_pct = 7.5;
633        let payload = serde_json::to_string(&ParamNotification::ParamUpdate {
634            asset: "BTC".into(),
635            timestamp: "2026-05-25T00:00:00Z".into(),
636            params: params.clone(),
637        })
638        .unwrap();
639        manager.process_notification(&payload).await.unwrap();
640        let cached = manager.get("BTC").await.unwrap();
641        assert!((cached.take_profit_pct - 7.5).abs() < 1e-9);
642    }
643
644    #[tokio::test]
645    async fn process_notification_rejects_malformed_json() {
646        let manager = ParamManager::new("test");
647        let err = manager.process_notification("{ not valid json").await;
648        assert!(matches!(err, Err(ParamError::Serialization(_))));
649    }
650
651    #[test]
652    fn test_trading_enabled() {
653        let mut params = OptimizedParams::new("BTC");
654        assert!(params.is_trading_enabled());
655
656        params.enabled = false;
657        assert!(!params.is_trading_enabled());
658
659        params.enabled = true;
660        params.max_position_size_usd = 0.0;
661        assert!(!params.is_trading_enabled());
662    }
663}