Skip to main content

fynd_core/graph/
petgraph.rs

1//! Petgraph's StableDiGraph implementation of GraphManager.
2//!
3//! This module provides PetgraphStableDiGraphManager, which implements GraphManager for
4//! petgraph::stable_graph::StableDiGraph, providing a reusable implementation for algorithms that
5//! use petgraph.
6//!
7//! A stable graph is a graph that maintains the indices of its edges even after removals. This is
8//! useful for optimising the graph manager's performance by allowing for O(1) edge and node
9//! lookups.
10
11use std::collections::{HashMap, HashSet};
12
13use async_trait::async_trait;
14pub use petgraph::graph::EdgeIndex;
15use petgraph::{graph::NodeIndex, stable_graph};
16use tracing::{debug, trace};
17use tycho_simulation::tycho_common::models::Address;
18
19use super::GraphManager;
20use crate::{
21    feed::{
22        events::{EventError, MarketEvent, MarketEventHandler},
23        market_data::SharedMarketData,
24    },
25    graph::GraphError,
26    types::ComponentId,
27};
28
29/// Data stored on each edge of the graph.
30///
31/// Contains the component ID (which pool this edge represents) and
32/// optional algorithm-specific data. The type `D` is generic to allow
33/// different algorithms to store their own scoring data.
34///
35/// # Type Parameters
36/// - `D`: Algorithm-specific data type. Defaults to `()` for no extra data.
37///
38/// # Examples
39/// ```ignore
40/// // For MostLiquid algorithm with depth/price data:
41/// use crate::algorithm::most_liquid::DepthAndPrice;
42/// type MostLiquidEdge = EdgeData<DepthAndPrice>;
43///
44/// // For algorithms that don't need extra data:
45/// type SimpleEdge = EdgeData<()>;
46/// ```
47#[derive(Debug, Clone, Default)]
48pub struct EdgeData<D = ()> {
49    /// The component ID that enables this swap.
50    pub component_id: ComponentId,
51    /// Algorithm-specific data. None if not yet computed.
52    pub data: Option<D>,
53}
54
55impl<M> EdgeData<M> {
56    /// Creates a new EdgeData with the given component ID and no data set.
57    pub fn new(component_id: ComponentId) -> Self {
58        Self { component_id, data: None }
59    }
60
61    /// Creates a new EdgeData with the given component ID and data.
62    pub fn with_data(component_id: ComponentId, data: M) -> Self {
63        Self { component_id, data: Some(data) }
64    }
65}
66
67/// A stable directed graph with token addresses as nodes and [`EdgeData`] as edge weights.
68pub type StableDiGraph<D> = stable_graph::StableDiGraph<Address, EdgeData<D>>;
69
70/// Petgraph implementation of GraphManager.
71///
72/// This struct implements GraphManager for petgraph::stable_graph::StableDiGraph.
73///
74/// The graph manager maintains the graph internally and updates it based on market events.
75/// Using StableDiGraph ensures edge indices remain valid after removals, making edge_map viable.
76pub struct PetgraphStableDiGraphManager<D: Clone> {
77    // Stable directed graph with token addresses as nodes and edge data (component id + weight) as
78    // edges. Using StableDiGraph ensures edge indices remain valid after removals, making
79    // edge_map viable.
80    graph: StableDiGraph<D>,
81    // Map from ComponentId to edge indices for fast removal and weight updates.
82    edge_map: HashMap<ComponentId, Vec<EdgeIndex>>,
83    // Map from token address to node index for fast node lookups.
84    node_map: HashMap<Address, NodeIndex>,
85}
86
87impl<D: Clone> PetgraphStableDiGraphManager<D> {
88    /// Creates a new empty graph manager.
89    pub fn new() -> Self {
90        Self { graph: StableDiGraph::default(), edge_map: HashMap::new(), node_map: HashMap::new() }
91    }
92
93    /// Helper function to find a node index by address
94    pub(crate) fn find_node(&self, addr: &Address) -> Result<NodeIndex, GraphError> {
95        self.node_map
96            .get(addr)
97            .copied()
98            .ok_or_else(|| GraphError::TokenNotFound(addr.clone()))
99    }
100
101    /// Helper function to get or create a node for the given address.
102    /// Returns the node index, creating the node if it doesn't exist.
103    fn get_or_create_node(&mut self, addr: &Address) -> NodeIndex {
104        // Check if node already exists
105        match self.find_node(addr) {
106            Ok(node_idx) => node_idx,
107            Err(_) => {
108                let node_idx = self.graph.add_node(addr.clone());
109                self.node_map
110                    .insert(addr.clone(), node_idx);
111                node_idx
112            }
113        }
114    }
115
116    /// Helper function to add an edge to the graph.
117    ///
118    /// # Arguments
119    ///
120    /// * `from_idx` - The index of the from node.
121    /// * `to_idx` - The index of the to node.
122    /// * `component_id` - The ID of the component represented by this edge.
123    fn add_edge(&mut self, from_idx: NodeIndex, to_idx: NodeIndex, component_id: &ComponentId) {
124        let edge_idx = self
125            .graph
126            .add_edge(from_idx, to_idx, EdgeData::new(component_id.clone()));
127        self.edge_map
128            .entry(component_id.clone())
129            .or_default()
130            .push(edge_idx);
131    }
132
133    /// Helper function to add edges for all token pairs in a component.
134    /// Takes a slice of node indices corresponding to the tokens.
135    fn add_component_edges(&mut self, component_id: &ComponentId, node_indices: &[NodeIndex]) {
136        // Create bidirectional edges for each token pair
137        node_indices
138            .iter()
139            .enumerate()
140            .flat_map(|(i, &from_idx)| {
141                node_indices
142                    .iter()
143                    .skip(i + 1)
144                    .map(move |&to_idx| (from_idx, to_idx))
145            })
146            .for_each(|(from_idx, to_idx)| {
147                // Create bidirectional edges A -> B and B -> A
148                self.add_edge(from_idx, to_idx, component_id);
149                self.add_edge(to_idx, from_idx, component_id);
150            });
151    }
152
153    /// Adds components to the graph.
154    ///
155    /// # Errors
156    ///
157    /// Returns an error if any components have too few tokens (components must have at least 2
158    /// tokens). All components not included in the error were successfully added.
159    ///
160    /// Arguments:
161    /// - components: A map of component IDs to their tokens.
162    fn add_components(
163        &mut self,
164        components: &HashMap<ComponentId, Vec<Address>>,
165    ) -> Result<(), GraphError> {
166        let mut invalid_components = Vec::new();
167        let mut skipped_duplicates = 0usize;
168
169        for (comp_id, tokens) in components {
170            if self.edge_map.contains_key(comp_id) {
171                trace!(component_id = %comp_id, "skipping already-tracked component");
172                skipped_duplicates += 1;
173                continue;
174            }
175
176            if tokens.len() < 2 {
177                invalid_components.push(comp_id.clone());
178                continue;
179            }
180            // Ensure all tokens are added as nodes (or get existing ones) and collect their indices
181            let node_indices: Vec<NodeIndex> = tokens
182                .iter()
183                .map(|token| self.get_or_create_node(token))
184                .collect();
185            // Add edges for all token pairs in this component
186            self.add_component_edges(comp_id, &node_indices);
187        }
188
189        if skipped_duplicates > 0 {
190            debug!(skipped_duplicates, "skipped duplicate components during add");
191        }
192
193        // Return error if any components had too few tokens (less than 2)
194        if !invalid_components.is_empty() {
195            return Err(GraphError::InvalidComponents(invalid_components));
196        }
197
198        Ok(())
199    }
200
201    /// Removes components from the graph.
202    ///
203    /// # Errors
204    ///
205    /// Returns an error if any components are not found in the graph. All components not included
206    /// in the error were successfully removed.
207    ///
208    /// Arguments:
209    /// - components: A vector of component IDs to remove.
210    fn remove_components(&mut self, components: &[ComponentId]) -> Result<(), GraphError> {
211        let mut missing_components = Vec::new();
212
213        for comp_id in components {
214            // Use the edge_map for O(1) lookup instead of iterating all edges
215            if let Some(edge_indices) = self.edge_map.remove(comp_id) {
216                for edge_idx in edge_indices {
217                    self.graph.remove_edge(edge_idx);
218                }
219            } else {
220                // Component not found in edge_map
221                missing_components.push(comp_id.clone());
222            }
223        }
224
225        // Return error if any components were not found
226        if !missing_components.is_empty() {
227            return Err(GraphError::ComponentsNotFound(missing_components));
228        }
229
230        Ok(())
231    }
232
233    /// Sets the weight for edges between the specified tokens with the given component ID.
234    ///
235    /// # Errors
236    ///
237    /// Returns an error if the component is not found in the graph for the given token pair.
238    ///
239    /// Arguments:
240    /// - component_id: The ID of the component to update.
241    /// - token_in: The input token.
242    /// - token_out: The output token.
243    /// - weight: The weight to set.
244    /// - If `bidirectional` is `true`, updates edges in both directions (token_in -> token_out and
245    ///   token_out -> token_in).
246    /// - If `bidirectional` is `false`, updates only the forward direction (token_in -> token_out).
247    #[cfg(test)]
248    pub(crate) fn set_edge_weight(
249        &mut self,
250        component_id: &ComponentId,
251        token_in: &Address,
252        token_out: &Address,
253        data: D,
254        bidirectional: bool,
255    ) -> Result<(), GraphError> {
256        let from_idx = self.find_node(token_in)?;
257        let to_idx = self.find_node(token_out)?;
258
259        // Get all edges for this component
260        let edge_indices = self
261            .edge_map
262            .get(component_id)
263            .ok_or_else(|| GraphError::ComponentsNotFound(vec![component_id.clone()]))?;
264
265        let mut updated = false;
266        for &edge_idx in edge_indices {
267            // Skip current edge if not found in graph, continue checking next edge
268            let (edge_from, edge_to) = match self.graph.edge_endpoints(edge_idx) {
269                Some(endpoints) => endpoints,
270                None => continue,
271            };
272
273            // Determine if we should update this edge based on edge tokens and bidirectional flag
274            let should_update = if bidirectional {
275                // Update both directions
276                (edge_from == from_idx && edge_to == to_idx) ||
277                    (edge_from == to_idx && edge_to == from_idx)
278            } else {
279                // Update only forward direction
280                edge_from == from_idx && edge_to == to_idx
281            };
282
283            if should_update {
284                // Error if edge weight is not found (edge is not in graph)
285                let edge_data = self
286                    .graph
287                    .edge_weight_mut(edge_idx)
288                    .ok_or_else(|| GraphError::ComponentsNotFound(vec![component_id.clone()]))?;
289                // Verify the component ID matches
290                if edge_data.component_id == *component_id {
291                    edge_data.data = Some(data.clone());
292                    updated = true;
293                }
294            }
295        }
296
297        if !updated {
298            return Err(GraphError::MissingComponentBetweenTokens(
299                token_in.clone(),
300                token_out.clone(),
301                component_id.clone(),
302            ));
303        }
304
305        Ok(())
306    }
307}
308
309impl<D: Clone + super::EdgeWeightFromSimAndDerived> PetgraphStableDiGraphManager<D> {
310    /// Updates edge weights using simulation states and pre-computed derived data.
311    ///
312    /// Uses pre-computed derived data (spot prices, pool depths, etc.) to update
313    /// edge weights. This is more accurate than computing from scratch as it uses
314    /// data computed with slippage thresholds via `query_pool_swap` or binary search.
315    ///
316    /// # Arguments
317    ///
318    /// * `market` - The market data containing simulation states and tokens
319    /// * `derived` - Pre-computed derived data (pool depths, spot prices, etc.)
320    ///
321    /// # Returns
322    ///
323    /// The number of edges successfully updated.
324    pub fn update_edge_weights_with_derived(
325        &mut self,
326        market: &SharedMarketData,
327        derived: &crate::derived::DerivedData,
328    ) -> usize {
329        let tokens = market.token_registry_ref();
330
331        // First pass: collect edge info and compute weights (immutable borrow)
332        let updates: Vec<_> = self
333            .graph
334            .edge_indices()
335            .filter_map(|edge_idx| {
336                let edge_data = self.graph.edge_weight(edge_idx)?;
337                let component_id = &edge_data.component_id;
338
339                let sim_state = market.get_simulation_state(component_id)?;
340
341                let (source_idx, target_idx) = self.graph.edge_endpoints(edge_idx)?;
342                let source_addr = &self.graph[source_idx];
343                let target_addr = &self.graph[target_idx];
344
345                let token_in = tokens.get(source_addr)?;
346                let token_out = tokens.get(target_addr)?;
347
348                let weight =
349                    D::from_sim_and_derived(sim_state, component_id, token_in, token_out, derived)?;
350                Some((edge_idx, weight))
351            })
352            .collect();
353
354        // Second pass: apply updates (mutable borrow)
355        let updated = updates.len();
356        for (edge_idx, weight) in updates {
357            if let Some(edge_data) = self.graph.edge_weight_mut(edge_idx) {
358                edge_data.data = Some(weight);
359            }
360        }
361
362        updated
363    }
364}
365
366impl<D: Clone + super::EdgeWeightFromSimAndDerived> super::EdgeWeightUpdaterWithDerived
367    for PetgraphStableDiGraphManager<D>
368{
369    fn update_edge_weights_with_derived(
370        &mut self,
371        market: &SharedMarketData,
372        derived: &crate::derived::DerivedData,
373    ) -> usize {
374        self.update_edge_weights_with_derived(market, derived)
375    }
376}
377
378impl<D: Clone> Default for PetgraphStableDiGraphManager<D> {
379    fn default() -> Self {
380        Self::new()
381    }
382}
383
384impl<D: Clone + Send + Sync> GraphManager<StableDiGraph<D>> for PetgraphStableDiGraphManager<D> {
385    fn initialize_graph(&mut self, component_topology: &HashMap<ComponentId, Vec<Address>>) {
386        // Clear existing graph and component map
387        self.graph = StableDiGraph::default();
388        self.edge_map.clear();
389        self.node_map.clear();
390
391        let unique_tokens: HashSet<Address> = component_topology
392            .values()
393            .flat_map(|v| v.iter())
394            .cloned()
395            .collect();
396
397        // Add all nodes (tokens) to the graph
398        for token in unique_tokens {
399            let node_idx = self.graph.add_node(token.clone());
400            self.node_map.insert(token, node_idx);
401        }
402
403        // Add edges between all tokens in each component
404        for (comp_id, tokens) in component_topology {
405            let node_indices: Vec<NodeIndex> = tokens
406                .iter()
407                .map(|token| self.node_map[token])
408                .collect();
409            self.add_component_edges(comp_id, &node_indices);
410        }
411    }
412
413    fn graph(&self) -> &StableDiGraph<D> {
414        &self.graph
415    }
416}
417
418#[async_trait]
419impl<D: Clone + Send> MarketEventHandler for PetgraphStableDiGraphManager<D> {
420    async fn handle_event(&mut self, event: &MarketEvent) -> Result<(), EventError> {
421        match event {
422            MarketEvent::MarketUpdated { added_components, removed_components, .. } => {
423                // Process both operations and collect all errors
424                let mut errors = Vec::new();
425
426                // Try to add components, collect error if it fails
427                if let Err(e) = self.add_components(added_components) {
428                    errors.push(e);
429                }
430
431                // Try to remove components, collect error if it fails
432                if let Err(e) = self.remove_components(removed_components) {
433                    errors.push(e);
434                }
435
436                // Return errors if any occurred
437                match errors.len() {
438                    0 => Ok(()),
439                    _ => Err(EventError::GraphErrors(errors)),
440                }
441            }
442        }
443    }
444}
445
446#[cfg(test)]
447mod tests {
448    use std::str::FromStr;
449
450    use super::*;
451
452    /// Helper function to create a test address from a hex string.
453    fn addr(s: &str) -> Address {
454        Address::from_str(s).expect("Invalid address hex string")
455    }
456
457    #[test]
458    fn test_initialize_graph_empty() {
459        let mut manager = PetgraphStableDiGraphManager::<()>::new();
460        let topology = HashMap::new();
461
462        manager.initialize_graph(&topology);
463
464        let graph = manager.graph();
465        assert_eq!(graph.node_count(), 0);
466        assert_eq!(graph.edge_count(), 0);
467    }
468
469    #[test]
470    fn test_initialize_graph_comprehensive() {
471        let mut manager = PetgraphStableDiGraphManager::<()>::new();
472        let mut topology = HashMap::new();
473        let token_a = addr("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"); // WETH
474        let token_b = addr("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"); // USDC
475        let token_c = addr("0x6B175474E89094C44Da98b954EedeAC495271d0F"); // DAI
476        let token_d = addr("0xdAC17F958D2ee523a2206206994597C13D831ec7"); // USDT
477
478        // Pool 1: A-B-C (3-token pool, fully connected)
479        topology
480            .insert("pool1".to_string(), vec![token_a.clone(), token_b.clone(), token_c.clone()]);
481        // Pool 2: C-D (2-token pool, overlapping with pool 1)
482        topology.insert("pool2".to_string(), vec![token_c.clone(), token_d.clone()]);
483
484        manager.initialize_graph(&topology);
485
486        let graph = manager.graph();
487        // 4 unique tokens
488        assert_eq!(graph.node_count(), 4);
489        // Pool 1: 3 pairs × 2 directions = 6 edges (A-B, B-A, A-C, C-A, B-C, C-B)
490        // Pool 2: 1 pair × 2 directions = 2 edges (C-D, D-C)
491        // Total: 8 edges
492        assert_eq!(graph.edge_count(), 8);
493
494        // Verify edge labels are correct by checking specific token pairs
495        let node_a = manager.find_node(&token_a).unwrap();
496        let node_b = manager.find_node(&token_b).unwrap();
497        let node_c = manager.find_node(&token_c).unwrap();
498        let node_d = manager.find_node(&token_d).unwrap();
499
500        // Pool 1 edges: A-B, B-A, A-C, C-A, B-C, C-B (bidirectional)
501        assert_eq!(
502            graph
503                .edge_weight(graph.find_edge(node_a, node_b).unwrap())
504                .unwrap()
505                .component_id,
506            "pool1".to_string()
507        );
508        assert_eq!(
509            graph
510                .edge_weight(graph.find_edge(node_b, node_a).unwrap())
511                .unwrap()
512                .component_id,
513            "pool1".to_string()
514        );
515        assert_eq!(
516            graph
517                .edge_weight(graph.find_edge(node_a, node_c).unwrap())
518                .unwrap()
519                .component_id,
520            "pool1".to_string()
521        );
522        assert_eq!(
523            graph
524                .edge_weight(graph.find_edge(node_c, node_a).unwrap())
525                .unwrap()
526                .component_id,
527            "pool1".to_string()
528        );
529        assert_eq!(
530            graph
531                .edge_weight(graph.find_edge(node_b, node_c).unwrap())
532                .unwrap()
533                .component_id,
534            "pool1".to_string()
535        );
536        assert_eq!(
537            graph
538                .edge_weight(graph.find_edge(node_c, node_b).unwrap())
539                .unwrap()
540                .component_id,
541            "pool1".to_string()
542        );
543
544        // Pool 2 edges: C-D, D-C (bidirectional)
545        assert_eq!(
546            graph
547                .edge_weight(graph.find_edge(node_c, node_d).unwrap())
548                .unwrap()
549                .component_id,
550            "pool2".to_string()
551        );
552        assert_eq!(
553            graph
554                .edge_weight(graph.find_edge(node_d, node_c).unwrap())
555                .unwrap()
556                .component_id,
557            "pool2".to_string()
558        );
559    }
560
561    #[test]
562    fn test_initialize_graph_multiple_edges_same_pair() {
563        let mut manager = PetgraphStableDiGraphManager::<()>::new();
564        let mut topology = HashMap::new();
565        let token_a = addr("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"); // WETH
566        let token_b = addr("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"); // USDC
567
568        // Multiple components connecting the same token pair
569        topology.insert("pool1".to_string(), vec![token_a.clone(), token_b.clone()]);
570        topology.insert("pool2".to_string(), vec![token_a.clone(), token_b.clone()]);
571        topology.insert("pool3".to_string(), vec![token_a.clone(), token_b.clone()]);
572
573        manager.initialize_graph(&topology);
574
575        let graph = manager.graph();
576        // 2 unique tokens
577        assert_eq!(graph.node_count(), 2);
578        // 3 components × 1 pair × 2 directions = 6 edges between A-B
579        assert_eq!(graph.edge_count(), 6);
580
581        let node_a = manager.find_node(&token_a).unwrap();
582        let node_b = manager.find_node(&token_b).unwrap();
583
584        // Verify all three edges exist with correct component IDs
585        let edges: Vec<_> = graph
586            .edges_connecting(node_a, node_b)
587            .collect();
588        assert_eq!(edges.len(), 3);
589
590        let component_ids: Vec<_> = edges
591            .iter()
592            .map(|e| &e.weight().component_id)
593            .collect();
594
595        // Verify all three component IDs are present
596        assert!(component_ids.contains(&&"pool1".to_string()));
597        assert!(component_ids.contains(&&"pool2".to_string()));
598        assert!(component_ids.contains(&&"pool3".to_string()));
599    }
600
601    #[test]
602    fn test_add_components_shared_tokens() {
603        let mut manager = PetgraphStableDiGraphManager::<()>::new();
604        let mut components = HashMap::new();
605        let token_a = addr("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"); // WETH
606        let token_b = addr("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"); // USDC
607
608        // Add first component with token A and B
609        components.insert("pool1".to_string(), vec![token_a.clone(), token_b.clone()]);
610        manager
611            .add_components(&components)
612            .unwrap();
613
614        let initial_node_count = manager.graph().node_count();
615        assert_eq!(initial_node_count, 2);
616
617        // Add second component with overlapping token A
618        components.clear();
619        components.insert("pool2".to_string(), vec![token_a.clone(), token_b.clone()]);
620        manager
621            .add_components(&components)
622            .unwrap();
623
624        // Should still have only 2 nodes, not 3
625        assert_eq!(manager.graph().node_count(), 2, "Should not create duplicate nodes");
626    }
627
628    #[test]
629    fn test_add_tokenless_components_error() {
630        let mut manager = PetgraphStableDiGraphManager::<()>::new();
631        let mut components = HashMap::new();
632        let token_a = addr("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"); // WETH
633        let token_b = addr("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"); // USDC
634
635        // Mix valid and invalid components
636        components.insert("pool1".to_string(), vec![token_a.clone(), token_b.clone()]);
637        components.insert("pool2".to_string(), vec![]);
638        components.insert("pool3".to_string(), vec![]);
639        let result = manager.add_components(&components);
640
641        assert!(result.is_err());
642        match result.unwrap_err() {
643            GraphError::InvalidComponents(ids) => {
644                assert_eq!(ids.len(), 2);
645                assert!(ids.contains(&"pool2".to_string()));
646                assert!(ids.contains(&"pool3".to_string()));
647            }
648            _ => panic!("Expected InvalidComponents error"),
649        }
650
651        // Verify valid component was still added
652        assert_eq!(manager.graph().node_count(), 2);
653        assert_eq!(manager.graph().edge_count(), 2); // A-B and B-A
654    }
655
656    #[test]
657    fn test_remove_components_not_found_error() {
658        let mut manager = PetgraphStableDiGraphManager::<()>::new();
659        let mut components = HashMap::new();
660        let token_a = addr("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"); // WETH
661        let token_b = addr("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"); // USDC
662
663        // Add components first
664        components.insert("pool1".to_string(), vec![token_a.clone(), token_b.clone()]);
665        components.insert("pool2".to_string(), vec![token_a.clone(), token_b.clone()]);
666        manager
667            .add_components(&components)
668            .unwrap();
669
670        // Try to remove mix of existing and non-existing components
671        let result = manager.remove_components(&[
672            "pool1".to_string(),
673            "pool3".to_string(),
674            "pool4".to_string(),
675        ]);
676
677        assert!(result.is_err());
678        match result.unwrap_err() {
679            GraphError::ComponentsNotFound(ids) => {
680                assert_eq!(ids.len(), 2, "Expected 2 missing components");
681                assert!(ids.contains(&"pool3".to_string()));
682                assert!(ids.contains(&"pool4".to_string()));
683            }
684            _ => panic!("Expected ComponentsNotFound error"),
685        }
686
687        // Verify only pool2 edges remain
688        for edge in manager.graph().edge_indices() {
689            assert_eq!(
690                manager
691                    .graph()
692                    .edge_weight(edge)
693                    .unwrap()
694                    .component_id,
695                "pool2".to_string()
696            );
697        }
698    }
699
700    #[test]
701    fn test_set_edge_weight_errors() {
702        let mut manager = PetgraphStableDiGraphManager::<()>::new();
703        let mut topology = HashMap::new();
704        let token_a = addr("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"); // WETH
705        let token_b = addr("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"); // USDC
706        let token_c = addr("0x6B175474E89094C44Da98b954EedeAC495271d0F"); // DAI
707
708        // Initialize with pool1 connecting A-B, and pool2 connecting B-C
709        topology.insert("pool1".to_string(), vec![token_a.clone(), token_b.clone()]);
710        topology.insert("pool2".to_string(), vec![token_b.clone(), token_c.clone()]);
711        manager.initialize_graph(&topology);
712
713        // Test 1: Component not found
714        let result = manager.set_edge_weight(&"pool3".to_string(), &token_a, &token_b, (), true);
715        assert!(result.is_err());
716        match result.unwrap_err() {
717            GraphError::ComponentsNotFound(ids) => {
718                assert_eq!(ids, vec!["pool3".to_string()]);
719            }
720            _ => panic!("Expected ComponentsNotFound error"),
721        }
722
723        // Test 2: Token not found
724        let non_existent_token = addr("0x0000000000000000000000000000000000000000");
725        let result = manager.set_edge_weight(
726            &"pool1".to_string(),
727            &token_a,
728            &non_existent_token, // Non-existent token
729            (),
730            true,
731        );
732        assert!(result.is_err());
733        match result.unwrap_err() {
734            GraphError::TokenNotFound(found_addr) => {
735                assert_eq!(found_addr, non_existent_token);
736            }
737            _ => panic!("Expected TokenNotFound error"),
738        }
739
740        // Test 3: Component doesn't connect the specified tokens
741        let result = manager.set_edge_weight(
742            &"pool1".to_string(),
743            &token_a,
744            &token_c, // pool1 doesn't connect A-C, only A-B
745            (),
746            true,
747        );
748        assert!(result.is_err());
749        match result.unwrap_err() {
750            GraphError::MissingComponentBetweenTokens(in_token, out_token, comp_id) => {
751                assert_eq!(in_token, token_a);
752                assert_eq!(out_token, token_c);
753                assert_eq!(comp_id, "pool1".to_string());
754            }
755            _ => panic!("Expected MissingComponentBetweenTokens error"),
756        }
757    }
758
759    #[tokio::test]
760    async fn test_handle_event_propagates_errors() {
761        let mut manager = PetgraphStableDiGraphManager::<()>::new();
762        use std::collections::HashMap;
763
764        use crate::feed::events::{EventError, MarketEvent};
765
766        // Create an event with both add and remove operations that will fail
767        let event = MarketEvent::MarketUpdated {
768            added_components: HashMap::from([("pool1".to_string(), vec![])]),
769            removed_components: vec!["pool2".to_string()],
770            updated_components: vec![],
771        };
772
773        let result = manager.handle_event(&event).await;
774
775        // Should return multiple errors
776        assert!(result.is_err());
777        match result.unwrap_err() {
778            EventError::GraphErrors(errors) => {
779                assert_eq!(errors.len(), 2);
780                // Check that we have both error types
781                let has_add_error = errors
782                    .iter()
783                    .any(|e| matches!(e, GraphError::InvalidComponents(_)));
784                let has_remove_error = errors
785                    .iter()
786                    .any(|e| matches!(e, GraphError::ComponentsNotFound(_)));
787                assert!(has_add_error, "Should have InvalidComponents error");
788                assert!(has_remove_error, "Should have ComponentsNotFound error");
789            }
790        }
791    }
792
793    #[test]
794    fn test_add_components_skips_duplicates() {
795        let mut manager = PetgraphStableDiGraphManager::<()>::new();
796        let token_a = addr("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2");
797        let token_b = addr("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48");
798
799        let mut components = HashMap::new();
800        components.insert("pool1".to_string(), vec![token_a.clone(), token_b.clone()]);
801
802        manager
803            .add_components(&components)
804            .unwrap();
805        let edge_count_after_first = manager.graph().edge_count();
806        assert_eq!(edge_count_after_first, 2); // A->B and B->A
807
808        // Add the same component again
809        manager
810            .add_components(&components)
811            .unwrap();
812        let edge_count_after_second = manager.graph().edge_count();
813        assert_eq!(
814            edge_count_after_first, edge_count_after_second,
815            "Edge count should not change when re-adding the same component"
816        );
817    }
818}