Skip to main content

newton_prover_transporter/
lib.rs

1//! transporter crate for cross-chain operator table updates
2
3/// operator table calculation
4pub mod calculator;
5/// transporter configuration
6pub mod config;
7/// source chain event listener
8pub mod listener;
9/// operator table updater
10pub mod updater;
11
12use alloy::{primitives::FixedBytes, providers::Provider};
13use calculator::WeightCalculator;
14use config::TransporterConfig;
15use eigensdk::common::get_provider;
16use eyre::Result;
17use listener::{SourceChainListener, TransportTrigger};
18use newton_prover_core::config::NewtonAvsConfig;
19use std::time::Duration;
20use tokio::{sync::mpsc, time};
21use tokio_util::sync::CancellationToken;
22use tracing::{error, info};
23use updater::TableUpdater;
24
25pub use calculator::{OperatorSetQueryResult, WeightCalculator as TransporterWeightCalculator};
26pub use updater::TableUpdater as TransporterTableUpdater;
27
28struct WeightTransportState<P> {
29    trigger_rx: mpsc::Receiver<TransportTrigger>,
30    calculator: WeightCalculator,
31    updater: TableUpdater,
32    table_calculator: alloy::primitives::Address,
33    operator_set_id: u32,
34    source_provider: P,
35}
36
37/// transport operator table updates to the destination chain
38pub async fn transport_update(
39    calculator: &WeightCalculator,
40    updater: &TableUpdater,
41    table_calculator: alloy::primitives::Address,
42    operator_set_id: u32,
43    provider: &impl Provider,
44) -> Result<()> {
45    let operator_set_query = calculator.query_operator_set(operator_set_id, table_calculator).await?;
46
47    let tables = calculator.calculate_operator_tables(&[operator_set_query]);
48
49    if tables.is_empty() {
50        info!("no operator tables to update");
51        return Ok(());
52    }
53
54    let latest_block = provider.get_block_number().await?;
55    // use latest - 1 to avoid GlobalTableRootInFuture when destination chain's clock
56    // lags the source chain's latest block timestamp
57    let reference_block = latest_block.saturating_sub(1);
58    let block = provider
59        .get_block_by_number(reference_block.into())
60        .await?
61        .ok_or_else(|| eyre::eyre!("failed to get latest block"))?;
62
63    let reference_timestamp: u32 = block
64        .header
65        .timestamp
66        .try_into()
67        .map_err(|_| eyre::eyre!("block timestamp {} exceeds u32 max", block.header.timestamp))?;
68    let reference_block_number: u32 = reference_block
69        .try_into()
70        .map_err(|_| eyre::eyre!("block number {} exceeds u32 max", reference_block))?;
71
72    let mut leaves: Vec<FixedBytes<32>> = Vec::with_capacity(tables.len());
73    for table in &tables {
74        let table_bytes = calculator.encode_operator_table(table);
75        let leaf = updater.calculate_operator_table_leaf(table_bytes).await?;
76        leaves.push(leaf);
77    }
78
79    let (root, proofs) = calculator.build_merkle_tree(&leaves);
80
81    updater
82        .confirm_global_root(root, reference_timestamp, reference_block_number)
83        .await?;
84
85    for (i, (table, proof)) in tables.iter().zip(proofs.iter()).enumerate() {
86        let table_bytes = calculator.encode_operator_table(table);
87        updater
88            .update_operator_table(reference_timestamp, root, i as u32, proof.clone(), table_bytes)
89            .await?;
90    }
91
92    Ok(())
93}
94
95/// Shared setup for transporter operations: builds calculator, updater, and loads source chain state.
96fn setup_transport(
97    config: &NewtonAvsConfig<TransporterConfig>,
98) -> Result<(WeightCalculator, TableUpdater, alloy::primitives::Address, String)> {
99    let source_chain_id = config.get_source_chain_id();
100    let source_rpc = config
101        .rpc
102        .get(source_chain_id)
103        .ok_or_else(|| eyre::eyre!("no RPC config for source chain {}", source_chain_id))?;
104    let source_rpc_url = source_rpc.http.clone();
105
106    let source_multichain = if source_chain_id != config.chain_id {
107        use newton_prover_core::config::contracts::SourceChainMultichainContracts;
108        SourceChainMultichainContracts::load(source_chain_id, &config.env)?
109    } else {
110        return Err(eyre::eyre!(
111            "source and destination chain are the same - multichain contracts not applicable"
112        ));
113    };
114
115    let calculator = WeightCalculator::new(
116        source_rpc_url.clone(),
117        config.contracts.avs.newton_prover_service_manager,
118        source_multichain.cross_chain_registry,
119    );
120
121    let dest_rpc = config
122        .rpc
123        .get(config.chain_id)
124        .ok_or_else(|| eyre::eyre!("no RPC config for chain {}", config.chain_id))?;
125
126    let dest_multichain = config
127        .contracts
128        .destination_multichain()
129        .map_err(|e| eyre::eyre!("destination_multichain not configured: {}", e))?;
130
131    let updater = TableUpdater::new(
132        config.service.signer.clone(),
133        dest_multichain.operator_table_updater,
134        dest_rpc.http.clone(),
135    )?;
136
137    Ok((calculator, updater, source_multichain.table_calculator, source_rpc_url))
138}
139
140/// Perform a single one-shot transport update for a given destination chain config.
141///
142/// This extracts the setup from `run()` and calls `transport_update()` exactly once,
143/// then returns. Useful for ad-hoc syncs triggered by CLI commands.
144pub async fn sync_once(config: NewtonAvsConfig<TransporterConfig>) -> Result<()> {
145    let (calculator, updater, table_calculator, source_rpc_url) = setup_transport(&config)?;
146    let source_provider = get_provider(&source_rpc_url);
147
148    let source_chain_id = config.get_source_chain_id();
149    let dest_chain_id = config.chain_id;
150
151    info!(
152        source_chain_id = source_chain_id,
153        dest_chain_id = dest_chain_id,
154        "performing one-shot transport sync"
155    );
156
157    let start = std::time::Instant::now();
158    let result = transport_update(
159        &calculator,
160        &updater,
161        table_calculator,
162        config.service.operator_set_id,
163        &source_provider,
164    )
165    .await;
166
167    let duration = start.elapsed().as_secs_f64();
168    newton_prover_metrics::record_transporter_sync_duration(source_chain_id, dest_chain_id, duration);
169    match &result {
170        Ok(()) => newton_prover_metrics::inc_transporter_sync_success(source_chain_id, dest_chain_id),
171        Err(_) => newton_prover_metrics::inc_transporter_sync_failure(source_chain_id, dest_chain_id),
172    }
173
174    result
175}
176
177/// run the transporter service
178pub async fn run(config: NewtonAvsConfig<TransporterConfig>, cancellation_token: CancellationToken) -> Result<()> {
179    let (calculator, updater, table_calculator, source_rpc_url) = setup_transport(&config)?;
180
181    let source_chain_id = config.get_source_chain_id();
182    let dest_chain_id = config.chain_id;
183    let source_rpc = config
184        .rpc
185        .get(source_chain_id)
186        .ok_or_else(|| eyre::eyre!("no RPC config for source chain {}", source_chain_id))?;
187    let source_ws_url = source_rpc.ws.clone();
188
189    let listener = SourceChainListener::new(source_ws_url, config.contracts.eigenlayer.allocation_manager);
190    let trigger_rx = listener.start(cancellation_token.clone()).await;
191
192    let source_provider = get_provider(&source_rpc_url);
193
194    let mut state = WeightTransportState {
195        trigger_rx,
196        calculator,
197        updater,
198        table_calculator,
199        operator_set_id: config.service.operator_set_id,
200        source_provider,
201    };
202
203    // Perform an explicit initial sync on startup to ensure destination chains
204    // have up-to-date operator state immediately after deploy, rather than
205    // waiting for the first periodic interval tick.
206    info!(
207        source_chain_id = source_chain_id,
208        dest_chain_id = dest_chain_id,
209        "performing initial startup sync"
210    );
211    let start = std::time::Instant::now();
212    let result = transport_update(
213        &state.calculator,
214        &state.updater,
215        state.table_calculator,
216        state.operator_set_id,
217        &state.source_provider,
218    )
219    .await;
220    let duration = start.elapsed().as_secs_f64();
221    newton_prover_metrics::record_transporter_sync_duration(source_chain_id, dest_chain_id, duration);
222    match result {
223        Ok(()) => {
224            info!(
225                source_chain_id = source_chain_id,
226                dest_chain_id = dest_chain_id,
227                duration_secs = duration,
228                "initial startup sync completed"
229            );
230            newton_prover_metrics::inc_transporter_sync_success(source_chain_id, dest_chain_id);
231        }
232        Err(e) => {
233            error!(
234                source_chain_id = source_chain_id,
235                dest_chain_id = dest_chain_id,
236                error = %e,
237                "initial startup sync failed, continuing with event loop"
238            );
239            newton_prover_metrics::inc_transporter_sync_failure(source_chain_id, dest_chain_id);
240        }
241    }
242
243    let mut interval = time::interval(Duration::from_secs(config.service.update_frequency));
244    // Skip the first immediate tick since we already performed the initial sync above
245    interval.tick().await;
246
247    loop {
248        tokio::select! {
249            _ = cancellation_token.cancelled() => {
250                tracing::info!("received shutdown signal, stopping transporter");
251                break Ok(());
252            }
253
254            _ = interval.tick() => {
255                info!("updating stake weights according to tempo");
256                let start = std::time::Instant::now();
257                let result = transport_update(
258                    &state.calculator,
259                    &state.updater,
260                    state.table_calculator,
261                    state.operator_set_id,
262                    &state.source_provider,
263                ).await;
264                let duration = start.elapsed().as_secs_f64();
265                newton_prover_metrics::record_transporter_sync_duration(source_chain_id, dest_chain_id, duration);
266                match result {
267                    Ok(()) => newton_prover_metrics::inc_transporter_sync_success(source_chain_id, dest_chain_id),
268                    Err(e) => {
269                        newton_prover_metrics::inc_transporter_sync_failure(source_chain_id, dest_chain_id);
270                        error!("failed to transport update: {}", e);
271                    }
272                }
273            }
274
275            Some(trigger) = state.trigger_rx.recv() => {
276                let trigger_name = match &trigger {
277                    TransportTrigger::OperatorSlashed { .. } => "operator_slashed",
278                    TransportTrigger::OperatorAdded { .. } => "operator_added",
279                    TransportTrigger::OperatorRemoved { .. } => "operator_removed",
280                    TransportTrigger::Cadence => "cadence",
281                };
282                match trigger {
283                    TransportTrigger::OperatorSlashed { .. } |
284                    TransportTrigger::OperatorAdded { .. } |
285                    TransportTrigger::OperatorRemoved { .. } => {
286                        info!("forced update triggered by {}", trigger_name);
287                        newton_prover_metrics::inc_transporter_event_triggered_sync(source_chain_id, dest_chain_id, trigger_name);
288                        let start = std::time::Instant::now();
289                        let result = transport_update(
290                            &state.calculator,
291                            &state.updater,
292                            state.table_calculator,
293                            state.operator_set_id,
294                            &state.source_provider,
295                        ).await;
296                        let duration = start.elapsed().as_secs_f64();
297                        newton_prover_metrics::record_transporter_sync_duration(source_chain_id, dest_chain_id, duration);
298                        match result {
299                            Ok(()) => newton_prover_metrics::inc_transporter_sync_success(source_chain_id, dest_chain_id),
300                            Err(e) => {
301                                newton_prover_metrics::inc_transporter_sync_failure(source_chain_id, dest_chain_id);
302                                error!("failed to transport update: {}", e);
303                            }
304                        }
305                        interval.reset();
306                    }
307                    TransportTrigger::Cadence => {}
308                }
309            }
310        }
311    }
312}