newton_prover_transporter/
lib.rs1pub mod calculator;
5pub mod config;
7pub mod listener;
9pub mod updater;
11
12use alloy::{primitives::FixedBytes, providers::Provider};
13use calculator::WeightCalculator;
14use config::TransporterConfig;
15use eigensdk::common::get_provider;
16use eyre::Result;
17use listener::{SourceChainListener, TransportTrigger};
18use newton_prover_core::config::NewtonAvsConfig;
19use std::time::Duration;
20use tokio::{sync::mpsc, time};
21use tokio_util::sync::CancellationToken;
22use tracing::{error, info};
23use updater::TableUpdater;
24
25pub use calculator::{OperatorSetQueryResult, WeightCalculator as TransporterWeightCalculator};
26pub use updater::TableUpdater as TransporterTableUpdater;
27
28struct WeightTransportState<P> {
29 trigger_rx: mpsc::Receiver<TransportTrigger>,
30 calculator: WeightCalculator,
31 updater: TableUpdater,
32 table_calculator: alloy::primitives::Address,
33 operator_set_id: u32,
34 source_provider: P,
35}
36
37pub async fn transport_update(
39 calculator: &WeightCalculator,
40 updater: &TableUpdater,
41 table_calculator: alloy::primitives::Address,
42 operator_set_id: u32,
43 provider: &impl Provider,
44) -> Result<()> {
45 let operator_set_query = calculator.query_operator_set(operator_set_id, table_calculator).await?;
46
47 let tables = calculator.calculate_operator_tables(&[operator_set_query]);
48
49 if tables.is_empty() {
50 info!("no operator tables to update");
51 return Ok(());
52 }
53
54 let latest_block = provider.get_block_number().await?;
55 let reference_block = latest_block.saturating_sub(1);
58 let block = provider
59 .get_block_by_number(reference_block.into())
60 .await?
61 .ok_or_else(|| eyre::eyre!("failed to get latest block"))?;
62
63 let reference_timestamp: u32 = block
64 .header
65 .timestamp
66 .try_into()
67 .map_err(|_| eyre::eyre!("block timestamp {} exceeds u32 max", block.header.timestamp))?;
68 let reference_block_number: u32 = reference_block
69 .try_into()
70 .map_err(|_| eyre::eyre!("block number {} exceeds u32 max", reference_block))?;
71
72 let mut leaves: Vec<FixedBytes<32>> = Vec::with_capacity(tables.len());
73 for table in &tables {
74 let table_bytes = calculator.encode_operator_table(table);
75 let leaf = updater.calculate_operator_table_leaf(table_bytes).await?;
76 leaves.push(leaf);
77 }
78
79 let (root, proofs) = calculator.build_merkle_tree(&leaves);
80
81 updater
82 .confirm_global_root(root, reference_timestamp, reference_block_number)
83 .await?;
84
85 for (i, (table, proof)) in tables.iter().zip(proofs.iter()).enumerate() {
86 let table_bytes = calculator.encode_operator_table(table);
87 updater
88 .update_operator_table(reference_timestamp, root, i as u32, proof.clone(), table_bytes)
89 .await?;
90 }
91
92 Ok(())
93}
94
95fn setup_transport(
97 config: &NewtonAvsConfig<TransporterConfig>,
98) -> Result<(WeightCalculator, TableUpdater, alloy::primitives::Address, String)> {
99 let source_chain_id = config.get_source_chain_id();
100 let source_rpc = config
101 .rpc
102 .get(source_chain_id)
103 .ok_or_else(|| eyre::eyre!("no RPC config for source chain {}", source_chain_id))?;
104 let source_rpc_url = source_rpc.http.clone();
105
106 let source_multichain = if source_chain_id != config.chain_id {
107 use newton_prover_core::config::contracts::SourceChainMultichainContracts;
108 SourceChainMultichainContracts::load(source_chain_id, &config.env)?
109 } else {
110 return Err(eyre::eyre!(
111 "source and destination chain are the same - multichain contracts not applicable"
112 ));
113 };
114
115 let calculator = WeightCalculator::new(
116 source_rpc_url.clone(),
117 config.contracts.avs.newton_prover_service_manager,
118 source_multichain.cross_chain_registry,
119 );
120
121 let dest_rpc = config
122 .rpc
123 .get(config.chain_id)
124 .ok_or_else(|| eyre::eyre!("no RPC config for chain {}", config.chain_id))?;
125
126 let dest_multichain = config
127 .contracts
128 .destination_multichain()
129 .map_err(|e| eyre::eyre!("destination_multichain not configured: {}", e))?;
130
131 let updater = TableUpdater::new(
132 config.service.signer.clone(),
133 dest_multichain.operator_table_updater,
134 dest_rpc.http.clone(),
135 )?;
136
137 Ok((calculator, updater, source_multichain.table_calculator, source_rpc_url))
138}
139
140pub async fn sync_once(config: NewtonAvsConfig<TransporterConfig>) -> Result<()> {
145 let (calculator, updater, table_calculator, source_rpc_url) = setup_transport(&config)?;
146 let source_provider = get_provider(&source_rpc_url);
147
148 let source_chain_id = config.get_source_chain_id();
149 let dest_chain_id = config.chain_id;
150
151 info!(
152 source_chain_id = source_chain_id,
153 dest_chain_id = dest_chain_id,
154 "performing one-shot transport sync"
155 );
156
157 let start = std::time::Instant::now();
158 let result = transport_update(
159 &calculator,
160 &updater,
161 table_calculator,
162 config.service.operator_set_id,
163 &source_provider,
164 )
165 .await;
166
167 let duration = start.elapsed().as_secs_f64();
168 newton_prover_metrics::record_transporter_sync_duration(source_chain_id, dest_chain_id, duration);
169 match &result {
170 Ok(()) => newton_prover_metrics::inc_transporter_sync_success(source_chain_id, dest_chain_id),
171 Err(_) => newton_prover_metrics::inc_transporter_sync_failure(source_chain_id, dest_chain_id),
172 }
173
174 result
175}
176
177pub async fn run(config: NewtonAvsConfig<TransporterConfig>, cancellation_token: CancellationToken) -> Result<()> {
179 let (calculator, updater, table_calculator, source_rpc_url) = setup_transport(&config)?;
180
181 let source_chain_id = config.get_source_chain_id();
182 let dest_chain_id = config.chain_id;
183 let source_rpc = config
184 .rpc
185 .get(source_chain_id)
186 .ok_or_else(|| eyre::eyre!("no RPC config for source chain {}", source_chain_id))?;
187 let source_ws_url = source_rpc.ws.clone();
188
189 let listener = SourceChainListener::new(source_ws_url, config.contracts.eigenlayer.allocation_manager);
190 let trigger_rx = listener.start(cancellation_token.clone()).await;
191
192 let source_provider = get_provider(&source_rpc_url);
193
194 let mut state = WeightTransportState {
195 trigger_rx,
196 calculator,
197 updater,
198 table_calculator,
199 operator_set_id: config.service.operator_set_id,
200 source_provider,
201 };
202
203 info!(
207 source_chain_id = source_chain_id,
208 dest_chain_id = dest_chain_id,
209 "performing initial startup sync"
210 );
211 let start = std::time::Instant::now();
212 let result = transport_update(
213 &state.calculator,
214 &state.updater,
215 state.table_calculator,
216 state.operator_set_id,
217 &state.source_provider,
218 )
219 .await;
220 let duration = start.elapsed().as_secs_f64();
221 newton_prover_metrics::record_transporter_sync_duration(source_chain_id, dest_chain_id, duration);
222 match result {
223 Ok(()) => {
224 info!(
225 source_chain_id = source_chain_id,
226 dest_chain_id = dest_chain_id,
227 duration_secs = duration,
228 "initial startup sync completed"
229 );
230 newton_prover_metrics::inc_transporter_sync_success(source_chain_id, dest_chain_id);
231 }
232 Err(e) => {
233 error!(
234 source_chain_id = source_chain_id,
235 dest_chain_id = dest_chain_id,
236 error = %e,
237 "initial startup sync failed, continuing with event loop"
238 );
239 newton_prover_metrics::inc_transporter_sync_failure(source_chain_id, dest_chain_id);
240 }
241 }
242
243 let mut interval = time::interval(Duration::from_secs(config.service.update_frequency));
244 interval.tick().await;
246
247 loop {
248 tokio::select! {
249 _ = cancellation_token.cancelled() => {
250 tracing::info!("received shutdown signal, stopping transporter");
251 break Ok(());
252 }
253
254 _ = interval.tick() => {
255 info!("updating stake weights according to tempo");
256 let start = std::time::Instant::now();
257 let result = transport_update(
258 &state.calculator,
259 &state.updater,
260 state.table_calculator,
261 state.operator_set_id,
262 &state.source_provider,
263 ).await;
264 let duration = start.elapsed().as_secs_f64();
265 newton_prover_metrics::record_transporter_sync_duration(source_chain_id, dest_chain_id, duration);
266 match result {
267 Ok(()) => newton_prover_metrics::inc_transporter_sync_success(source_chain_id, dest_chain_id),
268 Err(e) => {
269 newton_prover_metrics::inc_transporter_sync_failure(source_chain_id, dest_chain_id);
270 error!("failed to transport update: {}", e);
271 }
272 }
273 }
274
275 Some(trigger) = state.trigger_rx.recv() => {
276 let trigger_name = match &trigger {
277 TransportTrigger::OperatorSlashed { .. } => "operator_slashed",
278 TransportTrigger::OperatorAdded { .. } => "operator_added",
279 TransportTrigger::OperatorRemoved { .. } => "operator_removed",
280 TransportTrigger::Cadence => "cadence",
281 };
282 match trigger {
283 TransportTrigger::OperatorSlashed { .. } |
284 TransportTrigger::OperatorAdded { .. } |
285 TransportTrigger::OperatorRemoved { .. } => {
286 info!("forced update triggered by {}", trigger_name);
287 newton_prover_metrics::inc_transporter_event_triggered_sync(source_chain_id, dest_chain_id, trigger_name);
288 let start = std::time::Instant::now();
289 let result = transport_update(
290 &state.calculator,
291 &state.updater,
292 state.table_calculator,
293 state.operator_set_id,
294 &state.source_provider,
295 ).await;
296 let duration = start.elapsed().as_secs_f64();
297 newton_prover_metrics::record_transporter_sync_duration(source_chain_id, dest_chain_id, duration);
298 match result {
299 Ok(()) => newton_prover_metrics::inc_transporter_sync_success(source_chain_id, dest_chain_id),
300 Err(e) => {
301 newton_prover_metrics::inc_transporter_sync_failure(source_chain_id, dest_chain_id);
302 error!("failed to transport update: {}", e);
303 }
304 }
305 interval.reset();
306 }
307 TransportTrigger::Cadence => {}
308 }
309 }
310 }
311 }
312}