Skip to main content

fynd_core/derived/
store.rs

1//! Typed storage for derived data.
2
3use std::{collections::HashMap, str::FromStr, sync::Arc};
4
5use tokio::sync::RwLock;
6use tycho_simulation::tycho_common::models::Address;
7
8use super::{
9    computation::{FailedItem, FailedItemError},
10    types::{
11        PoolDepthKey, PoolDepths, SpotPriceKey, SpotPrices, TokenGasPriceKey, TokenGasPrices,
12        TokenPricesWithDeps,
13    },
14};
15use crate::derived::SharedDerivedDataRef;
16
17/// A computed value paired with the block it was computed for.
18#[derive(Debug)]
19struct ComputedValue<T> {
20    data: T,
21    block: u64,
22}
23
24/// Typed storage for derived data computations.
25///
26/// Provides typed access to previously computed derived data.
27/// Each field is `Option` to indicate whether the computation has run.
28#[derive(Debug, Default)]
29pub struct DerivedData {
30    token_prices: Option<ComputedValue<TokenGasPrices>>,
31    /// Persistent failure map: key → (block, error). Merged on incremental runs, replaced on full.
32    token_prices_failed: HashMap<TokenGasPriceKey, (u64, FailedItemError)>,
33    /// Token prices with path dependency tracking for incremental computation.
34    token_prices_deps: Option<ComputedValue<TokenPricesWithDeps>>,
35    pool_depths: Option<ComputedValue<PoolDepths>>,
36    /// Persistent failure map: key → (block, error). Merged on incremental runs, replaced on full.
37    pool_depths_failed: HashMap<PoolDepthKey, (u64, FailedItemError)>,
38    spot_prices: Option<ComputedValue<SpotPrices>>,
39    /// Persistent failure map: key → (block, error). Merged on incremental runs, replaced on full.
40    spot_prices_failed: HashMap<SpotPriceKey, (u64, FailedItemError)>,
41}
42
43/// Parses `"component_id/token_in/token_out"` into a typed `(ComponentId, Address, Address)` key.
44fn parse_pair_key(s: &str) -> Option<(String, Address, Address)> {
45    let mut parts = s.rsplitn(3, '/');
46    let token_out_str = parts.next()?;
47    let token_in_str = parts.next()?;
48    let component_id = parts.next()?;
49    let token_in = Address::from_str(token_in_str).ok()?;
50    let token_out = Address::from_str(token_out_str).ok()?;
51    Some((component_id.to_string(), token_in, token_out))
52}
53
54impl DerivedData {
55    /// Creates an empty store.
56    pub fn new() -> Self {
57        Self::default()
58    }
59
60    /// Creates a new shared derived data store for async computation tests that is wrapped in an
61    /// `Arc<RwLock<>>`.
62    pub fn new_shared() -> SharedDerivedDataRef {
63        Arc::new(RwLock::new(Self::new()))
64    }
65
66    /// Returns `true` if all derived data types has been computed at least once.
67    pub fn derived_data_ready(&self) -> bool {
68        self.token_prices_block().is_some() &&
69            self.token_prices_deps_block().is_some() &&
70            self.pool_depths_block().is_some() &&
71            self.spot_prices_block().is_some()
72    }
73
74    // -------------------------------------------------------------------------
75    // Token Prices
76    // -------------------------------------------------------------------------
77
78    /// Returns token prices if computed.
79    pub fn token_prices(&self) -> Option<&TokenGasPrices> {
80        self.token_prices
81            .as_ref()
82            .map(|v| &v.data)
83    }
84
85    /// Returns the block at which token prices were last computed.
86    pub fn token_prices_block(&self) -> Option<u64> {
87        self.token_prices
88            .as_ref()
89            .map(|v| v.block)
90    }
91
92    /// Sets token prices, merging failures for incremental runs.
93    ///
94    /// For full recomputes, the failure map is replaced entirely. For incremental runs,
95    /// failures are merged: existing entries for keys that now succeed are removed, new
96    /// failures are inserted, and entries for keys not attempted this run are preserved.
97    pub fn set_token_prices(
98        &mut self,
99        prices: TokenGasPrices,
100        failed_items: Vec<FailedItem>,
101        block: u64,
102        is_full_recompute: bool,
103    ) {
104        let new_failures: HashMap<TokenGasPriceKey, (u64, FailedItemError)> = failed_items
105            .into_iter()
106            .filter_map(|f| {
107                Address::from_str(&f.key)
108                    .ok()
109                    .map(|k| (k, (block, f.error)))
110            })
111            .collect();
112
113        if is_full_recompute {
114            self.token_prices_failed = new_failures;
115        } else {
116            self.token_prices_failed
117                .retain(|k, _| !prices.contains_key(k));
118            self.token_prices_failed
119                .extend(new_failures);
120        }
121
122        self.token_prices = Some(ComputedValue { data: prices, block });
123    }
124
125    /// Returns `(block, error)` for this token address if it failed in a past
126    /// computation, or `None` if it succeeded or was not attempted.
127    pub fn token_price_failure(&self, key: &TokenGasPriceKey) -> Option<(u64, &FailedItemError)> {
128        self.token_prices_failed
129            .get(key)
130            .map(|(block, error)| (*block, error))
131    }
132
133    /// Clears token prices and their failure map.
134    pub fn clear_token_prices(&mut self) {
135        self.token_prices = None;
136        self.token_prices_failed.clear();
137    }
138
139    // -------------------------------------------------------------------------
140    // Token Prices with Dependencies (for incremental computation)
141    // -------------------------------------------------------------------------
142
143    /// Returns token prices with path dependencies if computed.
144    pub fn token_prices_deps(&self) -> Option<&TokenPricesWithDeps> {
145        self.token_prices_deps
146            .as_ref()
147            .map(|v| &v.data)
148    }
149
150    /// Returns the block at which token prices with dependencies were last computed.
151    pub fn token_prices_deps_block(&self) -> Option<u64> {
152        self.token_prices_deps
153            .as_ref()
154            .map(|v| v.block)
155    }
156
157    /// Sets token prices with path dependencies.
158    pub fn set_token_prices_deps(&mut self, prices: TokenPricesWithDeps, block: u64) {
159        self.token_prices_deps = Some(ComputedValue { data: prices, block });
160    }
161
162    /// Clears token prices with dependencies.
163    pub fn clear_token_prices_deps(&mut self) {
164        self.token_prices_deps = None;
165    }
166
167    // -------------------------------------------------------------------------
168    // Pool Depths
169    // -------------------------------------------------------------------------
170
171    /// Returns pool depths if computed.
172    pub fn pool_depths(&self) -> Option<&PoolDepths> {
173        self.pool_depths
174            .as_ref()
175            .map(|v| &v.data)
176    }
177
178    /// Returns the block at which pool depths were last computed.
179    pub fn pool_depths_block(&self) -> Option<u64> {
180        self.pool_depths
181            .as_ref()
182            .map(|v| v.block)
183    }
184
185    /// Sets pool depths, merging failures for incremental runs.
186    ///
187    /// For full recomputes, the failure map is replaced entirely. For incremental runs,
188    /// failures are merged: existing entries for keys that now succeed are removed, new
189    /// failures are inserted, and entries for keys not attempted this run are preserved.
190    pub fn set_pool_depths(
191        &mut self,
192        depths: PoolDepths,
193        failed_items: Vec<FailedItem>,
194        block: u64,
195        is_full_recompute: bool,
196    ) {
197        let new_failures: HashMap<PoolDepthKey, (u64, FailedItemError)> = failed_items
198            .into_iter()
199            .filter_map(|f| parse_pair_key(&f.key).map(|k| (k, (block, f.error))))
200            .collect();
201
202        if is_full_recompute {
203            self.pool_depths_failed = new_failures;
204        } else {
205            self.pool_depths_failed
206                .retain(|k, _| !depths.contains_key(k));
207            self.pool_depths_failed
208                .extend(new_failures);
209        }
210
211        self.pool_depths = Some(ComputedValue { data: depths, block });
212    }
213
214    /// Returns `(block, error)` for this key if it failed in a past pool depth
215    /// computation, or `None` if it succeeded or was not attempted.
216    ///
217    /// Key format: `(component_id, token_in, token_out)`
218    pub fn pool_depth_failure(&self, key: &PoolDepthKey) -> Option<(u64, &FailedItemError)> {
219        self.pool_depths_failed
220            .get(key)
221            .map(|(block, error)| (*block, error))
222    }
223
224    /// Clears pool depths and their failure map.
225    pub fn clear_pool_depths(&mut self) {
226        self.pool_depths = None;
227        self.pool_depths_failed.clear();
228    }
229
230    // -------------------------------------------------------------------------
231    // Spot Prices
232    // -------------------------------------------------------------------------
233
234    /// Returns spot prices if computed.
235    pub fn spot_prices(&self) -> Option<&SpotPrices> {
236        self.spot_prices
237            .as_ref()
238            .map(|v| &v.data)
239    }
240
241    /// Returns the block at which spot prices were last computed.
242    pub fn spot_prices_block(&self) -> Option<u64> {
243        self.spot_prices
244            .as_ref()
245            .map(|v| v.block)
246    }
247
248    /// Sets spot prices, merging failures for incremental runs.
249    ///
250    /// For full recomputes, the failure map is replaced entirely. For incremental runs,
251    /// failures are merged: existing entries for keys that now succeed are removed, new
252    /// failures are inserted, and entries for keys not attempted this run are preserved.
253    pub fn set_spot_prices(
254        &mut self,
255        prices: SpotPrices,
256        failed_items: Vec<FailedItem>,
257        block: u64,
258        is_full_recompute: bool,
259    ) {
260        let new_failures: HashMap<SpotPriceKey, (u64, FailedItemError)> = failed_items
261            .into_iter()
262            .filter_map(|f| parse_pair_key(&f.key).map(|k| (k, (block, f.error))))
263            .collect();
264
265        if is_full_recompute {
266            self.spot_prices_failed = new_failures;
267        } else {
268            self.spot_prices_failed
269                .retain(|k, _| !prices.contains_key(k));
270            self.spot_prices_failed
271                .extend(new_failures);
272        }
273
274        self.spot_prices = Some(ComputedValue { data: prices, block });
275    }
276
277    /// Returns `(block, error)` for this key if it failed in a past spot price
278    /// computation, or `None` if it succeeded or was not attempted.
279    ///
280    /// Key format: `(component_id, token_in, token_out)`
281    pub fn spot_price_failure(&self, key: &SpotPriceKey) -> Option<(u64, &FailedItemError)> {
282        self.spot_prices_failed
283            .get(key)
284            .map(|(block, error)| (*block, error))
285    }
286
287    /// Clears spot prices and their failure map.
288    pub fn clear_spot_prices(&mut self) {
289        self.spot_prices = None;
290        self.spot_prices_failed.clear();
291    }
292
293    // -------------------------------------------------------------------------
294    // Bulk Operations
295    // -------------------------------------------------------------------------
296
297    /// Clears all stored data, including all failure maps.
298    pub fn clear_all(&mut self) {
299        self.token_prices = None;
300        self.token_prices_failed.clear();
301        self.token_prices_deps = None;
302        self.pool_depths = None;
303        self.pool_depths_failed.clear();
304        self.spot_prices = None;
305        self.spot_prices_failed.clear();
306    }
307}
308
309#[cfg(test)]
310mod tests {
311    use super::*;
312    use crate::{algorithm::test_utils::addr, derived::types::SpotPrices};
313
314    fn failed(key: &str, error: FailedItemError) -> FailedItem {
315        FailedItem { key: key.to_string(), error }
316    }
317
318    fn pair_key(comp: &str, b_in: u8, b_out: u8) -> SpotPriceKey {
319        (comp.to_string(), addr(b_in), addr(b_out))
320    }
321
322    #[test]
323    fn test_token_prices_block_tracks_independently() {
324        let mut store = DerivedData::new();
325        assert_eq!(store.token_prices_block(), None);
326
327        store.set_token_prices(Default::default(), vec![], 42, true);
328        assert_eq!(store.token_prices_block(), Some(42));
329
330        // Other computations not set yet
331        assert_eq!(store.spot_prices_block(), None);
332        assert_eq!(store.pool_depths_block(), None);
333    }
334
335    #[test]
336    fn test_spot_prices_block_tracks_independently() {
337        let mut store = DerivedData::new();
338        store.set_spot_prices(Default::default(), vec![], 10, true);
339        assert_eq!(store.spot_prices_block(), Some(10));
340        assert_eq!(store.token_prices_block(), None);
341    }
342
343    #[test]
344    fn test_pool_depths_block_tracks_independently() {
345        let mut store = DerivedData::new();
346        store.set_pool_depths(Default::default(), vec![], 7, true);
347        assert_eq!(store.pool_depths_block(), Some(7));
348        assert_eq!(store.token_prices_block(), None);
349    }
350
351    #[test]
352    fn test_derived_data_ready() {
353        let mut store = DerivedData::new();
354        assert!(!store.derived_data_ready());
355
356        store.set_spot_prices(Default::default(), vec![], 5, true);
357        assert!(!store.derived_data_ready());
358
359        store.set_token_prices(Default::default(), vec![], 10, true);
360        assert!(!store.derived_data_ready());
361
362        store.set_token_prices_deps(Default::default(), 10);
363        assert!(!store.derived_data_ready());
364
365        store.set_pool_depths(Default::default(), vec![], 9, true);
366        assert!(store.derived_data_ready());
367    }
368
369    #[test]
370    fn test_clear_all_resets_all_fields() {
371        let mut store = DerivedData::new();
372        store.set_token_prices(Default::default(), vec![], 1, true);
373        store.set_spot_prices(Default::default(), vec![], 1, true);
374        store.set_pool_depths(Default::default(), vec![], 1, true);
375
376        store.clear_all();
377
378        assert!(store.token_prices().is_none());
379        assert!(store.spot_prices().is_none());
380        assert!(store.pool_depths().is_none());
381        assert!(!store.derived_data_ready());
382    }
383
384    #[test]
385    fn test_token_price_failure_stored_with_block() {
386        let token_addr = addr(0xab);
387        let key_str = format!("{token_addr}");
388        let mut store = DerivedData::new();
389        store.set_token_prices(
390            Default::default(),
391            vec![failed(&key_str, FailedItemError::SimulationFailed("sim error".into()))],
392            42,
393            true,
394        );
395        assert_eq!(
396            store.token_price_failure(&token_addr),
397            Some((42, &FailedItemError::SimulationFailed("sim error".into())))
398        );
399        assert_eq!(store.token_price_failure(&addr(0xcd)), None);
400    }
401
402    #[test]
403    fn test_spot_price_failure_stored_with_block() {
404        let key = pair_key("pool1", 0x01, 0x02);
405        let key_str = format!("pool1/{}/{}", addr(0x01), addr(0x02));
406        let mut store = DerivedData::new();
407        store.set_spot_prices(
408            Default::default(),
409            vec![failed(&key_str, FailedItemError::SimulationFailed("sim error".into()))],
410            10,
411            true,
412        );
413        assert_eq!(
414            store.spot_price_failure(&key),
415            Some((10, &FailedItemError::SimulationFailed("sim error".into())))
416        );
417        assert_eq!(store.spot_price_failure(&pair_key("pool1", 0x01, 0x03)), None);
418    }
419
420    #[test]
421    fn test_pool_depth_failure_stored_with_block() {
422        let key: PoolDepthKey = pair_key("pool1", 0x01, 0x02);
423        let key_str = format!("pool1/{}/{}", addr(0x01), addr(0x02));
424        let mut store = DerivedData::new();
425        store.set_pool_depths(
426            Default::default(),
427            vec![failed(&key_str, FailedItemError::SimulationFailed("depth error".into()))],
428            7,
429            true,
430        );
431        assert_eq!(
432            store.pool_depth_failure(&key),
433            Some((7, &FailedItemError::SimulationFailed("depth error".into())))
434        );
435        assert_eq!(store.pool_depth_failure(&pair_key("pool2", 0x01, 0x02)), None);
436    }
437
438    #[test]
439    fn test_rerunning_with_empty_failures_clears_old_reasons() {
440        let key = pair_key("pool1", 0x01, 0x02);
441        let key_str = format!("pool1/{}/{}", addr(0x01), addr(0x02));
442        let mut store = DerivedData::new();
443        store.set_spot_prices(
444            Default::default(),
445            vec![failed(&key_str, FailedItemError::MissingSimulationState)],
446            1,
447            true,
448        );
449        assert!(store.spot_price_failure(&key).is_some());
450
451        // Full re-run with no failures clears the map
452        store.set_spot_prices(Default::default(), vec![], 2, true);
453        assert_eq!(store.spot_price_failure(&key), None);
454    }
455
456    #[test]
457    fn test_clear_token_prices_clears_failure_map() {
458        let token_addr = addr(0xab);
459        let key_str = format!("{token_addr}");
460        let mut store = DerivedData::new();
461        store.set_token_prices(
462            Default::default(),
463            vec![failed(&key_str, FailedItemError::AllSimulationPathsFailed)],
464            1,
465            true,
466        );
467        store.clear_token_prices();
468        assert_eq!(store.token_price_failure(&token_addr), None);
469    }
470
471    #[test]
472    fn test_clear_spot_prices_clears_failure_map() {
473        let key = pair_key("pool1", 0x01, 0x02);
474        let key_str = format!("pool1/{}/{}", addr(0x01), addr(0x02));
475        let mut store = DerivedData::new();
476        store.set_spot_prices(
477            Default::default(),
478            vec![failed(&key_str, FailedItemError::MissingSimulationState)],
479            1,
480            true,
481        );
482        store.clear_spot_prices();
483        assert_eq!(store.spot_price_failure(&key), None);
484    }
485
486    #[test]
487    fn test_clear_pool_depths_clears_failure_map() {
488        let key: PoolDepthKey = pair_key("pool1", 0x01, 0x02);
489        let key_str = format!("pool1/{}/{}", addr(0x01), addr(0x02));
490        let mut store = DerivedData::new();
491        store.set_pool_depths(
492            Default::default(),
493            vec![failed(&key_str, FailedItemError::MissingSpotPrice)],
494            1,
495            true,
496        );
497        store.clear_pool_depths();
498        assert_eq!(store.pool_depth_failure(&key), None);
499    }
500
501    #[test]
502    fn test_incremental_run_preserves_failures_for_unattempted_items() {
503        let key_a = pair_key("pool_a", 0x01, 0x02);
504        let key_a_str = format!("pool_a/{}/{}", addr(0x01), addr(0x02));
505        let key_b = pair_key("pool_b", 0x03, 0x04);
506        let key_b_str = format!("pool_b/{}/{}", addr(0x03), addr(0x04));
507
508        let mut store = DerivedData::new();
509
510        // Full recompute at block 10: both keys fail
511        store.set_spot_prices(
512            Default::default(),
513            vec![
514                failed(&key_a_str, FailedItemError::MissingSimulationState),
515                failed(&key_b_str, FailedItemError::MissingTokenMetadata),
516            ],
517            10,
518            true,
519        );
520        assert_eq!(
521            store.spot_price_failure(&key_a),
522            Some((10, &FailedItemError::MissingSimulationState))
523        );
524        assert_eq!(
525            store.spot_price_failure(&key_b),
526            Some((10, &FailedItemError::MissingTokenMetadata))
527        );
528
529        // Incremental run at block 11: only pool_b is attempted and succeeds
530        let mut prices = SpotPrices::default();
531        prices.insert(key_b.clone(), 1.0);
532        store.set_spot_prices(prices, vec![], 11, false);
533
534        // pool_a was not attempted — failure is preserved from block 10
535        assert_eq!(
536            store.spot_price_failure(&key_a),
537            Some((10, &FailedItemError::MissingSimulationState))
538        );
539        // pool_b succeeded — failure is cleared
540        assert_eq!(store.spot_price_failure(&key_b), None);
541    }
542
543    #[test]
544    fn test_incremental_run_updates_block_on_repeated_failure() {
545        let key = pair_key("pool_a", 0x01, 0x02);
546        let key_str = format!("pool_a/{}/{}", addr(0x01), addr(0x02));
547
548        let mut store = DerivedData::new();
549
550        store.set_spot_prices(
551            Default::default(),
552            vec![failed(&key_str, FailedItemError::MissingSimulationState)],
553            10,
554            true,
555        );
556        assert_eq!(
557            store.spot_price_failure(&key),
558            Some((10, &FailedItemError::MissingSimulationState))
559        );
560
561        // Incremental run at block 11: pool_a fails again with a new error
562        store.set_spot_prices(
563            Default::default(),
564            vec![failed(&key_str, FailedItemError::MissingTokenMetadata)],
565            11,
566            false,
567        );
568        assert_eq!(
569            store.spot_price_failure(&key),
570            Some((11, &FailedItemError::MissingTokenMetadata))
571        );
572    }
573
574    #[test]
575    fn test_clear_all_clears_all_failure_maps() {
576        let token_addr = addr(0xab);
577        let token_str = format!("{token_addr}");
578        let pair = pair_key("pool1", 0x01, 0x02);
579        let pair_str = format!("pool1/{}/{}", addr(0x01), addr(0x02));
580
581        let mut store = DerivedData::new();
582        store.set_token_prices(
583            Default::default(),
584            vec![failed(&token_str, FailedItemError::AllSimulationPathsFailed)],
585            1,
586            true,
587        );
588        store.set_spot_prices(
589            Default::default(),
590            vec![failed(&pair_str, FailedItemError::MissingSimulationState)],
591            1,
592            true,
593        );
594        store.set_pool_depths(
595            Default::default(),
596            vec![failed(&pair_str, FailedItemError::MissingSpotPrice)],
597            1,
598            true,
599        );
600
601        store.clear_all();
602
603        assert_eq!(store.token_price_failure(&token_addr), None);
604        assert_eq!(store.spot_price_failure(&pair), None);
605        assert_eq!(store.pool_depth_failure(&pair), None);
606    }
607}