Skip to main content

celestia_grpc/tx_client_v2/
mod.rs

1//! Transaction manager v2: queueing, submission, confirmation, and recovery logic.
2//!
3//! # Overview
4//! - `TxSubmitter` is a front-end that enqueues transactions.
5//! - `TransactionWorker` is a background event loop that submits and confirms.
6//! - `TxServer` abstracts the submission/confirmation backend (one or more nodes).
7//!
8//! The worker maintains a contiguous in-order queue keyed by sequence and resolves
9//! submit/confirm callbacks as transitions are observed.
10//!
11//! # Notes
12//! - Sequence continuity is enforced at enqueue time; any gap is treated as fatal.
13//! - Confirmations are batched per node and reduced to a small set of events.
14//! - Recovery runs a narrow confirmation loop for a single node when sequence
15//!   mismatch is detected.
16//!
17//! # Example
18//! ```no_run
19//! # use std::collections::HashMap;
20//! # use std::sync::Arc;
21//! # use std::time::Duration;
22//! # use celestia_grpc::{Result, TxConfig};
23//! # use celestia_grpc::tx_client_v2::{
24//! #     TransactionWorker, TxRequest, TxServer,
25//! # };
26//! # struct DummyServer;
27//! # #[async_trait::async_trait]
28//! # impl TxServer for DummyServer {
29//! #     type TxId = u64;
30//! #     type ConfirmInfo = u64;
31//! #     type TxRequest = TxRequest;
32//! #     type SubmitError = ();
33//! #     type ConfirmResponse = ();
34//! #     async fn submit(
35//! #         &self,
36//! #         _b: Arc<Vec<u8>>,
37//! #         _s: u64,
38//! #     ) -> celestia_grpc::tx_client_v2::TxSubmitResult<Self::TxId, Self::SubmitError> {
39//! #         unimplemented!()
40//! #     }
41//! #     async fn status_batch(
42//! #         &self,
43//! #         _ids: Vec<u64>,
44//! #     ) -> celestia_grpc::tx_client_v2::TxConfirmResult<
45//! #         Vec<(
46//! #             Self::TxId,
47//! #             celestia_grpc::tx_client_v2::TxStatus<Self::ConfirmInfo, Self::ConfirmResponse>,
48//! #         )>,
49//! #     > {
50//! #         unimplemented!()
51//! #     }
52//! #     async fn status(
53//! #         &self,
54//! #         _id: Self::TxId,
55//! #     ) -> celestia_grpc::tx_client_v2::TxConfirmResult<
56//! #         celestia_grpc::tx_client_v2::TxStatus<Self::ConfirmInfo, Self::ConfirmResponse>,
57//! #     > {
58//! #         unimplemented!()
59//! #     }
60//! #     async fn current_sequence(&self) -> Result<u64> { unimplemented!() }
61//! #     async fn simulate_and_sign(
62//! #         &self,
63//! #         _req: Arc<Self::TxRequest>,
64//! #         _sequence: u64,
65//! #     ) -> std::result::Result<
66//! #         celestia_grpc::tx_client_v2::Transaction<
67//! #             Self::TxId,
68//! #             Self::ConfirmInfo,
69//! #             Self::ConfirmResponse,
70//! #             Self::SubmitError,
71//! #         >,
72//! #         celestia_grpc::tx_client_v2::SigningFailure<Self::SubmitError>,
73//! #     > {
74//! #         unimplemented!()
75//! #     }
76//! # }
77//! # async fn docs() -> Result<()> {
78//! let nodes = HashMap::from([(Arc::from("node-1"), Arc::new(DummyServer))]);
79//! let (manager, mut worker) = TransactionWorker::new(
80//!     nodes,
81//!     Duration::from_secs(1),
82//!     16,
83//!     Some(0),
84//!     128,
85//! );
86//! # let _ = manager;
87//! # let _ = worker;
88//! # Ok(())
89//! # }
90//! ```
91use std::collections::HashMap;
92use std::fmt;
93use std::hash::Hash as StdHash;
94use std::sync::Arc;
95use std::time::Duration;
96
97use async_trait::async_trait;
98use celestia_types::state::ErrorCode;
99use futures::future::{BoxFuture, FutureExt};
100use futures::stream::{FuturesUnordered, StreamExt};
101use lumina_utils::executor::spawn_cancellable;
102use lumina_utils::time::{self, Interval};
103use tokio::select;
104use tokio::sync::{mpsc, oneshot};
105use tokio_util::sync::CancellationToken;
106use tracing::{debug, info};
107
108mod node_manager;
109mod tx_buffer;
110
111use crate::{Error, Result, TxConfig};
112use node_manager::NodeManager;
113/// Identifier for a submission/confirmation node.
114pub type NodeId = Arc<str>;
115/// Result for submission calls: either a server TxId or a submission failure.
116pub type TxSubmitResult<T, E> = Result<T, SubmitFailure<E>>;
117/// Result for confirmation calls.
118pub type TxConfirmResult<T> = Result<T>;
119/// Result for signing calls.
120pub type TxSigningResult<TxId, ConfirmInfo, ConfirmResponse, SubmitErr> =
121    Result<Transaction<TxId, ConfirmInfo, ConfirmResponse, SubmitErr>, SigningFailure<SubmitErr>>;
122
123/// Trait bound for transaction identifiers.
124pub trait TxIdT: Clone + std::fmt::Debug {}
125impl<T> TxIdT for T where T: Clone + std::fmt::Debug {}
126
127/// Payload for a transaction request.
128#[derive(Debug, Clone)]
129pub enum TxPayload {
130    /// Pay-for-blobs transaction.
131    Blobs(Vec<celestia_types::Blob>),
132    /// Raw Cosmos transaction body with arbitrary messages.
133    Tx(celestia_types::state::RawTxBody),
134}
135
136/// A transaction request combining payload and configuration.
137#[derive(Debug, Clone)]
138pub struct TxRequest {
139    /// The transaction payload (blobs or raw tx body).
140    pub tx: TxPayload,
141    /// Configuration for gas, fees, and other settings.
142    pub cfg: TxConfig,
143}
144
145/// Error indicating the worker stopped before completing an operation.
146#[derive(Debug, Clone)]
147pub enum StopError<SubmitErr, ConfirmInfo, ConfirmResponse> {
148    /// Confirmation failed with a status indicating rejection or other issue.
149    ConfirmError(TxStatus<ConfirmInfo, ConfirmResponse>),
150    /// Submission failed with an error from the node.
151    SubmitError(SubmitErr),
152    /// Signing failed with an error.
153    SignError(SubmitErr),
154    /// The transaction worker stopped unexpectedly.
155    WorkerStopped,
156}
157
158#[derive(Debug)]
159#[allow(clippy::enum_variant_names)]
160pub(crate) enum WorkerPlan<TxId: TxIdT> {
161    SpawnSigning {
162        node_id: NodeId,
163        sequence: u64,
164        delay: Duration,
165    },
166    SpawnSubmit {
167        node_id: NodeId,
168        bytes: Arc<Vec<u8>>,
169        sequence: u64,
170        delay: Duration,
171    },
172    SpawnConfirmBatch {
173        node_id: NodeId,
174        ids: Vec<TxId>,
175        epoch: u64,
176    },
177    SpawnRecover {
178        node_id: NodeId,
179        id: TxId,
180        epoch: u64,
181    },
182}
183
184impl<TxId: TxIdT> WorkerPlan<TxId> {
185    pub(crate) fn summary(&self) -> String {
186        match self {
187            WorkerPlan::SpawnSigning {
188                node_id, sequence, ..
189            } => {
190                format!("SpawnSigning node={} seq={}", node_id.as_ref(), sequence)
191            }
192            WorkerPlan::SpawnSubmit {
193                node_id, sequence, ..
194            } => format!("SpawnSubmit node={} seq={}", node_id.as_ref(), sequence),
195            WorkerPlan::SpawnConfirmBatch { node_id, ids, .. } => {
196                format!(
197                    "SpawnConfirmBatch node={} count={}",
198                    node_id.as_ref(),
199                    ids.len()
200                )
201            }
202            WorkerPlan::SpawnRecover { node_id, .. } => {
203                format!("SpawnRecover node={}", node_id.as_ref())
204            }
205        }
206    }
207}
208
209#[derive(Debug)]
210pub(crate) enum WorkerMutation<TxId: TxIdT, ConfirmInfo, ConfirmResponse, SubmitErr> {
211    EnqueueSigned {
212        tx: crate::tx_client_v2::Transaction<TxId, ConfirmInfo, ConfirmResponse, SubmitErr>,
213    },
214    MarkSubmitted {
215        sequence: u64,
216        id: TxId,
217    },
218    Confirm {
219        seq: u64,
220        info: ConfirmInfo,
221    },
222    WorkerStop,
223}
224
225impl<TxId: TxIdT, ConfirmInfo, ConfirmResponse, SubmitErr>
226    WorkerMutation<TxId, ConfirmInfo, ConfirmResponse, SubmitErr>
227{
228    pub(crate) fn summary(&self) -> String {
229        match self {
230            WorkerMutation::EnqueueSigned { tx } => {
231                format!("EnqueueSigned seq={}", tx.sequence)
232            }
233            WorkerMutation::MarkSubmitted { sequence, .. } => {
234                format!("MarkSubmitted seq={}", sequence)
235            }
236            WorkerMutation::Confirm { seq, .. } => format!("Confirm seq={}", seq),
237            WorkerMutation::WorkerStop => "WorkerStop".to_string(),
238        }
239    }
240}
241
242/// Result type for confirmation, either success info or a stop error.
243pub type ConfirmResult<ConfirmInfo, SubmitErr, ConfirmResponse> =
244    std::result::Result<ConfirmInfo, StopError<SubmitErr, ConfirmInfo, ConfirmResponse>>;
245
246impl<SubmitErr, ConfirmInfo, ConfirmResponse> fmt::Display
247    for StopError<SubmitErr, ConfirmInfo, ConfirmResponse>
248where
249    SubmitErr: fmt::Debug,
250    ConfirmInfo: fmt::Debug,
251    ConfirmResponse: fmt::Debug,
252{
253    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
254        match self {
255            StopError::ConfirmError(status) => write!(f, "ConfirmError({:?})", status),
256            StopError::SubmitError(err) => write!(f, "SubmitError({:?})", err),
257            StopError::SignError(err) => write!(f, "SignError({:?})", err),
258            StopError::WorkerStopped => write!(f, "WorkerStopped"),
259        }
260    }
261}
262
263impl TxRequest {
264    /// Create a request for a raw transaction body.
265    pub fn tx(body: celestia_types::state::RawTxBody, cfg: TxConfig) -> Self {
266        Self {
267            tx: TxPayload::Tx(body),
268            cfg,
269        }
270    }
271
272    /// Create a request for a pay-for-blobs transaction.
273    pub fn blobs(blobs: Vec<celestia_types::Blob>, cfg: TxConfig) -> Self {
274        Self {
275            tx: TxPayload::Blobs(blobs),
276            cfg,
277        }
278    }
279}
280
281/// A signed transaction ready for submission.
282#[derive(Debug)]
283pub struct Transaction<TxId: TxIdT, ConfirmInfo, ConfirmResponse, SubmitErr> {
284    /// Transaction sequence for the signer.
285    pub sequence: u64,
286    /// Signed transaction bytes ready for broadcast.
287    pub bytes: Arc<Vec<u8>>,
288    /// One-shot callbacks for submit/confirm acknowledgements.
289    pub callbacks: TxCallbacks<TxId, ConfirmInfo, ConfirmResponse, SubmitErr>,
290    /// Id of the transaction (set after submission).
291    pub id: Option<TxId>,
292}
293
294impl<TxId: TxIdT, ConfirmInfo, ConfirmResponse, SubmitErr>
295    Transaction<TxId, ConfirmInfo, ConfirmResponse, SubmitErr>
296{
297    fn notify_submitted(&mut self, result: Result<TxId>) {
298        if let Some(submitted) = self.callbacks.submitted.take() {
299            let _ = submitted.send(result);
300        }
301    }
302
303    fn notify_confirmed(&mut self, result: ConfirmResult<ConfirmInfo, SubmitErr, ConfirmResponse>) {
304        if let Some(confirmed) = self.callbacks.confirmed.take() {
305            let _ = confirmed.send(result);
306        }
307    }
308}
309
310/// Callbacks for transaction lifecycle events.
311#[derive(Debug)]
312pub struct TxCallbacks<TxId: TxIdT, ConfirmInfo, ConfirmResponse, SubmitErr> {
313    /// Resolves when submission succeeds or fails.
314    pub submitted: Option<oneshot::Sender<Result<TxId>>>,
315    /// Resolves when the transaction is confirmed or rejected.
316    pub confirmed: Option<oneshot::Sender<ConfirmResult<ConfirmInfo, SubmitErr, ConfirmResponse>>>,
317}
318
319impl<TxId: TxIdT, ConfirmInfo, ConfirmResponse, SubmitErr> Default
320    for TxCallbacks<TxId, ConfirmInfo, ConfirmResponse, SubmitErr>
321{
322    fn default() -> Self {
323        Self {
324            submitted: None,
325            confirmed: None,
326        }
327    }
328}
329
330struct RequestWithChannels<TxId, ConfirmInfo, ConfirmResponse, SubmitErr, Request> {
331    request: Arc<Request>,
332    sign_tx: oneshot::Sender<Result<()>>,
333    submit_tx: oneshot::Sender<Result<TxId>>,
334    confirm_tx: oneshot::Sender<ConfirmResult<ConfirmInfo, SubmitErr, ConfirmResponse>>,
335}
336
337impl<TxId: TxIdT, ConfirmInfo, ConfirmResponse, SubmitErr, Request>
338    RequestWithChannels<TxId, ConfirmInfo, ConfirmResponse, SubmitErr, Request>
339{
340    fn new(
341        request: Request,
342        sign_tx: oneshot::Sender<Result<()>>,
343        submit_tx: oneshot::Sender<Result<TxId>>,
344        confirm_tx: oneshot::Sender<ConfirmResult<ConfirmInfo, SubmitErr, ConfirmResponse>>,
345    ) -> Self {
346        Self {
347            request: Arc::new(request),
348            sign_tx,
349            submit_tx,
350            confirm_tx,
351        }
352    }
353}
354
355struct SigningResult<TxId: TxIdT, ConfirmInfo, ConfirmResponse, SubmitErr> {
356    node_id: NodeId,
357    sequence: u64,
358    response: TxSigningResult<TxId, ConfirmInfo, ConfirmResponse, SubmitErr>,
359}
360
361/// Handle for tracking a transaction through signing, submission, and confirmation.
362#[derive(Debug)]
363pub struct TxHandle<TxId: TxIdT, ConfirmInfo, ConfirmResponse, SubmitErr> {
364    /// Resolves when signing completes successfully or fails.
365    pub signed: oneshot::Receiver<Result<()>>,
366    /// Resolves when submission completes with the transaction id or an error.
367    pub submitted: oneshot::Receiver<Result<TxId>>,
368    /// Resolves when confirmation completes with info or a stop error.
369    pub confirmed: oneshot::Receiver<ConfirmResult<ConfirmInfo, SubmitErr, ConfirmResponse>>,
370}
371
372/// Front-end handle for enqueuing transactions into the worker.
373#[derive(Clone)]
374pub struct TxSubmitter<TxId: TxIdT, ConfirmInfo, ConfirmResponse, SubmitErr, Request> {
375    add_tx:
376        mpsc::Sender<RequestWithChannels<TxId, ConfirmInfo, ConfirmResponse, SubmitErr, Request>>,
377}
378
379impl<TxId: TxIdT, ConfirmInfo, ConfirmResponse, SubmitErr, Request>
380    TxSubmitter<TxId, ConfirmInfo, ConfirmResponse, SubmitErr, Request>
381{
382    /// Enqueue a transaction request and return a handle for tracking its progress.
383    pub async fn add_tx(
384        &self,
385        request: Request,
386    ) -> Result<TxHandle<TxId, ConfirmInfo, ConfirmResponse, SubmitErr>> {
387        let (sign_tx, sign_rx) = oneshot::channel();
388        let (submit_tx, submit_rx) = oneshot::channel();
389        let (confirm_tx, confirm_rx) = oneshot::channel();
390        let request_with_channels =
391            RequestWithChannels::new(request, sign_tx, submit_tx, confirm_tx);
392        match self.add_tx.try_send(request_with_channels) {
393            Ok(()) => Ok(TxHandle {
394                signed: sign_rx,
395                submitted: submit_rx,
396                confirmed: confirm_rx,
397            }),
398            Err(mpsc::error::TrySendError::Full(_)) => Err(Error::UnexpectedResponseType(
399                "transaction queue full".to_string(),
400            )),
401            Err(mpsc::error::TrySendError::Closed(_)) => Err(Error::TxWorkerStopped),
402        }
403    }
404}
405
406#[derive(Debug)]
407#[allow(dead_code)]
408struct TxIndexEntry<TxId: TxIdT> {
409    node_id: NodeId,
410    sequence: u64,
411    id: TxId,
412}
413
414/// The kind of status for a transaction.
415#[derive(Debug, Clone)]
416pub enum TxStatusKind<ConfirmInfo> {
417    /// Submitted, but not yet committed.
418    Pending,
419    /// Included in a block successfully with confirmation info.
420    Confirmed {
421        /// Confirmation details.
422        info: ConfirmInfo,
423    },
424    /// Rejected by the node with a specific reason.
425    Rejected {
426        /// The reason for rejection.
427        reason: RejectionReason,
428    },
429    /// Removed from mempool; may need resubmission.
430    Evicted,
431    /// Status could not be determined.
432    Unknown,
433}
434
435/// Transaction status with the original response from the node.
436#[derive(Debug, Clone)]
437pub struct TxStatus<ConfirmInfo, OriginalResponse> {
438    /// The mapped status kind.
439    pub kind: TxStatusKind<ConfirmInfo>,
440    /// The original response from the node for additional details.
441    pub original_response: OriginalResponse,
442}
443
444impl<ConfirmInfo, OriginalResponse> TxStatus<ConfirmInfo, OriginalResponse> {
445    pub(crate) fn new(
446        kind: TxStatusKind<ConfirmInfo>,
447        original_response: OriginalResponse,
448    ) -> Self {
449        Self {
450            kind,
451            original_response,
452        }
453    }
454}
455
456/// Errors that can occur during transaction submission.
457#[derive(Debug, Clone)]
458#[allow(dead_code)]
459pub enum SubmitError {
460    /// Server expects a different sequence.
461    SequenceMismatch {
462        /// The sequence number the server expected.
463        expected: u64,
464    },
465    /// Transaction failed due to insufficient fee.
466    InsufficientFee {
467        /// The minimum fee the server requires.
468        expected_fee: u64,
469        /// Error message from the server.
470        message: String,
471    },
472    /// Transport or RPC error while submitting.
473    NetworkError,
474    /// Node mempool is full.
475    MempoolIsFull,
476    /// Transaction is already in the mempool cache.
477    TxInMempoolCache,
478    /// Submission failed with a specific error code and message.
479    Other {
480        /// Error code returned by the server.
481        error_code: ErrorCode,
482        /// Error message from the server.
483        message: String,
484    },
485}
486
487impl SubmitError {
488    fn label(&self) -> String {
489        match self {
490            SubmitError::SequenceMismatch { expected } => {
491                format!("SequenceMismatch expected={}", expected)
492            }
493            SubmitError::InsufficientFee {
494                expected_fee,
495                message,
496            } => format!(
497                "InsufficientFee expected_fee={} msg={}",
498                expected_fee, message
499            ),
500            SubmitError::Other {
501                error_code,
502                message,
503            } => format!("Other code={:?} msg={}", error_code, message),
504            SubmitError::NetworkError => "NetworkError".to_string(),
505            SubmitError::MempoolIsFull => "MempoolIsFull".to_string(),
506            SubmitError::TxInMempoolCache => "TxInMempoolCache".to_string(),
507        }
508    }
509}
510
511/// A submission failure containing both mapped and original errors.
512#[derive(Debug, Clone)]
513pub struct SubmitFailure<T> {
514    /// The categorized error for internal handling.
515    pub mapped_error: SubmitError,
516    /// The original error from the underlying transport or node.
517    pub original_error: T,
518}
519
520impl<T: fmt::Debug> SubmitFailure<T> {
521    fn label(&self) -> String {
522        format!(
523            "{} original={:?}",
524            self.mapped_error.label(),
525            self.original_error
526        )
527    }
528}
529
530/// Errors that can occur during transaction signing.
531#[derive(Debug, Clone)]
532#[allow(dead_code)]
533pub enum SigningError {
534    /// Server expects a different sequence (detected during simulation).
535    SequenceMismatch {
536        /// The sequence number the server expected.
537        expected: u64,
538    },
539    /// Signing failed with a specific error message.
540    Other {
541        /// Error message describing the failure.
542        message: String,
543    },
544    /// Transport or RPC error while signing.
545    NetworkError,
546}
547
548impl SigningError {
549    fn label(&self) -> String {
550        match self {
551            SigningError::SequenceMismatch { expected } => {
552                format!("SequenceMismatch expected={}", expected)
553            }
554            SigningError::Other { message } => format!("Other msg={}", message),
555            SigningError::NetworkError => "NetworkError".to_string(),
556        }
557    }
558}
559
560/// A signing failure containing both mapped and original errors.
561#[derive(Debug, Clone)]
562pub struct SigningFailure<T> {
563    /// The categorized error for internal handling.
564    pub mapped_error: SigningError,
565    /// The original error from the underlying transport or node.
566    pub original_error: T,
567}
568
569impl<T: fmt::Debug> SigningFailure<T> {
570    fn label(&self) -> String {
571        format!(
572            "{} original={:?}",
573            self.mapped_error.label(),
574            self.original_error
575        )
576    }
577}
578
579/// A confirmation failure wrapping the rejection reason.
580#[derive(Debug, Clone)]
581#[allow(dead_code)]
582pub struct ConfirmFailure {
583    reason: RejectionReason,
584}
585
586/// Reason a transaction was rejected during confirmation.
587#[derive(Debug, Clone)]
588#[allow(dead_code)]
589pub enum RejectionReason {
590    /// The transaction had a sequence mismatch.
591    SequenceMismatch {
592        /// The sequence number the node expected.
593        expected: u64,
594        /// The node that reported the mismatch.
595        node_id: NodeId,
596    },
597    /// The transaction was not found (not submitted or expired).
598    TxNotSubmitted {
599        /// The expected sequence at the time of check.
600        expected: u64,
601        /// The node that reported the issue.
602        node_id: NodeId,
603    },
604    /// Rejected for another reason.
605    OtherReason {
606        /// Error code from the node.
607        error_code: ErrorCode,
608        /// Error message from the node.
609        message: String,
610        /// The node that rejected the transaction.
611        node_id: NodeId,
612    },
613}
614
615/// Backend trait for submitting and confirming transactions on a node.
616///
617/// Implementations handle the actual network communication with blockchain nodes.
618#[async_trait]
619pub trait TxServer: Send + Sync {
620    /// The transaction identifier type returned by the node.
621    type TxId: TxIdT + Eq + StdHash + Send + Sync + 'static;
622    /// Information returned when a transaction is confirmed.
623    type ConfirmInfo: Clone + Send + Sync + 'static;
624    /// The request type for transactions.
625    type TxRequest: Send + Sync + 'static;
626    /// The error type for submission failures.
627    type SubmitError: Clone + Send + Sync + fmt::Debug + 'static;
628    /// The raw response type from status queries.
629    type ConfirmResponse: Clone + Send + Sync + 'static;
630
631    /// Submit signed bytes with the given sequence, returning a server TxId.
632    async fn submit(
633        &self,
634        tx_bytes: Arc<Vec<u8>>,
635        sequence: u64,
636    ) -> TxSubmitResult<Self::TxId, Self::SubmitError>;
637    /// Batch status lookup for submitted TxIds.
638    async fn status_batch(
639        &self,
640        ids: Vec<Self::TxId>,
641    ) -> TxConfirmResult<
642        Vec<(
643            Self::TxId,
644            TxStatus<Self::ConfirmInfo, Self::ConfirmResponse>,
645        )>,
646    >;
647    /// Status lookup for submitted TxId.
648    async fn status(
649        &self,
650        id: Self::TxId,
651    ) -> TxConfirmResult<TxStatus<Self::ConfirmInfo, Self::ConfirmResponse>>;
652    /// Fetch current sequence for the account (used by some implementations).
653    #[allow(dead_code)]
654    async fn current_sequence(&self) -> Result<u64>;
655    /// Simulate a transaction and sign it with the given sequence, returning a signed transaction.
656    async fn simulate_and_sign(
657        &self,
658        req: Arc<Self::TxRequest>,
659        sequence: u64,
660    ) -> Result<
661        Transaction<Self::TxId, Self::ConfirmInfo, Self::ConfirmResponse, Self::SubmitError>,
662        SigningFailure<Self::SubmitError>,
663    >;
664}
665
666#[derive(Debug)]
667enum NodeEvent<TxId: TxIdT, ConfirmInfo, ConfirmResponse, SubmitErr> {
668    NodeResponse {
669        node_id: NodeId,
670        response: NodeResponse<TxId, ConfirmInfo, ConfirmResponse, SubmitErr>,
671    },
672    NodeStop,
673}
674
675impl<TxId: TxIdT, ConfirmInfo, ConfirmResponse, SubmitErr: fmt::Debug>
676    NodeEvent<TxId, ConfirmInfo, ConfirmResponse, SubmitErr>
677{
678    fn summary(&self) -> String {
679        match self {
680            NodeEvent::NodeResponse { response, .. } => match response {
681                NodeResponse::Submission { sequence, result } => match result {
682                    Ok(_) => format!("NodeResponse::Submission seq={} Ok", sequence),
683                    Err(err) => format!(
684                        "NodeResponse::Submission seq={} Err {}",
685                        sequence,
686                        err.label()
687                    ),
688                },
689                NodeResponse::Confirmation { response, .. } => match response {
690                    Ok(ConfirmationResponse::Batch { statuses }) => {
691                        format!("NodeResponse::Confirmation Batch {}", statuses.len())
692                    }
693                    Ok(ConfirmationResponse::Single { .. }) => {
694                        "NodeResponse::Confirmation Single".to_string()
695                    }
696                    Err(err) => format!("NodeResponse::Confirmation Err {}", err),
697                },
698                NodeResponse::Signing { sequence, result } => match result {
699                    Ok(_) => format!("NodeResponse::Signing seq={} Ok", sequence),
700                    Err(err) => {
701                        format!("NodeResponse::Signing seq={} Err {}", sequence, err.label())
702                    }
703                },
704            },
705            NodeEvent::NodeStop => "NodeStop".to_string(),
706        }
707    }
708}
709
710#[derive(Debug)]
711enum NodeResponse<TxId: TxIdT, ConfirmInfo, ConfirmResponse, SubmitErr> {
712    Submission {
713        sequence: u64,
714        result: TxSubmitResult<TxId, SubmitErr>,
715    },
716    Confirmation {
717        state_epoch: u64,
718        response: TxConfirmResult<ConfirmationResponse<TxId, ConfirmInfo, ConfirmResponse>>,
719    },
720    Signing {
721        sequence: u64,
722        result: TxSigningResult<TxId, ConfirmInfo, ConfirmResponse, SubmitErr>,
723    },
724}
725
726#[derive(Debug)]
727enum ConfirmationResponse<TxId, ConfirmInfo, ConfirmResponse> {
728    Batch {
729        statuses: Vec<(TxId, TxStatus<ConfirmInfo, ConfirmResponse>)>,
730    },
731    Single {
732        id: TxId,
733        status: TxStatus<ConfirmInfo, ConfirmResponse>,
734    },
735}
736
737struct SubmissionResult<TxId, SubmitErr> {
738    node_id: NodeId,
739    sequence: u64,
740    result: TxSubmitResult<TxId, SubmitErr>,
741}
742
743struct ConfirmationResult<TxId, ConfirmInfo, ConfirmResponse> {
744    node_id: NodeId,
745    state_epoch: u64,
746    response: TxConfirmResult<ConfirmationResponse<TxId, ConfirmInfo, ConfirmResponse>>,
747}
748
749type PendingRequest<S> = RequestWithChannels<
750    <S as TxServer>::TxId,
751    <S as TxServer>::ConfirmInfo,
752    <S as TxServer>::ConfirmResponse,
753    <S as TxServer>::SubmitError,
754    <S as TxServer>::TxRequest,
755>;
756type NodeEventFor<S> = NodeEvent<
757    <S as TxServer>::TxId,
758    <S as TxServer>::ConfirmInfo,
759    <S as TxServer>::ConfirmResponse,
760    <S as TxServer>::SubmitError,
761>;
762type SigningResultFor<S> = SigningResult<
763    <S as TxServer>::TxId,
764    <S as TxServer>::ConfirmInfo,
765    <S as TxServer>::ConfirmResponse,
766    <S as TxServer>::SubmitError,
767>;
768type StopErrorFor<S> = StopError<
769    <S as TxServer>::SubmitError,
770    <S as TxServer>::ConfirmInfo,
771    <S as TxServer>::ConfirmResponse,
772>;
773type TxBuf<S> = tx_buffer::TxBuffer<
774    <S as TxServer>::TxId,
775    <S as TxServer>::ConfirmInfo,
776    <S as TxServer>::ConfirmResponse,
777    <S as TxServer>::SubmitError,
778    PendingRequest<S>,
779>;
780type WorkerMutations<S> = Vec<
781    WorkerMutation<
782        <S as TxServer>::TxId,
783        <S as TxServer>::ConfirmInfo,
784        <S as TxServer>::ConfirmResponse,
785        <S as TxServer>::SubmitError,
786    >,
787>;
788type SubmissionFuture<S> = BoxFuture<
789    'static,
790    Option<SubmissionResult<<S as TxServer>::TxId, <S as TxServer>::SubmitError>>,
791>;
792type ConfirmationFuture<S> = BoxFuture<
793    'static,
794    Option<
795        ConfirmationResult<
796            <S as TxServer>::TxId,
797            <S as TxServer>::ConfirmInfo,
798            <S as TxServer>::ConfirmResponse,
799        >,
800    >,
801>;
802type SigningFuture<S> = BoxFuture<
803    'static,
804    Option<
805        SigningResult<
806            <S as TxServer>::TxId,
807            <S as TxServer>::ConfirmInfo,
808            <S as TxServer>::ConfirmResponse,
809            <S as TxServer>::SubmitError,
810        >,
811    >,
812>;
813
814/// Background worker that processes transaction signing, submission, and confirmation.
815///
816/// Maintains per-node state machines and coordinates submissions across multiple nodes.
817/// Handles sequence mismatches by entering recovery mode to re-sync with the network.
818pub struct TransactionWorker<S: TxServer> {
819    nodes: NodeManager<S>,
820    servers: HashMap<NodeId, Arc<S>>,
821    transactions: TxBuf<S>,
822    add_tx_rx: mpsc::Receiver<PendingRequest<S>>,
823    submissions: FuturesUnordered<SubmissionFuture<S>>,
824    confirmations: FuturesUnordered<ConfirmationFuture<S>>,
825    signing: Option<SigningFuture<S>>,
826    confirm_ticker: Interval,
827    max_status_batch: usize,
828    should_confirm: bool,
829}
830
831/// Return type for [`TransactionWorker::new`].
832pub type WorkerPair<S> = (
833    TxSubmitter<
834        <S as TxServer>::TxId,
835        <S as TxServer>::ConfirmInfo,
836        <S as TxServer>::ConfirmResponse,
837        <S as TxServer>::SubmitError,
838        <S as TxServer>::TxRequest,
839    >,
840    TransactionWorker<S>,
841);
842
843impl<S: TxServer + 'static> TransactionWorker<S> {
844    /// Create a submitter/worker pair with initial confirmed sequence and queue capacity.
845    ///
846    /// # Notes
847    /// - `confirmed_sequence` is the last confirmed sequence for the signer.
848    /// - `start_sequence` is derived as `confirmed_sequence + 1` or `0` when unknown.
849    /// - `confirm_interval` drives periodic confirmation polling.
850    pub fn new(
851        nodes: HashMap<NodeId, Arc<S>>,
852        confirm_interval: Duration,
853        max_status_batch: usize,
854        confirmed_sequence: Option<u64>,
855        add_tx_capacity: usize,
856    ) -> WorkerPair<S> {
857        let (add_tx_tx, add_tx_rx) = mpsc::channel(add_tx_capacity);
858        let manager = TxSubmitter {
859            add_tx: add_tx_tx.clone(),
860        };
861        let node_ids = nodes.keys().cloned().collect::<Vec<_>>();
862        let node_manager = NodeManager::new(node_ids, confirmed_sequence);
863        let transactions = tx_buffer::TxBuffer::new(confirmed_sequence);
864        (
865            manager,
866            TransactionWorker {
867                add_tx_rx,
868                nodes: node_manager,
869                servers: nodes,
870                transactions,
871                submissions: FuturesUnordered::new(),
872                confirmations: FuturesUnordered::new(),
873                signing: None,
874                confirm_ticker: Interval::new(confirm_interval),
875                max_status_batch,
876                should_confirm: false,
877            },
878        )
879    }
880
881    fn enqueue_pending(&mut self, request: PendingRequest<S>) -> Result<()> {
882        self.transactions
883            .add_pending(request)
884            .map_err(|_| crate::Error::UnexpectedResponseType("pending queue error".to_string()))?;
885        Ok(())
886    }
887
888    /// Run the worker loop until shutdown or a terminal error.
889    ///
890    /// # Notes
891    /// - The worker is single-owner; it should be run in a dedicated task.
892    /// - On terminal failures it returns `Ok(())` after notifying callbacks.
893    pub async fn process(&mut self, shutdown: CancellationToken) -> Result<()> {
894        let mut current_event: Option<NodeEventFor<S>> = None;
895        let mut shutdown_seen = false;
896
897        loop {
898            let plans = self.nodes.plan(
899                &self.transactions,
900                self.max_status_batch,
901                self.should_confirm,
902            );
903            if self.should_confirm {
904                self.should_confirm = false;
905            }
906            self.execute_plans(plans, shutdown.clone());
907
908            if let Some(event) = current_event.take() {
909                let mutations = self.nodes.apply_event(event, &self.transactions);
910                let stop = self.apply_mutations(mutations)?;
911                if stop {
912                    break;
913                }
914                continue;
915            }
916
917            select! {
918                _ = poll_shutdown(&shutdown, &mut shutdown_seen) => {
919                    current_event = Some(NodeEvent::NodeStop);
920                }
921                tx = self.add_tx_rx.recv() => {
922                    if let Some(tx) = tx {
923                        self.enqueue_pending(tx)?;
924                    }
925                }
926                _ = self.confirm_ticker.tick() => {
927                    self.should_confirm = true;
928                }
929                result = self.submissions.next(), if !self.submissions.is_empty() => {
930                    if let Some(Some(result)) = result {
931                        current_event = Some(NodeEvent::NodeResponse {
932                            node_id: result.node_id,
933                            response: NodeResponse::Submission {
934                                sequence: result.sequence,
935                                result: result.result,
936                            },
937                        });
938                    }
939                }
940                result = self.confirmations.next(), if !self.confirmations.is_empty() => {
941                    if let Some(Some(result)) = result {
942                        current_event = Some(NodeEvent::NodeResponse {
943                            node_id: result.node_id,
944                            response: NodeResponse::Confirmation {
945                                state_epoch: result.state_epoch,
946                                response: result.response,
947                            },
948                        });
949                    }
950                }
951                result = poll_opt(&mut self.signing), if self.signing.is_some() => {
952                    self.signing = None;
953                    if let Some(Some(result)) = result {
954                        current_event = Some(NodeEvent::NodeResponse {
955                            node_id: result.node_id,
956                            response: NodeResponse::Signing {
957                                sequence: result.sequence,
958                                result: result.response,
959                            },
960                        });
961                    }
962                }
963            }
964        }
965        info!("stopping nodes");
966        shutdown.cancel();
967        Ok(())
968    }
969
970    fn execute_plans(&mut self, plans: Vec<WorkerPlan<S::TxId>>, token: CancellationToken) {
971        for plan in plans {
972            debug!(plan = %plan.summary(), "worker plan");
973            match plan {
974                WorkerPlan::SpawnSigning {
975                    node_id,
976                    sequence,
977                    delay,
978                } => {
979                    if self.signing.is_some() {
980                        continue;
981                    }
982                    let Some(node) = self.servers.get(&node_id).cloned() else {
983                        continue;
984                    };
985                    let Some(request) = self.transactions.peek_pending() else {
986                        continue;
987                    };
988                    let request_ref = request.request.clone();
989                    let tx = self.push_signing();
990                    spawn_cancellable(token.clone(), async move {
991                        time::sleep(delay).await;
992                        let response = node.simulate_and_sign(request_ref, sequence).await;
993                        let _ = tx.send(SigningResult {
994                            node_id,
995                            sequence,
996                            response,
997                        });
998                    });
999                }
1000                WorkerPlan::SpawnSubmit {
1001                    node_id,
1002                    bytes,
1003                    sequence,
1004                    delay,
1005                } => {
1006                    let Some(node) = self.servers.get(&node_id).cloned() else {
1007                        continue;
1008                    };
1009                    spawn_task(&mut self.submissions, token.clone(), async move |tx| {
1010                        time::sleep(delay).await;
1011                        let result = node.submit(bytes, sequence).await;
1012                        let _ = tx.send(SubmissionResult {
1013                            node_id,
1014                            sequence,
1015                            result,
1016                        });
1017                    });
1018                }
1019                WorkerPlan::SpawnConfirmBatch {
1020                    node_id,
1021                    ids,
1022                    epoch,
1023                } => {
1024                    let Some(node) = self.servers.get(&node_id).cloned() else {
1025                        continue;
1026                    };
1027                    spawn_task(&mut self.confirmations, token.clone(), async move |tx| {
1028                        let response = node
1029                            .status_batch(ids)
1030                            .await
1031                            .map(|statuses| ConfirmationResponse::Batch { statuses });
1032                        let _ = tx.send(ConfirmationResult {
1033                            state_epoch: epoch,
1034                            node_id,
1035                            response,
1036                        });
1037                    });
1038                }
1039                WorkerPlan::SpawnRecover { node_id, id, epoch } => {
1040                    let Some(node) = self.servers.get(&node_id).cloned() else {
1041                        continue;
1042                    };
1043                    spawn_task(&mut self.confirmations, token.clone(), async move |tx| {
1044                        let response = node
1045                            .status(id.clone())
1046                            .await
1047                            .map(|status| ConfirmationResponse::Single { id, status });
1048                        let _ = tx.send(ConfirmationResult {
1049                            node_id,
1050                            response,
1051                            state_epoch: epoch,
1052                        });
1053                    });
1054                }
1055            }
1056        }
1057    }
1058
1059    fn apply_mutations(&mut self, mutations: WorkerMutations<S>) -> Result<bool> {
1060        for mutation in mutations {
1061            debug!(mutation = %mutation.summary(), "worker mutation");
1062            match mutation {
1063                WorkerMutation::EnqueueSigned { mut tx } => {
1064                    let Some(request) = self.transactions.pop_pending() else {
1065                        return Err(Error::UnexpectedResponseType(
1066                            "missing pending request".to_string(),
1067                        ));
1068                    };
1069                    tx.callbacks.submitted = Some(request.submit_tx);
1070                    tx.callbacks.confirmed = Some(request.confirm_tx);
1071                    self.enqueue_signed(tx, request.sign_tx)?;
1072                }
1073                WorkerMutation::MarkSubmitted { sequence, id } => {
1074                    self.mark_submitted(sequence, id);
1075                }
1076                WorkerMutation::Confirm { seq, info } => {
1077                    self.confirm_tx(seq, info);
1078                }
1079                WorkerMutation::WorkerStop => {
1080                    let fatal = self.nodes.tail_stop_error();
1081                    self.finalize_remaining(fatal);
1082                    return Ok(true);
1083                }
1084            }
1085        }
1086        if let Some(limit) = self.nodes.min_confirmed_non_stopped() {
1087            let before = self.transactions.confirmed_seq();
1088            debug!(
1089                before = ?before,
1090                limit,
1091                max_seq = ?self.transactions.max_seq(),
1092                "clear_confirmed_up_to candidate"
1093            );
1094            self.clear_confirmed_up_to(limit);
1095            let after = self.transactions.confirmed_seq();
1096            debug!(before = ?before, after = ?after, limit, "clear_confirmed_up_to result");
1097        }
1098        Ok(false)
1099    }
1100
1101    fn push_signing(&mut self) -> oneshot::Sender<SigningResultFor<S>> {
1102        let (tx, rx) = oneshot::channel();
1103        self.signing = Some(async move { rx.await.ok() }.boxed());
1104        tx
1105    }
1106
1107    fn enqueue_signed(
1108        &mut self,
1109        tx: Transaction<S::TxId, S::ConfirmInfo, S::ConfirmResponse, S::SubmitError>,
1110        signed: oneshot::Sender<Result<()>>,
1111    ) -> Result<()> {
1112        let exp_seq = self.transactions.next_sequence();
1113        let tx_sequence = tx.sequence;
1114        if self.transactions.add_transaction(tx).is_err() {
1115            let msg = format!("tx sequence gap: expected {}, got {}", exp_seq, tx_sequence);
1116            let _ = signed.send(Err(crate::Error::UnexpectedResponseType(msg.clone())));
1117            return Err(crate::Error::UnexpectedResponseType(msg));
1118        }
1119        let _ = signed.send(Ok(()));
1120        Ok(())
1121    }
1122
1123    fn mark_submitted(&mut self, sequence: u64, id: S::TxId) {
1124        if let Some(tx) = self.transactions.get_mut(sequence) {
1125            tx.notify_submitted(Ok(id.clone()));
1126        }
1127        let _ = self.transactions.set_submitted_id(sequence, id);
1128    }
1129
1130    fn confirm_tx(&mut self, seq: u64, info: S::ConfirmInfo) {
1131        let Some(tx) = self.transactions.get_mut(seq) else {
1132            return;
1133        };
1134        tx.notify_confirmed(Ok(info));
1135    }
1136
1137    fn finalize_remaining(&mut self, fatal: Option<StopErrorFor<S>>) {
1138        for pending in self.transactions.drain_pending() {
1139            let _ = pending.sign_tx.send(Err(Error::TxWorkerStopped));
1140            let _ = pending.submit_tx.send(Err(Error::TxWorkerStopped));
1141            let _ = pending.confirm_tx.send(Err(StopError::WorkerStopped));
1142        }
1143        loop {
1144            let next = self
1145                .transactions
1146                .confirmed_seq()
1147                .map(|seq| seq.saturating_add(1))
1148                .unwrap_or(0);
1149            let Ok(mut tx) = self.transactions.confirm(next) else {
1150                break;
1151            };
1152            let status = fatal.clone().unwrap_or(StopError::WorkerStopped);
1153            if tx.id.is_none() {
1154                tx.notify_submitted(Err(Error::TxWorkerStopped));
1155            }
1156            tx.notify_confirmed(Err(status));
1157        }
1158    }
1159
1160    fn clear_confirmed_up_to(&mut self, limit: u64) {
1161        loop {
1162            let next = self
1163                .transactions
1164                .confirmed_seq()
1165                .map(|seq| seq.saturating_add(1))
1166                .unwrap_or(0);
1167            if next > limit {
1168                break;
1169            }
1170            if self.transactions.confirm(next).is_err() {
1171                break;
1172            }
1173        }
1174    }
1175}
1176
1177fn spawn_task<T, F, Fut>(
1178    unordered: &mut FuturesUnordered<BoxFuture<'static, Option<T>>>,
1179    token: CancellationToken,
1180    fut_fn: F,
1181) where
1182    T: Send + 'static,
1183    F: FnOnce(oneshot::Sender<T>) -> Fut,
1184    Fut: Future<Output = ()> + Send + 'static,
1185{
1186    let (tx, rx) = oneshot::channel();
1187    spawn_cancellable(token, fut_fn(tx));
1188    unordered.push(async move { rx.await.ok() }.boxed());
1189}
1190
1191async fn poll_opt<F: std::future::Future + Unpin>(fut: &mut Option<F>) -> Option<F::Output> {
1192    match fut.as_mut() {
1193        Some(fut) => Some(fut.await),
1194        None => futures::future::pending().await,
1195    }
1196}
1197
1198async fn poll_shutdown(shutdown: &CancellationToken, seen: &mut bool) {
1199    if *seen {
1200        futures::future::pending::<()>().await;
1201    } else {
1202        shutdown.cancelled().await;
1203        *seen = true;
1204    }
1205}
1206
1207#[cfg(all(test, not(target_arch = "wasm32")))]
1208mod tests;