Skip to main content

fynd_core/derived/
manager.rs

1//! Computation manager for derived data.
2//!
3//! The ComputationManager:
4//! - Subscribes to MarketEvents from TychoFeed
5//! - Runs derived computations (token prices, spot prices, pool depths)
6//! - Updates DerivedDataStore (exclusive write access)
7//! - Provides read access to workers via shared store reference
8
9use std::{
10    collections::{HashMap, HashSet},
11    sync::Arc,
12    time::Instant,
13};
14
15use async_trait::async_trait;
16use tokio::sync::{broadcast, RwLock};
17use tracing::{debug, info, trace, warn};
18use tycho_simulation::tycho_common::models::Address;
19
20use crate::types::ComponentId;
21
22/// Information about which components changed in a market update.
23///
24/// Used to enable incremental computation - only recomputing derived data
25/// for components that actually changed.
26#[derive(Debug, Clone, Default)]
27pub struct ChangedComponents {
28    /// Newly added components with their token addresses.
29    pub added: HashMap<ComponentId, Vec<Address>>,
30    /// Components that were removed.
31    pub removed: Vec<ComponentId>,
32    /// Components whose state was updated (but not added/removed).
33    pub updated: Vec<ComponentId>,
34    /// If true, this represents a full recompute (startup/lag recovery).
35    pub is_full_recompute: bool,
36}
37
38impl ChangedComponents {
39    /// Creates a marker for full recompute where all components are considered changed.
40    ///
41    /// Used for startup and lag recovery scenarios.
42    pub fn all(market: MarketDataView) -> Self {
43        Self {
44            added: market.component_topology().clone(),
45            removed: vec![],
46            updated: vec![],
47            is_full_recompute: true,
48        }
49    }
50
51    /// Returns true if this update changes the graph topology (adds or removes components).
52    pub fn is_topology_change(&self) -> bool {
53        !self.added.is_empty() || !self.removed.is_empty()
54    }
55
56    /// Returns a HashSet of all changed component IDs.
57    pub fn all_changed_ids(&self) -> HashSet<ComponentId> {
58        let mut all = HashSet::new();
59        all.extend(self.added.keys().cloned());
60        all.extend(self.removed.iter().cloned());
61        all.extend(self.updated.iter().cloned());
62        all
63    }
64}
65
66use super::{
67    computation::DerivedComputation,
68    computations::{PoolDepthComputation, SpotPriceComputation, TokenGasPriceComputation},
69    error::ComputationError,
70    events::DerivedDataEvent,
71    store::DerivedData,
72};
73use crate::feed::{
74    events::{EventError, MarketEvent, MarketEventHandler},
75    market_data::{MarketData, MarketDataView},
76};
77
78/// Thread-safe handle to shared derived data store.
79pub type SharedDerivedDataRef = Arc<RwLock<DerivedData>>;
80
81/// Configuration for the ComputationManager.
82///
83/// TODO: Consider making this a registry of computation configs using `Box<dyn ComputationConfig>`
84/// to support dynamic computation registration. This would allow adding new computation types
85/// without modifying this struct. For now, we hardcode the three computation types.
86#[derive(Debug, Clone)]
87pub struct ComputationManagerConfig {
88    /// Gas token address (e.g., WETH) for token price computation.
89    gas_token: Address,
90    /// Max hop count for token gas price computation.
91    max_hop: usize,
92    /// Slippage threshold for pool depth computation (0.0 < threshold < 1.0).
93    depth_slippage_threshold: f64,
94}
95
96impl ComputationManagerConfig {
97    /// Creates a new configuration with the given gas token.
98    pub fn new() -> Self {
99        Self::default()
100    }
101
102    /// Sets the slippage threshold for pool depth computation.
103    pub fn with_depth_slippage_threshold(mut self, threshold: f64) -> Self {
104        self.depth_slippage_threshold = threshold;
105        self
106    }
107
108    /// Sets the max hop count for token gas price computation.
109    pub fn with_max_hop(mut self, hop_count: usize) -> Self {
110        self.max_hop = hop_count;
111        self
112    }
113
114    /// Sets the gas token address.
115    pub fn with_gas_token(mut self, gas_token: Address) -> Self {
116        self.gas_token = gas_token;
117        self
118    }
119
120    /// Returns the gas token address.
121    pub fn gas_token(&self) -> &Address {
122        &self.gas_token
123    }
124
125    /// Returns the max hop count.
126    pub fn max_hop(&self) -> usize {
127        self.max_hop
128    }
129
130    /// Returns the depth slippage threshold.
131    pub fn depth_slippage_threshold(&self) -> f64 {
132        self.depth_slippage_threshold
133    }
134}
135
136impl Default for ComputationManagerConfig {
137    fn default() -> Self {
138        Self { gas_token: Address::zero(20), max_hop: 2, depth_slippage_threshold: 0.01 }
139    }
140}
141
142/// Manages derived data computations triggered by market events.
143pub struct ComputationManager {
144    /// Reference to shared market data (read access).
145    market_data: MarketData,
146    /// Shared derived data store (write access).
147    store: SharedDerivedDataRef,
148    /// Token gas price computation.
149    token_price_computation: TokenGasPriceComputation,
150    /// Spot price computation.
151    spot_price_computation: SpotPriceComputation,
152    /// Pool depth computation.
153    pool_depth_computation: PoolDepthComputation,
154    /// Event broadcaster for derived data updates.
155    event_tx: broadcast::Sender<DerivedDataEvent>,
156}
157
158impl ComputationManager {
159    /// Creates a new ComputationManager.
160    ///
161    /// Returns the manager and a receiver for derived data events.
162    /// Workers can subscribe to the event sender via `event_sender()` to track
163    /// computation readiness.
164    pub fn new(
165        config: ComputationManagerConfig,
166        market_data: MarketData,
167    ) -> Result<(Self, broadcast::Receiver<DerivedDataEvent>), ComputationError> {
168        let pool_depth_computation = PoolDepthComputation::new(config.depth_slippage_threshold)?;
169        let (event_tx, event_rx) = broadcast::channel(64);
170
171        Ok((
172            Self {
173                market_data,
174                store: DerivedData::new_shared(),
175                token_price_computation: TokenGasPriceComputation::default()
176                    .with_max_hops(config.max_hop)
177                    .with_gas_token(config.gas_token),
178                spot_price_computation: SpotPriceComputation::new(),
179                pool_depth_computation,
180                event_tx,
181            },
182            event_rx,
183        ))
184    }
185
186    /// Returns a reference to the shared derived data store.
187    pub fn store(&self) -> SharedDerivedDataRef {
188        Arc::clone(&self.store)
189    }
190
191    /// Returns the event sender for workers to subscribe.
192    pub fn event_sender(&self) -> broadcast::Sender<DerivedDataEvent> {
193        self.event_tx.clone()
194    }
195
196    /// Runs the main loop until shutdown or channel close.
197    ///
198    /// **Note:** Consumes `self`. Call [`store()`](Self::store) before `run()` to retain access.
199    pub async fn run(
200        mut self,
201        mut event_rx: broadcast::Receiver<MarketEvent>,
202        mut shutdown_rx: broadcast::Receiver<()>,
203    ) {
204        info!("computation manager started");
205
206        loop {
207            tokio::select! {
208                biased;
209
210                _ = shutdown_rx.recv() => {
211                    info!("computation manager shutting down");
212                    break;
213                }
214
215                event_result = event_rx.recv() => {
216                    match event_result {
217                        Ok(event) => {
218                            if let Err(e) = self.handle_event(&event).await {
219                                warn!(error = ?e, "failed to handle market event");
220                            }
221                        }
222                        Err(broadcast::error::RecvError::Closed) => {
223                            info!("event channel closed, computation manager shutting down");
224                            break;
225                        }
226                        Err(broadcast::error::RecvError::Lagged(skipped)) => {
227                            warn!(
228                                skipped,
229                                "computation manager lagged, skipped {} events. Recomputing from current state.",
230                                skipped
231                            );
232                            let market = self.market_data.read().await;
233                            let changed = ChangedComponents::all(market);
234                            self.compute_all(&changed).await;
235                        }
236                    }
237                }
238            }
239        }
240    }
241
242    /// Runs all computations and updates the store.
243    ///
244    /// This is called on market updates and lag recovery.
245    /// Broadcasts `DerivedDataEvent` for each computation that completes.
246    ///
247    /// **Dependency order**:
248    /// 1. `SpotPriceComputation` - no dependencies
249    /// 2. `TokenGasPriceComputation` - depends on spot_prices in store
250    /// 3. `PoolDepthComputation` - no dependencies (runs in parallel with token prices)
251    async fn compute_all(&self, changed: &ChangedComponents) {
252        let total_start = Instant::now();
253
254        // Get block info for tracking
255        let Some(block) = self
256            .market_data
257            .read()
258            .await
259            .last_updated()
260            .map(|b| b.number())
261        else {
262            warn!("market data has no last updated block, skipping computations");
263            return;
264        };
265
266        // Broadcast new block event
267        let _ = self
268            .event_tx
269            .send(DerivedDataEvent::NewBlock { block });
270
271        // Phase 1: Compute spot prices first (no dependencies)
272        let spot_start = Instant::now();
273        let spot_prices_result = self
274            .spot_price_computation
275            .compute(&self.market_data, &self.store, changed)
276            .await;
277        let spot_elapsed = spot_start.elapsed();
278
279        // Write spot prices to store before dependent computations
280        match spot_prices_result {
281            Ok(output) => {
282                let count = output.data.len();
283                if output.has_failures() {
284                    warn!(
285                        count,
286                        failed = output.failed_items.len(),
287                        "spot prices partial failures"
288                    );
289                    for item in &output.failed_items {
290                        debug!(key = %item.key, error = %item.error, "spot price failed item");
291                    }
292                } else {
293                    info!(count, elapsed_ms = spot_elapsed.as_millis(), "spot prices computed");
294                }
295                self.store
296                    .write()
297                    .await
298                    .set_spot_prices(
299                        output.data,
300                        output.failed_items.clone(),
301                        block,
302                        changed.is_full_recompute,
303                    );
304                let _ = self
305                    .event_tx
306                    .send(DerivedDataEvent::ComputationComplete {
307                        computation_id: SpotPriceComputation::ID,
308                        block,
309                        failed_items: output.failed_items,
310                    });
311            }
312            Err(e) => {
313                warn!(error = ?e, elapsed_ms = spot_elapsed.as_millis(), "spot price computation failed");
314                let _ = self
315                    .event_tx
316                    .send(DerivedDataEvent::ComputationFailed {
317                        computation_id: SpotPriceComputation::ID,
318                        block,
319                    });
320                let _ = self
321                    .event_tx
322                    .send(DerivedDataEvent::ComputationFailed {
323                        computation_id: TokenGasPriceComputation::ID,
324                        block,
325                    });
326                let _ = self
327                    .event_tx
328                    .send(DerivedDataEvent::ComputationFailed {
329                        computation_id: PoolDepthComputation::ID,
330                        block,
331                    });
332                // Cannot proceed with token prices if spot prices failed
333                return;
334            }
335        }
336
337        // Phase 2: Run dependent computations (token gas prices and pool depths need spot prices)
338        let (token_prices_result, pool_depths_result) = tokio::join!(
339            async {
340                let start = Instant::now();
341                let result = self
342                    .token_price_computation
343                    .compute(&self.market_data, &self.store, changed)
344                    .await;
345                (result, start.elapsed())
346            },
347            async {
348                let start = Instant::now();
349                let result = self
350                    .pool_depth_computation
351                    .compute(&self.market_data, &self.store, changed)
352                    .await;
353                (result, start.elapsed())
354            }
355        );
356        let (token_prices_result, token_elapsed) = token_prices_result;
357        let (pool_depths_result, depth_elapsed) = pool_depths_result;
358
359        // Update store with remaining results
360        let mut store_write = self.store.write().await;
361
362        match token_prices_result {
363            Ok(output) => {
364                let count = output.data.len();
365                if output.has_failures() {
366                    warn!(
367                        count,
368                        failed = output.failed_items.len(),
369                        "token prices partial failures"
370                    );
371                    for item in &output.failed_items {
372                        debug!(key = %item.key, error = %item.error, "token price failed item");
373                    }
374                } else {
375                    info!(count, elapsed_ms = token_elapsed.as_millis(), "token prices computed");
376                }
377                store_write.set_token_prices(
378                    output.data,
379                    output.failed_items.clone(),
380                    block,
381                    changed.is_full_recompute,
382                );
383                let _ = self
384                    .event_tx
385                    .send(DerivedDataEvent::ComputationComplete {
386                        computation_id: TokenGasPriceComputation::ID,
387                        block,
388                        failed_items: output.failed_items,
389                    });
390            }
391            Err(e) => {
392                warn!(error = ?e, "token price computation failed");
393                let _ = self
394                    .event_tx
395                    .send(DerivedDataEvent::ComputationFailed {
396                        computation_id: TokenGasPriceComputation::ID,
397                        block,
398                    });
399            }
400        }
401
402        match pool_depths_result {
403            Ok(output) => {
404                let count = output.data.len();
405                if output.has_failures() {
406                    warn!(
407                        count,
408                        failed = output.failed_items.len(),
409                        "pool depths partial failures"
410                    );
411                    for item in &output.failed_items {
412                        debug!(key = %item.key, error = %item.error, "pool depth failed item");
413                    }
414                } else {
415                    info!(count, elapsed_ms = depth_elapsed.as_millis(), "pool depths computed");
416                }
417                store_write.set_pool_depths(
418                    output.data,
419                    output.failed_items.clone(),
420                    block,
421                    changed.is_full_recompute,
422                );
423                let _ = self
424                    .event_tx
425                    .send(DerivedDataEvent::ComputationComplete {
426                        computation_id: PoolDepthComputation::ID,
427                        block,
428                        failed_items: output.failed_items,
429                    });
430            }
431            Err(e) => {
432                warn!(error = ?e, "pool depth computation failed");
433                let _ = self
434                    .event_tx
435                    .send(DerivedDataEvent::ComputationFailed {
436                        computation_id: PoolDepthComputation::ID,
437                        block,
438                    });
439            }
440        }
441
442        let total_elapsed = total_start.elapsed();
443        info!(block, total_ms = total_elapsed.as_millis(), "all derived computations complete");
444    }
445}
446
447#[async_trait]
448impl MarketEventHandler for ComputationManager {
449    async fn handle_event(&mut self, event: &MarketEvent) -> Result<(), EventError> {
450        match event {
451            MarketEvent::MarketUpdated {
452                added_components,
453                removed_components,
454                updated_components,
455            } if !added_components.is_empty() ||
456                !removed_components.is_empty() ||
457                !updated_components.is_empty() =>
458            {
459                trace!(
460                    added = added_components.len(),
461                    removed = removed_components.len(),
462                    updated = updated_components.len(),
463                    "market updated, running incremental computations"
464                );
465
466                let changed = ChangedComponents {
467                    added: added_components.clone(),
468                    removed: removed_components.clone(),
469                    updated: updated_components.clone(),
470                    is_full_recompute: false,
471                };
472                self.compute_all(&changed).await;
473            }
474            _ => {
475                trace!("empty market update, skipping computations");
476            }
477        }
478
479        Ok(())
480    }
481}
482
483#[cfg(test)]
484mod tests {
485    use std::collections::HashMap;
486
487    use tokio::sync::broadcast;
488
489    use super::*;
490    use crate::{
491        algorithm::test_utils::{component, setup_market_weighted, token, MockProtocolSim},
492        feed::market_data::{MarketData, MarketState},
493        types::BlockInfo,
494    };
495
496    /// Drains all currently-pending events from a broadcast receiver into a Vec.
497    fn drain_events(rx: &mut broadcast::Receiver<DerivedDataEvent>) -> Vec<DerivedDataEvent> {
498        let mut events = vec![];
499        loop {
500            match rx.try_recv() {
501                Ok(e) => events.push(e),
502                Err(broadcast::error::TryRecvError::Empty) => break,
503                Err(broadcast::error::TryRecvError::Lagged(_)) => continue,
504                Err(broadcast::error::TryRecvError::Closed) => break,
505            }
506        }
507        events
508    }
509
510    #[test]
511    fn invalid_slippage_threshold_returns_error() {
512        let (market, _) = setup_market_weighted(vec![]);
513        let config = ComputationManagerConfig::new().with_depth_slippage_threshold(1.5);
514
515        let result = ComputationManager::new(config, market);
516        assert!(matches!(result, Err(ComputationError::InvalidConfiguration(_))));
517    }
518
519    #[tokio::test]
520    async fn handle_event_runs_computations_on_market_update() {
521        let eth = token(1, "ETH");
522        let usdc = token(2, "USDC");
523
524        let (market, _) = setup_market_weighted(vec![(
525            "eth_usdc",
526            &eth,
527            &usdc,
528            MockProtocolSim::new(2000.0).with_gas(0),
529        )]);
530
531        let config = ComputationManagerConfig::new().with_gas_token(eth.address.clone());
532        let (mut manager, _event_rx) = ComputationManager::new(config, market).unwrap();
533
534        let event = MarketEvent::MarketUpdated {
535            added_components: HashMap::from([(
536                "eth_usdc".to_string(),
537                vec![eth.address.clone(), usdc.address.clone()],
538            )]),
539            removed_components: vec![],
540            updated_components: vec![],
541        };
542
543        manager
544            .handle_event(&event)
545            .await
546            .unwrap();
547
548        let store = manager.store();
549        let guard = store.read().await;
550        assert!(guard.token_prices().is_some());
551        assert!(guard.spot_prices().is_some());
552    }
553
554    #[tokio::test]
555    async fn handle_event_skips_empty_update() {
556        let (market, _) = setup_market_weighted(vec![]);
557        let config = ComputationManagerConfig::new();
558        let (mut manager, _event_rx) = ComputationManager::new(config, market).unwrap();
559
560        let event = MarketEvent::MarketUpdated {
561            added_components: HashMap::new(),
562            removed_components: vec![],
563            updated_components: vec![],
564        };
565
566        manager
567            .handle_event(&event)
568            .await
569            .unwrap();
570
571        let store = manager.store();
572        let guard = store.read().await;
573        assert!(guard.token_prices().is_none());
574    }
575
576    #[tokio::test]
577    async fn run_shuts_down_on_signal() {
578        let (market, _) = setup_market_weighted(vec![]);
579        let config = ComputationManagerConfig::new();
580        let (manager, _event_rx) = ComputationManager::new(config, market).unwrap();
581
582        let (_event_tx, event_rx) = broadcast::channel::<MarketEvent>(16);
583        let (shutdown_tx, shutdown_rx) = broadcast::channel::<()>(1);
584
585        let handle = tokio::spawn(async move {
586            manager.run(event_rx, shutdown_rx).await;
587        });
588
589        shutdown_tx.send(()).unwrap();
590
591        tokio::time::timeout(tokio::time::Duration::from_secs(1), handle)
592            .await
593            .expect("manager should shutdown")
594            .expect("task should complete successfully");
595    }
596
597    /// Creates a market with a component in topology but WITHOUT simulation state.
598    ///
599    /// Used to trigger `TotalFailure` in spot_price computation (full recompute with
600    /// all components missing sim_state → succeeded == 0 → failure).
601    fn market_with_component_no_sim_state() -> MarketData {
602        let eth = token(1, "ETH");
603        let usdc = token(2, "USDC");
604        let pool = component("pool", &[eth.clone(), usdc.clone()]);
605
606        let mut market = MarketState::new();
607        market.update_last_updated(BlockInfo::new(10, "0xhash".into(), 0));
608        market.upsert_components(std::iter::once(pool));
609        // Note: no update_states() — simulation state is intentionally absent
610        market.upsert_tokens([eth, usdc]);
611        MarketData::new(std::sync::Arc::new(tokio::sync::RwLock::new(market)))
612    }
613
614    /// Creates a market with two pools: one with sim state (pool succeeds) and one without (pool
615    /// fails). Used to trigger partial spot price failure.
616    fn market_with_mixed_sim_states() -> MarketData {
617        let eth = token(1, "ETH");
618        let usdc = token(2, "USDC");
619        let dai = token(3, "DAI");
620
621        let pool1 = component("eth_usdc", &[eth.clone(), usdc.clone()]);
622        let pool2 = component("eth_dai", &[eth.clone(), dai.clone()]);
623
624        let mut market = MarketState::new();
625        market.update_last_updated(BlockInfo::new(10, "0xhash".into(), 0));
626        market.upsert_components([pool1, pool2]);
627        // Only pool1 has simulation state; pool2 intentionally has none
628        market
629            .update_states([("eth_usdc".to_string(), Box::new(MockProtocolSim::new(2000.0)) as _)]);
630        market.upsert_tokens([eth, usdc, dai]);
631        MarketData::new(std::sync::Arc::new(tokio::sync::RwLock::new(market)))
632    }
633
634    /// Creates a market WITH sim_state but WITHOUT gas_price.
635    ///
636    /// Spot price computation succeeds (MockProtocolSim works), but token_price
637    /// computation fails with `MissingDependency("gas_price")`.
638    fn market_with_sim_state_no_gas_price() -> MarketData {
639        let eth = token(1, "ETH");
640        let usdc = token(2, "USDC");
641        let pool = component("pool", &[eth.clone(), usdc.clone()]);
642
643        let mut market = MarketState::new();
644        // Note: no update_gas_price() — gas price is intentionally absent
645        market.update_last_updated(BlockInfo::new(10, "0xhash".into(), 0));
646        market.upsert_components(std::iter::once(pool));
647        market.update_states([("pool".to_string(), Box::new(MockProtocolSim::new(2000.0)) as _)]);
648        market.upsert_tokens([eth, usdc]);
649        MarketData::new(std::sync::Arc::new(tokio::sync::RwLock::new(market)))
650    }
651
652    #[tokio::test]
653    async fn test_spot_price_failure_broadcasts_computation_failed() {
654        let market = market_with_component_no_sim_state();
655        let config = ComputationManagerConfig::new();
656        let (manager, mut event_rx) = ComputationManager::new(config, market).unwrap();
657
658        // Full recompute with components that have no sim_state → TotalFailure
659        let changed = ChangedComponents { is_full_recompute: true, ..Default::default() };
660        manager.compute_all(&changed).await;
661
662        let events = drain_events(&mut event_rx);
663
664        assert!(
665            events.iter().any(|e| matches!(
666                e,
667                DerivedDataEvent::ComputationFailed { computation_id: "spot_prices", .. }
668            )),
669            "expected ComputationFailed(spot_prices) in events: {events:?}"
670        );
671    }
672
673    #[tokio::test]
674    async fn test_token_price_failure_broadcasts_computation_failed() {
675        let eth = token(1, "ETH");
676        let usdc = token(2, "USDC");
677        let market = market_with_sim_state_no_gas_price();
678        let config = ComputationManagerConfig::new().with_gas_token(eth.address.clone());
679        let (mut manager, mut event_rx) = ComputationManager::new(config, market).unwrap();
680
681        // handle_event with added components — spot_price succeeds, token_price fails
682        let event = MarketEvent::MarketUpdated {
683            added_components: HashMap::from([(
684                "pool".to_string(),
685                vec![eth.address.clone(), usdc.address.clone()],
686            )]),
687            removed_components: vec![],
688            updated_components: vec![],
689        };
690        manager
691            .handle_event(&event)
692            .await
693            .unwrap();
694
695        let events = drain_events(&mut event_rx);
696        assert!(
697            events.iter().any(|e| matches!(
698                e,
699                DerivedDataEvent::ComputationFailed { computation_id: "token_prices", .. }
700            )),
701            "expected ComputationFailed(token_prices) in events: {events:?}"
702        );
703    }
704
705    #[tokio::test]
706    async fn run_shuts_down_on_channel_close() {
707        let (market, _) = setup_market_weighted(vec![]);
708        let config = ComputationManagerConfig::new();
709        let (manager, _event_rx) = ComputationManager::new(config, market).unwrap();
710
711        let (event_tx, event_rx) = broadcast::channel::<MarketEvent>(16);
712        let (_shutdown_tx, shutdown_rx) = broadcast::channel::<()>(1);
713
714        let handle = tokio::spawn(async move {
715            manager.run(event_rx, shutdown_rx).await;
716        });
717
718        drop(event_tx);
719
720        tokio::time::timeout(tokio::time::Duration::from_secs(1), handle)
721            .await
722            .expect("manager should shutdown on channel close")
723            .expect("task should complete successfully");
724    }
725
726    #[tokio::test]
727    async fn partial_spot_price_failure_broadcasts_computation_complete() {
728        // market_with_mixed_sim_states has pool1 (with sim state) and pool2 (without)
729        // → spot price computation partially succeeds → ComputationComplete with failed_items
730        let market = market_with_mixed_sim_states();
731        let config = ComputationManagerConfig::new();
732        let (manager, mut event_rx) = ComputationManager::new(config, market).unwrap();
733
734        let changed = ChangedComponents { is_full_recompute: true, ..Default::default() };
735        manager.compute_all(&changed).await;
736
737        let events = drain_events(&mut event_rx);
738
739        // Should broadcast ComputationComplete (not ComputationFailed) because pool1 succeeds
740        assert!(
741            events.iter().any(|e| matches!(
742                e,
743                DerivedDataEvent::ComputationComplete { computation_id: "spot_prices", .. }
744            )),
745            "expected ComputationComplete(spot_prices), got: {events:?}"
746        );
747        assert!(
748            !events.iter().any(|e| matches!(
749                e,
750                DerivedDataEvent::ComputationFailed { computation_id: "spot_prices", .. }
751            )),
752            "should not broadcast ComputationFailed for partial failure"
753        );
754
755        // The ComputationComplete event should carry the failed item for pool2
756        let complete = events.iter().find(|e| {
757            matches!(e, DerivedDataEvent::ComputationComplete { computation_id: "spot_prices", .. })
758        });
759        if let Some(DerivedDataEvent::ComputationComplete { failed_items, .. }) = complete {
760            assert!(
761                !failed_items.is_empty(),
762                "ComputationComplete should carry failed_items for pool2"
763            );
764        }
765
766        // The store should persist the failure reason for the failed pool.
767        // market_with_mixed_sim_states uses token(1, "ETH") and token(3, "DAI") for pool2.
768        let eth = token(1, "ETH");
769        let dai = token(3, "DAI");
770        let store = manager.store();
771        let guard = store.read().await;
772        let key_eth_dai = ("eth_dai".to_string(), eth.address.clone(), dai.address.clone());
773        let key_dai_eth = ("eth_dai".to_string(), dai.address.clone(), eth.address.clone());
774        assert!(
775            guard
776                .spot_price_failure(&key_eth_dai)
777                .is_some() ||
778                guard
779                    .spot_price_failure(&key_dai_eth)
780                    .is_some(),
781            "store should persist failure reason for eth_dai (missing sim state)"
782        );
783    }
784}