Skip to main content

tycho_simulation/evm/
pending.rs

1use std::{
2    collections::{HashMap, HashSet},
3    sync::Arc,
4};
5
6use thiserror::Error;
7use tokio::sync::{mpsc::UnboundedReceiver, watch};
8use tycho_client::feed::{synchronizer::Snapshot, BlockHeader, FeedMessage};
9use tycho_common::{
10    models::{
11        blockchain::{Block, BlockAggregatedChanges, TxInput},
12        protocol::{ComponentBalance, ProtocolComponent, ProtocolComponentStateDelta},
13        Chain,
14    },
15    traits::TxDeltaIndexer,
16    Bytes,
17};
18
19use crate::{
20    evm::decoder::{StreamDecodeError, TychoStreamDecoder},
21    protocol::models::Update,
22};
23
24/// An ephemeral [`Update`] tagged with a caller-supplied label.
25///
26/// The label is an opaque string chosen by the caller to distinguish parallel bundle evaluations
27/// (e.g. bundle ID, strategy name). It is separate from `update.block_number_or_timestamp`,
28/// which carries the target block the bundle was evaluated against.
29pub struct PendingUpdate {
30    pub label: String,
31    pub update: Update,
32}
33
34#[derive(Debug, Error)]
35pub enum PendingError {
36    /// Returned when `generate_pending_update` is called before the parent block of
37    /// `target_block` has been confirmed. Use
38    /// [`subscribe_confirmed_block`](PendingBlockProcessor::subscribe_confirmed_block) to wait
39    /// for the right block before calling.
40    #[error("parent block {needed} not yet confirmed (current: {current})")]
41    ParentNotYetConfirmed { needed: u64, current: u64 },
42    #[error("decoder error: {0}")]
43    Decoder(#[from] StreamDecodeError),
44    #[error("indexer error for extractor '{extractor}': {message}")]
45    Indexer { extractor: String, message: String },
46}
47
48/// Wires one or more [`TxDeltaIndexer`]s to an existing [`TychoStreamDecoder`], enabling
49/// ephemeral simulation of candidate transaction bundles against the correct parent state
50/// for a specific target block.
51///
52/// # Block targeting
53///
54/// Call [`subscribe_confirmed_block`](Self::subscribe_confirmed_block) to obtain a
55/// [`watch::Receiver<u64>`] that fires on every confirmed block. Use it to wait for the
56/// right parent before submitting a bundle:
57///
58/// ```no_run
59/// # async fn example(
60/// #     mut pending: tycho_simulation::evm::pending::PendingBlockProcessor,
61/// #     txs: &[tycho_common::models::blockchain::TxInput],
62/// #     target_header: tycho_client::feed::BlockHeader,
63/// # ) {
64/// pending
65///     .subscribe_confirmed_block()
66///     .wait_for(|&n| n >= target_header.number - 1)
67///     .await
68///     .expect("stream closed");
69/// let update = pending
70///     .generate_pending_update(txs, target_header, "bundle-1".to_string())
71///     .await
72///     .expect("pending update failed");
73/// # }
74/// ```
75///
76/// # Concurrency
77///
78/// `PendingBlockProcessor` is intentionally **not** wrapped in a `Mutex` at construction
79/// time. The confirmed stream forwards blocks via an unbounded channel — it never blocks
80/// waiting for the consumer. Multiple callers can each hold a watch receiver and
81/// independently decide when to acquire whatever external lock they use around
82/// `generate_pending_update`.
83pub struct PendingBlockProcessor {
84    indexers: HashMap<String, Box<dyn TxDeltaIndexer>>,
85    decoder: Arc<TychoStreamDecoder<BlockHeader>>,
86    chain: Chain,
87    /// Block number of the most recently confirmed block applied to `indexers`.
88    current_confirmed_block: u64,
89    /// Notified on every `advance_inner` call; drives `subscribe_confirmed_block`.
90    confirmed_block_tx: watch::Sender<u64>,
91    /// Confirmed blocks forwarded by the stream pipeline.
92    block_rx: UnboundedReceiver<FeedMessage<BlockHeader>>,
93}
94
95impl PendingBlockProcessor {
96    pub(crate) fn new(
97        indexers: HashMap<String, Box<dyn TxDeltaIndexer>>,
98        decoder: Arc<TychoStreamDecoder<BlockHeader>>,
99        chain: Chain,
100        block_rx: UnboundedReceiver<FeedMessage<BlockHeader>>,
101    ) -> Self {
102        let (confirmed_block_tx, _) = watch::channel(0u64);
103        Self { indexers, decoder, chain, current_confirmed_block: 0, confirmed_block_tx, block_rx }
104    }
105
106    /// Returns a receiver that is notified with the latest confirmed block number every time
107    /// a new block is applied.
108    ///
109    /// Typical usage: `.wait_for(|&n| n >= target_block - 1).await` before calling
110    /// [`generate_pending_update`](Self::generate_pending_update).
111    pub fn subscribe_confirmed_block(&self) -> watch::Receiver<u64> {
112        self.confirmed_block_tx.subscribe()
113    }
114
115    /// Returns the block number of the last confirmed block applied to the indexers.
116    pub fn current_confirmed_block(&self) -> u64 {
117        self.current_confirmed_block
118    }
119
120    /// Advances each registered indexer by applying one confirmed block.
121    ///
122    /// Only needed when using the processor standalone (without
123    /// [`ProtocolStreamBuilder::build_with_pending`](crate::evm::stream::ProtocolStreamBuilder::build_with_pending)).
124    /// When using `build_with_pending`, confirmed blocks are forwarded automatically.
125    pub fn advance(&mut self, msg: &FeedMessage<BlockHeader>) -> Result<(), PendingError> {
126        self.advance_inner(msg)
127    }
128
129    /// Simulates `txs` against the confirmed parent state of `target_block` and returns an
130    /// ephemeral [`Update`].
131    ///
132    /// Drains any confirmed blocks that have arrived since the last call, then immediately
133    /// checks whether the parent block (`target_block - 1`) is available. If not, returns
134    /// [`PendingError::ParentNotYetConfirmed`] — **no blocking**. Use
135    /// [`subscribe_confirmed_block`](Self::subscribe_confirmed_block) to wait for the right
136    /// block before calling.
137    ///
138    /// Neither the indexers' internal state nor the decoder's confirmed pool states are
139    /// mutated. Calling this twice with the same arguments returns identical results.
140    ///
141    /// # Parameters
142    /// * `txs` — candidate bundle in execution order; failed transactions are skipped.
143    /// * `target_header` — header of the block being built. Its `number` is used for the
144    ///   parent-block guard; the full header is forwarded to `apply_deltas_ephemeral` so that block
145    ///   number and timestamp are injected into each state delta.
146    /// * `label` — opaque caller-supplied tag stamped onto the returned [`PendingUpdate`]. Use it
147    ///   to associate the result with a specific bundle or evaluation context.
148    pub async fn generate_pending_update(
149        &mut self,
150        txs: &[TxInput],
151        target_header: BlockHeader,
152        label: String,
153    ) -> Result<PendingUpdate, PendingError> {
154        // Drain any confirmed blocks that have arrived since our last call.
155        while let Ok(msg) = self.block_rx.try_recv() {
156            self.advance_inner(&msg)?;
157        }
158
159        let parent = target_header.number.saturating_sub(1);
160        if self.current_confirmed_block < parent {
161            return Err(PendingError::ParentNotYetConfirmed {
162                needed: parent,
163                current: self.current_confirmed_block,
164            });
165        }
166
167        let mut pending_deltas: HashMap<String, BlockAggregatedChanges> = HashMap::new();
168        for (extractor, indexer) in &mut self.indexers {
169            let changes = indexer.generate_deltas(txs);
170            pending_deltas.insert(extractor.clone(), changes);
171        }
172
173        let update = self
174            .decoder
175            .apply_deltas_ephemeral(&pending_deltas, target_header)
176            .await?;
177        Ok(PendingUpdate { label, update })
178    }
179
180    fn advance_inner(&mut self, msg: &FeedMessage<BlockHeader>) -> Result<(), PendingError> {
181        let msg_block = msg
182            .state_msgs
183            .values()
184            .map(|s| s.header.number)
185            .max()
186            .unwrap_or(0);
187
188        for (extractor, state_msg) in &msg.state_msgs {
189            let Some(indexer) = self.indexers.get_mut(extractor) else {
190                continue;
191            };
192
193            if !state_msg.snapshots.states.is_empty() {
194                let block_changes = snapshot_to_block_changes(
195                    extractor,
196                    &state_msg.snapshots,
197                    &state_msg.header,
198                    self.chain,
199                );
200                indexer
201                    .apply_block(&block_changes)
202                    .map_err(|e| PendingError::Indexer {
203                        extractor: extractor.clone(),
204                        message: format!("{e:#}"),
205                    })?;
206            }
207
208            if let Some(deltas) = &state_msg.deltas {
209                indexer
210                    .apply_block(deltas)
211                    .map_err(|e| PendingError::Indexer {
212                        extractor: extractor.clone(),
213                        message: format!("{e:#}"),
214                    })?;
215            }
216        }
217
218        if msg_block > self.current_confirmed_block {
219            self.current_confirmed_block = msg_block;
220            // Receivers that have been dropped are silently ignored.
221            let _ = self.confirmed_block_tx.send(msg_block);
222        }
223        Ok(())
224    }
225}
226
227/// Converts a startup snapshot into a `BlockAggregatedChanges` suitable for
228/// [`TxDeltaIndexer::apply_block`].
229fn snapshot_to_block_changes(
230    extractor: &str,
231    snapshot: &Snapshot,
232    header: &BlockHeader,
233    chain: Chain,
234) -> BlockAggregatedChanges {
235    let ts = chrono::DateTime::from_timestamp(header.timestamp as i64, 0)
236        .unwrap_or_default()
237        .naive_utc();
238    let block = Block {
239        number: header.number,
240        chain,
241        hash: header.hash.clone(),
242        parent_hash: header.parent_hash.clone(),
243        ts,
244    };
245
246    let mut new_protocol_components: HashMap<String, ProtocolComponent> = HashMap::new();
247    let mut state_deltas: HashMap<String, ProtocolComponentStateDelta> = HashMap::new();
248    let mut component_balances: HashMap<String, HashMap<Bytes, ComponentBalance>> = HashMap::new();
249
250    for (id, comp_with_state) in &snapshot.states {
251        new_protocol_components.insert(id.clone(), comp_with_state.component.clone());
252
253        state_deltas.insert(
254            id.clone(),
255            ProtocolComponentStateDelta {
256                component_id: id.clone(),
257                updated_attributes: comp_with_state.state.attributes.clone(),
258                deleted_attributes: HashSet::new(),
259                created_attributes: HashSet::new(),
260            },
261        );
262
263        let token_balances: HashMap<Bytes, ComponentBalance> = comp_with_state
264            .state
265            .balances
266            .iter()
267            .map(|(token, balance)| {
268                (
269                    token.clone(),
270                    ComponentBalance {
271                        token: token.clone(),
272                        balance: balance.clone(),
273                        balance_float: 0.0,
274                        modify_tx: Bytes::default(),
275                        component_id: id.clone(),
276                    },
277                )
278            })
279            .collect();
280        component_balances.insert(id.clone(), token_balances);
281    }
282
283    BlockAggregatedChanges {
284        extractor: extractor.to_string(),
285        chain,
286        block,
287        finalized_block_height: header.number,
288        new_protocol_components,
289        state_deltas,
290        component_balances,
291        ..Default::default()
292    }
293}