1use serde::{Deserialize, Serialize};
28use std::collections::HashMap;
29use std::sync::Arc;
30use tokio::sync::{RwLock, broadcast};
31use tracing::{debug, error, info, warn};
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
35pub struct OptimizedParams {
36 pub asset: String,
38
39 #[serde(default = "default_ema_fast")]
41 pub ema_fast_period: u32,
42
43 #[serde(default = "default_ema_slow")]
45 pub ema_slow_period: u32,
46
47 #[serde(default = "default_atr_length")]
49 pub atr_length: u32,
50
51 #[serde(default = "default_atr_multiplier")]
53 pub atr_multiplier: f64,
54
55 #[serde(default = "default_min_trailing_stop")]
57 pub min_trailing_stop_pct: f64,
58
59 #[serde(default = "default_min_ema_spread")]
61 pub min_ema_spread_pct: f64,
62
63 #[serde(default = "default_min_profit")]
65 pub min_profit_pct: f64,
66
67 #[serde(default = "default_take_profit")]
69 pub take_profit_pct: f64,
70
71 #[serde(default = "default_stop_loss")]
76 pub stop_loss_pct: f64,
77
78 #[serde(default = "default_cooldown")]
80 pub trade_cooldown_seconds: u64,
81
82 #[serde(default = "default_require_htf")]
84 pub require_htf_alignment: bool,
85
86 #[serde(default = "default_htf_timeframe")]
88 pub htf_timeframe_minutes: u32,
89
90 #[serde(default = "default_max_position")]
92 pub max_position_size_usd: f64,
93
94 #[serde(default = "default_enabled")]
96 pub enabled: bool,
97
98 #[serde(default = "default_min_hold")]
100 pub min_hold_minutes: u32,
101
102 #[serde(default = "default_prefer_trailing")]
104 pub prefer_trailing_stop_exit: bool,
105
106 #[serde(default)]
108 pub optimized_at: String,
109
110 #[serde(default)]
112 pub optimization_score: f64,
113
114 #[serde(default)]
116 pub backtest_result: BacktestResultSummary,
117}
118
119#[derive(Debug, Clone, Default, Serialize, Deserialize)]
121pub struct BacktestResultSummary {
122 pub total_trades: u32,
123 pub winning_trades: u32,
124 pub losing_trades: u32,
125 pub total_pnl_pct: f64,
126 pub max_drawdown_pct: f64,
127 pub win_rate: f64,
128 pub profit_factor: f64,
129 pub sharpe_ratio: f64,
130 pub trades_per_day: f64,
131}
132
133fn default_ema_fast() -> u32 {
135 9
136}
137fn default_ema_slow() -> u32 {
138 28
139}
140fn default_atr_length() -> u32 {
141 14
142}
143pub const DEFAULT_ATR_MULTIPLIER: f64 = 2.0;
147
148fn default_atr_multiplier() -> f64 {
149 DEFAULT_ATR_MULTIPLIER
150}
151fn default_min_trailing_stop() -> f64 {
152 0.5
153}
154fn default_min_ema_spread() -> f64 {
155 0.20
156}
157fn default_min_profit() -> f64 {
158 0.40
159}
160fn default_take_profit() -> f64 {
161 5.0
162}
163fn default_stop_loss() -> f64 {
164 2.0
165}
166fn default_cooldown() -> u64 {
167 1800
168}
169fn default_require_htf() -> bool {
170 true
171}
172fn default_htf_timeframe() -> u32 {
173 15
174}
175fn default_max_position() -> f64 {
176 20.0
177}
178fn default_enabled() -> bool {
179 true
180}
181fn default_min_hold() -> u32 {
182 15
183}
184fn default_prefer_trailing() -> bool {
185 true
186}
187
188impl Default for OptimizedParams {
189 fn default() -> Self {
190 Self {
191 asset: String::new(),
192 ema_fast_period: default_ema_fast(),
193 ema_slow_period: default_ema_slow(),
194 atr_length: default_atr_length(),
195 atr_multiplier: default_atr_multiplier(),
196 min_trailing_stop_pct: default_min_trailing_stop(),
197 min_ema_spread_pct: default_min_ema_spread(),
198 min_profit_pct: default_min_profit(),
199 take_profit_pct: default_take_profit(),
200 stop_loss_pct: default_stop_loss(),
201 trade_cooldown_seconds: default_cooldown(),
202 require_htf_alignment: default_require_htf(),
203 htf_timeframe_minutes: default_htf_timeframe(),
204 max_position_size_usd: default_max_position(),
205 enabled: default_enabled(),
206 min_hold_minutes: default_min_hold(),
207 prefer_trailing_stop_exit: default_prefer_trailing(),
208 optimized_at: String::new(),
209 optimization_score: 0.0,
210 backtest_result: BacktestResultSummary::default(),
211 }
212 }
213}
214
215impl OptimizedParams {
216 pub fn new(asset: impl Into<String>) -> Self {
218 Self {
219 asset: asset.into(),
220 ..Default::default()
221 }
222 }
223
224 pub fn validate(&self) -> Result<(), ParamError> {
226 if self.ema_fast_period >= self.ema_slow_period {
227 return Err(ParamError::InvalidConfig(
228 "EMA fast period must be less than slow period".into(),
229 ));
230 }
231
232 if self.ema_fast_period < 3 || self.ema_slow_period < 5 {
233 return Err(ParamError::InvalidConfig("EMA periods too small".into()));
234 }
235
236 if self.atr_multiplier <= 0.0 {
237 return Err(ParamError::InvalidConfig(
238 "ATR multiplier must be positive".into(),
239 ));
240 }
241
242 if self.min_ema_spread_pct < 0.0 || self.min_ema_spread_pct > 10.0 {
243 return Err(ParamError::InvalidConfig(
244 "Min EMA spread must be between 0 and 10%".into(),
245 ));
246 }
247
248 if self.stop_loss_pct <= 0.0 || self.stop_loss_pct > 100.0 {
249 return Err(ParamError::InvalidConfig(
250 "Stop loss must be a positive percentage (e.g. 2.0 = 2%)".into(),
251 ));
252 }
253
254 if self.take_profit_pct <= 0.0 || self.take_profit_pct > 100.0 {
255 return Err(ParamError::InvalidConfig(
256 "Take profit must be a positive percentage (e.g. 5.0 = 5%)".into(),
257 ));
258 }
259
260 Ok(())
261 }
262
263 pub fn is_trading_enabled(&self) -> bool {
265 self.enabled && self.max_position_size_usd > 0.0
266 }
267}
268
269#[derive(Debug, Clone, Serialize, Deserialize)]
271#[serde(tag = "type", rename_all = "snake_case")]
272pub enum ParamNotification {
273 ParamUpdate {
275 asset: String,
276 timestamp: String,
277 params: OptimizedParams,
278 },
279 OptimizationStarted {
281 timestamp: String,
282 assets: Vec<String>,
283 },
284 OptimizationComplete {
286 timestamp: String,
287 successful: u32,
288 failed: u32,
289 assets: Vec<String>,
290 },
291 OptimizationFailed {
293 timestamp: String,
294 asset: String,
295 error: String,
296 },
297}
298
299#[derive(Debug, thiserror::Error)]
301pub enum ParamError {
302 #[error("Redis error: {0}")]
303 Redis(String),
304
305 #[error("Serialization error: {0}")]
306 Serialization(String),
307
308 #[error("Invalid configuration: {0}")]
309 InvalidConfig(String),
310
311 #[error("Asset not found: {0}")]
312 AssetNotFound(String),
313
314 #[error("Connection error: {0}")]
315 Connection(String),
316}
317
318pub struct ParamManager {
320 params: Arc<RwLock<HashMap<String, OptimizedParams>>>,
322
323 instance_id: String,
325
326 update_tx: broadcast::Sender<ParamNotification>,
328}
329
330impl ParamManager {
331 pub fn new(instance_id: impl Into<String>) -> Self {
333 let (update_tx, _) = broadcast::channel(64);
334 Self {
335 params: Arc::new(RwLock::new(HashMap::new())),
336 instance_id: instance_id.into(),
337 update_tx,
338 }
339 }
340
341 pub fn redis_key(&self, suffix: &str) -> String {
343 format!("fks:{}:{}", self.instance_id, suffix)
344 }
345
346 pub fn updates_channel(&self) -> String {
348 self.redis_key("param_updates")
349 }
350
351 pub fn params_hash_key(&self) -> String {
353 self.redis_key("optimized_params")
354 }
355
356 pub fn subscribe(&self) -> broadcast::Receiver<ParamNotification> {
358 self.update_tx.subscribe()
359 }
360
361 pub async fn get(&self, asset: &str) -> Option<OptimizedParams> {
363 let params = self.params.read().await;
364 params.get(asset).cloned()
365 }
366
367 pub async fn get_all(&self) -> HashMap<String, OptimizedParams> {
369 let params = self.params.read().await;
370 params.clone()
371 }
372
373 pub async fn update(&self, params: OptimizedParams) {
375 let asset = params.asset.clone();
376 let notification = ParamNotification::ParamUpdate {
377 asset: asset.clone(),
378 timestamp: chrono::Utc::now().to_rfc3339(),
379 params: params.clone(),
380 };
381
382 {
384 let mut cache = self.params.write().await;
385 cache.insert(asset.clone(), params);
386 }
387
388 if let Err(e) = self.update_tx.send(notification) {
390 debug!("No subscribers for param update: {}", e);
391 }
392
393 info!("Updated optimized params for {}", asset);
394 }
395
396 #[cfg(feature = "redis")]
398 pub async fn load_from_redis(&self, client: &redis::Client) -> Result<usize, ParamError> {
399 use redis::AsyncCommands;
400
401 let mut conn = client
402 .get_multiplexed_async_connection()
403 .await
404 .map_err(|e| ParamError::Redis(e.to_string()))?;
405
406 let key = self.params_hash_key();
407 let all_params: HashMap<String, String> = conn
408 .hgetall(&key)
409 .await
410 .map_err(|e| ParamError::Redis(e.to_string()))?;
411
412 let mut count = 0;
413 let mut cache = self.params.write().await;
414
415 for (asset, json) in all_params {
416 if asset.starts_with('_') {
418 continue;
419 }
420
421 match serde_json::from_str::<OptimizedParams>(&json) {
422 Ok(params) => {
423 cache.insert(asset, params);
424 count += 1;
425 }
426 Err(e) => {
427 warn!("Failed to parse params for {}: {}", asset, e);
428 }
429 }
430 }
431
432 info!("Loaded {} optimized params from Redis", count);
433 Ok(count)
434 }
435
436 pub async fn process_notification(&self, json: &str) -> Result<(), ParamError> {
438 let notification: ParamNotification =
439 serde_json::from_str(json).map_err(|e| ParamError::Serialization(e.to_string()))?;
440
441 match ¬ification {
442 ParamNotification::ParamUpdate { asset, params, .. } => {
443 let mut cache = self.params.write().await;
444 cache.insert(asset.clone(), params.clone());
445 info!(
446 "Applied optimized params for {} (score: {:.2})",
447 asset, params.optimization_score
448 );
449 }
450 ParamNotification::OptimizationStarted { assets, .. } => {
451 info!("Optimization started for: {:?}", assets);
452 }
453 ParamNotification::OptimizationComplete {
454 successful, failed, ..
455 } => {
456 info!(
457 "Optimization complete: {} successful, {} failed",
458 successful, failed
459 );
460 }
461 ParamNotification::OptimizationFailed { asset, error, .. } => {
462 warn!("Optimization failed for {}: {}", asset, error);
463 }
464 }
465
466 if let Err(e) = self.update_tx.send(notification) {
468 debug!("No subscribers for notification: {}", e);
469 }
470
471 Ok(())
472 }
473}
474
475#[cfg(feature = "redis")]
477pub struct ParamUpdateListener {
478 manager: Arc<ParamManager>,
479 pubsub: Option<redis::aio::PubSub>,
480}
481
482#[cfg(feature = "redis")]
483impl ParamUpdateListener {
484 pub async fn new(redis_url: &str, instance_id: impl Into<String>) -> Result<Self, ParamError> {
486 let manager = Arc::new(ParamManager::new(instance_id));
487
488 let client =
489 redis::Client::open(redis_url).map_err(|e| ParamError::Connection(e.to_string()))?;
490
491 let mut pubsub = client
492 .get_async_pubsub()
493 .await
494 .map_err(|e| ParamError::Connection(e.to_string()))?;
495
496 let channel = manager.updates_channel();
497 pubsub
498 .subscribe(&channel)
499 .await
500 .map_err(|e| ParamError::Redis(e.to_string()))?;
501
502 info!("Subscribed to param updates on channel: {}", channel);
503
504 Ok(Self {
505 manager,
506 pubsub: Some(pubsub),
507 })
508 }
509
510 pub fn manager(&self) -> Arc<ParamManager> {
512 Arc::clone(&self.manager)
513 }
514
515 pub async fn next_update(&mut self) -> Option<ParamNotification> {
517 use futures_util::StreamExt;
518
519 let pubsub = self.pubsub.as_mut()?;
520
521 while let Some(msg) = pubsub.on_message().next().await {
522 let payload: String = match msg.get_payload() {
523 Ok(p) => p,
524 Err(e) => {
525 error!("Failed to get message payload: {}", e);
526 continue;
527 }
528 };
529
530 match self.manager.process_notification(&payload).await {
531 Ok(()) => {
532 if let Ok(notification) = serde_json::from_str(&payload) {
534 return Some(notification);
535 }
536 }
537 Err(e) => {
538 error!("Failed to process notification: {}", e);
539 }
540 }
541 }
542
543 None
544 }
545
546 pub async fn run(&mut self) {
548 info!("Starting param update listener loop");
549 while let Some(notification) = self.next_update().await {
550 debug!("Processed notification: {:?}", notification);
551 }
552 warn!("Param update listener loop ended");
553 }
554}
555
556#[cfg(test)]
557mod tests {
558 use super::*;
559
560 #[test]
561 fn test_default_params() {
562 let params = OptimizedParams::default();
563 assert_eq!(params.ema_fast_period, 9);
564 assert_eq!(params.ema_slow_period, 28);
565 assert!(params.enabled);
566 }
567
568 #[test]
569 fn test_params_validation() {
570 let mut params = OptimizedParams::new("BTC");
571 assert!(params.validate().is_ok());
572
573 params.ema_fast_period = 30;
575 params.ema_slow_period = 20;
576 assert!(params.validate().is_err());
577 }
578
579 #[test]
580 fn test_notification_serialization() {
581 let notification = ParamNotification::ParamUpdate {
582 asset: "BTC".to_string(),
583 timestamp: "2025-01-15T12:00:00Z".to_string(),
584 params: OptimizedParams::new("BTC"),
585 };
586
587 let json = serde_json::to_string(¬ification).unwrap();
588 assert!(json.contains("param_update"));
589 assert!(json.contains("BTC"));
590
591 let parsed: ParamNotification = serde_json::from_str(&json).unwrap();
592 match parsed {
593 ParamNotification::ParamUpdate { asset, .. } => {
594 assert_eq!(asset, "BTC");
595 }
596 _ => panic!("Wrong notification type"),
597 }
598 }
599
600 #[tokio::test]
601 async fn test_param_manager() {
602 let manager = ParamManager::new("test");
603
604 assert!(manager.get("BTC").await.is_none());
606
607 let params = OptimizedParams::new("BTC");
609 manager.update(params.clone()).await;
610
611 let retrieved = manager.get("BTC").await.unwrap();
613 assert_eq!(retrieved.asset, "BTC");
614 assert_eq!(retrieved.ema_fast_period, params.ema_fast_period);
615 }
616
617 #[test]
618 fn test_redis_keys() {
619 let manager = ParamManager::new("personal");
620 assert_eq!(manager.params_hash_key(), "fks:personal:optimized_params");
621 assert_eq!(manager.updates_channel(), "fks:personal:param_updates");
622 }
623
624 #[tokio::test]
625 async fn process_notification_applies_param_update_to_cache() {
626 let manager = ParamManager::new("test");
631 let mut params = OptimizedParams::new("BTC");
632 params.take_profit_pct = 7.5;
633 let payload = serde_json::to_string(&ParamNotification::ParamUpdate {
634 asset: "BTC".into(),
635 timestamp: "2026-05-25T00:00:00Z".into(),
636 params: params.clone(),
637 })
638 .unwrap();
639 manager.process_notification(&payload).await.unwrap();
640 let cached = manager.get("BTC").await.unwrap();
641 assert!((cached.take_profit_pct - 7.5).abs() < 1e-9);
642 }
643
644 #[tokio::test]
645 async fn process_notification_rejects_malformed_json() {
646 let manager = ParamManager::new("test");
647 let err = manager.process_notification("{ not valid json").await;
648 assert!(matches!(err, Err(ParamError::Serialization(_))));
649 }
650
651 #[test]
652 fn test_trading_enabled() {
653 let mut params = OptimizedParams::new("BTC");
654 assert!(params.is_trading_enabled());
655
656 params.enabled = false;
657 assert!(!params.is_trading_enabled());
658
659 params.enabled = true;
660 params.max_position_size_usd = 0.0;
661 assert!(!params.is_trading_enabled());
662 }
663}