newton-aggregator 0.4.13

newton prover aggregator utils
Documentation
//! Operator RPC client for state-commit two-phase consensus.
//!
//! Two-phase commit:
//! - Prepare: orchestrator calls [`StateCommitOperatorClient::get_state_commit_proposal`]
//!   per operator. Each operator reads its local state tree at the registry's current
//!   sequence and returns `(new_state_root, da_cert_hash, pcr0_commitment)`. The
//!   orchestrator picks the majority proposal (or aborts if no consensus).
//! - Commit: orchestrator builds a canonical [`StateCommit`], computes
//!   `keccak256(abi.encode(StateCommit))`, then broadcasts the digest. Operators verify
//!   their state-tree at this sequence hashes to the same root and BLS-sign the digest,
//!   returning a [`BlsG1Point`] partial signature.
//!
//! The JSON-RPC server side is implemented in `crates/operator/src/state_commit_rpc.rs`
//! (NEWT-1116). This module defines the trait surface for orchestrator-side testing.
//!
//! See `docs/PRIVATE_DATA_STORAGE.md` §6 (Commit Protocol) for the full
//! protocol specification.

use alloy::primitives::B256;
use async_trait::async_trait;
use eigensdk::{crypto_bls::BlsG1Point, types::operator::OperatorId};
use newton_core::state_commit_registry::IStateRootCommittable::StateCommit;
use thiserror::Error;

/// A single operator's Prepare-phase response: the inputs the operator proposes
/// for the next `StateCommit`. The orchestrator takes the majority value across
/// the operator set (or aborts if no majority forms).
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct OperatorProposal {
    /// Proposed JMT state root for the next commit.
    pub new_state_root: B256,
    /// DA certificate hash accompanying the state root.
    pub da_cert_hash: B256,
    /// PCR0 commitment the operator attests to.
    pub pcr0_commitment: B256,
}

/// Errors returned by [`StateCommitOperatorClient`] methods.
#[derive(Debug, Error)]
pub enum OperatorClientError {
    /// Operator explicitly disagrees with the majority digest at the given sequence number.
    /// The orchestrator aborts the tick — no blind retry.
    #[error("operator {operator_id} disagrees on state commit digest at sequence {sequence_no}")]
    DigestDisagreement {
        /// Hex-encoded operator id for logging.
        operator_id: String,
        /// Sequence number the operator was asked to sign.
        sequence_no: u64,
    },

    /// Transport-level failure (connection refused, TLS error, etc.). Callers
    /// may treat this as transient and skip the operator for the current tick.
    #[error("operator {operator_id} RPC transport failure: {source}")]
    Transport {
        /// Hex-encoded operator id for logging.
        operator_id: String,
        /// Underlying transport error.
        #[source]
        source: Box<dyn std::error::Error + Send + Sync>,
    },

    /// Operator returned a structurally invalid response (unexpected type,
    /// missing field, ABI decode error). Non-transient.
    #[error("operator {operator_id} returned malformed response: {reason}")]
    Malformed {
        /// Hex-encoded operator id for logging.
        operator_id: String,
        /// Human-readable description of the schema violation.
        reason: String,
    },

    /// Request to operator timed out. Callers treat this as transient and skip
    /// the operator for the current tick.
    #[error("operator {operator_id} timed out after {timeout_ms}ms")]
    Timeout {
        /// Hex-encoded operator id for logging.
        operator_id: String,
        /// Elapsed timeout in milliseconds.
        timeout_ms: u64,
    },
}

/// Per-operator RPC surface for the state-commit two-phase protocol.
///
/// Both methods are called per-operator from the orchestrator's tick loop.
/// The production implementation (task #130) bridges these to `newt_getStateCommitProposal`
/// and `newt_signStateCommit` JSON-RPC methods exposed by each operator.
///
/// `Send + Sync + 'static` lets the orchestrator hold the client behind
/// [`std::sync::Arc`] across `tokio::spawn` boundaries.
#[async_trait]
pub trait StateCommitOperatorClient: Send + Sync + 'static {
    /// Prepare phase: ask the operator for its proposed next state-commit inputs.
    ///
    /// The operator reads its local state tree at `sequence_no` and returns the
    /// `(new_state_root, da_cert_hash, pcr0_commitment)` triple.
    ///
    /// # Errors
    ///
    /// - [`OperatorClientError::Transport`] / [`OperatorClientError::Timeout`] for
    ///   reachability failures — orchestrator skips the operator for this tick.
    /// - [`OperatorClientError::Malformed`] for response schema violations — non-transient.
    async fn get_state_commit_proposal(
        &self,
        operator_id: &OperatorId,
        sequence_no: u64,
    ) -> Result<OperatorProposal, OperatorClientError>;

    /// Commit phase: ask the operator to BLS-sign the canonical commit digest.
    ///
    /// The operator independently re-derives `keccak256(abi.encode(commit))` from
    /// `commit` and verifies it equals `digest` AND that `commit.newStateRoot`
    /// matches its local JMT root at this sequence — refusing to sign if either
    /// check fails. Both `commit` and `digest` are sent so the operator does not
    /// have to trust a caller-supplied hash. The operator derives the
    /// EIP-712 `signableDigest` from `(reference_timestamp, digest)` per
    /// `BN254CertificateVerifier.calculateCertificateDigest` before BLS-signing
    /// — both fields are bound into the signing preimage so a captured
    /// signature cannot be replayed against a different `referenceTimestamp`.
    /// Returns the G1 partial signature over the typed digest.
    ///
    /// # Why `&StateCommit` and not `&StateCommitWire`?
    ///
    /// `StateCommit` is the alloy `sol!`-generated mirror of the on-chain
    /// `IStateRootCommittable.StateCommit` struct — its `abi.encode` is the
    /// canonical preimage the BLS digest binds to. The orchestrator builds one
    /// `StateCommit` per tick and computes the digest from it directly; the
    /// per-operator transport conversion (`From<&StateCommit> for
    /// StateCommitWire`) happens inside the HTTP impl. Putting the alloy
    /// struct in the trait surface keeps the digest derivation aligned with
    /// the on-chain encoding contract and confines wire-format concerns
    /// (snake_case JSON) to the transport layer.
    ///
    /// # Errors
    ///
    /// - [`OperatorClientError::DigestDisagreement`] when the operator refuses to
    ///   sign (its view of the digest differs from `digest`, or its local state
    ///   root differs from `commit.newStateRoot`). The orchestrator aborts the
    ///   tick and does NOT retry against the same proposal — it rebuilds from the
    ///   current registry view on the next tick.
    /// - [`OperatorClientError::Transport`] / [`OperatorClientError::Timeout`] for
    ///   reachability failures.
    async fn sign_state_commit(
        &self,
        operator_id: &OperatorId,
        digest: B256,
        commit: &StateCommit,
        reference_timestamp: u32,
    ) -> Result<BlsG1Point, OperatorClientError>;
}

#[cfg(test)]
pub(crate) mod tests {
    use super::*;
    use ark_bn254::G1Affine;
    use ark_ec::AffineRepr;
    use std::{
        collections::HashMap,
        sync::{Arc, Mutex},
    };

    /// Dummy `StateCommit` for tests that exercise [`StateCommitOperatorClient::sign_state_commit`].
    ///
    /// The fake client ignores both `digest` and `commit`, so any value works.
    /// Real `(digest, commit)` consistency is enforced by the orchestrator (digest
    /// derived from commit) and the operator handler (recomputes the digest).
    pub(crate) fn dummy_state_commit() -> StateCommit {
        StateCommit {
            version: 1,
            sequenceNo: 0,
            prevStateRoot: B256::ZERO,
            newStateRoot: B256::ZERO,
            timestamp: 0,
            daCertHash: B256::ZERO,
            pcr0Commitment: B256::ZERO,
        }
    }

    /// Default proposal returned by `FakeStateCommitOperatorClient` when no
    /// per-operator override is registered.
    pub(crate) fn default_proposal() -> OperatorProposal {
        OperatorProposal {
            new_state_root: B256::repeat_byte(0xaa),
            da_cert_hash: B256::repeat_byte(0xbb),
            pcr0_commitment: B256::repeat_byte(0xcc),
        }
    }

    /// Hand-rolled fake [`StateCommitOperatorClient`].
    ///
    /// - Returns `default_proposal()` by default for all operators.
    /// - `set_proposal_for(operator_id, proposal)` overrides the return value for
    ///   that specific operator.
    /// - `inject_error(operator_id, error)` makes the next call for that operator
    ///   fail once with the given error, then clears the injection.
    /// - `sign_state_commit` always returns a zero G2 point unless an error is
    ///   injected — sufficient for orchestrator-level tests that only care about
    ///   flow control, not cryptographic correctness.
    pub(crate) struct FakeStateCommitOperatorClient {
        state: Arc<Mutex<FakeClientState>>,
    }

    struct FakeClientState {
        default_proposal: OperatorProposal,
        proposals: HashMap<String, OperatorProposal>,
        /// One-shot errors keyed by hex operator id. Consumed on first match.
        errors: HashMap<String, OperatorClientError>,
        proposal_call_count: usize,
        sign_call_count: usize,
    }

    impl FakeStateCommitOperatorClient {
        pub(crate) fn new() -> Self {
            Self {
                state: Arc::new(Mutex::new(FakeClientState {
                    default_proposal: default_proposal(),
                    proposals: HashMap::new(),
                    errors: HashMap::new(),
                    proposal_call_count: 0,
                    sign_call_count: 0,
                })),
            }
        }

        /// Override the proposal returned for a specific operator.
        pub(crate) fn set_proposal_for(&self, operator_id: &OperatorId, proposal: OperatorProposal) {
            let key = hex::encode(operator_id);
            self.state.lock().unwrap().proposals.insert(key, proposal);
        }

        /// Inject a one-shot error for the next call from the given operator.
        pub(crate) fn inject_error(&self, operator_id: &OperatorId, error: OperatorClientError) {
            let key = hex::encode(operator_id);
            self.state.lock().unwrap().errors.insert(key, error);
        }

        pub(crate) fn proposal_call_count(&self) -> usize {
            self.state.lock().unwrap().proposal_call_count
        }

        pub(crate) fn sign_call_count(&self) -> usize {
            self.state.lock().unwrap().sign_call_count
        }
    }

    #[async_trait]
    impl StateCommitOperatorClient for FakeStateCommitOperatorClient {
        async fn get_state_commit_proposal(
            &self,
            operator_id: &OperatorId,
            _sequence_no: u64,
        ) -> Result<OperatorProposal, OperatorClientError> {
            let key = hex::encode(operator_id);
            let mut state = self.state.lock().unwrap();
            state.proposal_call_count += 1;

            if let Some(err) = state.errors.remove(&key) {
                return Err(err);
            }

            let proposal = state.proposals.get(&key).copied().unwrap_or(state.default_proposal);
            Ok(proposal)
        }

        async fn sign_state_commit(
            &self,
            operator_id: &OperatorId,
            _digest: B256,
            _commit: &StateCommit,
            _reference_timestamp: u32,
        ) -> Result<BlsG1Point, OperatorClientError> {
            let key = hex::encode(operator_id);
            let mut state = self.state.lock().unwrap();
            state.sign_call_count += 1;

            if let Some(err) = state.errors.remove(&key) {
                return Err(err);
            }

            // Return the G1 generator — a well-formed non-identity point that passes
            // the aggregator's structural gate (on-curve, in-subgroup, non-zero).
            // The pairing check is #[cfg(not(test))]-gated so this suffices for
            // orchestrator-level unit tests; aggregator.rs tests exercise the full path.
            Ok(BlsG1Point::new(G1Affine::generator()))
        }
    }

    // ---- unit tests ----------------------------------------------------------

    #[tokio::test]
    async fn fake_returns_default_proposal_for_unknown_operator() {
        let client = FakeStateCommitOperatorClient::new();
        let op_id = OperatorId::default();
        let proposal = client
            .get_state_commit_proposal(&op_id, 1)
            .await
            .expect("default proposal succeeds");
        assert_eq!(proposal, default_proposal());
        assert_eq!(client.proposal_call_count(), 1);
    }

    #[tokio::test]
    async fn fake_returns_overridden_proposal_for_configured_operator() {
        let client = FakeStateCommitOperatorClient::new();
        let op_id = OperatorId::default();
        let custom = OperatorProposal {
            new_state_root: B256::repeat_byte(0x11),
            da_cert_hash: B256::repeat_byte(0x22),
            pcr0_commitment: B256::repeat_byte(0x33),
        };
        client.set_proposal_for(&op_id, custom);
        let returned = client
            .get_state_commit_proposal(&op_id, 2)
            .await
            .expect("overridden proposal succeeds");
        assert_eq!(returned, custom);
    }

    #[tokio::test]
    async fn fake_one_shot_error_on_proposal_then_recovers() {
        let client = FakeStateCommitOperatorClient::new();
        let op_id = OperatorId::default();
        client.inject_error(
            &op_id,
            OperatorClientError::Timeout {
                operator_id: "test".into(),
                timeout_ms: 5_000,
            },
        );
        // First call: injected error
        let err = client
            .get_state_commit_proposal(&op_id, 1)
            .await
            .expect_err("error injected");
        assert!(matches!(err, OperatorClientError::Timeout { .. }));
        // Second call: recovers to default
        let proposal = client
            .get_state_commit_proposal(&op_id, 1)
            .await
            .expect("recovers after one-shot error");
        assert_eq!(proposal, default_proposal());
    }

    #[tokio::test]
    async fn fake_sign_state_commit_returns_zero_g2() {
        let client = FakeStateCommitOperatorClient::new();
        let op_id = OperatorId::default();
        let commit = dummy_state_commit();
        let sig = client
            .sign_state_commit(&op_id, B256::ZERO, &commit, 0u32)
            .await
            .expect("sign succeeds");
        // Zero G2 sentinel — just checks the happy path compiles and returns Ok.
        let _ = sig;
        assert_eq!(client.sign_call_count(), 1);
    }

    #[tokio::test]
    async fn fake_one_shot_error_on_sign_then_recovers() {
        let client = FakeStateCommitOperatorClient::new();
        let op_id = OperatorId::default();
        let commit = dummy_state_commit();
        client.inject_error(
            &op_id,
            OperatorClientError::DigestDisagreement {
                operator_id: "test".into(),
                sequence_no: 7,
            },
        );
        let err = client
            .sign_state_commit(&op_id, B256::ZERO, &commit, 0u32)
            .await
            .expect_err("injected error");
        assert!(matches!(err, OperatorClientError::DigestDisagreement { .. }));
        // Recovers on next call
        client
            .sign_state_commit(&op_id, B256::ZERO, &commit, 0u32)
            .await
            .expect("recovers after one-shot error");
    }
}