Skip to main content

fynd_core/derived/
computation.rs

1//! Core computation trait and types.
2
3use std::collections::HashSet;
4
5use async_trait::async_trait;
6
7use super::{
8    error::ComputationError,
9    manager::{ChangedComponents, SharedDerivedDataRef},
10};
11use crate::feed::market_data::SharedMarketDataRef;
12
13/// Unique identifier for a computation type.
14///
15/// Used for event discrimination, storage keys, and readiness tracking.
16pub type ComputationId = &'static str;
17
18/// Error when building computation requirements.
19#[derive(Debug, Clone, thiserror::Error)]
20#[error("conflicting requirement: '{id}' cannot be both fresh and stale")]
21pub struct RequirementConflict {
22    /// The computation ID that was added with conflicting freshness.
23    pub(crate) id: ComputationId,
24}
25
26impl RequirementConflict {
27    /// Returns the conflicting computation ID.
28    pub fn id(&self) -> ComputationId {
29        self.id
30    }
31}
32
33/// Requirements for derived data computations.
34///
35/// Each algorithm declares which computations it needs and their freshness requirements:
36///
37/// - `require_fresh`: Data must be from the current block (same block as SharedMarketData). Workers
38///   wait for these computations to complete for the current block before solving.
39///
40/// - `allow_stale`: Data can be from any past block, as long as it has been computed at least once.
41///   Workers only check that the data exists, not that it's from the current block.
42///
43///
44/// # Example
45///
46/// ```ignore
47/// // Token prices don't change much block-to-block, stale is fine
48/// ComputationRequirements::none()
49///     .expect_stale("token_prices")?
50///
51/// // Spot prices must be fresh for accurate routing
52/// ComputationRequirements::none()
53///     .expect_fresh("spot_prices")?
54/// ```
55#[derive(Debug, Clone, Default)]
56pub struct ComputationRequirements {
57    /// Computations that must be from the current block.
58    pub(crate) require_fresh: HashSet<ComputationId>,
59    /// Computations that can use data from any past block.
60    ///
61    /// TODO: Stale data can be dangerous if stale for too long. In the future, associate staleness
62    /// to a block limit might be implemented.
63    pub(crate) allow_stale: HashSet<ComputationId>,
64}
65
66impl ComputationRequirements {
67    /// Returns the set of computations that require fresh data.
68    pub fn fresh_requirements(&self) -> &HashSet<ComputationId> {
69        &self.require_fresh
70    }
71
72    /// Returns the set of computations that allow stale data.
73    pub fn stale_requirements(&self) -> &HashSet<ComputationId> {
74        &self.allow_stale
75    }
76
77    /// Creates empty requirements (no derived data needed).
78    pub fn none() -> Self {
79        Self::default()
80    }
81
82    /// Builder method to add a computation that requires fresh data (current block).
83    ///
84    /// # Errors
85    ///
86    /// Returns `RequirementConflict` if the same ID is already in `allow_stale`.
87    pub fn require_fresh(mut self, id: ComputationId) -> Result<Self, RequirementConflict> {
88        if self.allow_stale.contains(&id) {
89            return Err(RequirementConflict { id });
90        }
91        self.require_fresh.insert(id);
92        Ok(self)
93    }
94
95    /// Builder method to add a computation that allows stale data (any past block).
96    ///
97    /// # Errors
98    ///
99    /// Returns `RequirementConflict` if the same ID is already in `require_fresh`.
100    pub fn allow_stale(mut self, id: ComputationId) -> Result<Self, RequirementConflict> {
101        if self.require_fresh.contains(&id) {
102            return Err(RequirementConflict { id });
103        }
104        self.allow_stale.insert(id);
105        Ok(self)
106    }
107
108    /// Returns true if there are any requirements.
109    pub fn has_requirements(&self) -> bool {
110        !self.require_fresh.is_empty() || !self.allow_stale.is_empty()
111    }
112
113    /// Returns true if the given computation is required (fresh or stale).
114    pub fn is_required(&self, id: ComputationId) -> bool {
115        self.require_fresh.contains(&id) || self.allow_stale.contains(&id)
116    }
117}
118
119/// Typed error for a failed computation item.
120#[derive(Debug, Clone, PartialEq, thiserror::Error)]
121pub enum FailedItemError {
122    /// The pool's simulation state was not available in shared market data.
123    #[error("missing simulation state")]
124    MissingSimulationState,
125
126    /// Token metadata (decimals, symbol) was not found for the pool's tokens.
127    #[error("missing token metadata")]
128    MissingTokenMetadata,
129
130    /// A required spot price was not yet computed for this edge.
131    #[error("missing spot price")]
132    MissingSpotPrice,
133
134    /// The decimal difference between two tokens is too large for a meaningful price.
135    #[error("extreme decimal mismatch ({from}\u{2192}{to})")]
136    ExtremeDecimalMismatch {
137        /// Source token decimals.
138        from: u32,
139        /// Target token decimals.
140        to: u32,
141    },
142
143    /// The computed spot price is below the minimum threshold.
144    #[error("spot price too small: {0}")]
145    SpotPriceTooSmall(f64),
146
147    /// Protocol simulation returned an error.
148    #[error("simulation failed: {0}")]
149    SimulationFailed(String),
150
151    /// Every simulation path for this pool failed.
152    #[error("all simulation paths failed")]
153    AllSimulationPathsFailed,
154}
155
156/// A single item that failed during a computation.
157#[derive(Debug, Clone)]
158pub struct FailedItem {
159    /// Human-readable key for the failed item.
160    /// - spot_prices/pool_depths: "component_id/token_in/token_out"
161    /// - token_prices: "token_address"
162    pub key: String,
163    /// Typed error describing the failure.
164    pub error: FailedItemError,
165}
166
167/// Computation result with optional partial failure details.
168///
169/// `Err(...)` = total failure (no usable data).
170/// `Ok(output)` = some data produced; `output.failed_items` may be non-empty.
171#[derive(Debug, Clone)]
172pub struct ComputationOutput<T> {
173    pub data: T,
174    pub failed_items: Vec<FailedItem>,
175}
176
177impl<T> ComputationOutput<T> {
178    pub fn success(data: T) -> Self {
179        Self { data, failed_items: vec![] }
180    }
181
182    pub fn with_failures(data: T, failed_items: Vec<FailedItem>) -> Self {
183        Self { data, failed_items }
184    }
185
186    pub fn has_failures(&self) -> bool {
187        !self.failed_items.is_empty()
188    }
189}
190
191/// Trait for derived data computations.
192///
193/// Implement this trait to define a new type of derived data that can be
194/// computed from market data.
195///
196/// # Design
197///
198/// - No `dependencies()` method - execution order is hardcoded in `ComputationManager`
199/// - Typed `DerivedDataStore` - access previous results via `store.token_prices()` etc.
200/// - Each computation is explicitly added to `ComputationManager`
201/// - Computations receive `Arc<RwLock<>>` references and acquire locks as needed, allowing early
202///   release and granular locking strategies
203///
204/// # Example
205///
206/// ```ignore
207/// pub struct TokenPriceComputation {
208///     gas_token: Address,
209/// }
210///
211/// #[async_trait]
212/// impl DerivedComputation for TokenPriceComputation {
213///     type Output = TokenPrices;
214///     const ID: ComputationId = "token_prices";
215///
216///     async fn compute(
217///         &self,
218///         market: &SharedMarketDataRef,
219///         store: &SharedDerivedDataRef,
220///         changed: &ChangedComponents,
221///     ) -> Result<Self::Output, ComputationError> {
222///         if changed.is_full_recompute {
223///             // Full recompute: process all components
224///         } else {
225///             // Incremental: only process changed components
226///         }
227///     }
228/// }
229/// ```
230#[async_trait]
231pub trait DerivedComputation: Send + Sync + 'static {
232    /// The output type produced by this computation.
233    ///
234    /// Must be `Clone` for storage retrieval and `Send + Sync` for thread safety.
235    type Output: Clone + Send + Sync + 'static;
236
237    /// Unique identifier for this computation.
238    ///
239    /// Used for event discrimination, storage keys, and readiness tracking.
240    const ID: ComputationId;
241
242    /// Computes the derived data from market state.
243    ///
244    /// # Arguments
245    ///
246    /// * `market` - Reference to shared market data (computation acquires lock as needed)
247    /// * `store` - Reference to derived data store (computation acquires lock as needed)
248    /// * `changed` - Information about which components changed, enabling incremental computation
249    ///
250    /// # Returns
251    ///
252    /// The computed output, or an error if computation failed.
253    ///
254    /// # Incremental Computation
255    ///
256    /// Implementations should use `changed` to only recompute data affected by the changes:
257    /// - `changed.is_full_recompute` - If true, recompute everything (startup/lag recovery)
258    /// - `changed.added` - New components to compute
259    /// - `changed.removed` - Components to remove from results
260    /// - `changed.updated` - Components whose state changed
261    ///
262    /// # Lock Management
263    ///
264    /// Computations should acquire locks only when needed and release them as early
265    /// as possible to minimize contention. Use `.read().await` for async lock acquisition.
266    async fn compute(
267        &self,
268        market: &SharedMarketDataRef,
269        store: &SharedDerivedDataRef,
270        changed: &ChangedComponents,
271    ) -> Result<ComputationOutput<Self::Output>, ComputationError>;
272}