tycho_simulation/evm/pending.rs
1use std::{
2 collections::{HashMap, HashSet},
3 sync::Arc,
4};
5
6use thiserror::Error;
7use tokio::sync::{mpsc::UnboundedReceiver, watch};
8use tycho_client::feed::{synchronizer::Snapshot, BlockHeader, FeedMessage};
9use tycho_common::{
10 models::{
11 blockchain::{Block, BlockAggregatedChanges, TxInput},
12 protocol::{ComponentBalance, ProtocolComponent, ProtocolComponentStateDelta},
13 Chain,
14 },
15 traits::TxDeltaIndexer,
16 Bytes,
17};
18
19use crate::{
20 evm::decoder::{StreamDecodeError, TychoStreamDecoder},
21 protocol::models::Update,
22};
23
24/// An ephemeral [`Update`] tagged with a caller-supplied label.
25///
26/// The label is an opaque string chosen by the caller to distinguish parallel bundle evaluations
27/// (e.g. bundle ID, strategy name). It is separate from `update.block_number_or_timestamp`,
28/// which carries the target block the bundle was evaluated against.
29pub struct PendingUpdate {
30 pub label: String,
31 pub update: Update,
32}
33
34#[derive(Debug, Error)]
35pub enum PendingError {
36 /// Returned when `generate_pending_update` is called before the parent block of
37 /// `target_block` has been confirmed. Use
38 /// [`subscribe_confirmed_block`](PendingBlockProcessor::subscribe_confirmed_block) to wait
39 /// for the right block before calling.
40 #[error("parent block {needed} not yet confirmed (current: {current})")]
41 ParentNotYetConfirmed { needed: u64, current: u64 },
42 #[error("decoder error: {0}")]
43 Decoder(#[from] StreamDecodeError),
44 #[error("indexer error for extractor '{extractor}': {message}")]
45 Indexer { extractor: String, message: String },
46}
47
48/// Wires one or more [`TxDeltaIndexer`]s to an existing [`TychoStreamDecoder`], enabling
49/// ephemeral simulation of candidate transaction bundles against the correct parent state
50/// for a specific target block.
51///
52/// # Block targeting
53///
54/// Call [`subscribe_confirmed_block`](Self::subscribe_confirmed_block) to obtain a
55/// [`watch::Receiver<u64>`] that fires on every confirmed block. Use it to wait for the
56/// right parent before submitting a bundle:
57///
58/// ```no_run
59/// # async fn example(
60/// # mut pending: tycho_simulation::evm::pending::PendingBlockProcessor,
61/// # txs: &[tycho_common::models::blockchain::TxInput],
62/// # target_header: tycho_client::feed::BlockHeader,
63/// # ) {
64/// pending
65/// .subscribe_confirmed_block()
66/// .wait_for(|&n| n >= target_header.number - 1)
67/// .await
68/// .expect("stream closed");
69/// let update = pending
70/// .generate_pending_update(txs, target_header, "bundle-1".to_string())
71/// .await
72/// .expect("pending update failed");
73/// # }
74/// ```
75///
76/// # Concurrency
77///
78/// `PendingBlockProcessor` is intentionally **not** wrapped in a `Mutex` at construction
79/// time. The confirmed stream forwards blocks via an unbounded channel — it never blocks
80/// waiting for the consumer. Multiple callers can each hold a watch receiver and
81/// independently decide when to acquire whatever external lock they use around
82/// `generate_pending_update`.
83pub struct PendingBlockProcessor {
84 indexers: HashMap<String, Box<dyn TxDeltaIndexer>>,
85 decoder: Arc<TychoStreamDecoder<BlockHeader>>,
86 chain: Chain,
87 /// Block number of the most recently confirmed block applied to `indexers`.
88 current_confirmed_block: u64,
89 /// Notified on every `advance_inner` call; drives `subscribe_confirmed_block`.
90 confirmed_block_tx: watch::Sender<u64>,
91 /// Confirmed blocks forwarded by the stream pipeline.
92 block_rx: UnboundedReceiver<FeedMessage<BlockHeader>>,
93}
94
95impl PendingBlockProcessor {
96 pub(crate) fn new(
97 indexers: HashMap<String, Box<dyn TxDeltaIndexer>>,
98 decoder: Arc<TychoStreamDecoder<BlockHeader>>,
99 chain: Chain,
100 block_rx: UnboundedReceiver<FeedMessage<BlockHeader>>,
101 ) -> Self {
102 let (confirmed_block_tx, _) = watch::channel(0u64);
103 Self { indexers, decoder, chain, current_confirmed_block: 0, confirmed_block_tx, block_rx }
104 }
105
106 /// Returns a receiver that is notified with the latest confirmed block number every time
107 /// a new block is applied.
108 ///
109 /// Typical usage: `.wait_for(|&n| n >= target_block - 1).await` before calling
110 /// [`generate_pending_update`](Self::generate_pending_update).
111 pub fn subscribe_confirmed_block(&self) -> watch::Receiver<u64> {
112 self.confirmed_block_tx.subscribe()
113 }
114
115 /// Returns the block number of the last confirmed block applied to the indexers.
116 pub fn current_confirmed_block(&self) -> u64 {
117 self.current_confirmed_block
118 }
119
120 /// Advances each registered indexer by applying one confirmed block.
121 ///
122 /// Only needed when using the processor standalone (without
123 /// [`ProtocolStreamBuilder::build_with_pending`](crate::evm::stream::ProtocolStreamBuilder::build_with_pending)).
124 /// When using `build_with_pending`, confirmed blocks are forwarded automatically.
125 pub fn advance(&mut self, msg: &FeedMessage<BlockHeader>) -> Result<(), PendingError> {
126 self.advance_inner(msg)
127 }
128
129 /// Simulates `txs` against the confirmed parent state of `target_block` and returns an
130 /// ephemeral [`Update`].
131 ///
132 /// Drains any confirmed blocks that have arrived since the last call, then immediately
133 /// checks whether the parent block (`target_block - 1`) is available. If not, returns
134 /// [`PendingError::ParentNotYetConfirmed`] — **no blocking**. Use
135 /// [`subscribe_confirmed_block`](Self::subscribe_confirmed_block) to wait for the right
136 /// block before calling.
137 ///
138 /// Neither the indexers' internal state nor the decoder's confirmed pool states are
139 /// mutated. Calling this twice with the same arguments returns identical results.
140 ///
141 /// # Parameters
142 /// * `txs` — candidate bundle in execution order; failed transactions are skipped.
143 /// * `target_header` — header of the block being built. Its `number` is used for the
144 /// parent-block guard; the full header is forwarded to `apply_deltas_ephemeral` so that block
145 /// number and timestamp are injected into each state delta.
146 /// * `label` — opaque caller-supplied tag stamped onto the returned [`PendingUpdate`]. Use it
147 /// to associate the result with a specific bundle or evaluation context.
148 pub async fn generate_pending_update(
149 &mut self,
150 txs: &[TxInput],
151 target_header: BlockHeader,
152 label: String,
153 ) -> Result<PendingUpdate, PendingError> {
154 // Drain any confirmed blocks that have arrived since our last call.
155 while let Ok(msg) = self.block_rx.try_recv() {
156 self.advance_inner(&msg)?;
157 }
158
159 let parent = target_header.number.saturating_sub(1);
160 if self.current_confirmed_block < parent {
161 return Err(PendingError::ParentNotYetConfirmed {
162 needed: parent,
163 current: self.current_confirmed_block,
164 });
165 }
166
167 let mut pending_deltas: HashMap<String, BlockAggregatedChanges> = HashMap::new();
168 for (extractor, indexer) in &mut self.indexers {
169 let changes = indexer.generate_deltas(txs);
170 pending_deltas.insert(extractor.clone(), changes);
171 }
172
173 let update = self
174 .decoder
175 .apply_deltas_ephemeral(&pending_deltas, target_header)
176 .await?;
177 Ok(PendingUpdate { label, update })
178 }
179
180 fn advance_inner(&mut self, msg: &FeedMessage<BlockHeader>) -> Result<(), PendingError> {
181 let msg_block = msg
182 .state_msgs
183 .values()
184 .map(|s| s.header.number)
185 .max()
186 .unwrap_or(0);
187
188 for (extractor, state_msg) in &msg.state_msgs {
189 let Some(indexer) = self.indexers.get_mut(extractor) else {
190 continue;
191 };
192
193 if !state_msg.snapshots.states.is_empty() {
194 let block_changes = snapshot_to_block_changes(
195 extractor,
196 &state_msg.snapshots,
197 &state_msg.header,
198 self.chain,
199 );
200 indexer
201 .apply_block(&block_changes)
202 .map_err(|e| PendingError::Indexer {
203 extractor: extractor.clone(),
204 message: format!("{e:#}"),
205 })?;
206 }
207
208 if let Some(deltas) = &state_msg.deltas {
209 indexer
210 .apply_block(deltas)
211 .map_err(|e| PendingError::Indexer {
212 extractor: extractor.clone(),
213 message: format!("{e:#}"),
214 })?;
215 }
216 }
217
218 if msg_block > self.current_confirmed_block {
219 self.current_confirmed_block = msg_block;
220 // Receivers that have been dropped are silently ignored.
221 let _ = self.confirmed_block_tx.send(msg_block);
222 }
223 Ok(())
224 }
225}
226
227/// Converts a startup snapshot into a `BlockAggregatedChanges` suitable for
228/// [`TxDeltaIndexer::apply_block`].
229fn snapshot_to_block_changes(
230 extractor: &str,
231 snapshot: &Snapshot,
232 header: &BlockHeader,
233 chain: Chain,
234) -> BlockAggregatedChanges {
235 let ts = chrono::DateTime::from_timestamp(header.timestamp as i64, 0)
236 .unwrap_or_default()
237 .naive_utc();
238 let block = Block {
239 number: header.number,
240 chain,
241 hash: header.hash.clone(),
242 parent_hash: header.parent_hash.clone(),
243 ts,
244 };
245
246 let mut new_protocol_components: HashMap<String, ProtocolComponent> = HashMap::new();
247 let mut state_deltas: HashMap<String, ProtocolComponentStateDelta> = HashMap::new();
248 let mut component_balances: HashMap<String, HashMap<Bytes, ComponentBalance>> = HashMap::new();
249
250 for (id, comp_with_state) in &snapshot.states {
251 new_protocol_components.insert(id.clone(), comp_with_state.component.clone());
252
253 state_deltas.insert(
254 id.clone(),
255 ProtocolComponentStateDelta {
256 component_id: id.clone(),
257 updated_attributes: comp_with_state.state.attributes.clone(),
258 deleted_attributes: HashSet::new(),
259 created_attributes: HashSet::new(),
260 },
261 );
262
263 let token_balances: HashMap<Bytes, ComponentBalance> = comp_with_state
264 .state
265 .balances
266 .iter()
267 .map(|(token, balance)| {
268 (
269 token.clone(),
270 ComponentBalance {
271 token: token.clone(),
272 balance: balance.clone(),
273 balance_float: 0.0,
274 modify_tx: Bytes::default(),
275 component_id: id.clone(),
276 },
277 )
278 })
279 .collect();
280 component_balances.insert(id.clone(), token_balances);
281 }
282
283 BlockAggregatedChanges {
284 extractor: extractor.to_string(),
285 chain,
286 block,
287 finalized_block_height: header.number,
288 new_protocol_components,
289 state_deltas,
290 component_balances,
291 ..Default::default()
292 }
293}