newton-aggregator 0.4.18

newton prover aggregator utils
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
//! State-commit orchestrator: 120s tick loop driving the per-chain BLS-gated
//! state root commit.
//!
//! Per `.claude/rules/lessons.md` "State-commit registry reverts classify as
//! poison": every typed `IStateRootCommittable` revert means our local view is
//! stale. On poison the orchestrator aborts the tick and lets the next tick
//! re-read the view fresh — never blind-retry against the same proposal.
//!
//! ## Tick structure
//!
//! 1. Read fresh [`RegistryView`] from [`RegistryReader`], then read fresh
//!    `(operator_set, reference_timestamp)` from
//!    [`OperatorSetSnapshotReader::snapshot`]. Per-tick re-read — never cached
//!    across ticks. This is the `empty_set` detection point if no operators
//!    are visible in the current registry view.
//! 2. Prepare phase: fan out [`StateCommitOperatorClient::get_state_commit_proposal`]
//!    to all operators in parallel and compute the stake-weighted majority
//!    `(new_state_root, da_cert_hash)` tuple using the same
//!    `quorum_threshold_bps` the commit phase enforces. Abort if no majority
//!    forms (`all_unreachable` or `no_majority` flavors).
//! 3. Fetch PCR0 commitment from [`crate::state_commit::Pcr0Provider`].
//! 4. Stamp wall-clock timestamp for the commit's `timestamp` field. Read
//!    here (not earlier) so a stalled prepare phase doesn't ship a stale
//!    timestamp that would trip `TimestampRegression` against a racing
//!    commit.
//! 5. Build [`StateCommit`] via [`build_state_commit`] using the majority root +
//!    DA cert + orchestrator PCR0 + wall-clock timestamp.
//! 6. Compute `keccak256(abi.encode(StateCommit))` digest.
//! 7. Commit phase: call [`StateCommitOperatorClient::sign_state_commit`] per
//!    operator in parallel. Operators that disagree with the digest (e.g.,
//!    PCR0 mismatch against their own enclave) refuse to sign.
//! 8. Aggregate G1 partial signatures into the certificate sigma via
//!    [`StateCommitAggregator`].
//! 9. Submit via [`AvsWriter::commit_state_root`].
//!
//! The orchestrator does not hold its own opinion about
//! `(new_state_root, da_cert_hash)` — the operator majority is authoritative.
//! In a 2f+1 honest-majority Byzantine model the majority IS the system view;
//! a separate orchestrator-private state tree adds trust without buying safety.
//!
//! ## Operational invariants
//!
//! - **Per-tick snapshot re-read.** Every tick calls
//!   [`OperatorSetSnapshotReader::snapshot`] at step 1 to re-read
//!   `(operators, reference_timestamp)`. The snapshot is never cached across
//!   ticks. This makes `empty_set` (snapshot returned zero operators) reflect
//!   the *current* registry view, never a stale one. See `lessons.md`
//!   "State-commit operator-majority disagreement is transient — three flavors".
//! - **Dual-layer panic isolation.** The tick body is wrapped in
//!   `AssertUnwindSafe(...).catch_unwind()` so a panic in one tick aborts that
//!   tick without poisoning the loop; the loop itself runs under
//!   `task_supervisor::spawn_monitored` so any panic that escapes
//!   `catch_unwind` (extremely rare — a panic in `Drop` mid-unwind) surfaces in
//!   tracing instead of disappearing into stderr.
//! - **`MissedTickBehavior::Skip`.** The tick interval is intentionally
//!   non-coalescing: if a tick takes longer than 120s the orchestrator skips
//!   missed ticks rather than firing them back-to-back. Catch-up bursts would
//!   stack against a registry view that has already advanced.
//! - **Single-signer routing.** All `commit_state_root` submissions for a
//!   given chain flow through one [`AvsWriter`] (one ECDSA signer). Multiple
//!   writers sharing one key would race on `eth_getTransactionCount("pending")`
//!   and produce duplicate-nonce reverts. See `lessons.md` "Transaction
//!   submission has multiple silent failure modes".
//!
//! ## Timing budgets
//!
//! | Stage                           | Budget        |
//! |---------------------------------|---------------|
//! | Tick interval                   | 120 s         |
//! | Per-operator HTTP fanout        | 5 s           |
//! | PCR0 lookup (Track 1 stub)      | 5 s           |
//! | Receipt timeout (post-submit)   | 60 s          |
//!
//! The 60s receipt timeout is intentionally half the tick interval: a stuck
//! transaction must release the prepare-phase before the next tick fires,
//! otherwise prepare/commit drift past the registry's view window and every
//! subsequent tick poisons.
//!
//! See `docs/PRIVATE_DATA_STORAGE.md` §6 (Commit Protocol).

use alloy::{
    primitives::{keccak256, Bytes, U256},
    sol_types::SolValue,
};
use async_trait::async_trait;
use eigensdk::types::operator::OperatorId;
use futures_util::{
    stream::{FuturesUnordered, StreamExt},
    FutureExt,
};
use newton_core::state_commit_registry::IStateRootCommittable::StateCommit;
use std::{
    collections::HashMap,
    sync::Arc,
    time::{Duration, SystemTime, UNIX_EPOCH},
};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};

use newton_chainio::avs::writer::AvsWriter;
use newton_core::pcr0_provider::Pcr0Provider;

use crate::state_commit::{
    aggregator::{AggregateRequest, OperatorRecord, StateCommitAggregator},
    error::{from_chainio, StateCommitError},
    operator_client::{OperatorClientError, StateCommitOperatorClient},
    operator_set_reader::{OperatorSetSnapshot, OperatorSetSnapshotReader},
    proposal::build_state_commit,
    registry_view::{RegistryReader, RegistryView},
};

/// Thin submission facade over [`AvsWriter`].
///
/// Extracted as a trait so the orchestrator can be tested without a live RPC
/// provider. The production impl is [`AvsWriterCommitter`]; tests use a
/// hand-rolled fake.
#[async_trait]
pub trait StateCommitWriter: Send + Sync + 'static {
    /// Submit a BLS-certified `StateCommit` to the on-chain `StateCommitRegistry`.
    ///
    /// # Errors
    ///
    /// Returns [`StateCommitError`] translated from [`ChainIoError`] via
    /// [`from_chainio`]. Typed PDS reverts are poison; transport failures are not.
    async fn commit_state_root(&self, commit: StateCommit, bls_certificate: Bytes) -> Result<(), StateCommitError>;
}

/// Production [`StateCommitWriter`] backed by [`AvsWriter`].
#[derive(Debug)]
pub struct AvsWriterCommitter(pub Arc<AvsWriter>);

#[async_trait]
impl StateCommitWriter for AvsWriterCommitter {
    async fn commit_state_root(&self, commit: StateCommit, bls_certificate: Bytes) -> Result<(), StateCommitError> {
        self.0
            .commit_state_root(commit, bls_certificate)
            .await
            .map(|_receipt| ())
            .map_err(from_chainio)
    }
}

/// Strict-majority subset of [`crate::state_commit::operator_client::OperatorProposal`].
///
/// PCR0 is intentionally omitted — operators contribute their own PCR0 in
/// Prepare for visibility, but the orchestrator's [`Pcr0Provider`] is the
/// authoritative source for the committed `pcr0Commitment`. Operators verify
/// the full `keccak256(abi.encode(StateCommit))` digest in commit phase, so a
/// PCR0 mismatch surfaces as digest refusal there.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
struct MajorityProposal {
    new_state_root: alloy::primitives::B256,
    da_cert_hash: alloy::primitives::B256,
}

/// An operator identity exposed to the orchestrator for fanout RPC calls.
#[derive(Clone)]
pub struct OperatorEntry {
    /// EigenLayer 32-byte operator identifier.
    pub operator_id: OperatorId,
    /// Stake weight for quorum threshold calculation.
    pub stake: alloy::primitives::U256,
    /// BLS G1 public key (APK aggregation and individual sig association).
    pub g1_pubkey: eigensdk::crypto_bls::BlsG1Point,
    /// BLS G2 public key (G2 APK aggregation in the certificate).
    pub g2_pubkey: eigensdk::crypto_bls::BlsG2Point,
}

impl std::fmt::Debug for OperatorEntry {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("OperatorEntry")
            .field("operator_id", &hex::encode(self.operator_id))
            .field("stake", &self.stake)
            .finish()
    }
}

/// State-commit orchestrator.
///
/// Drives the 120s tick loop for one chain. Holds `Arc` references to all
/// dependencies so it can be spawned behind `tokio::spawn(Arc::clone(&orch).run(…))`.
///
/// The operator set and `referenceTimestamp` are NOT pinned at construction.
/// Each tick re-reads them through [`OperatorSetSnapshotReader::snapshot`],
/// so transporter syncs and operator (de)registrations take effect within
/// one 120s cycle without restarting the orchestrator.
pub struct StateCommitOrchestrator {
    chain_id: u64,
    interval: Duration,
    registry_reader: Arc<dyn RegistryReader>,
    pcr0_provider: Arc<dyn Pcr0Provider>,
    operator_client: Arc<dyn StateCommitOperatorClient>,
    aggregator: Arc<StateCommitAggregator>,
    writer: Arc<dyn StateCommitWriter>,
    /// Per-tick atomic snapshot of `(operator_set, reference_timestamp)`.
    /// Production impl (NEWT-1116) composes
    /// `ViewBN254CertificateVerifier::latestReferenceTimestamp` with
    /// `AvsRegistryServiceCaller`. Until then, the dev-stub binary wires
    /// [`crate::state_commit::StubOperatorSetSnapshotReader`] from config.
    operator_set_reader: Arc<dyn OperatorSetSnapshotReader>,
    /// Provider for the canonical on-chain operator-table snapshot
    /// (`BN254TableCalculator.getOperatorInfos` + `getOperatorSetInfo` +
    /// cert-verifier `latestReferenceTimestamp`). The aggregator builds
    /// non-signer Merkle witnesses against this snapshot's `operators` list
    /// for ELIP-008 partial-attendance certificates.
    ///
    /// Today fetched per-tick; reactive refresh on `NewGlobalTableRoot` is
    /// follow-up work.
    operator_table_provider: Arc<dyn crate::state_commit::operator_table::OperatorTableProvider>,
    /// Operator set the orchestrator commits state for. Threaded through to
    /// `OperatorTableProvider::fetch_snapshot` per tick.
    operator_set: newton_core::bn254_certificate_verifier::ViewBN254CertificateVerifier::OperatorSet,
}

impl std::fmt::Debug for StateCommitOrchestrator {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("StateCommitOrchestrator")
            .field("chain_id", &self.chain_id)
            .field("interval", &self.interval)
            .finish()
    }
}

impl StateCommitOrchestrator {
    /// Construct the orchestrator.
    #[allow(clippy::too_many_arguments)]
    pub fn new(
        chain_id: u64,
        interval: Duration,
        registry_reader: Arc<dyn RegistryReader>,
        pcr0_provider: Arc<dyn Pcr0Provider>,
        operator_client: Arc<dyn StateCommitOperatorClient>,
        aggregator: Arc<StateCommitAggregator>,
        writer: Arc<dyn StateCommitWriter>,
        operator_set_reader: Arc<dyn OperatorSetSnapshotReader>,
        operator_table_provider: Arc<dyn crate::state_commit::operator_table::OperatorTableProvider>,
        operator_set: newton_core::bn254_certificate_verifier::ViewBN254CertificateVerifier::OperatorSet,
    ) -> Self {
        Self {
            chain_id,
            interval,
            registry_reader,
            pcr0_provider,
            operator_client,
            aggregator,
            writer,
            operator_set_reader,
            operator_table_provider,
            operator_set,
        }
    }

    /// Run the 120s tick loop until `shutdown` is cancelled.
    ///
    /// Each tick is isolated: panics are caught via `catch_unwind` and the loop
    /// continues. Poison errors abort the tick without retry per
    /// `.claude/rules/lessons.md`; the next tick re-reads the view.
    pub async fn run(self: Arc<Self>, shutdown: CancellationToken) {
        let mut ticker = tokio::time::interval(self.interval);
        // Skip missed ticks rather than firing multiple ticks to catch up.
        ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

        loop {
            tokio::select! {
                biased;
                _ = shutdown.cancelled() => {
                    info!(chain_id = self.chain_id, "state-commit orchestrator shutting down");
                    return;
                }
                _ = ticker.tick() => {
                    let this = Arc::clone(&self);
                    // catch_unwind to isolate panics per .claude/rules/rust.md
                    let result = std::panic::AssertUnwindSafe(this.tick())
                        .catch_unwind()
                        .await;
                    match result {
                        Ok(Ok(())) => {}
                        Ok(Err(ref e)) if e.is_poison() => {
                            warn!(
                                chain_id = self.chain_id,
                                error = %e,
                                "state-commit tick aborted on poison error; next tick will re-read view"
                            );
                        }
                        Ok(Err(e)) => {
                            error!(chain_id = self.chain_id, error = %e, "state-commit tick failed");
                        }
                        Err(_) => {
                            error!(chain_id = self.chain_id, "state-commit tick panicked; loop continues");
                        }
                    }
                }
            }
        }
    }

    /// Execute one state-commit tick.
    // Large error size: same allow as proposal.rs — fires ~once/120s.
    #[allow(clippy::result_large_err)]
    async fn tick(&self) -> Result<(), StateCommitError> {
        // Step 1: fresh registry view.
        let view: RegistryView = self.registry_reader.read_view().await?;
        let next_seq = view.sequence_no + 1;

        // Step 1b: fresh operator-set snapshot. `(operator_set, reference_timestamp)`
        // is one atomic read because the BN254 certificate references operators
        // *as of* `referenceTimestamp` — splitting across two unsynchronized
        // sources risks committing against an APK that does not match the cert
        // verifier at that timestamp.
        let OperatorSetSnapshot {
            operators,
            reference_timestamp,
        } = self.operator_set_reader.snapshot().await?;

        debug!(
            chain_id = self.chain_id,
            sequence_no = view.sequence_no,
            next_seq,
            operator_count = operators.len(),
            reference_timestamp,
            "state-commit tick: view + operator-set read"
        );

        // Step 2: prepare phase — fan out get_state_commit_proposal in parallel
        // and compute the stake-weighted majority (state_root, da_cert_hash)
        // using the same quorum threshold the commit phase enforces. The
        // operator quorum is authoritative; the orchestrator does not hold its
        // own opinion about the next state root.
        let majority = self.gather_majority_proposal(&operators, next_seq).await?;

        // Step 3: PCR0 from the orchestrator's own provider. NEWT-1116 will
        // wire `EnclavePcr0Provider`; for now operators verify the full commit
        // digest in commit phase, so a PCR0 mismatch surfaces as digest
        // refusal rather than as a silent commit of the wrong PCR0.
        let pcr0 = self
            .pcr0_provider
            .pcr0_commitment()
            .await
            .map_err(|e| StateCommitError::Pcr0Lookup(e.to_string()))?;

        // Step 4: wall clock.
        let now_ts = SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .map_err(|e| StateCommitError::ClockSkew(e.to_string()))?
            .as_secs();

        // Step 5: build canonical commit using majority root + DA cert.
        let commit = build_state_commit(&view, majority.new_state_root, majority.da_cert_hash, pcr0, now_ts)?;

        // Step 6: compute digest for commit phase.
        let digest = keccak256(commit.abi_encode());

        // Step 7: commit phase — fanout sign_state_commit in parallel. Pass
        // `reference_timestamp` so each operator can derive the EIP-712-typed
        // `signableDigest` that the on-chain `BN254CertificateVerifier`
        // pairing-checks against.
        let signatures = self
            .commit_phase(&operators, digest, &commit, next_seq, reference_timestamp)
            .await?;

        // Step 8: aggregate.
        let operator_records: Vec<OperatorRecord> = operators
            .iter()
            .map(|e| OperatorRecord {
                operator_id: e.operator_id,
                g1_pubkey: e.g1_pubkey.clone(),
                g2_pubkey: e.g2_pubkey.clone(),
                stake: e.stake,
            })
            .collect();

        // Fetch the canonical operator-table snapshot for non-signer Merkle
        // witnesses. Per-tick fetch (no caching yet) — the cost is one RPC
        // roundtrip per 120s commit. Reactive cache refresh on
        // `NewGlobalTableRoot` is a follow-up. Treat fetch failure as
        // transient — abort tick, retry next interval against fresh state.
        let operator_table_snapshot = match self
            .operator_table_provider
            .fetch_snapshot(self.operator_set.clone())
            .await
        {
            Ok(snap) => Arc::new(snap),
            Err(e) => {
                warn!(
                    chain_id = self.chain_id,
                    sequence_no = next_seq,
                    error = %e,
                    "state-commit: operator-table snapshot fetch failed; aborting tick"
                );
                return Err(StateCommitError::AggregationFailed(format!(
                    "operator-table snapshot fetch: {e}"
                )));
            }
        };

        let cert_bytes = self.aggregator.aggregate(AggregateRequest {
            digest,
            signatures,
            operator_set: operator_records,
            reference_timestamp,
            operator_table_snapshot,
        })?;

        // Step 9: submit.
        self.writer.commit_state_root(commit.clone(), cert_bytes).await?;

        info!(
            chain_id = self.chain_id,
            sequence_no = commit.sequenceNo,
            new_state_root = %commit.newStateRoot,
            "state commit submitted"
        );

        Ok(())
    }

    /// Prepare phase: fan out `get_state_commit_proposal` to all operators in
    /// parallel and return the stake-weighted majority
    /// `(new_state_root, da_cert_hash)`.
    ///
    /// Operators report a `(new_state_root, da_cert_hash, pcr0_commitment)`
    /// triple, but only `(new_state_root, da_cert_hash)` participates in the
    /// majority calculation here — PCR0 is contributed by the orchestrator's
    /// own [`Pcr0Provider`] and verified end-to-end by every operator in
    /// commit phase via the digest match.
    ///
    /// The threshold predicate is **stake-weighted**, not operator-count, and
    /// uses the same `quorum_threshold_bps` configured on
    /// [`StateCommitAggregator`] — a single source of truth so a candidate
    /// that clears the prepare-phase threshold also clears the commit-phase
    /// verifier and the on-chain
    /// `BN254CertificateVerifier::isCertificateValidByStakeThreshold`. Per the
    /// `lessons.md` "Per-chain resources must flow from the task's chain"
    /// rule, both phases must observe the same threshold value to avoid the
    /// false-positive class where a count-majority of low-stake operators
    /// admits a candidate that fails on-chain stake quorum.
    ///
    /// Aborts the tick with [`StateCommitError::OperatorDisagreement`] in
    /// three distinct failure modes:
    /// 1. The operator set is empty OR `total_stake == 0` (caller
    ///    misconfiguration; logged as `empty_set` flavor).
    /// 2. Every operator response errored — transport, timeout, malformed
    ///    proposal (network availability issue; logged as `all_unreachable`
    ///    flavor).
    /// 3. Operators responded but no `(state_root, da_cert_hash)` tuple
    ///    accumulates enough stake to clear the threshold (genuine
    ///    state-tree divergence; logged with `matched_stake`,
    ///    `total_stake`, and `distinct_proposals`; flavor `no_majority`).
    ///
    /// None of these are poison — the next tick re-reads the registry
    /// view and tries again. See `log_commit_state_root_error` in
    /// `crates/chainio/src/avs/writer.rs` for the poison classification
    /// of typed `IStateRootCommittable` reverts (which is a different
    /// failure surface).
    #[allow(clippy::result_large_err)]
    async fn gather_majority_proposal(
        &self,
        operators: &[OperatorEntry],
        next_seq: u64,
    ) -> Result<MajorityProposal, StateCommitError> {
        // Build per-operator stake lookup and total stake. `total_stake.is_zero()`
        // is functionally indistinguishable from `operators.is_empty()` — both
        // mean no operator-set weight to majority over — so they share the
        // `empty_set` flavor.
        let mut op_stake: HashMap<OperatorId, U256> = HashMap::with_capacity(operators.len());
        let mut total_stake = U256::ZERO;
        for entry in operators {
            op_stake.insert(entry.operator_id, entry.stake);
            total_stake += entry.stake;
        }

        if operators.is_empty() || total_stake.is_zero() {
            warn!(
                chain_id = self.chain_id,
                sequence_no = next_seq,
                operator_count = operators.len(),
                total_stake = %total_stake,
                "state-commit prepare: no operator stake to majority over"
            );
            newton_metric::inc_state_commit_disagreement_total(
                self.chain_id,
                newton_metric::state_commit_disagreement_flavor::EMPTY_SET,
            );
            return Err(StateCommitError::OperatorDisagreement { sequence_no: next_seq });
        }

        let mut futs = FuturesUnordered::new();
        for entry in operators {
            let client = Arc::clone(&self.operator_client);
            let op_id = entry.operator_id;
            let seq = next_seq;
            futs.push(async move { (op_id, client.get_state_commit_proposal(&op_id, seq).await) });
        }

        // Tally `(state_root, da_cert_hash)` tuples by **cumulative stake**, not
        // operator count. This matches the on-chain
        // `BN254CertificateVerifier::isCertificateValidByStakeThreshold`
        // predicate the commit phase will face, and uses the same
        // `quorum_threshold_bps` configured on `StateCommitAggregator` —
        // single source of truth keeps prepare-phase admission and
        // commit-phase verification aligned. A count-based predicate could
        // pick a candidate that fails on-chain stake quorum (e.g., a
        // count-majority of low-stake operators locking out a high-stake
        // honest minority), wasting a 120s tick.
        let mut tally: HashMap<MajorityProposal, U256> = HashMap::new();
        let mut total = 0usize;
        let mut success_count = 0usize;

        while let Some((op_id, result)) = futs.next().await {
            total += 1;
            match result {
                Ok(proposal) => {
                    success_count += 1;
                    let stake = op_stake.get(&op_id).copied().unwrap_or(U256::ZERO);
                    let key = MajorityProposal {
                        new_state_root: proposal.new_state_root,
                        da_cert_hash: proposal.da_cert_hash,
                    };
                    *tally.entry(key).or_insert(U256::ZERO) += stake;
                }
                Err(OperatorClientError::Transport { .. }) | Err(OperatorClientError::Timeout { .. }) => {
                    warn!(
                        chain_id = self.chain_id,
                        operator_id = %hex::encode(op_id),
                        "state-commit prepare: operator unreachable, skipping"
                    );
                }
                Err(e) => {
                    warn!(
                        chain_id = self.chain_id,
                        operator_id = %hex::encode(op_id),
                        error = %e,
                        "state-commit prepare: operator error, skipping"
                    );
                }
            }
        }

        // Stake-weighted threshold check:
        //   leader_stake * 10_000 >= total_stake * threshold_bps
        // The `none` (no responses tallied) branch is `all_unreachable`; the
        // `Some(_)` non-clearing branch is `no_majority`.
        //
        // `error_count` and `matched_stake` let dashboards distinguish the two
        // shapes operators can produce: high `error_count` means operators are
        // silent (transport/timeout — investigate availability), low
        // `error_count` with `distinct_proposals > 1` means operators disagree
        // (state-tree divergence — investigate replica drift).
        let threshold_bps = self.aggregator.quorum_threshold_bps();
        let matched_stake: U256 = tally.values().copied().fold(U256::ZERO, |acc, s| acc + s);
        let error_count = total.saturating_sub(success_count);

        let leader = tally.iter().max_by_key(|(_, stake)| *stake);
        match leader {
            Some((proposal, leader_stake))
                if *leader_stake * U256::from(10_000u64) >= total_stake * U256::from(threshold_bps as u64) =>
            {
                debug!(
                    chain_id = self.chain_id,
                    sequence_no = next_seq,
                    leader_stake = %leader_stake,
                    matched_stake = %matched_stake,
                    total_stake = %total_stake,
                    threshold_bps,
                    total,
                    error_count,
                    new_state_root = %proposal.new_state_root,
                    "state-commit prepare: stake-weighted majority reached"
                );
                Ok(*proposal)
            }
            None => {
                // Every operator response errored (transport, timeout, malformed).
                // Distinct from "no majority": there are no proposals to even
                // disagree about. Surfaces as a network/availability issue, not
                // an operator-state-divergence issue.
                warn!(
                    chain_id = self.chain_id,
                    sequence_no = next_seq,
                    operator_count = operators.len(),
                    responded = total,
                    error_count,
                    total_stake = %total_stake,
                    threshold_bps,
                    "state-commit prepare: all operators unreachable or errored"
                );
                newton_metric::inc_state_commit_disagreement_total(
                    self.chain_id,
                    newton_metric::state_commit_disagreement_flavor::ALL_UNREACHABLE,
                );
                Err(StateCommitError::OperatorDisagreement { sequence_no: next_seq })
            }
            Some(_) => {
                // Operators responded but no `(state_root, da_cert_hash)` tuple
                // wins enough stake-weighted votes to clear the threshold. This
                // is the genuine disagreement case — operators are reachable
                // but their local state trees diverge enough that no proposal
                // gathers stake-quorum support.
                warn!(
                    chain_id = self.chain_id,
                    sequence_no = next_seq,
                    operator_count = operators.len(),
                    responded = total,
                    error_count,
                    matched_stake = %matched_stake,
                    total_stake = %total_stake,
                    threshold_bps,
                    distinct_proposals = tally.len(),
                    "state-commit prepare: no stake-weighted majority among responses"
                );
                newton_metric::inc_state_commit_disagreement_total(
                    self.chain_id,
                    newton_metric::state_commit_disagreement_flavor::NO_MAJORITY,
                );
                Err(StateCommitError::OperatorDisagreement { sequence_no: next_seq })
            }
        }
    }

    /// Commit phase: fan out `sign_state_commit` to all operators in parallel.
    ///
    /// Returns the list of (operator_id, G1 partial sig) pairs from operators
    /// that successfully signed. Transient failures (timeout, transport) are
    /// skipped; the quorum check in the aggregator determines if enough signed.
    async fn commit_phase(
        &self,
        operators: &[OperatorEntry],
        digest: alloy::primitives::B256,
        commit: &newton_core::state_commit_registry::IStateRootCommittable::StateCommit,
        sequence_no: u64,
        reference_timestamp: u32,
    ) -> Result<Vec<(OperatorId, eigensdk::crypto_bls::BlsG1Point)>, StateCommitError> {
        let mut futs = FuturesUnordered::new();
        for entry in operators {
            let client = Arc::clone(&self.operator_client);
            let op_id = entry.operator_id;
            let commit = commit.clone();
            futs.push(async move {
                (
                    op_id,
                    client
                        .sign_state_commit(&op_id, digest, &commit, reference_timestamp)
                        .await,
                )
            });
        }

        let mut signatures = Vec::new();
        while let Some((op_id, result)) = futs.next().await {
            match result {
                Ok(sig) => {
                    signatures.push((op_id, sig));
                }
                Err(OperatorClientError::DigestDisagreement { .. }) => {
                    warn!(
                        chain_id = self.chain_id,
                        sequence_no,
                        operator_id = %hex::encode(op_id),
                        "state-commit commit: operator refused to sign (digest disagreement)"
                    );
                    // Per spec: if any operator disagrees in commit phase, abort tick.
                    return Err(StateCommitError::OperatorDisagreement { sequence_no });
                }
                Err(OperatorClientError::Transport { .. }) | Err(OperatorClientError::Timeout { .. }) => {
                    warn!(
                        chain_id = self.chain_id,
                        sequence_no,
                        operator_id = %hex::encode(op_id),
                        "state-commit commit: operator unreachable, skipping"
                    );
                }
                Err(e) => {
                    warn!(
                        chain_id = self.chain_id,
                        sequence_no,
                        operator_id = %hex::encode(op_id),
                        error = %e,
                        "state-commit commit: operator error, skipping"
                    );
                }
            }
        }

        Ok(signatures)
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::state_commit::{
        aggregator::StateCommitAggregator,
        operator_client::{tests::FakeStateCommitOperatorClient, OperatorClientError, OperatorProposal},
        operator_set_reader::StubOperatorSetSnapshotReader,
        registry_view::RegistryView,
    };
    use alloy::primitives::{B256, U256};
    use ark_bn254::{G1Affine, G2Affine};
    use async_trait::async_trait;
    use eigensdk::crypto_bls::{BlsG1Point, BlsG2Point};
    use newton_core::pcr0_provider::StubPcr0Provider;
    use std::sync::Arc;
    use tokio_util::sync::CancellationToken;

    fn b32(byte: u8) -> B256 {
        B256::repeat_byte(byte)
    }

    // --- Fakes ---

    struct FakeRegistryReader {
        view: RegistryView,
        error: bool,
    }

    impl FakeRegistryReader {
        fn ok(view: RegistryView) -> Arc<Self> {
            Arc::new(Self { view, error: false })
        }
        fn failing() -> Arc<Self> {
            Arc::new(Self {
                view: RegistryView {
                    sequence_no: 0,
                    state_root: B256::ZERO,
                    last_commit_timestamp: 0,
                },
                error: true,
            })
        }
    }

    #[async_trait]
    impl RegistryReader for FakeRegistryReader {
        async fn read_view(&self) -> Result<RegistryView, StateCommitError> {
            if self.error {
                return Err(StateCommitError::RegistryNotConfigured);
            }
            Ok(self.view)
        }
    }

    /// Fake writer that always returns `Ok(())` — lets tick() run to completion.
    struct FakeWriter;

    #[async_trait]
    impl StateCommitWriter for FakeWriter {
        async fn commit_state_root(
            &self,
            _commit: StateCommit,
            _bls_certificate: Bytes,
        ) -> Result<(), StateCommitError> {
            Ok(())
        }
    }

    fn fake_writer() -> Arc<dyn StateCommitWriter> {
        Arc::new(FakeWriter)
    }

    fn make_operator_entry(id_byte: u8) -> OperatorEntry {
        let mut id = OperatorId::default();
        id[0] = id_byte;
        OperatorEntry {
            operator_id: id,
            stake: U256::from(1_000u64),
            g1_pubkey: BlsG1Point::new(G1Affine::identity()),
            g2_pubkey: BlsG2Point::new(G2Affine::identity()),
        }
    }

    fn baseline_view() -> RegistryView {
        RegistryView {
            sequence_no: 5,
            state_root: b32(0xab),
            last_commit_timestamp: 1_000,
        }
    }

    fn build_orchestrator(
        reader: Arc<dyn RegistryReader>,
        op_client: Arc<dyn StateCommitOperatorClient>,
        operators: Vec<OperatorEntry>,
    ) -> Arc<StateCommitOrchestrator> {
        Arc::new(StateCommitOrchestrator::new(
            1,
            Duration::from_secs(120),
            reader,
            Arc::new(StubPcr0Provider::new()),
            op_client,
            Arc::new(StateCommitAggregator::new(1, 6_700)),
            fake_writer(),
            Arc::new(StubOperatorSetSnapshotReader::new(operators, 42)),
            crate::state_commit::operator_table::fake::FakeOperatorTableProvider::from_operators(42, Vec::new()),
            newton_core::bn254_certificate_verifier::ViewBN254CertificateVerifier::OperatorSet {
                avs: alloy::primitives::Address::ZERO,
                id: 0,
            },
        ))
    }

    // --- Tests ---

    #[tokio::test]
    async fn tick_happy_path_reaches_aggregation() {
        // With 2 operators each at 1000 stake, both signing produces 10_000 bps
        // signed share against a 6_700 bps threshold; FakeWriter returns Ok so
        // the full tick reaches submit and returns Ok(()).
        let view = baseline_view();
        let client = Arc::new(FakeStateCommitOperatorClient::new());
        let mut op1 = OperatorId::default();
        op1[0] = 0x01;
        let mut op2 = OperatorId::default();
        op2[0] = 0x02;
        let local_root = b32(0xcd);
        let local_da = b32(0xef);
        let local_pcr0 = *newton_core::pcr0_sentinels::STATE_COMMIT_STUB_PCR0_HASH;

        let proposal = OperatorProposal {
            new_state_root: local_root,
            da_cert_hash: local_da,
            pcr0_commitment: local_pcr0,
        };
        // Both operators agree on (state_root, da_cert_hash) — stake-weighted majority forms.
        client.set_proposal_for(&op1, proposal);
        client.set_proposal_for(&op2, proposal);

        let orch = build_orchestrator(
            FakeRegistryReader::ok(view),
            client,
            vec![make_operator_entry(0x01), make_operator_entry(0x02)],
        );

        let result = orch.tick().await;
        assert!(result.is_ok(), "expected Ok, got {result:?}");
    }

    #[tokio::test]
    async fn tick_aborts_on_view_read_error() {
        let client = Arc::new(FakeStateCommitOperatorClient::new());
        let orch = build_orchestrator(FakeRegistryReader::failing(), client, vec![]);

        let result = orch.tick().await;
        // RegistryNotConfigured is not poison.
        assert!(matches!(result, Err(StateCommitError::RegistryNotConfigured)));
        assert!(!StateCommitError::RegistryNotConfigured.is_poison());
    }

    #[tokio::test]
    async fn tick_aborts_on_operator_disagreement_in_prepare() {
        let local_root = b32(0xcd);
        let local_da = b32(0xef);
        let local_pcr0 = *newton_core::pcr0_sentinels::STATE_COMMIT_STUB_PCR0_HASH;

        let client = Arc::new(FakeStateCommitOperatorClient::new());
        let mut op1 = OperatorId::default();
        op1[0] = 0x01;
        // op1 agrees
        client.set_proposal_for(
            &op1,
            OperatorProposal {
                new_state_root: local_root,
                da_cert_hash: local_da,
                pcr0_commitment: local_pcr0,
            },
        );
        // op2 returns a different root → majority fails (1 agree, 1 disagree, 1*2 <= 2)
        let mut op2 = OperatorId::default();
        op2[0] = 0x02;
        client.set_proposal_for(
            &op2,
            OperatorProposal {
                new_state_root: b32(0xff),
                da_cert_hash: local_da,
                pcr0_commitment: local_pcr0,
            },
        );

        let orch = build_orchestrator(
            FakeRegistryReader::ok(baseline_view()),
            client,
            vec![make_operator_entry(0x01), make_operator_entry(0x02)],
        );

        let result = orch.tick().await;
        assert!(matches!(result, Err(StateCommitError::OperatorDisagreement { .. })));
        assert!(!result.unwrap_err().is_poison());
    }

    #[tokio::test]
    async fn tick_aborts_when_operator_set_is_empty() {
        // Empty operator set is a misconfiguration; gather_majority_proposal
        // refuses before issuing any RPCs and returns OperatorDisagreement.
        let client = Arc::new(FakeStateCommitOperatorClient::new());
        let orch = build_orchestrator(
            FakeRegistryReader::ok(baseline_view()),
            Arc::clone(&client) as _,
            vec![],
        );

        let result = orch.tick().await;
        assert!(matches!(result, Err(StateCommitError::OperatorDisagreement { .. })));
        assert!(!result.unwrap_err().is_poison());
        // No RPC fired.
        assert_eq!(client.proposal_call_count(), 0);
    }

    #[tokio::test]
    async fn tick_aborts_when_all_operators_unreachable() {
        // Every prepare-phase response errors → no leader tuple. Distinct from
        // disagreement: there is nothing to disagree about. Still surfaces as
        // OperatorDisagreement (transient) and is not poison.
        let client = Arc::new(FakeStateCommitOperatorClient::new());
        let mut op1 = OperatorId::default();
        op1[0] = 0x01;
        let mut op2 = OperatorId::default();
        op2[0] = 0x02;
        client.inject_error(
            &op1,
            OperatorClientError::Transport {
                operator_id: hex::encode(op1),
                source: "connection refused".into(),
            },
        );
        client.inject_error(
            &op2,
            OperatorClientError::Timeout {
                operator_id: hex::encode(op2),
                timeout_ms: 5_000,
            },
        );

        let orch = build_orchestrator(
            FakeRegistryReader::ok(baseline_view()),
            Arc::clone(&client) as _,
            vec![make_operator_entry(0x01), make_operator_entry(0x02)],
        );

        let result = orch.tick().await;
        assert!(matches!(result, Err(StateCommitError::OperatorDisagreement { .. })));
        assert!(!result.unwrap_err().is_poison());
        assert_eq!(client.proposal_call_count(), 2);
    }

    #[tokio::test]
    async fn run_loop_exits_on_shutdown() {
        let client = Arc::new(FakeStateCommitOperatorClient::new());
        let orch = Arc::new(StateCommitOrchestrator::new(
            1,
            Duration::from_secs(3600), // very long interval — tick won't fire
            FakeRegistryReader::ok(baseline_view()),
            Arc::new(StubPcr0Provider::new()),
            client,
            Arc::new(StateCommitAggregator::new(1, 6_700)),
            fake_writer(),
            Arc::new(StubOperatorSetSnapshotReader::new(vec![], 1)),
            crate::state_commit::operator_table::fake::FakeOperatorTableProvider::from_operators(1, Vec::new()),
            newton_core::bn254_certificate_verifier::ViewBN254CertificateVerifier::OperatorSet {
                avs: alloy::primitives::Address::ZERO,
                id: 0,
            },
        ));

        let token = CancellationToken::new();
        let token_clone = token.clone();
        let orch_clone = Arc::clone(&orch);

        let handle = tokio::spawn(async move {
            orch_clone.run(token_clone).await;
        });

        token.cancel();
        tokio::time::timeout(Duration::from_secs(2), handle)
            .await
            .expect("run() exited within 2s")
            .expect("task did not panic");
    }
}