jflow_core/state.rs
1//! Shared application state for all JANUS modules
2
3use crate::{Config, MarketDataBus, SignalBus, metrics::metrics};
4use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
5use std::time::Instant;
6use tokio::sync::{RwLock, watch};
7
8// ---------------------------------------------------------------------------
9// LogLevelController — trait-object interface for runtime log level changes
10// ---------------------------------------------------------------------------
11
12/// A type-erased interface for changing the tracing log filter at runtime.
13///
14/// This is stored in [`JanusState`] so the API module can expose a
15/// `POST /api/log-level` endpoint without depending on `tracing_subscriber`
16/// internals. The concrete implementation lives in [`crate::logging`] and
17/// is wired up in `main()` after `init_logging()` returns.
18///
19/// # Thread Safety
20///
21/// Implementations must be `Send + Sync` because `JanusState` is shared
22/// across Tokio tasks.
23pub trait LogLevelController: Send + Sync {
24 /// Change the operational (stdout) log filter at runtime.
25 ///
26 /// `filter_str` uses the same syntax as `RUST_LOG`, e.g.:
27 /// - `"debug"`
28 /// - `"info,janus=trace"`
29 /// - `"warn,janus::supervisor=debug,hyper=info"`
30 ///
31 /// Returns `Ok(())` on success or a human-readable error message.
32 fn set_log_level(&self, filter_str: &str) -> Result<(), String>;
33
34 /// Returns the current filter string, if available.
35 fn current_filter(&self) -> Option<String>;
36}
37
38// ---------------------------------------------------------------------------
39// AffinityRecorder — trait-object interface for feeding closed-trade outcomes
40// back into the strategy-affinity tracker.
41// ---------------------------------------------------------------------------
42
43/// A type-erased interface for recording a realized trade outcome into the
44/// strategy-affinity tracker.
45///
46/// Stored in [`JanusState`] so the API module's position-close handler can
47/// feed outcomes back into affinity learning in real time **without**
48/// `janus-core` (or `janus-api`) depending on `janus-strategies` — the
49/// concrete tracker lives in the forward service's `TradingPipeline`, which
50/// installs an adapter via [`set_affinity_recorder`](JanusState::set_affinity_recorder).
51/// Mirrors the [`LogLevelController`] pattern.
52///
53/// # Thread Safety
54///
55/// Implementations must be `Send + Sync` because `JanusState` is shared
56/// across Tokio tasks. The method is `async` + `&self` so the adapter can
57/// acquire the concrete tracker's own (async) lock internally.
58#[async_trait::async_trait]
59pub trait AffinityRecorder: Send + Sync {
60 /// Record a closed trade for `(strategy, asset)`.
61 ///
62 /// - `pnl`: realized P&L in quote currency (signed).
63 /// - `is_winner`: whether the trade closed in profit.
64 /// - `rr_ratio`: realized risk-reward ratio, when known.
65 async fn record_trade(
66 &self,
67 strategy: &str,
68 asset: &str,
69 pnl: f64,
70 is_winner: bool,
71 rr_ratio: Option<f64>,
72 );
73}
74
75/// Service lifecycle state — controls whether processing modules are active.
76///
77/// On startup JANUS enters `Standby`: the API is live but Forward, Backward,
78/// CNS and Data modules wait for an explicit start command.
79#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
80#[serde(rename_all = "snake_case")]
81pub enum ServiceState {
82 /// Pre-flight passed, API up, processing modules waiting for start command
83 Standby,
84 /// All enabled processing modules are running
85 Running,
86 /// Services were explicitly stopped via API
87 Stopped,
88}
89
90impl std::fmt::Display for ServiceState {
91 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
92 match self {
93 ServiceState::Standby => write!(f, "standby"),
94 ServiceState::Running => write!(f, "running"),
95 ServiceState::Stopped => write!(f, "stopped"),
96 }
97 }
98}
99
100/// Module health status
101#[derive(Debug, Clone, serde::Serialize)]
102pub struct ModuleHealth {
103 pub name: String,
104 pub healthy: bool,
105 #[serde(skip)]
106 pub last_check: std::time::Instant,
107 pub message: Option<String>,
108}
109
110/// Shared application state accessible by all modules
111pub struct JanusState {
112 /// Configuration
113 pub config: Config,
114
115 /// Signal broadcast bus for inter-module communication
116 pub signal_bus: SignalBus,
117
118 /// Market data broadcast bus for live data streaming (Data → Forward)
119 ///
120 /// The Data module publishes normalised [`MarketDataEvent`](crate::MarketDataEvent)s
121 /// here when live market data ingestion is active. The Forward module
122 /// subscribes to consume them for indicator calculation and
123 /// strategy-driven signal generation.
124 pub market_data_bus: MarketDataBus,
125
126 /// Service start time
127 start_time: Instant,
128
129 /// Shutdown flag
130 shutdown_requested: AtomicBool,
131
132 /// Module health statuses
133 module_health: RwLock<Vec<ModuleHealth>>,
134
135 /// Total signals generated counter
136 signals_generated: AtomicU64,
137
138 /// Total signals persisted counter
139 signals_persisted: AtomicU64,
140
141 /// Redis connection (lazy initialized)
142 redis_client: RwLock<Option<redis::Client>>,
143
144 /// Service lifecycle watch channel — sender side.
145 /// Modules subscribe via `wait_for_services_start()`.
146 service_state_tx: watch::Sender<ServiceState>,
147
148 /// Service lifecycle watch channel — receiver template for cloning.
149 service_state_rx: watch::Receiver<ServiceState>,
150
151 /// Optional runtime log-level controller.
152 ///
153 /// Set via [`set_log_level_controller`] after `init_logging()` in `main()`.
154 /// The API module reads this to expose `POST /api/log-level`.
155 log_level_controller: RwLock<Option<Box<dyn LogLevelController>>>,
156
157 /// Latest market-regime label, as a free-form string. Updated by the
158 /// brain-pipeline / regime-detector producers; consumed by the JFLOW-A
159 /// session metrics reporter so the snapshot it pushes to JanusAI
160 /// carries `regime` instead of `None`. Defaults to `None` until a
161 /// producer calls [`set_current_regime`].
162 current_regime: RwLock<Option<String>>,
163
164 /// Latest amygdala threat / fear level in `0.0..=1.0` (higher = more
165 /// fear). Updated by the brain pipeline; consumed by position
166 /// guidance to escalate under stress. `None` until a producer calls
167 /// [`set_current_threat`].
168 current_threat: RwLock<Option<f64>>,
169
170 /// Optional strategy-affinity recorder.
171 ///
172 /// Installed by the forward service via [`set_affinity_recorder`] so the
173 /// API's position-close handler can feed realized outcomes into affinity
174 /// learning in real time. `None` until installed (e.g. API-only
175 /// deployments), in which case outcomes are persisted but not recorded
176 /// live. See [`AffinityRecorder`].
177 affinity_recorder: RwLock<Option<Box<dyn AffinityRecorder>>>,
178}
179
180impl JanusState {
181 /// Create new application state.
182 ///
183 /// Services start in [`ServiceState::Standby`] — the API module comes up
184 /// immediately but processing modules (Forward, Backward, CNS, Data) will
185 /// block on [`wait_for_services_start`] until an explicit start command is
186 /// issued through the API or web interface.
187 pub async fn new(config: Config) -> crate::Result<Self> {
188 let signal_bus = SignalBus::new(1000);
189 let market_data_bus = MarketDataBus::new(5000);
190 let (service_state_tx, service_state_rx) = watch::channel(ServiceState::Standby);
191
192 Ok(Self {
193 config,
194 signal_bus,
195 market_data_bus,
196 start_time: Instant::now(),
197 shutdown_requested: AtomicBool::new(false),
198 module_health: RwLock::new(Vec::new()),
199 signals_generated: AtomicU64::new(0),
200 signals_persisted: AtomicU64::new(0),
201 redis_client: RwLock::new(None),
202 service_state_tx,
203 service_state_rx,
204 log_level_controller: RwLock::new(None),
205 current_regime: RwLock::new(None),
206 current_threat: RwLock::new(None),
207 affinity_recorder: RwLock::new(None),
208 })
209 }
210
211 /// Get the most recently published market-regime label.
212 ///
213 /// Returns the value most recently written via [`set_current_regime`],
214 /// or `None` if no producer has reported a regime yet. Used by the
215 /// JFLOW-A session metrics reporter.
216 pub async fn current_regime(&self) -> Option<String> {
217 self.current_regime.read().await.clone()
218 }
219
220 /// Publish the latest market-regime label.
221 ///
222 /// Intended to be called by the brain pipeline / regime detector as
223 /// new decisions land. Overwrites whatever was there before — the
224 /// field is a snapshot, not a history.
225 pub async fn set_current_regime(&self, regime: impl Into<String>) {
226 let mut guard = self.current_regime.write().await;
227 *guard = Some(regime.into());
228 }
229
230 /// Get the most recently published amygdala threat / fear level.
231 ///
232 /// A scalar in `0.0..=1.0` (higher = more fear), or `None` if no
233 /// producer has reported one yet. Consumed by position guidance to
234 /// escalate under stress. Kept as a plain `f64` so `janus-core` stays
235 /// free of any dependency on the neuromorphic crate that produces it.
236 pub async fn current_threat(&self) -> Option<f64> {
237 *self.current_threat.read().await
238 }
239
240 /// Publish the latest amygdala threat / fear level.
241 ///
242 /// Intended to be called by the brain pipeline as new amygdala
243 /// assessments land. Snapshot semantics, like [`set_current_regime`].
244 pub async fn set_current_threat(&self, fear: f64) {
245 let mut guard = self.current_threat.write().await;
246 *guard = Some(fear);
247 }
248
249 // ── Affinity recording ────────────────────────────────────────────
250
251 /// Install a strategy-affinity recorder.
252 ///
253 /// Called once by the forward service after its `TradingPipeline` is
254 /// constructed, so the API's position-close handler can feed realized
255 /// outcomes into affinity learning. Replaces any previously installed
256 /// recorder.
257 pub async fn set_affinity_recorder(&self, recorder: Box<dyn AffinityRecorder>) {
258 let mut guard = self.affinity_recorder.write().await;
259 *guard = Some(recorder);
260 }
261
262 /// Whether an affinity recorder is installed (for diagnostics / logging).
263 pub async fn has_affinity_recorder(&self) -> bool {
264 self.affinity_recorder.read().await.is_some()
265 }
266
267 /// Record a closed trade into the affinity tracker, if a recorder is
268 /// installed. No-op otherwise. Returns whether the outcome was recorded.
269 pub async fn record_affinity_outcome(
270 &self,
271 strategy: &str,
272 asset: &str,
273 pnl: f64,
274 is_winner: bool,
275 rr_ratio: Option<f64>,
276 ) -> bool {
277 let guard = self.affinity_recorder.read().await;
278 match guard.as_ref() {
279 Some(recorder) => {
280 recorder
281 .record_trade(strategy, asset, pnl, is_winner, rr_ratio)
282 .await;
283 true
284 }
285 None => false,
286 }
287 }
288
289 // ── Log level control ─────────────────────────────────────────────
290
291 /// Install a runtime log-level controller.
292 ///
293 /// Called once from `main()` after `init_logging()` returns.
294 /// The controller is then available to the API via
295 /// [`set_log_level`](Self::set_log_level) and
296 /// [`current_log_filter`](Self::current_log_filter).
297 pub async fn set_log_level_controller(&self, controller: Box<dyn LogLevelController>) {
298 let mut guard = self.log_level_controller.write().await;
299 *guard = Some(controller);
300 tracing::debug!("log-level controller installed in JanusState");
301 }
302
303 /// Change the runtime log filter.
304 ///
305 /// Delegates to the installed [`LogLevelController`]. Returns an error
306 /// if no controller has been installed yet.
307 pub async fn set_log_level(&self, filter_str: &str) -> Result<(), String> {
308 let guard = self.log_level_controller.read().await;
309 match guard.as_ref() {
310 Some(ctrl) => ctrl.set_log_level(filter_str),
311 None => Err("no log-level controller installed".to_string()),
312 }
313 }
314
315 /// Returns the current log filter string, if a controller is installed.
316 pub async fn current_log_filter(&self) -> Option<String> {
317 let guard = self.log_level_controller.read().await;
318 guard.as_ref().and_then(|ctrl| ctrl.current_filter())
319 }
320
321 // ── Uptime & shutdown ─────────────────────────────────────────────
322
323 /// Get uptime in seconds
324 pub fn uptime_seconds(&self) -> u64 {
325 self.start_time.elapsed().as_secs()
326 }
327
328 /// Check if shutdown has been requested
329 pub fn is_shutdown_requested(&self) -> bool {
330 self.shutdown_requested.load(Ordering::SeqCst)
331 }
332
333 /// Request shutdown
334 pub fn request_shutdown(&self) {
335 self.shutdown_requested.store(true, Ordering::SeqCst);
336 }
337
338 /// Perform graceful shutdown
339 pub async fn shutdown(&self) -> crate::Result<()> {
340 tracing::info!("Initiating graceful shutdown...");
341 self.request_shutdown();
342
343 // Also move services to Stopped so any waiting modules unblock
344 let _ = self.service_state_tx.send(ServiceState::Stopped);
345
346 // Close Redis connection if open and update metric
347 let mut redis = self.redis_client.write().await;
348 if redis.is_some() {
349 metrics().redis_connected.set(0.0);
350 }
351 *redis = None;
352
353 tracing::info!("Shutdown complete");
354 Ok(())
355 }
356
357 // ── Service lifecycle management ──────────────────────────────────
358
359 /// Transition processing services to [`ServiceState::Running`].
360 ///
361 /// All modules blocked on [`wait_for_services_start`] will proceed.
362 /// Returns `true` if the state actually changed.
363 pub fn start_services(&self) -> bool {
364 self.service_state_tx.send_if_modified(|current| {
365 if *current == ServiceState::Running {
366 false
367 } else {
368 tracing::info!("Service state: {} → running", current);
369 *current = ServiceState::Running;
370 true
371 }
372 })
373 }
374
375 /// Transition processing services to [`ServiceState::Stopped`].
376 ///
377 /// Modules should check [`are_services_active`] in their hot loops and
378 /// wind down gracefully when it returns `false`.
379 /// Returns `true` if the state actually changed.
380 pub fn stop_services(&self) -> bool {
381 self.service_state_tx.send_if_modified(|current| {
382 if *current == ServiceState::Stopped {
383 false
384 } else {
385 tracing::info!("Service state: {} → stopped", current);
386 *current = ServiceState::Stopped;
387 true
388 }
389 })
390 }
391
392 /// Returns the current [`ServiceState`].
393 pub fn service_state(&self) -> ServiceState {
394 *self.service_state_tx.borrow()
395 }
396
397 /// Returns `true` when processing modules should be actively running.
398 pub fn are_services_active(&self) -> bool {
399 *self.service_state_tx.borrow() == ServiceState::Running
400 }
401
402 /// Block until services are started (state becomes [`ServiceState::Running`])
403 /// **or** a shutdown is requested.
404 ///
405 /// Returns `true` if services were started, `false` if a shutdown was
406 /// requested while still waiting.
407 pub async fn wait_for_services_start(&self) -> bool {
408 let mut rx = self.service_state_rx.clone();
409
410 // Fast path — already running
411 if *rx.borrow_and_update() == ServiceState::Running {
412 return true;
413 }
414
415 loop {
416 tokio::select! {
417 result = rx.changed() => {
418 match result {
419 Ok(()) => {
420 if *rx.borrow() == ServiceState::Running {
421 return true;
422 }
423 // State changed to something other than Running
424 // (e.g. Stopped) — keep waiting unless shutdown
425 if self.is_shutdown_requested() {
426 return false;
427 }
428 }
429 Err(_) => {
430 // Sender dropped — treat as shutdown
431 return false;
432 }
433 }
434 }
435 _ = tokio::time::sleep(tokio::time::Duration::from_millis(250)) => {
436 if self.is_shutdown_requested() {
437 return false;
438 }
439 }
440 }
441 }
442 }
443
444 /// Subscribe to service-state changes.
445 ///
446 /// Useful for modules that need to react to stop commands mid-loop.
447 pub fn subscribe_service_state(&self) -> watch::Receiver<ServiceState> {
448 self.service_state_rx.clone()
449 }
450
451 /// Register a module's health status
452 pub async fn register_module_health(
453 &self,
454 name: impl Into<String>,
455 healthy: bool,
456 message: Option<String>,
457 ) {
458 let mut health = self.module_health.write().await;
459 let name = name.into();
460
461 // Update existing or add new
462 if let Some(existing) = health.iter_mut().find(|h| h.name == name) {
463 existing.healthy = healthy;
464 existing.last_check = Instant::now();
465 existing.message = message;
466 } else {
467 health.push(ModuleHealth {
468 name,
469 healthy,
470 last_check: Instant::now(),
471 message,
472 });
473 }
474 }
475
476 /// Get all module health statuses
477 pub async fn get_module_health(&self) -> Vec<ModuleHealth> {
478 self.module_health.read().await.clone()
479 }
480
481 /// Check if all modules are healthy
482 pub async fn all_modules_healthy(&self) -> bool {
483 let health = self.module_health.read().await;
484 health.iter().all(|h| h.healthy)
485 }
486
487 /// Increment signals generated counter
488 pub fn increment_signals_generated(&self) {
489 self.signals_generated.fetch_add(1, Ordering::SeqCst);
490 }
491
492 /// Get signals generated count
493 pub fn signals_generated(&self) -> u64 {
494 self.signals_generated.load(Ordering::SeqCst)
495 }
496
497 /// Increment signals persisted counter
498 pub fn increment_signals_persisted(&self) {
499 self.signals_persisted.fetch_add(1, Ordering::SeqCst);
500 }
501
502 /// Get signals persisted count
503 pub fn signals_persisted(&self) -> u64 {
504 self.signals_persisted.load(Ordering::SeqCst)
505 }
506
507 /// Get or create the Redis client handle.
508 ///
509 /// This is intentionally cheap — `redis::Client::open` only parses
510 /// the URL; it does **not** open a TCP connection. Callers obtain
511 /// an actual connection via `client.get_multiplexed_async_connection()`.
512 pub async fn redis_client(&self) -> crate::Result<redis::Client> {
513 let mut client = self.redis_client.write().await;
514
515 if client.is_none() {
516 match redis::Client::open(self.config.redis.url.as_str()) {
517 Ok(new_client) => {
518 *client = Some(new_client);
519 }
520 Err(e) => {
521 metrics().redis_connected.set(0.0);
522 return Err(e.into());
523 }
524 }
525 }
526
527 Ok(client.as_ref().unwrap().clone())
528 }
529
530 /// Probe Redis connectivity and update the `janus_redis_connected`
531 /// Prometheus gauge.
532 ///
533 /// Call once at startup (from `main()`) and optionally from periodic
534 /// health-check loops. The method never returns an error — it logs
535 /// warnings and sets the gauge to `0` on failure so the process can
536 /// continue booting even if Redis is temporarily unavailable.
537 pub async fn probe_redis(&self) {
538 let client = match self.redis_client().await {
539 Ok(c) => c,
540 Err(e) => {
541 tracing::warn!("Redis probe: failed to create client — {e}");
542 metrics().redis_connected.set(0.0);
543 return;
544 }
545 };
546
547 match client.get_multiplexed_async_connection().await {
548 Ok(mut conn) => {
549 // Actual PING to confirm the server responds.
550 let pong: Result<String, _> = redis::cmd("PING").query_async(&mut conn).await;
551 match pong {
552 Ok(_) => {
553 metrics().redis_connected.set(1.0);
554 tracing::info!("Redis probe: connected ✓");
555 }
556 Err(e) => {
557 metrics().redis_connected.set(0.0);
558 tracing::warn!("Redis probe: PING failed — {e}");
559 }
560 }
561 }
562 Err(e) => {
563 metrics().redis_connected.set(0.0);
564 tracing::warn!("Redis probe: connection failed — {e}");
565 }
566 }
567 }
568
569 /// Get comprehensive health status
570 pub async fn health_status(&self) -> HealthStatus {
571 let module_health = self.get_module_health().await;
572 let all_healthy = module_health.iter().all(|h| h.healthy);
573
574 HealthStatus {
575 status: if all_healthy { "healthy" } else { "degraded" }.to_string(),
576 uptime_seconds: self.uptime_seconds(),
577 signals_generated: self.signals_generated(),
578 signals_persisted: self.signals_persisted(),
579 modules: module_health
580 .iter()
581 .map(|h| ModuleHealthSummary {
582 name: h.name.clone(),
583 healthy: h.healthy,
584 message: h.message.clone(),
585 })
586 .collect(),
587 shutdown_requested: self.is_shutdown_requested(),
588 service_state: self.service_state(),
589 }
590 }
591}
592
593/// Health status response
594#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
595pub struct HealthStatus {
596 pub status: String,
597 pub uptime_seconds: u64,
598 pub signals_generated: u64,
599 pub signals_persisted: u64,
600 pub modules: Vec<ModuleHealthSummary>,
601 pub shutdown_requested: bool,
602 pub service_state: ServiceState,
603}
604
605/// Module health summary for API responses
606#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
607pub struct ModuleHealthSummary {
608 pub name: String,
609 pub healthy: bool,
610 pub message: Option<String>,
611}
612
613#[cfg(test)]
614mod tests {
615 use super::*;
616
617 #[tokio::test]
618 async fn test_state_creation() {
619 let config = Config::default();
620 let state = JanusState::new(config).await.unwrap();
621
622 assert!(!state.is_shutdown_requested());
623 assert_eq!(state.signals_generated(), 0);
624 assert_eq!(state.service_state(), ServiceState::Standby);
625 assert!(!state.are_services_active());
626 }
627
628 #[tokio::test]
629 async fn test_service_lifecycle() {
630 let config = Config::default();
631 let state = JanusState::new(config).await.unwrap();
632
633 // Starts in standby
634 assert_eq!(state.service_state(), ServiceState::Standby);
635 assert!(!state.are_services_active());
636
637 // Start services
638 assert!(state.start_services());
639 assert_eq!(state.service_state(), ServiceState::Running);
640 assert!(state.are_services_active());
641
642 // Idempotent
643 assert!(!state.start_services());
644
645 // Stop services
646 assert!(state.stop_services());
647 assert_eq!(state.service_state(), ServiceState::Stopped);
648 assert!(!state.are_services_active());
649
650 // Idempotent
651 assert!(!state.stop_services());
652 }
653
654 #[tokio::test]
655 async fn test_wait_for_services_start() {
656 let config = Config::default();
657 let state = std::sync::Arc::new(JanusState::new(config).await.unwrap());
658
659 let state2 = state.clone();
660 let handle = tokio::spawn(async move { state2.wait_for_services_start().await });
661
662 // Give the waiter a moment to park
663 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
664
665 // Start services — waiter should unblock
666 state.start_services();
667 let result = tokio::time::timeout(tokio::time::Duration::from_secs(2), handle)
668 .await
669 .expect("timed out")
670 .expect("task panicked");
671
672 assert!(result);
673 }
674
675 #[tokio::test]
676 async fn test_module_health_registration() {
677 let config = Config::default();
678 let state = JanusState::new(config).await.unwrap();
679
680 state.register_module_health("forward", true, None).await;
681 state
682 .register_module_health("backward", true, Some("running".to_string()))
683 .await;
684
685 let health = state.get_module_health().await;
686 assert_eq!(health.len(), 2);
687 assert!(state.all_modules_healthy().await);
688 }
689
690 #[tokio::test]
691 async fn test_signal_counters() {
692 let config = Config::default();
693 let state = JanusState::new(config).await.unwrap();
694
695 state.increment_signals_generated();
696 state.increment_signals_generated();
697 state.increment_signals_persisted();
698
699 assert_eq!(state.signals_generated(), 2);
700 assert_eq!(state.signals_persisted(), 1);
701 }
702
703 /// One captured `record_trade` call: (strategy, asset, pnl, is_winner, rr_ratio).
704 type RecordedCall = (String, String, f64, bool, Option<f64>);
705
706 /// Records each call so we can assert the recorder is reached.
707 struct CountingRecorder {
708 calls: std::sync::Arc<std::sync::Mutex<Vec<RecordedCall>>>,
709 }
710
711 #[async_trait::async_trait]
712 impl AffinityRecorder for CountingRecorder {
713 async fn record_trade(
714 &self,
715 strategy: &str,
716 asset: &str,
717 pnl: f64,
718 is_winner: bool,
719 rr_ratio: Option<f64>,
720 ) {
721 self.calls.lock().unwrap().push((
722 strategy.to_string(),
723 asset.to_string(),
724 pnl,
725 is_winner,
726 rr_ratio,
727 ));
728 }
729 }
730
731 #[tokio::test]
732 async fn record_affinity_outcome_no_op_without_recorder() {
733 let state = JanusState::new(Config::default()).await.unwrap();
734 assert!(!state.has_affinity_recorder().await);
735 // No recorder installed → returns false, doesn't panic.
736 let recorded = state
737 .record_affinity_outcome("ema_cross", "BTC", 100.0, true, Some(2.0))
738 .await;
739 assert!(!recorded);
740 }
741
742 #[tokio::test]
743 async fn record_affinity_outcome_reaches_installed_recorder() {
744 let state = JanusState::new(Config::default()).await.unwrap();
745 let calls = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
746 state
747 .set_affinity_recorder(Box::new(CountingRecorder {
748 calls: calls.clone(),
749 }))
750 .await;
751 assert!(state.has_affinity_recorder().await);
752
753 let recorded = state
754 .record_affinity_outcome("ema_cross", "BTC", -25.0, false, None)
755 .await;
756 assert!(recorded);
757
758 let calls = calls.lock().unwrap();
759 assert_eq!(calls.len(), 1);
760 assert_eq!(
761 calls[0],
762 ("ema_cross".to_string(), "BTC".to_string(), -25.0, false, None)
763 );
764 }
765}