fynd_core/derived/computation.rs
1//! Core computation trait and types.
2
3use std::collections::HashSet;
4
5use async_trait::async_trait;
6
7use super::{
8 error::ComputationError,
9 manager::{ChangedComponents, SharedDerivedDataRef},
10};
11use crate::feed::market_data::SharedMarketDataRef;
12
13/// Unique identifier for a computation type.
14///
15/// Used for event discrimination, storage keys, and readiness tracking.
16pub type ComputationId = &'static str;
17
18/// Error when building computation requirements.
19#[derive(Debug, Clone, thiserror::Error)]
20#[error("conflicting requirement: '{id}' cannot be both fresh and stale")]
21pub struct RequirementConflict {
22 /// The computation ID that was added with conflicting freshness.
23 pub(crate) id: ComputationId,
24}
25
26impl RequirementConflict {
27 /// Returns the conflicting computation ID.
28 pub fn id(&self) -> ComputationId {
29 self.id
30 }
31}
32
33/// Requirements for derived data computations.
34///
35/// Each algorithm declares which computations it needs and their freshness requirements:
36///
37/// - `require_fresh`: Data must be from the current block (same block as SharedMarketData). Workers
38/// wait for these computations to complete for the current block before solving.
39///
40/// - `allow_stale`: Data can be from any past block, as long as it has been computed at least once.
41/// Workers only check that the data exists, not that it's from the current block.
42///
43///
44/// # Example
45///
46/// ```ignore
47/// // Token prices don't change much block-to-block, stale is fine
48/// ComputationRequirements::none()
49/// .expect_stale("token_prices")?
50///
51/// // Spot prices must be fresh for accurate routing
52/// ComputationRequirements::none()
53/// .expect_fresh("spot_prices")?
54/// ```
55#[derive(Debug, Clone, Default)]
56pub struct ComputationRequirements {
57 /// Computations that must be from the current block.
58 pub(crate) require_fresh: HashSet<ComputationId>,
59 /// Computations that can use data from any past block.
60 ///
61 /// TODO: Stale data can be dangerous if stale for too long. In the future, associate staleness
62 /// to a block limit might be implemented.
63 pub(crate) allow_stale: HashSet<ComputationId>,
64}
65
66impl ComputationRequirements {
67 /// Returns the set of computations that require fresh data.
68 pub fn fresh_requirements(&self) -> &HashSet<ComputationId> {
69 &self.require_fresh
70 }
71
72 /// Returns the set of computations that allow stale data.
73 pub fn stale_requirements(&self) -> &HashSet<ComputationId> {
74 &self.allow_stale
75 }
76
77 /// Creates empty requirements (no derived data needed).
78 pub fn none() -> Self {
79 Self::default()
80 }
81
82 /// Builder method to add a computation that requires fresh data (current block).
83 ///
84 /// # Errors
85 ///
86 /// Returns `RequirementConflict` if the same ID is already in `allow_stale`.
87 pub fn require_fresh(mut self, id: ComputationId) -> Result<Self, RequirementConflict> {
88 if self.allow_stale.contains(&id) {
89 return Err(RequirementConflict { id });
90 }
91 self.require_fresh.insert(id);
92 Ok(self)
93 }
94
95 /// Builder method to add a computation that allows stale data (any past block).
96 ///
97 /// # Errors
98 ///
99 /// Returns `RequirementConflict` if the same ID is already in `require_fresh`.
100 pub fn allow_stale(mut self, id: ComputationId) -> Result<Self, RequirementConflict> {
101 if self.require_fresh.contains(&id) {
102 return Err(RequirementConflict { id });
103 }
104 self.allow_stale.insert(id);
105 Ok(self)
106 }
107
108 /// Returns true if there are any requirements.
109 pub fn has_requirements(&self) -> bool {
110 !self.require_fresh.is_empty() || !self.allow_stale.is_empty()
111 }
112
113 /// Returns true if the given computation is required (fresh or stale).
114 pub fn is_required(&self, id: ComputationId) -> bool {
115 self.require_fresh.contains(&id) || self.allow_stale.contains(&id)
116 }
117}
118
119/// Typed error for a failed computation item.
120#[derive(Debug, Clone, PartialEq, thiserror::Error)]
121pub enum FailedItemError {
122 #[error("missing simulation state")]
123 MissingSimulationState,
124
125 #[error("missing token metadata")]
126 MissingTokenMetadata,
127
128 #[error("missing spot price")]
129 MissingSpotPrice,
130
131 #[error("extreme decimal mismatch ({from}\u{2192}{to})")]
132 ExtremeDecimalMismatch { from: u32, to: u32 },
133
134 #[error("spot price too small: {0}")]
135 SpotPriceTooSmall(f64),
136
137 #[error("simulation failed: {0}")]
138 SimulationFailed(String),
139
140 #[error("all simulation paths failed")]
141 AllSimulationPathsFailed,
142}
143
144/// A single item that failed during a computation.
145#[derive(Debug, Clone)]
146pub struct FailedItem {
147 /// Human-readable key for the failed item.
148 /// - spot_prices/pool_depths: "component_id/token_in/token_out"
149 /// - token_prices: "token_address"
150 pub key: String,
151 /// Typed error describing the failure.
152 pub error: FailedItemError,
153}
154
155/// Computation result with optional partial failure details.
156///
157/// `Err(...)` = total failure (no usable data).
158/// `Ok(output)` = some data produced; `output.failed_items` may be non-empty.
159#[derive(Debug, Clone)]
160pub struct ComputationOutput<T> {
161 pub data: T,
162 pub failed_items: Vec<FailedItem>,
163}
164
165impl<T> ComputationOutput<T> {
166 pub fn success(data: T) -> Self {
167 Self { data, failed_items: vec![] }
168 }
169
170 pub fn with_failures(data: T, failed_items: Vec<FailedItem>) -> Self {
171 Self { data, failed_items }
172 }
173
174 pub fn has_failures(&self) -> bool {
175 !self.failed_items.is_empty()
176 }
177}
178
179/// Trait for derived data computations.
180///
181/// Implement this trait to define a new type of derived data that can be
182/// computed from market data.
183///
184/// # Design
185///
186/// - No `dependencies()` method - execution order is hardcoded in `ComputationManager`
187/// - Typed `DerivedDataStore` - access previous results via `store.token_prices()` etc.
188/// - Each computation is explicitly added to `ComputationManager`
189/// - Computations receive `Arc<RwLock<>>` references and acquire locks as needed, allowing early
190/// release and granular locking strategies
191///
192/// # Example
193///
194/// ```ignore
195/// pub struct TokenPriceComputation {
196/// gas_token: Address,
197/// }
198///
199/// #[async_trait]
200/// impl DerivedComputation for TokenPriceComputation {
201/// type Output = TokenPrices;
202/// const ID: ComputationId = "token_prices";
203///
204/// async fn compute(
205/// &self,
206/// market: &SharedMarketDataRef,
207/// store: &SharedDerivedDataRef,
208/// changed: &ChangedComponents,
209/// ) -> Result<Self::Output, ComputationError> {
210/// if changed.is_full_recompute {
211/// // Full recompute: process all components
212/// } else {
213/// // Incremental: only process changed components
214/// }
215/// }
216/// }
217/// ```
218#[async_trait]
219pub trait DerivedComputation: Send + Sync + 'static {
220 /// The output type produced by this computation.
221 ///
222 /// Must be `Clone` for storage retrieval and `Send + Sync` for thread safety.
223 type Output: Clone + Send + Sync + 'static;
224
225 /// Unique identifier for this computation.
226 ///
227 /// Used for event discrimination, storage keys, and readiness tracking.
228 const ID: ComputationId;
229
230 /// Computes the derived data from market state.
231 ///
232 /// # Arguments
233 ///
234 /// * `market` - Reference to shared market data (computation acquires lock as needed)
235 /// * `store` - Reference to derived data store (computation acquires lock as needed)
236 /// * `changed` - Information about which components changed, enabling incremental computation
237 ///
238 /// # Returns
239 ///
240 /// The computed output, or an error if computation failed.
241 ///
242 /// # Incremental Computation
243 ///
244 /// Implementations should use `changed` to only recompute data affected by the changes:
245 /// - `changed.is_full_recompute` - If true, recompute everything (startup/lag recovery)
246 /// - `changed.added` - New components to compute
247 /// - `changed.removed` - Components to remove from results
248 /// - `changed.updated` - Components whose state changed
249 ///
250 /// # Lock Management
251 ///
252 /// Computations should acquire locks only when needed and release them as early
253 /// as possible to minimize contention. Use `.read().await` for async lock acquisition.
254 async fn compute(
255 &self,
256 market: &SharedMarketDataRef,
257 store: &SharedDerivedDataRef,
258 changed: &ChangedComponents,
259 ) -> Result<ComputationOutput<Self::Output>, ComputationError>;
260}