Skip to main content

jflow_core/
state.rs

1//! Shared application state for all JANUS modules
2
3use crate::{Config, MarketDataBus, SignalBus, metrics::metrics};
4use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
5use std::time::Instant;
6use tokio::sync::{RwLock, watch};
7
8// ---------------------------------------------------------------------------
9// LogLevelController — trait-object interface for runtime log level changes
10// ---------------------------------------------------------------------------
11
12/// A type-erased interface for changing the tracing log filter at runtime.
13///
14/// This is stored in [`JanusState`] so the API module can expose a
15/// `POST /api/log-level` endpoint without depending on `tracing_subscriber`
16/// internals.  The concrete implementation lives in [`crate::logging`] and
17/// is wired up in `main()` after `init_logging()` returns.
18///
19/// # Thread Safety
20///
21/// Implementations must be `Send + Sync` because `JanusState` is shared
22/// across Tokio tasks.
23pub trait LogLevelController: Send + Sync {
24    /// Change the operational (stdout) log filter at runtime.
25    ///
26    /// `filter_str` uses the same syntax as `RUST_LOG`, e.g.:
27    /// - `"debug"`
28    /// - `"info,janus=trace"`
29    /// - `"warn,janus::supervisor=debug,hyper=info"`
30    ///
31    /// Returns `Ok(())` on success or a human-readable error message.
32    fn set_log_level(&self, filter_str: &str) -> Result<(), String>;
33
34    /// Returns the current filter string, if available.
35    fn current_filter(&self) -> Option<String>;
36}
37
38// ---------------------------------------------------------------------------
39// AffinityRecorder — trait-object interface for feeding closed-trade outcomes
40// back into the strategy-affinity tracker.
41// ---------------------------------------------------------------------------
42
43/// A type-erased interface for recording a realized trade outcome into the
44/// strategy-affinity tracker.
45///
46/// Stored in [`JanusState`] so the API module's position-close handler can
47/// feed outcomes back into affinity learning in real time **without**
48/// `janus-core` (or `janus-api`) depending on `janus-strategies` — the
49/// concrete tracker lives in the forward service's `TradingPipeline`, which
50/// installs an adapter via [`set_affinity_recorder`](JanusState::set_affinity_recorder).
51/// Mirrors the [`LogLevelController`] pattern.
52///
53/// # Thread Safety
54///
55/// Implementations must be `Send + Sync` because `JanusState` is shared
56/// across Tokio tasks. The method is `async` + `&self` so the adapter can
57/// acquire the concrete tracker's own (async) lock internally.
58#[async_trait::async_trait]
59pub trait AffinityRecorder: Send + Sync {
60    /// Record a closed trade for `(strategy, asset)`.
61    ///
62    /// - `pnl`: realized P&L in quote currency (signed).
63    /// - `is_winner`: whether the trade closed in profit.
64    /// - `rr_ratio`: realized risk-reward ratio, when known.
65    async fn record_trade(
66        &self,
67        strategy: &str,
68        asset: &str,
69        pnl: f64,
70        is_winner: bool,
71        rr_ratio: Option<f64>,
72    );
73}
74
75/// Service lifecycle state — controls whether processing modules are active.
76///
77/// On startup JANUS enters `Standby`: the API is live but Forward, Backward,
78/// CNS and Data modules wait for an explicit start command.
79#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
80#[serde(rename_all = "snake_case")]
81pub enum ServiceState {
82    /// Pre-flight passed, API up, processing modules waiting for start command
83    Standby,
84    /// All enabled processing modules are running
85    Running,
86    /// Services were explicitly stopped via API
87    Stopped,
88}
89
90impl std::fmt::Display for ServiceState {
91    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
92        match self {
93            ServiceState::Standby => write!(f, "standby"),
94            ServiceState::Running => write!(f, "running"),
95            ServiceState::Stopped => write!(f, "stopped"),
96        }
97    }
98}
99
100/// Module health status
101#[derive(Debug, Clone, serde::Serialize)]
102pub struct ModuleHealth {
103    pub name: String,
104    pub healthy: bool,
105    #[serde(skip)]
106    pub last_check: std::time::Instant,
107    pub message: Option<String>,
108}
109
110/// Shared application state accessible by all modules
111pub struct JanusState {
112    /// Configuration
113    pub config: Config,
114
115    /// Signal broadcast bus for inter-module communication
116    pub signal_bus: SignalBus,
117
118    /// Market data broadcast bus for live data streaming (Data → Forward)
119    ///
120    /// The Data module publishes normalised [`MarketDataEvent`](crate::MarketDataEvent)s
121    /// here when live market data ingestion is active.  The Forward module
122    /// subscribes to consume them for indicator calculation and
123    /// strategy-driven signal generation.
124    pub market_data_bus: MarketDataBus,
125
126    /// Service start time
127    start_time: Instant,
128
129    /// Shutdown flag
130    shutdown_requested: AtomicBool,
131
132    /// Module health statuses
133    module_health: RwLock<Vec<ModuleHealth>>,
134
135    /// Total signals generated counter
136    signals_generated: AtomicU64,
137
138    /// Total signals persisted counter
139    signals_persisted: AtomicU64,
140
141    /// Redis connection (lazy initialized)
142    redis_client: RwLock<Option<redis::Client>>,
143
144    /// Service lifecycle watch channel — sender side.
145    /// Modules subscribe via `wait_for_services_start()`.
146    service_state_tx: watch::Sender<ServiceState>,
147
148    /// Service lifecycle watch channel — receiver template for cloning.
149    service_state_rx: watch::Receiver<ServiceState>,
150
151    /// Optional runtime log-level controller.
152    ///
153    /// Set via [`set_log_level_controller`] after `init_logging()` in `main()`.
154    /// The API module reads this to expose `POST /api/log-level`.
155    log_level_controller: RwLock<Option<Box<dyn LogLevelController>>>,
156
157    /// Latest market-regime label, as a free-form string. Updated by the
158    /// brain-pipeline / regime-detector producers; consumed by the JFLOW-A
159    /// session metrics reporter so the snapshot it pushes to JanusAI
160    /// carries `regime` instead of `None`. Defaults to `None` until a
161    /// producer calls [`set_current_regime`].
162    current_regime: RwLock<Option<String>>,
163
164    /// Latest amygdala threat / fear level in `0.0..=1.0` (higher = more
165    /// fear). Updated by the brain pipeline; consumed by position
166    /// guidance to escalate under stress. `None` until a producer calls
167    /// [`set_current_threat`].
168    current_threat: RwLock<Option<f64>>,
169
170    /// Optional strategy-affinity recorder.
171    ///
172    /// Installed by the forward service via [`set_affinity_recorder`] so the
173    /// API's position-close handler can feed realized outcomes into affinity
174    /// learning in real time. `None` until installed (e.g. API-only
175    /// deployments), in which case outcomes are persisted but not recorded
176    /// live. See [`AffinityRecorder`].
177    affinity_recorder: RwLock<Option<Box<dyn AffinityRecorder>>>,
178}
179
180impl JanusState {
181    /// Create new application state.
182    ///
183    /// Services start in [`ServiceState::Standby`] — the API module comes up
184    /// immediately but processing modules (Forward, Backward, CNS, Data) will
185    /// block on [`wait_for_services_start`] until an explicit start command is
186    /// issued through the API or web interface.
187    pub async fn new(config: Config) -> crate::Result<Self> {
188        let signal_bus = SignalBus::new(1000);
189        let market_data_bus = MarketDataBus::new(5000);
190        let (service_state_tx, service_state_rx) = watch::channel(ServiceState::Standby);
191
192        Ok(Self {
193            config,
194            signal_bus,
195            market_data_bus,
196            start_time: Instant::now(),
197            shutdown_requested: AtomicBool::new(false),
198            module_health: RwLock::new(Vec::new()),
199            signals_generated: AtomicU64::new(0),
200            signals_persisted: AtomicU64::new(0),
201            redis_client: RwLock::new(None),
202            service_state_tx,
203            service_state_rx,
204            log_level_controller: RwLock::new(None),
205            current_regime: RwLock::new(None),
206            current_threat: RwLock::new(None),
207            affinity_recorder: RwLock::new(None),
208        })
209    }
210
211    /// Get the most recently published market-regime label.
212    ///
213    /// Returns the value most recently written via [`set_current_regime`],
214    /// or `None` if no producer has reported a regime yet. Used by the
215    /// JFLOW-A session metrics reporter.
216    pub async fn current_regime(&self) -> Option<String> {
217        self.current_regime.read().await.clone()
218    }
219
220    /// Publish the latest market-regime label.
221    ///
222    /// Intended to be called by the brain pipeline / regime detector as
223    /// new decisions land. Overwrites whatever was there before — the
224    /// field is a snapshot, not a history.
225    pub async fn set_current_regime(&self, regime: impl Into<String>) {
226        let mut guard = self.current_regime.write().await;
227        *guard = Some(regime.into());
228    }
229
230    /// Get the most recently published amygdala threat / fear level.
231    ///
232    /// A scalar in `0.0..=1.0` (higher = more fear), or `None` if no
233    /// producer has reported one yet. Consumed by position guidance to
234    /// escalate under stress. Kept as a plain `f64` so `janus-core` stays
235    /// free of any dependency on the neuromorphic crate that produces it.
236    pub async fn current_threat(&self) -> Option<f64> {
237        *self.current_threat.read().await
238    }
239
240    /// Publish the latest amygdala threat / fear level.
241    ///
242    /// Intended to be called by the brain pipeline as new amygdala
243    /// assessments land. Snapshot semantics, like [`set_current_regime`].
244    pub async fn set_current_threat(&self, fear: f64) {
245        let mut guard = self.current_threat.write().await;
246        *guard = Some(fear);
247    }
248
249    // ── Affinity recording ────────────────────────────────────────────
250
251    /// Install a strategy-affinity recorder.
252    ///
253    /// Called once by the forward service after its `TradingPipeline` is
254    /// constructed, so the API's position-close handler can feed realized
255    /// outcomes into affinity learning. Replaces any previously installed
256    /// recorder.
257    pub async fn set_affinity_recorder(&self, recorder: Box<dyn AffinityRecorder>) {
258        let mut guard = self.affinity_recorder.write().await;
259        *guard = Some(recorder);
260    }
261
262    /// Whether an affinity recorder is installed (for diagnostics / logging).
263    pub async fn has_affinity_recorder(&self) -> bool {
264        self.affinity_recorder.read().await.is_some()
265    }
266
267    /// Record a closed trade into the affinity tracker, if a recorder is
268    /// installed. No-op otherwise. Returns whether the outcome was recorded.
269    pub async fn record_affinity_outcome(
270        &self,
271        strategy: &str,
272        asset: &str,
273        pnl: f64,
274        is_winner: bool,
275        rr_ratio: Option<f64>,
276    ) -> bool {
277        let guard = self.affinity_recorder.read().await;
278        match guard.as_ref() {
279            Some(recorder) => {
280                recorder
281                    .record_trade(strategy, asset, pnl, is_winner, rr_ratio)
282                    .await;
283                true
284            }
285            None => false,
286        }
287    }
288
289    // ── Log level control ─────────────────────────────────────────────
290
291    /// Install a runtime log-level controller.
292    ///
293    /// Called once from `main()` after `init_logging()` returns.
294    /// The controller is then available to the API via
295    /// [`set_log_level`](Self::set_log_level) and
296    /// [`current_log_filter`](Self::current_log_filter).
297    pub async fn set_log_level_controller(&self, controller: Box<dyn LogLevelController>) {
298        let mut guard = self.log_level_controller.write().await;
299        *guard = Some(controller);
300        tracing::debug!("log-level controller installed in JanusState");
301    }
302
303    /// Change the runtime log filter.
304    ///
305    /// Delegates to the installed [`LogLevelController`].  Returns an error
306    /// if no controller has been installed yet.
307    pub async fn set_log_level(&self, filter_str: &str) -> Result<(), String> {
308        let guard = self.log_level_controller.read().await;
309        match guard.as_ref() {
310            Some(ctrl) => ctrl.set_log_level(filter_str),
311            None => Err("no log-level controller installed".to_string()),
312        }
313    }
314
315    /// Returns the current log filter string, if a controller is installed.
316    pub async fn current_log_filter(&self) -> Option<String> {
317        let guard = self.log_level_controller.read().await;
318        guard.as_ref().and_then(|ctrl| ctrl.current_filter())
319    }
320
321    // ── Uptime & shutdown ─────────────────────────────────────────────
322
323    /// Get uptime in seconds
324    pub fn uptime_seconds(&self) -> u64 {
325        self.start_time.elapsed().as_secs()
326    }
327
328    /// Check if shutdown has been requested
329    pub fn is_shutdown_requested(&self) -> bool {
330        self.shutdown_requested.load(Ordering::SeqCst)
331    }
332
333    /// Request shutdown
334    pub fn request_shutdown(&self) {
335        self.shutdown_requested.store(true, Ordering::SeqCst);
336    }
337
338    /// Perform graceful shutdown
339    pub async fn shutdown(&self) -> crate::Result<()> {
340        tracing::info!("Initiating graceful shutdown...");
341        self.request_shutdown();
342
343        // Also move services to Stopped so any waiting modules unblock
344        let _ = self.service_state_tx.send(ServiceState::Stopped);
345
346        // Close Redis connection if open and update metric
347        let mut redis = self.redis_client.write().await;
348        if redis.is_some() {
349            metrics().redis_connected.set(0.0);
350        }
351        *redis = None;
352
353        tracing::info!("Shutdown complete");
354        Ok(())
355    }
356
357    // ── Service lifecycle management ──────────────────────────────────
358
359    /// Transition processing services to [`ServiceState::Running`].
360    ///
361    /// All modules blocked on [`wait_for_services_start`] will proceed.
362    /// Returns `true` if the state actually changed.
363    pub fn start_services(&self) -> bool {
364        self.service_state_tx.send_if_modified(|current| {
365            if *current == ServiceState::Running {
366                false
367            } else {
368                tracing::info!("Service state: {} → running", current);
369                *current = ServiceState::Running;
370                true
371            }
372        })
373    }
374
375    /// Transition processing services to [`ServiceState::Stopped`].
376    ///
377    /// Modules should check [`are_services_active`] in their hot loops and
378    /// wind down gracefully when it returns `false`.
379    /// Returns `true` if the state actually changed.
380    pub fn stop_services(&self) -> bool {
381        self.service_state_tx.send_if_modified(|current| {
382            if *current == ServiceState::Stopped {
383                false
384            } else {
385                tracing::info!("Service state: {} → stopped", current);
386                *current = ServiceState::Stopped;
387                true
388            }
389        })
390    }
391
392    /// Returns the current [`ServiceState`].
393    pub fn service_state(&self) -> ServiceState {
394        *self.service_state_tx.borrow()
395    }
396
397    /// Returns `true` when processing modules should be actively running.
398    pub fn are_services_active(&self) -> bool {
399        *self.service_state_tx.borrow() == ServiceState::Running
400    }
401
402    /// Block until services are started (state becomes [`ServiceState::Running`])
403    /// **or** a shutdown is requested.
404    ///
405    /// Returns `true` if services were started, `false` if a shutdown was
406    /// requested while still waiting.
407    pub async fn wait_for_services_start(&self) -> bool {
408        let mut rx = self.service_state_rx.clone();
409
410        // Fast path — already running
411        if *rx.borrow_and_update() == ServiceState::Running {
412            return true;
413        }
414
415        loop {
416            tokio::select! {
417                result = rx.changed() => {
418                    match result {
419                        Ok(()) => {
420                            if *rx.borrow() == ServiceState::Running {
421                                return true;
422                            }
423                            // State changed to something other than Running
424                            // (e.g. Stopped) — keep waiting unless shutdown
425                            if self.is_shutdown_requested() {
426                                return false;
427                            }
428                        }
429                        Err(_) => {
430                            // Sender dropped — treat as shutdown
431                            return false;
432                        }
433                    }
434                }
435                _ = tokio::time::sleep(tokio::time::Duration::from_millis(250)) => {
436                    if self.is_shutdown_requested() {
437                        return false;
438                    }
439                }
440            }
441        }
442    }
443
444    /// Subscribe to service-state changes.
445    ///
446    /// Useful for modules that need to react to stop commands mid-loop.
447    pub fn subscribe_service_state(&self) -> watch::Receiver<ServiceState> {
448        self.service_state_rx.clone()
449    }
450
451    /// Register a module's health status
452    pub async fn register_module_health(
453        &self,
454        name: impl Into<String>,
455        healthy: bool,
456        message: Option<String>,
457    ) {
458        let mut health = self.module_health.write().await;
459        let name = name.into();
460
461        // Update existing or add new
462        if let Some(existing) = health.iter_mut().find(|h| h.name == name) {
463            existing.healthy = healthy;
464            existing.last_check = Instant::now();
465            existing.message = message;
466        } else {
467            health.push(ModuleHealth {
468                name,
469                healthy,
470                last_check: Instant::now(),
471                message,
472            });
473        }
474    }
475
476    /// Get all module health statuses
477    pub async fn get_module_health(&self) -> Vec<ModuleHealth> {
478        self.module_health.read().await.clone()
479    }
480
481    /// Check if all modules are healthy
482    pub async fn all_modules_healthy(&self) -> bool {
483        let health = self.module_health.read().await;
484        health.iter().all(|h| h.healthy)
485    }
486
487    /// Increment signals generated counter
488    pub fn increment_signals_generated(&self) {
489        self.signals_generated.fetch_add(1, Ordering::SeqCst);
490    }
491
492    /// Get signals generated count
493    pub fn signals_generated(&self) -> u64 {
494        self.signals_generated.load(Ordering::SeqCst)
495    }
496
497    /// Increment signals persisted counter
498    pub fn increment_signals_persisted(&self) {
499        self.signals_persisted.fetch_add(1, Ordering::SeqCst);
500    }
501
502    /// Get signals persisted count
503    pub fn signals_persisted(&self) -> u64 {
504        self.signals_persisted.load(Ordering::SeqCst)
505    }
506
507    /// Get or create the Redis client handle.
508    ///
509    /// This is intentionally cheap — `redis::Client::open` only parses
510    /// the URL; it does **not** open a TCP connection.  Callers obtain
511    /// an actual connection via `client.get_multiplexed_async_connection()`.
512    pub async fn redis_client(&self) -> crate::Result<redis::Client> {
513        let mut client = self.redis_client.write().await;
514
515        if client.is_none() {
516            match redis::Client::open(self.config.redis.url.as_str()) {
517                Ok(new_client) => {
518                    *client = Some(new_client);
519                }
520                Err(e) => {
521                    metrics().redis_connected.set(0.0);
522                    return Err(e.into());
523                }
524            }
525        }
526
527        Ok(client.as_ref().unwrap().clone())
528    }
529
530    /// Probe Redis connectivity and update the `janus_redis_connected`
531    /// Prometheus gauge.
532    ///
533    /// Call once at startup (from `main()`) and optionally from periodic
534    /// health-check loops.  The method never returns an error — it logs
535    /// warnings and sets the gauge to `0` on failure so the process can
536    /// continue booting even if Redis is temporarily unavailable.
537    pub async fn probe_redis(&self) {
538        let client = match self.redis_client().await {
539            Ok(c) => c,
540            Err(e) => {
541                tracing::warn!("Redis probe: failed to create client — {e}");
542                metrics().redis_connected.set(0.0);
543                return;
544            }
545        };
546
547        match client.get_multiplexed_async_connection().await {
548            Ok(mut conn) => {
549                // Actual PING to confirm the server responds.
550                let pong: Result<String, _> = redis::cmd("PING").query_async(&mut conn).await;
551                match pong {
552                    Ok(_) => {
553                        metrics().redis_connected.set(1.0);
554                        tracing::info!("Redis probe: connected ✓");
555                    }
556                    Err(e) => {
557                        metrics().redis_connected.set(0.0);
558                        tracing::warn!("Redis probe: PING failed — {e}");
559                    }
560                }
561            }
562            Err(e) => {
563                metrics().redis_connected.set(0.0);
564                tracing::warn!("Redis probe: connection failed — {e}");
565            }
566        }
567    }
568
569    /// Get comprehensive health status
570    pub async fn health_status(&self) -> HealthStatus {
571        let module_health = self.get_module_health().await;
572        let all_healthy = module_health.iter().all(|h| h.healthy);
573
574        HealthStatus {
575            status: if all_healthy { "healthy" } else { "degraded" }.to_string(),
576            uptime_seconds: self.uptime_seconds(),
577            signals_generated: self.signals_generated(),
578            signals_persisted: self.signals_persisted(),
579            modules: module_health
580                .iter()
581                .map(|h| ModuleHealthSummary {
582                    name: h.name.clone(),
583                    healthy: h.healthy,
584                    message: h.message.clone(),
585                })
586                .collect(),
587            shutdown_requested: self.is_shutdown_requested(),
588            service_state: self.service_state(),
589        }
590    }
591}
592
593/// Health status response
594#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
595pub struct HealthStatus {
596    pub status: String,
597    pub uptime_seconds: u64,
598    pub signals_generated: u64,
599    pub signals_persisted: u64,
600    pub modules: Vec<ModuleHealthSummary>,
601    pub shutdown_requested: bool,
602    pub service_state: ServiceState,
603}
604
605/// Module health summary for API responses
606#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
607pub struct ModuleHealthSummary {
608    pub name: String,
609    pub healthy: bool,
610    pub message: Option<String>,
611}
612
613#[cfg(test)]
614mod tests {
615    use super::*;
616
617    #[tokio::test]
618    async fn test_state_creation() {
619        let config = Config::default();
620        let state = JanusState::new(config).await.unwrap();
621
622        assert!(!state.is_shutdown_requested());
623        assert_eq!(state.signals_generated(), 0);
624        assert_eq!(state.service_state(), ServiceState::Standby);
625        assert!(!state.are_services_active());
626    }
627
628    #[tokio::test]
629    async fn test_service_lifecycle() {
630        let config = Config::default();
631        let state = JanusState::new(config).await.unwrap();
632
633        // Starts in standby
634        assert_eq!(state.service_state(), ServiceState::Standby);
635        assert!(!state.are_services_active());
636
637        // Start services
638        assert!(state.start_services());
639        assert_eq!(state.service_state(), ServiceState::Running);
640        assert!(state.are_services_active());
641
642        // Idempotent
643        assert!(!state.start_services());
644
645        // Stop services
646        assert!(state.stop_services());
647        assert_eq!(state.service_state(), ServiceState::Stopped);
648        assert!(!state.are_services_active());
649
650        // Idempotent
651        assert!(!state.stop_services());
652    }
653
654    #[tokio::test]
655    async fn test_wait_for_services_start() {
656        let config = Config::default();
657        let state = std::sync::Arc::new(JanusState::new(config).await.unwrap());
658
659        let state2 = state.clone();
660        let handle = tokio::spawn(async move { state2.wait_for_services_start().await });
661
662        // Give the waiter a moment to park
663        tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
664
665        // Start services — waiter should unblock
666        state.start_services();
667        let result = tokio::time::timeout(tokio::time::Duration::from_secs(2), handle)
668            .await
669            .expect("timed out")
670            .expect("task panicked");
671
672        assert!(result);
673    }
674
675    #[tokio::test]
676    async fn test_module_health_registration() {
677        let config = Config::default();
678        let state = JanusState::new(config).await.unwrap();
679
680        state.register_module_health("forward", true, None).await;
681        state
682            .register_module_health("backward", true, Some("running".to_string()))
683            .await;
684
685        let health = state.get_module_health().await;
686        assert_eq!(health.len(), 2);
687        assert!(state.all_modules_healthy().await);
688    }
689
690    #[tokio::test]
691    async fn test_signal_counters() {
692        let config = Config::default();
693        let state = JanusState::new(config).await.unwrap();
694
695        state.increment_signals_generated();
696        state.increment_signals_generated();
697        state.increment_signals_persisted();
698
699        assert_eq!(state.signals_generated(), 2);
700        assert_eq!(state.signals_persisted(), 1);
701    }
702
703    /// One captured `record_trade` call: (strategy, asset, pnl, is_winner, rr_ratio).
704    type RecordedCall = (String, String, f64, bool, Option<f64>);
705
706    /// Records each call so we can assert the recorder is reached.
707    struct CountingRecorder {
708        calls: std::sync::Arc<std::sync::Mutex<Vec<RecordedCall>>>,
709    }
710
711    #[async_trait::async_trait]
712    impl AffinityRecorder for CountingRecorder {
713        async fn record_trade(
714            &self,
715            strategy: &str,
716            asset: &str,
717            pnl: f64,
718            is_winner: bool,
719            rr_ratio: Option<f64>,
720        ) {
721            self.calls.lock().unwrap().push((
722                strategy.to_string(),
723                asset.to_string(),
724                pnl,
725                is_winner,
726                rr_ratio,
727            ));
728        }
729    }
730
731    #[tokio::test]
732    async fn record_affinity_outcome_no_op_without_recorder() {
733        let state = JanusState::new(Config::default()).await.unwrap();
734        assert!(!state.has_affinity_recorder().await);
735        // No recorder installed → returns false, doesn't panic.
736        let recorded = state
737            .record_affinity_outcome("ema_cross", "BTC", 100.0, true, Some(2.0))
738            .await;
739        assert!(!recorded);
740    }
741
742    #[tokio::test]
743    async fn record_affinity_outcome_reaches_installed_recorder() {
744        let state = JanusState::new(Config::default()).await.unwrap();
745        let calls = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
746        state
747            .set_affinity_recorder(Box::new(CountingRecorder {
748                calls: calls.clone(),
749            }))
750            .await;
751        assert!(state.has_affinity_recorder().await);
752
753        let recorded = state
754            .record_affinity_outcome("ema_cross", "BTC", -25.0, false, None)
755            .await;
756        assert!(recorded);
757
758        let calls = calls.lock().unwrap();
759        assert_eq!(calls.len(), 1);
760        assert_eq!(
761            calls[0],
762            ("ema_cross".to_string(), "BTC".to_string(), -25.0, false, None)
763        );
764    }
765}