Skip to main content

fynd_core/graph/
petgraph.rs

1//! Petgraph's StableDiGraph implementation of GraphManager.
2//!
3//! This module provides PetgraphStableDiGraphManager, which implements GraphManager for
4//! petgraph::stable_graph::StableDiGraph, providing a reusable implementation for algorithms that
5//! use petgraph.
6//!
7//! A stable graph is a graph that maintains the indices of its edges even after removals. This is
8//! useful for optimising the graph manager's performance by allowing for O(1) edge and node
9//! lookups.
10
11use std::collections::{HashMap, HashSet};
12
13use async_trait::async_trait;
14pub use petgraph::graph::EdgeIndex;
15use petgraph::{graph::NodeIndex, stable_graph};
16use tracing::{debug, trace};
17use tycho_simulation::tycho_common::models::Address;
18
19use super::GraphManager;
20use crate::{
21    feed::{
22        events::{EventError, MarketEvent, MarketEventHandler},
23        market_data::MarketDataView,
24    },
25    graph::GraphError,
26    types::ComponentId,
27};
28
29/// Data stored on each edge of the graph.
30///
31/// Contains the component ID (which pool this edge represents) and
32/// optional algorithm-specific data. The type `D` is generic to allow
33/// different algorithms to store their own scoring data.
34///
35/// # Type Parameters
36/// - `D`: Algorithm-specific data type. Defaults to `()` for no extra data.
37///
38/// # Examples
39/// ```ignore
40/// // For MostLiquid algorithm with depth/price data:
41/// use crate::algorithm::most_liquid::DepthAndPrice;
42/// type MostLiquidEdge = EdgeData<DepthAndPrice>;
43///
44/// // For algorithms that don't need extra data:
45/// type SimpleEdge = EdgeData<()>;
46/// ```
47#[derive(Debug, Clone, Default)]
48pub struct EdgeData<D = ()> {
49    /// The component ID that enables this swap.
50    pub component_id: ComponentId,
51    /// Algorithm-specific data. None if not yet computed.
52    pub data: Option<D>,
53}
54
55impl<M> EdgeData<M> {
56    /// Creates a new EdgeData with the given component ID and no data set.
57    pub fn new(component_id: ComponentId) -> Self {
58        Self { component_id, data: None }
59    }
60
61    /// Creates a new EdgeData with the given component ID and data.
62    pub fn with_data(component_id: ComponentId, data: M) -> Self {
63        Self { component_id, data: Some(data) }
64    }
65}
66
67/// A stable directed graph with token addresses as nodes and [`EdgeData`] as edge weights.
68pub type StableDiGraph<D> = stable_graph::StableDiGraph<Address, EdgeData<D>>;
69
70/// Petgraph implementation of GraphManager.
71///
72/// This struct implements GraphManager for petgraph::stable_graph::StableDiGraph.
73///
74/// The graph manager maintains the graph internally and updates it based on market events.
75/// Using StableDiGraph ensures edge indices remain valid after removals, making edge_map viable.
76pub struct PetgraphStableDiGraphManager<D: Clone> {
77    // Stable directed graph with token addresses as nodes and edge data (component id + weight) as
78    // edges. Using StableDiGraph ensures edge indices remain valid after removals, making
79    // edge_map viable.
80    graph: StableDiGraph<D>,
81    // Map from ComponentId to edge indices for fast removal and weight updates.
82    edge_map: HashMap<ComponentId, Vec<EdgeIndex>>,
83    // Map from token address to node index for fast node lookups.
84    node_map: HashMap<Address, NodeIndex>,
85}
86
87impl<D: Clone> PetgraphStableDiGraphManager<D> {
88    /// Creates a new empty graph manager.
89    pub fn new() -> Self {
90        Self { graph: StableDiGraph::default(), edge_map: HashMap::new(), node_map: HashMap::new() }
91    }
92
93    /// Helper function to find a node index by address
94    pub(crate) fn find_node(&self, addr: &Address) -> Result<NodeIndex, GraphError> {
95        self.node_map
96            .get(addr)
97            .copied()
98            .ok_or_else(|| GraphError::TokenNotFound(addr.clone()))
99    }
100
101    /// Helper function to get or create a node for the given address.
102    /// Returns the node index, creating the node if it doesn't exist.
103    fn get_or_create_node(&mut self, addr: &Address) -> NodeIndex {
104        // Check if node already exists
105        match self.find_node(addr) {
106            Ok(node_idx) => node_idx,
107            Err(_) => {
108                let node_idx = self.graph.add_node(addr.clone());
109                self.node_map
110                    .insert(addr.clone(), node_idx);
111                node_idx
112            }
113        }
114    }
115
116    /// Helper function to add an edge to the graph.
117    ///
118    /// # Arguments
119    ///
120    /// * `from_idx` - The index of the from node.
121    /// * `to_idx` - The index of the to node.
122    /// * `component_id` - The ID of the component represented by this edge.
123    fn add_edge(&mut self, from_idx: NodeIndex, to_idx: NodeIndex, component_id: &ComponentId) {
124        let edge_idx = self
125            .graph
126            .add_edge(from_idx, to_idx, EdgeData::new(component_id.clone()));
127        self.edge_map
128            .entry(component_id.clone())
129            .or_default()
130            .push(edge_idx);
131    }
132
133    /// Helper function to add edges for all token pairs in a component.
134    /// Takes a slice of node indices corresponding to the tokens.
135    fn add_component_edges(&mut self, component_id: &ComponentId, node_indices: &[NodeIndex]) {
136        // Create bidirectional edges for each token pair
137        node_indices
138            .iter()
139            .enumerate()
140            .flat_map(|(i, &from_idx)| {
141                node_indices
142                    .iter()
143                    .skip(i + 1)
144                    .map(move |&to_idx| (from_idx, to_idx))
145            })
146            .for_each(|(from_idx, to_idx)| {
147                // Create bidirectional edges A -> B and B -> A
148                self.add_edge(from_idx, to_idx, component_id);
149                self.add_edge(to_idx, from_idx, component_id);
150            });
151    }
152
153    /// Adds components to the graph.
154    ///
155    /// # Errors
156    ///
157    /// Returns an error if any components have too few tokens (components must have at least 2
158    /// tokens). All components not included in the error were successfully added.
159    ///
160    /// Arguments:
161    /// - components: A map of component IDs to their tokens.
162    fn add_components(
163        &mut self,
164        components: &HashMap<ComponentId, Vec<Address>>,
165    ) -> Result<(), GraphError> {
166        let mut invalid_components = Vec::new();
167        let mut skipped_duplicates = 0usize;
168
169        // Sort components for deterministic node/edge insertion order.
170        let mut sorted_components: Vec<_> = components.iter().collect();
171        sorted_components.sort_by_key(|(id, _)| *id);
172
173        for (comp_id, tokens) in sorted_components {
174            if self.edge_map.contains_key(comp_id) {
175                trace!(component_id = %comp_id, "skipping already-tracked component");
176                skipped_duplicates += 1;
177                continue;
178            }
179
180            if tokens.len() < 2 {
181                invalid_components.push(comp_id.clone());
182                continue;
183            }
184            let mut sorted_tokens: Vec<&Address> = tokens.iter().collect();
185            sorted_tokens.sort();
186            let node_indices: Vec<NodeIndex> = sorted_tokens
187                .iter()
188                .map(|token| self.get_or_create_node(token))
189                .collect();
190            self.add_component_edges(comp_id, &node_indices);
191        }
192
193        if skipped_duplicates > 0 {
194            debug!(skipped_duplicates, "skipped duplicate components during add");
195        }
196
197        // Return error if any components had too few tokens (less than 2)
198        if !invalid_components.is_empty() {
199            return Err(GraphError::InvalidComponents(invalid_components));
200        }
201
202        Ok(())
203    }
204
205    /// Removes components from the graph.
206    ///
207    /// # Errors
208    ///
209    /// Returns an error if any components are not found in the graph. All components not included
210    /// in the error were successfully removed.
211    ///
212    /// Arguments:
213    /// - components: A vector of component IDs to remove.
214    fn remove_components(&mut self, components: &[ComponentId]) -> Result<(), GraphError> {
215        let mut missing_components = Vec::new();
216
217        for comp_id in components {
218            // Use the edge_map for O(1) lookup instead of iterating all edges
219            if let Some(edge_indices) = self.edge_map.remove(comp_id) {
220                for edge_idx in edge_indices {
221                    self.graph.remove_edge(edge_idx);
222                }
223            } else {
224                // Component not found in edge_map
225                missing_components.push(comp_id.clone());
226            }
227        }
228
229        // Return error if any components were not found
230        if !missing_components.is_empty() {
231            return Err(GraphError::ComponentsNotFound(missing_components));
232        }
233
234        Ok(())
235    }
236
237    /// Sets the weight for edges between the specified tokens with the given component ID.
238    ///
239    /// # Errors
240    ///
241    /// Returns an error if the component is not found in the graph for the given token pair.
242    ///
243    /// Arguments:
244    /// - component_id: The ID of the component to update.
245    /// - token_in: The input token.
246    /// - token_out: The output token.
247    /// - weight: The weight to set.
248    /// - If `bidirectional` is `true`, updates edges in both directions (token_in -> token_out and
249    ///   token_out -> token_in).
250    /// - If `bidirectional` is `false`, updates only the forward direction (token_in -> token_out).
251    #[cfg(test)]
252    pub(crate) fn set_edge_weight(
253        &mut self,
254        component_id: &ComponentId,
255        token_in: &Address,
256        token_out: &Address,
257        data: D,
258        bidirectional: bool,
259    ) -> Result<(), GraphError> {
260        let from_idx = self.find_node(token_in)?;
261        let to_idx = self.find_node(token_out)?;
262
263        // Get all edges for this component
264        let edge_indices = self
265            .edge_map
266            .get(component_id)
267            .ok_or_else(|| GraphError::ComponentsNotFound(vec![component_id.clone()]))?;
268
269        let mut updated = false;
270        for &edge_idx in edge_indices {
271            // Skip current edge if not found in graph, continue checking next edge
272            let (edge_from, edge_to) = match self.graph.edge_endpoints(edge_idx) {
273                Some(endpoints) => endpoints,
274                None => continue,
275            };
276
277            // Determine if we should update this edge based on edge tokens and bidirectional flag
278            let should_update = if bidirectional {
279                // Update both directions
280                (edge_from == from_idx && edge_to == to_idx) ||
281                    (edge_from == to_idx && edge_to == from_idx)
282            } else {
283                // Update only forward direction
284                edge_from == from_idx && edge_to == to_idx
285            };
286
287            if should_update {
288                // Error if edge weight is not found (edge is not in graph)
289                let edge_data = self
290                    .graph
291                    .edge_weight_mut(edge_idx)
292                    .ok_or_else(|| GraphError::ComponentsNotFound(vec![component_id.clone()]))?;
293                // Verify the component ID matches
294                if edge_data.component_id == *component_id {
295                    edge_data.data = Some(data.clone());
296                    updated = true;
297                }
298            }
299        }
300
301        if !updated {
302            return Err(GraphError::MissingComponentBetweenTokens(
303                token_in.clone(),
304                token_out.clone(),
305                component_id.clone(),
306            ));
307        }
308
309        Ok(())
310    }
311}
312
313impl<D: Clone + super::EdgeWeightFromSimAndDerived> PetgraphStableDiGraphManager<D> {
314    /// Updates edge weights using simulation states and pre-computed derived data.
315    ///
316    /// Uses pre-computed derived data (spot prices, pool depths, etc.) to update
317    /// edge weights. This is more accurate than computing from scratch as it uses
318    /// data computed with slippage thresholds via `query_pool_swap` or binary search.
319    ///
320    /// # Arguments
321    ///
322    /// * `market` - The market data containing simulation states and tokens
323    /// * `derived` - Pre-computed derived data (pool depths, spot prices, etc.)
324    ///
325    /// # Returns
326    ///
327    /// The number of edges successfully updated.
328    pub fn update_edge_weights_with_derived(
329        &mut self,
330        market: MarketDataView<'_>,
331        derived: &crate::derived::DerivedData,
332    ) -> usize {
333        let tokens = market.token_registry_ref();
334
335        // First pass: collect edge info and compute weights (immutable borrow).
336        // `None` in the inner Option means derived data is unavailable — the edge weight will be
337        // cleared to prevent stale data from influencing routing scores.
338        let updates: Vec<(EdgeIndex, Option<D>)> = self
339            .graph
340            .edge_indices()
341            .filter_map(|edge_idx| {
342                let edge_data = self.graph.edge_weight(edge_idx)?;
343                let component_id = &edge_data.component_id;
344
345                let sim_state = market.get_simulation_state(component_id)?;
346
347                let (source_idx, target_idx) = self.graph.edge_endpoints(edge_idx)?;
348                let source_addr = &self.graph[source_idx];
349                let target_addr = &self.graph[target_idx];
350
351                let token_in = tokens.get(source_addr)?;
352                let token_out = tokens.get(target_addr)?;
353
354                Some((
355                    edge_idx,
356                    D::from_sim_and_derived(sim_state, component_id, token_in, token_out, derived),
357                ))
358            })
359            .collect();
360
361        // Second pass: apply updates and clears (mutable borrow).
362        let updated = updates
363            .iter()
364            .filter(|(_, w)| w.is_some())
365            .count();
366        for (edge_idx, weight) in updates {
367            if let Some(edge_data) = self.graph.edge_weight_mut(edge_idx) {
368                edge_data.data = weight;
369            }
370        }
371
372        updated
373    }
374}
375
376impl<D: Clone + super::EdgeWeightFromSimAndDerived> super::EdgeWeightUpdaterWithDerived
377    for PetgraphStableDiGraphManager<D>
378{
379    fn update_edge_weights_with_derived(
380        &mut self,
381        market: MarketDataView<'_>,
382        derived: &crate::derived::DerivedData,
383    ) -> usize {
384        self.update_edge_weights_with_derived(market, derived)
385    }
386}
387
388impl<D: Clone> Default for PetgraphStableDiGraphManager<D> {
389    fn default() -> Self {
390        Self::new()
391    }
392}
393
394impl<D: Clone + Send + Sync> GraphManager<StableDiGraph<D>> for PetgraphStableDiGraphManager<D> {
395    fn initialize_graph(&mut self, component_topology: &HashMap<ComponentId, Vec<Address>>) {
396        // Clear existing graph and component map
397        self.graph = StableDiGraph::default();
398        self.edge_map.clear();
399        self.node_map.clear();
400
401        // Sort tokens for deterministic NodeIndex assignment across processes
402        // given the same input. HashMap/HashSet iteration order varies per
403        // process (random SipHash seeds), which would otherwise give different
404        // graph structure each run.
405        let mut unique_tokens: Vec<Address> = component_topology
406            .values()
407            .flat_map(|v| v.iter())
408            .cloned()
409            .collect::<HashSet<_>>()
410            .into_iter()
411            .collect();
412        unique_tokens.sort();
413
414        for token in unique_tokens {
415            let node_idx = self.graph.add_node(token.clone());
416            self.node_map.insert(token, node_idx);
417        }
418
419        // Sort components for deterministic edge insertion order.
420        let mut sorted_components: Vec<_> = component_topology.iter().collect();
421        sorted_components.sort_by_key(|(id, _)| *id);
422
423        for (comp_id, tokens) in sorted_components {
424            let mut sorted_tokens: Vec<&Address> = tokens.iter().collect();
425            sorted_tokens.sort();
426            let node_indices: Vec<NodeIndex> = sorted_tokens
427                .iter()
428                .map(|token| self.node_map[*token])
429                .collect();
430            self.add_component_edges(comp_id, &node_indices);
431        }
432    }
433
434    fn graph(&self) -> &StableDiGraph<D> {
435        &self.graph
436    }
437}
438
439#[async_trait]
440impl<D: Clone + Send> MarketEventHandler for PetgraphStableDiGraphManager<D> {
441    async fn handle_event(&mut self, event: &MarketEvent) -> Result<(), EventError> {
442        match event {
443            MarketEvent::MarketUpdated { added_components, removed_components, .. } => {
444                // Process both operations and collect all errors
445                let mut errors = Vec::new();
446
447                // Try to add components, collect error if it fails
448                if let Err(e) = self.add_components(added_components) {
449                    errors.push(e);
450                }
451
452                // Try to remove components, collect error if it fails
453                if let Err(e) = self.remove_components(removed_components) {
454                    errors.push(e);
455                }
456
457                // Return errors if any occurred
458                match errors.len() {
459                    0 => Ok(()),
460                    _ => Err(EventError::GraphErrors(errors)),
461                }
462            }
463        }
464    }
465}
466
467#[cfg(test)]
468mod tests {
469    use std::str::FromStr;
470
471    use super::*;
472
473    /// Helper function to create a test address from a hex string.
474    fn addr(s: &str) -> Address {
475        Address::from_str(s).expect("Invalid address hex string")
476    }
477
478    #[test]
479    fn test_initialize_graph_empty() {
480        let mut manager = PetgraphStableDiGraphManager::<()>::new();
481        let topology = HashMap::new();
482
483        manager.initialize_graph(&topology);
484
485        let graph = manager.graph();
486        assert_eq!(graph.node_count(), 0);
487        assert_eq!(graph.edge_count(), 0);
488    }
489
490    #[test]
491    fn test_initialize_graph_comprehensive() {
492        let mut manager = PetgraphStableDiGraphManager::<()>::new();
493        let mut topology = HashMap::new();
494        let token_a = addr("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"); // WETH
495        let token_b = addr("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"); // USDC
496        let token_c = addr("0x6B175474E89094C44Da98b954EedeAC495271d0F"); // DAI
497        let token_d = addr("0xdAC17F958D2ee523a2206206994597C13D831ec7"); // USDT
498
499        // Pool 1: A-B-C (3-token pool, fully connected)
500        topology
501            .insert("pool1".to_string(), vec![token_a.clone(), token_b.clone(), token_c.clone()]);
502        // Pool 2: C-D (2-token pool, overlapping with pool 1)
503        topology.insert("pool2".to_string(), vec![token_c.clone(), token_d.clone()]);
504
505        manager.initialize_graph(&topology);
506
507        let graph = manager.graph();
508        // 4 unique tokens
509        assert_eq!(graph.node_count(), 4);
510        // Pool 1: 3 pairs × 2 directions = 6 edges (A-B, B-A, A-C, C-A, B-C, C-B)
511        // Pool 2: 1 pair × 2 directions = 2 edges (C-D, D-C)
512        // Total: 8 edges
513        assert_eq!(graph.edge_count(), 8);
514
515        // Verify edge labels are correct by checking specific token pairs
516        let node_a = manager.find_node(&token_a).unwrap();
517        let node_b = manager.find_node(&token_b).unwrap();
518        let node_c = manager.find_node(&token_c).unwrap();
519        let node_d = manager.find_node(&token_d).unwrap();
520
521        // Pool 1 edges: A-B, B-A, A-C, C-A, B-C, C-B (bidirectional)
522        assert_eq!(
523            graph
524                .edge_weight(graph.find_edge(node_a, node_b).unwrap())
525                .unwrap()
526                .component_id,
527            "pool1".to_string()
528        );
529        assert_eq!(
530            graph
531                .edge_weight(graph.find_edge(node_b, node_a).unwrap())
532                .unwrap()
533                .component_id,
534            "pool1".to_string()
535        );
536        assert_eq!(
537            graph
538                .edge_weight(graph.find_edge(node_a, node_c).unwrap())
539                .unwrap()
540                .component_id,
541            "pool1".to_string()
542        );
543        assert_eq!(
544            graph
545                .edge_weight(graph.find_edge(node_c, node_a).unwrap())
546                .unwrap()
547                .component_id,
548            "pool1".to_string()
549        );
550        assert_eq!(
551            graph
552                .edge_weight(graph.find_edge(node_b, node_c).unwrap())
553                .unwrap()
554                .component_id,
555            "pool1".to_string()
556        );
557        assert_eq!(
558            graph
559                .edge_weight(graph.find_edge(node_c, node_b).unwrap())
560                .unwrap()
561                .component_id,
562            "pool1".to_string()
563        );
564
565        // Pool 2 edges: C-D, D-C (bidirectional)
566        assert_eq!(
567            graph
568                .edge_weight(graph.find_edge(node_c, node_d).unwrap())
569                .unwrap()
570                .component_id,
571            "pool2".to_string()
572        );
573        assert_eq!(
574            graph
575                .edge_weight(graph.find_edge(node_d, node_c).unwrap())
576                .unwrap()
577                .component_id,
578            "pool2".to_string()
579        );
580    }
581
582    #[test]
583    fn test_initialize_graph_multiple_edges_same_pair() {
584        let mut manager = PetgraphStableDiGraphManager::<()>::new();
585        let mut topology = HashMap::new();
586        let token_a = addr("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"); // WETH
587        let token_b = addr("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"); // USDC
588
589        // Multiple components connecting the same token pair
590        topology.insert("pool1".to_string(), vec![token_a.clone(), token_b.clone()]);
591        topology.insert("pool2".to_string(), vec![token_a.clone(), token_b.clone()]);
592        topology.insert("pool3".to_string(), vec![token_a.clone(), token_b.clone()]);
593
594        manager.initialize_graph(&topology);
595
596        let graph = manager.graph();
597        // 2 unique tokens
598        assert_eq!(graph.node_count(), 2);
599        // 3 components × 1 pair × 2 directions = 6 edges between A-B
600        assert_eq!(graph.edge_count(), 6);
601
602        let node_a = manager.find_node(&token_a).unwrap();
603        let node_b = manager.find_node(&token_b).unwrap();
604
605        // Verify all three edges exist with correct component IDs
606        let edges: Vec<_> = graph
607            .edges_connecting(node_a, node_b)
608            .collect();
609        assert_eq!(edges.len(), 3);
610
611        let component_ids: Vec<_> = edges
612            .iter()
613            .map(|e| &e.weight().component_id)
614            .collect();
615
616        // Verify all three component IDs are present
617        assert!(component_ids.contains(&&"pool1".to_string()));
618        assert!(component_ids.contains(&&"pool2".to_string()));
619        assert!(component_ids.contains(&&"pool3".to_string()));
620    }
621
622    #[test]
623    fn test_add_components_shared_tokens() {
624        let mut manager = PetgraphStableDiGraphManager::<()>::new();
625        let mut components = HashMap::new();
626        let token_a = addr("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"); // WETH
627        let token_b = addr("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"); // USDC
628
629        // Add first component with token A and B
630        components.insert("pool1".to_string(), vec![token_a.clone(), token_b.clone()]);
631        manager
632            .add_components(&components)
633            .unwrap();
634
635        let initial_node_count = manager.graph().node_count();
636        assert_eq!(initial_node_count, 2);
637
638        // Add second component with overlapping token A
639        components.clear();
640        components.insert("pool2".to_string(), vec![token_a.clone(), token_b.clone()]);
641        manager
642            .add_components(&components)
643            .unwrap();
644
645        // Should still have only 2 nodes, not 3
646        assert_eq!(manager.graph().node_count(), 2, "Should not create duplicate nodes");
647    }
648
649    #[test]
650    fn test_add_tokenless_components_error() {
651        let mut manager = PetgraphStableDiGraphManager::<()>::new();
652        let mut components = HashMap::new();
653        let token_a = addr("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"); // WETH
654        let token_b = addr("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"); // USDC
655
656        // Mix valid and invalid components
657        components.insert("pool1".to_string(), vec![token_a.clone(), token_b.clone()]);
658        components.insert("pool2".to_string(), vec![]);
659        components.insert("pool3".to_string(), vec![]);
660        let result = manager.add_components(&components);
661
662        assert!(result.is_err());
663        match result.unwrap_err() {
664            GraphError::InvalidComponents(ids) => {
665                assert_eq!(ids.len(), 2);
666                assert!(ids.contains(&"pool2".to_string()));
667                assert!(ids.contains(&"pool3".to_string()));
668            }
669            _ => panic!("Expected InvalidComponents error"),
670        }
671
672        // Verify valid component was still added
673        assert_eq!(manager.graph().node_count(), 2);
674        assert_eq!(manager.graph().edge_count(), 2); // A-B and B-A
675    }
676
677    #[test]
678    fn test_remove_components_not_found_error() {
679        let mut manager = PetgraphStableDiGraphManager::<()>::new();
680        let mut components = HashMap::new();
681        let token_a = addr("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"); // WETH
682        let token_b = addr("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"); // USDC
683
684        // Add components first
685        components.insert("pool1".to_string(), vec![token_a.clone(), token_b.clone()]);
686        components.insert("pool2".to_string(), vec![token_a.clone(), token_b.clone()]);
687        manager
688            .add_components(&components)
689            .unwrap();
690
691        // Try to remove mix of existing and non-existing components
692        let result = manager.remove_components(&[
693            "pool1".to_string(),
694            "pool3".to_string(),
695            "pool4".to_string(),
696        ]);
697
698        assert!(result.is_err());
699        match result.unwrap_err() {
700            GraphError::ComponentsNotFound(ids) => {
701                assert_eq!(ids.len(), 2, "Expected 2 missing components");
702                assert!(ids.contains(&"pool3".to_string()));
703                assert!(ids.contains(&"pool4".to_string()));
704            }
705            _ => panic!("Expected ComponentsNotFound error"),
706        }
707
708        // Verify only pool2 edges remain
709        for edge in manager.graph().edge_indices() {
710            assert_eq!(
711                manager
712                    .graph()
713                    .edge_weight(edge)
714                    .unwrap()
715                    .component_id,
716                "pool2".to_string()
717            );
718        }
719    }
720
721    #[test]
722    fn test_set_edge_weight_errors() {
723        let mut manager = PetgraphStableDiGraphManager::<()>::new();
724        let mut topology = HashMap::new();
725        let token_a = addr("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"); // WETH
726        let token_b = addr("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"); // USDC
727        let token_c = addr("0x6B175474E89094C44Da98b954EedeAC495271d0F"); // DAI
728
729        // Initialize with pool1 connecting A-B, and pool2 connecting B-C
730        topology.insert("pool1".to_string(), vec![token_a.clone(), token_b.clone()]);
731        topology.insert("pool2".to_string(), vec![token_b.clone(), token_c.clone()]);
732        manager.initialize_graph(&topology);
733
734        // Test 1: Component not found
735        let result = manager.set_edge_weight(&"pool3".to_string(), &token_a, &token_b, (), true);
736        assert!(result.is_err());
737        match result.unwrap_err() {
738            GraphError::ComponentsNotFound(ids) => {
739                assert_eq!(ids, vec!["pool3".to_string()]);
740            }
741            _ => panic!("Expected ComponentsNotFound error"),
742        }
743
744        // Test 2: Token not found
745        let non_existent_token = addr("0x0000000000000000000000000000000000000000");
746        let result = manager.set_edge_weight(
747            &"pool1".to_string(),
748            &token_a,
749            &non_existent_token, // Non-existent token
750            (),
751            true,
752        );
753        assert!(result.is_err());
754        match result.unwrap_err() {
755            GraphError::TokenNotFound(found_addr) => {
756                assert_eq!(found_addr, non_existent_token);
757            }
758            _ => panic!("Expected TokenNotFound error"),
759        }
760
761        // Test 3: Component doesn't connect the specified tokens
762        let result = manager.set_edge_weight(
763            &"pool1".to_string(),
764            &token_a,
765            &token_c, // pool1 doesn't connect A-C, only A-B
766            (),
767            true,
768        );
769        assert!(result.is_err());
770        match result.unwrap_err() {
771            GraphError::MissingComponentBetweenTokens(in_token, out_token, comp_id) => {
772                assert_eq!(in_token, token_a);
773                assert_eq!(out_token, token_c);
774                assert_eq!(comp_id, "pool1".to_string());
775            }
776            _ => panic!("Expected MissingComponentBetweenTokens error"),
777        }
778    }
779
780    #[tokio::test]
781    async fn test_handle_event_propagates_errors() {
782        let mut manager = PetgraphStableDiGraphManager::<()>::new();
783        use std::collections::HashMap;
784
785        use crate::feed::events::{EventError, MarketEvent};
786
787        // Create an event with both add and remove operations that will fail
788        let event = MarketEvent::MarketUpdated {
789            added_components: HashMap::from([("pool1".to_string(), vec![])]),
790            removed_components: vec!["pool2".to_string()],
791            updated_components: vec![],
792        };
793
794        let result = manager.handle_event(&event).await;
795
796        // Should return multiple errors
797        assert!(result.is_err());
798        match result.unwrap_err() {
799            EventError::GraphErrors(errors) => {
800                assert_eq!(errors.len(), 2);
801                // Check that we have both error types
802                let has_add_error = errors
803                    .iter()
804                    .any(|e| matches!(e, GraphError::InvalidComponents(_)));
805                let has_remove_error = errors
806                    .iter()
807                    .any(|e| matches!(e, GraphError::ComponentsNotFound(_)));
808                assert!(has_add_error, "Should have InvalidComponents error");
809                assert!(has_remove_error, "Should have ComponentsNotFound error");
810            }
811        }
812    }
813
814    #[test]
815    fn test_add_components_skips_duplicates() {
816        let mut manager = PetgraphStableDiGraphManager::<()>::new();
817        let token_a = addr("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2");
818        let token_b = addr("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48");
819
820        let mut components = HashMap::new();
821        components.insert("pool1".to_string(), vec![token_a.clone(), token_b.clone()]);
822
823        manager
824            .add_components(&components)
825            .unwrap();
826        let edge_count_after_first = manager.graph().edge_count();
827        assert_eq!(edge_count_after_first, 2); // A->B and B->A
828
829        // Add the same component again
830        manager
831            .add_components(&components)
832            .unwrap();
833        let edge_count_after_second = manager.graph().edge_count();
834        assert_eq!(
835            edge_count_after_first, edge_count_after_second,
836            "Edge count should not change when re-adding the same component"
837        );
838    }
839
840    #[test]
841    fn edge_weight_cleared_on_spot_price_miss() {
842        // Regression: when spot price computation fails, stale edge weights must be cleared so the
843        // pool is excluded from path scoring rather than routed with an outdated price.
844        use num_bigint::BigUint;
845        use num_traits::One;
846        use tycho_simulation::tycho_core::simulation::protocol_sim::Price;
847
848        use crate::{
849            algorithm::test_utils::{market_read, setup_market_weighted, token, MockProtocolSim},
850            derived::{types::TokenGasPrices, DerivedData},
851        };
852
853        let token_a = token(0x01, "A");
854        let token_b = token(0x02, "B");
855        let (market, mut manager) =
856            setup_market_weighted(vec![("pool1", &token_a, &token_b, MockProtocolSim::new(2.0))]);
857
858        assert!(
859            manager
860                .graph()
861                .edge_indices()
862                .all(|e| manager
863                    .graph()
864                    .edge_weight(e)
865                    .unwrap()
866                    .data
867                    .is_some()),
868            "edges should have weight data after setup"
869        );
870
871        let mut token_prices = TokenGasPrices::new();
872        for addr in [&token_a.address, &token_b.address] {
873            token_prices.insert(
874                addr.clone(),
875                Price { numerator: BigUint::one(), denominator: BigUint::one() },
876            );
877        }
878        let mut derived = DerivedData::new();
879        derived.set_spot_prices(Default::default(), vec![], 10, true);
880        derived.set_pool_depths(Default::default(), vec![], 10, true);
881        derived.set_token_prices(token_prices, vec![], 10, true);
882
883        manager.update_edge_weights_with_derived(market_read(&market), &derived);
884
885        assert!(
886            manager
887                .graph()
888                .edge_indices()
889                .all(|e| manager
890                    .graph()
891                    .edge_weight(e)
892                    .unwrap()
893                    .data
894                    .is_none()),
895            "stale edge weights must be cleared when spot price is unavailable"
896        );
897    }
898}