1use std::collections::HashMap;
92use std::fmt;
93use std::hash::Hash as StdHash;
94use std::sync::Arc;
95use std::time::Duration;
96
97use async_trait::async_trait;
98use celestia_types::state::ErrorCode;
99use futures::future::{BoxFuture, FutureExt};
100use futures::stream::{FuturesUnordered, StreamExt};
101use lumina_utils::executor::spawn_cancellable;
102use lumina_utils::time::{self, Interval};
103use tokio::select;
104use tokio::sync::{mpsc, oneshot};
105use tokio_util::sync::CancellationToken;
106use tracing::{debug, info};
107
108mod node_manager;
109mod tx_buffer;
110
111use crate::{Error, Result, TxConfig};
112use node_manager::NodeManager;
113pub type NodeId = Arc<str>;
115pub type TxSubmitResult<T, E> = Result<T, SubmitFailure<E>>;
117pub type TxConfirmResult<T> = Result<T>;
119pub type TxSigningResult<TxId, ConfirmInfo, ConfirmResponse, SubmitErr> =
121 Result<Transaction<TxId, ConfirmInfo, ConfirmResponse, SubmitErr>, SigningFailure<SubmitErr>>;
122
123pub trait TxIdT: Clone + std::fmt::Debug {}
125impl<T> TxIdT for T where T: Clone + std::fmt::Debug {}
126
127#[derive(Debug, Clone)]
129pub enum TxPayload {
130 Blobs(Vec<celestia_types::Blob>),
132 Tx(celestia_types::state::RawTxBody),
134}
135
136#[derive(Debug, Clone)]
138pub struct TxRequest {
139 pub tx: TxPayload,
141 pub cfg: TxConfig,
143}
144
145#[derive(Debug, Clone)]
147pub enum StopError<SubmitErr, ConfirmInfo, ConfirmResponse> {
148 ConfirmError(TxStatus<ConfirmInfo, ConfirmResponse>),
150 SubmitError(SubmitErr),
152 SignError(SubmitErr),
154 WorkerStopped,
156}
157
158#[derive(Debug)]
159#[allow(clippy::enum_variant_names)]
160pub(crate) enum WorkerPlan<TxId: TxIdT> {
161 SpawnSigning {
162 node_id: NodeId,
163 sequence: u64,
164 delay: Duration,
165 },
166 SpawnSubmit {
167 node_id: NodeId,
168 bytes: Arc<Vec<u8>>,
169 sequence: u64,
170 delay: Duration,
171 },
172 SpawnConfirmBatch {
173 node_id: NodeId,
174 ids: Vec<TxId>,
175 epoch: u64,
176 },
177 SpawnRecover {
178 node_id: NodeId,
179 id: TxId,
180 epoch: u64,
181 },
182}
183
184impl<TxId: TxIdT> WorkerPlan<TxId> {
185 pub(crate) fn summary(&self) -> String {
186 match self {
187 WorkerPlan::SpawnSigning {
188 node_id, sequence, ..
189 } => {
190 format!("SpawnSigning node={} seq={}", node_id.as_ref(), sequence)
191 }
192 WorkerPlan::SpawnSubmit {
193 node_id, sequence, ..
194 } => format!("SpawnSubmit node={} seq={}", node_id.as_ref(), sequence),
195 WorkerPlan::SpawnConfirmBatch { node_id, ids, .. } => {
196 format!(
197 "SpawnConfirmBatch node={} count={}",
198 node_id.as_ref(),
199 ids.len()
200 )
201 }
202 WorkerPlan::SpawnRecover { node_id, .. } => {
203 format!("SpawnRecover node={}", node_id.as_ref())
204 }
205 }
206 }
207}
208
209#[derive(Debug)]
210pub(crate) enum WorkerMutation<TxId: TxIdT, ConfirmInfo, ConfirmResponse, SubmitErr> {
211 EnqueueSigned {
212 tx: crate::tx_client_v2::Transaction<TxId, ConfirmInfo, ConfirmResponse, SubmitErr>,
213 },
214 MarkSubmitted {
215 sequence: u64,
216 id: TxId,
217 },
218 Confirm {
219 seq: u64,
220 info: ConfirmInfo,
221 },
222 WorkerStop,
223}
224
225impl<TxId: TxIdT, ConfirmInfo, ConfirmResponse, SubmitErr>
226 WorkerMutation<TxId, ConfirmInfo, ConfirmResponse, SubmitErr>
227{
228 pub(crate) fn summary(&self) -> String {
229 match self {
230 WorkerMutation::EnqueueSigned { tx } => {
231 format!("EnqueueSigned seq={}", tx.sequence)
232 }
233 WorkerMutation::MarkSubmitted { sequence, .. } => {
234 format!("MarkSubmitted seq={}", sequence)
235 }
236 WorkerMutation::Confirm { seq, .. } => format!("Confirm seq={}", seq),
237 WorkerMutation::WorkerStop => "WorkerStop".to_string(),
238 }
239 }
240}
241
242pub type ConfirmResult<ConfirmInfo, SubmitErr, ConfirmResponse> =
244 std::result::Result<ConfirmInfo, StopError<SubmitErr, ConfirmInfo, ConfirmResponse>>;
245
246impl<SubmitErr, ConfirmInfo, ConfirmResponse> fmt::Display
247 for StopError<SubmitErr, ConfirmInfo, ConfirmResponse>
248where
249 SubmitErr: fmt::Debug,
250 ConfirmInfo: fmt::Debug,
251 ConfirmResponse: fmt::Debug,
252{
253 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
254 match self {
255 StopError::ConfirmError(status) => write!(f, "ConfirmError({:?})", status),
256 StopError::SubmitError(err) => write!(f, "SubmitError({:?})", err),
257 StopError::SignError(err) => write!(f, "SignError({:?})", err),
258 StopError::WorkerStopped => write!(f, "WorkerStopped"),
259 }
260 }
261}
262
263impl TxRequest {
264 pub fn tx(body: celestia_types::state::RawTxBody, cfg: TxConfig) -> Self {
266 Self {
267 tx: TxPayload::Tx(body),
268 cfg,
269 }
270 }
271
272 pub fn blobs(blobs: Vec<celestia_types::Blob>, cfg: TxConfig) -> Self {
274 Self {
275 tx: TxPayload::Blobs(blobs),
276 cfg,
277 }
278 }
279}
280
281#[derive(Debug)]
283pub struct Transaction<TxId: TxIdT, ConfirmInfo, ConfirmResponse, SubmitErr> {
284 pub sequence: u64,
286 pub bytes: Arc<Vec<u8>>,
288 pub callbacks: TxCallbacks<TxId, ConfirmInfo, ConfirmResponse, SubmitErr>,
290 pub id: Option<TxId>,
292}
293
294impl<TxId: TxIdT, ConfirmInfo, ConfirmResponse, SubmitErr>
295 Transaction<TxId, ConfirmInfo, ConfirmResponse, SubmitErr>
296{
297 fn notify_submitted(&mut self, result: Result<TxId>) {
298 if let Some(submitted) = self.callbacks.submitted.take() {
299 let _ = submitted.send(result);
300 }
301 }
302
303 fn notify_confirmed(&mut self, result: ConfirmResult<ConfirmInfo, SubmitErr, ConfirmResponse>) {
304 if let Some(confirmed) = self.callbacks.confirmed.take() {
305 let _ = confirmed.send(result);
306 }
307 }
308}
309
310#[derive(Debug)]
312pub struct TxCallbacks<TxId: TxIdT, ConfirmInfo, ConfirmResponse, SubmitErr> {
313 pub submitted: Option<oneshot::Sender<Result<TxId>>>,
315 pub confirmed: Option<oneshot::Sender<ConfirmResult<ConfirmInfo, SubmitErr, ConfirmResponse>>>,
317}
318
319impl<TxId: TxIdT, ConfirmInfo, ConfirmResponse, SubmitErr> Default
320 for TxCallbacks<TxId, ConfirmInfo, ConfirmResponse, SubmitErr>
321{
322 fn default() -> Self {
323 Self {
324 submitted: None,
325 confirmed: None,
326 }
327 }
328}
329
330struct RequestWithChannels<TxId, ConfirmInfo, ConfirmResponse, SubmitErr, Request> {
331 request: Arc<Request>,
332 sign_tx: oneshot::Sender<Result<()>>,
333 submit_tx: oneshot::Sender<Result<TxId>>,
334 confirm_tx: oneshot::Sender<ConfirmResult<ConfirmInfo, SubmitErr, ConfirmResponse>>,
335}
336
337impl<TxId: TxIdT, ConfirmInfo, ConfirmResponse, SubmitErr, Request>
338 RequestWithChannels<TxId, ConfirmInfo, ConfirmResponse, SubmitErr, Request>
339{
340 fn new(
341 request: Request,
342 sign_tx: oneshot::Sender<Result<()>>,
343 submit_tx: oneshot::Sender<Result<TxId>>,
344 confirm_tx: oneshot::Sender<ConfirmResult<ConfirmInfo, SubmitErr, ConfirmResponse>>,
345 ) -> Self {
346 Self {
347 request: Arc::new(request),
348 sign_tx,
349 submit_tx,
350 confirm_tx,
351 }
352 }
353}
354
355struct SigningResult<TxId: TxIdT, ConfirmInfo, ConfirmResponse, SubmitErr> {
356 node_id: NodeId,
357 sequence: u64,
358 response: TxSigningResult<TxId, ConfirmInfo, ConfirmResponse, SubmitErr>,
359}
360
361#[derive(Debug)]
363pub struct TxHandle<TxId: TxIdT, ConfirmInfo, ConfirmResponse, SubmitErr> {
364 pub signed: oneshot::Receiver<Result<()>>,
366 pub submitted: oneshot::Receiver<Result<TxId>>,
368 pub confirmed: oneshot::Receiver<ConfirmResult<ConfirmInfo, SubmitErr, ConfirmResponse>>,
370}
371
372#[derive(Clone)]
374pub struct TxSubmitter<TxId: TxIdT, ConfirmInfo, ConfirmResponse, SubmitErr, Request> {
375 add_tx:
376 mpsc::Sender<RequestWithChannels<TxId, ConfirmInfo, ConfirmResponse, SubmitErr, Request>>,
377}
378
379impl<TxId: TxIdT, ConfirmInfo, ConfirmResponse, SubmitErr, Request>
380 TxSubmitter<TxId, ConfirmInfo, ConfirmResponse, SubmitErr, Request>
381{
382 pub async fn add_tx(
384 &self,
385 request: Request,
386 ) -> Result<TxHandle<TxId, ConfirmInfo, ConfirmResponse, SubmitErr>> {
387 let (sign_tx, sign_rx) = oneshot::channel();
388 let (submit_tx, submit_rx) = oneshot::channel();
389 let (confirm_tx, confirm_rx) = oneshot::channel();
390 let request_with_channels =
391 RequestWithChannels::new(request, sign_tx, submit_tx, confirm_tx);
392 match self.add_tx.try_send(request_with_channels) {
393 Ok(()) => Ok(TxHandle {
394 signed: sign_rx,
395 submitted: submit_rx,
396 confirmed: confirm_rx,
397 }),
398 Err(mpsc::error::TrySendError::Full(_)) => Err(Error::UnexpectedResponseType(
399 "transaction queue full".to_string(),
400 )),
401 Err(mpsc::error::TrySendError::Closed(_)) => Err(Error::TxWorkerStopped),
402 }
403 }
404}
405
406#[derive(Debug)]
407#[allow(dead_code)]
408struct TxIndexEntry<TxId: TxIdT> {
409 node_id: NodeId,
410 sequence: u64,
411 id: TxId,
412}
413
414#[derive(Debug, Clone)]
416pub enum TxStatusKind<ConfirmInfo> {
417 Pending,
419 Confirmed {
421 info: ConfirmInfo,
423 },
424 Rejected {
426 reason: RejectionReason,
428 },
429 Evicted,
431 Unknown,
433}
434
435#[derive(Debug, Clone)]
437pub struct TxStatus<ConfirmInfo, OriginalResponse> {
438 pub kind: TxStatusKind<ConfirmInfo>,
440 pub original_response: OriginalResponse,
442}
443
444impl<ConfirmInfo, OriginalResponse> TxStatus<ConfirmInfo, OriginalResponse> {
445 pub(crate) fn new(
446 kind: TxStatusKind<ConfirmInfo>,
447 original_response: OriginalResponse,
448 ) -> Self {
449 Self {
450 kind,
451 original_response,
452 }
453 }
454}
455
456#[derive(Debug, Clone)]
458#[allow(dead_code)]
459pub enum SubmitError {
460 SequenceMismatch {
462 expected: u64,
464 },
465 InsufficientFee {
467 expected_fee: u64,
469 message: String,
471 },
472 NetworkError,
474 MempoolIsFull,
476 TxInMempoolCache,
478 Other {
480 error_code: ErrorCode,
482 message: String,
484 },
485}
486
487impl SubmitError {
488 fn label(&self) -> String {
489 match self {
490 SubmitError::SequenceMismatch { expected } => {
491 format!("SequenceMismatch expected={}", expected)
492 }
493 SubmitError::InsufficientFee {
494 expected_fee,
495 message,
496 } => format!(
497 "InsufficientFee expected_fee={} msg={}",
498 expected_fee, message
499 ),
500 SubmitError::Other {
501 error_code,
502 message,
503 } => format!("Other code={:?} msg={}", error_code, message),
504 SubmitError::NetworkError => "NetworkError".to_string(),
505 SubmitError::MempoolIsFull => "MempoolIsFull".to_string(),
506 SubmitError::TxInMempoolCache => "TxInMempoolCache".to_string(),
507 }
508 }
509}
510
511#[derive(Debug, Clone)]
513pub struct SubmitFailure<T> {
514 pub mapped_error: SubmitError,
516 pub original_error: T,
518}
519
520impl<T: fmt::Debug> SubmitFailure<T> {
521 fn label(&self) -> String {
522 format!(
523 "{} original={:?}",
524 self.mapped_error.label(),
525 self.original_error
526 )
527 }
528}
529
530#[derive(Debug, Clone)]
532#[allow(dead_code)]
533pub enum SigningError {
534 SequenceMismatch {
536 expected: u64,
538 },
539 Other {
541 message: String,
543 },
544 NetworkError,
546}
547
548impl SigningError {
549 fn label(&self) -> String {
550 match self {
551 SigningError::SequenceMismatch { expected } => {
552 format!("SequenceMismatch expected={}", expected)
553 }
554 SigningError::Other { message } => format!("Other msg={}", message),
555 SigningError::NetworkError => "NetworkError".to_string(),
556 }
557 }
558}
559
560#[derive(Debug, Clone)]
562pub struct SigningFailure<T> {
563 pub mapped_error: SigningError,
565 pub original_error: T,
567}
568
569impl<T: fmt::Debug> SigningFailure<T> {
570 fn label(&self) -> String {
571 format!(
572 "{} original={:?}",
573 self.mapped_error.label(),
574 self.original_error
575 )
576 }
577}
578
579#[derive(Debug, Clone)]
581#[allow(dead_code)]
582pub struct ConfirmFailure {
583 reason: RejectionReason,
584}
585
586#[derive(Debug, Clone)]
588#[allow(dead_code)]
589pub enum RejectionReason {
590 SequenceMismatch {
592 expected: u64,
594 node_id: NodeId,
596 },
597 TxNotSubmitted {
599 expected: u64,
601 node_id: NodeId,
603 },
604 OtherReason {
606 error_code: ErrorCode,
608 message: String,
610 node_id: NodeId,
612 },
613}
614
615#[async_trait]
619pub trait TxServer: Send + Sync {
620 type TxId: TxIdT + Eq + StdHash + Send + Sync + 'static;
622 type ConfirmInfo: Clone + Send + Sync + 'static;
624 type TxRequest: Send + Sync + 'static;
626 type SubmitError: Clone + Send + Sync + fmt::Debug + 'static;
628 type ConfirmResponse: Clone + Send + Sync + 'static;
630
631 async fn submit(
633 &self,
634 tx_bytes: Arc<Vec<u8>>,
635 sequence: u64,
636 ) -> TxSubmitResult<Self::TxId, Self::SubmitError>;
637 async fn status_batch(
639 &self,
640 ids: Vec<Self::TxId>,
641 ) -> TxConfirmResult<
642 Vec<(
643 Self::TxId,
644 TxStatus<Self::ConfirmInfo, Self::ConfirmResponse>,
645 )>,
646 >;
647 async fn status(
649 &self,
650 id: Self::TxId,
651 ) -> TxConfirmResult<TxStatus<Self::ConfirmInfo, Self::ConfirmResponse>>;
652 #[allow(dead_code)]
654 async fn current_sequence(&self) -> Result<u64>;
655 async fn simulate_and_sign(
657 &self,
658 req: Arc<Self::TxRequest>,
659 sequence: u64,
660 ) -> Result<
661 Transaction<Self::TxId, Self::ConfirmInfo, Self::ConfirmResponse, Self::SubmitError>,
662 SigningFailure<Self::SubmitError>,
663 >;
664}
665
666#[derive(Debug)]
667enum NodeEvent<TxId: TxIdT, ConfirmInfo, ConfirmResponse, SubmitErr> {
668 NodeResponse {
669 node_id: NodeId,
670 response: NodeResponse<TxId, ConfirmInfo, ConfirmResponse, SubmitErr>,
671 },
672 NodeStop,
673}
674
675impl<TxId: TxIdT, ConfirmInfo, ConfirmResponse, SubmitErr: fmt::Debug>
676 NodeEvent<TxId, ConfirmInfo, ConfirmResponse, SubmitErr>
677{
678 fn summary(&self) -> String {
679 match self {
680 NodeEvent::NodeResponse { response, .. } => match response {
681 NodeResponse::Submission { sequence, result } => match result {
682 Ok(_) => format!("NodeResponse::Submission seq={} Ok", sequence),
683 Err(err) => format!(
684 "NodeResponse::Submission seq={} Err {}",
685 sequence,
686 err.label()
687 ),
688 },
689 NodeResponse::Confirmation { response, .. } => match response {
690 Ok(ConfirmationResponse::Batch { statuses }) => {
691 format!("NodeResponse::Confirmation Batch {}", statuses.len())
692 }
693 Ok(ConfirmationResponse::Single { .. }) => {
694 "NodeResponse::Confirmation Single".to_string()
695 }
696 Err(err) => format!("NodeResponse::Confirmation Err {}", err),
697 },
698 NodeResponse::Signing { sequence, result } => match result {
699 Ok(_) => format!("NodeResponse::Signing seq={} Ok", sequence),
700 Err(err) => {
701 format!("NodeResponse::Signing seq={} Err {}", sequence, err.label())
702 }
703 },
704 },
705 NodeEvent::NodeStop => "NodeStop".to_string(),
706 }
707 }
708}
709
710#[derive(Debug)]
711enum NodeResponse<TxId: TxIdT, ConfirmInfo, ConfirmResponse, SubmitErr> {
712 Submission {
713 sequence: u64,
714 result: TxSubmitResult<TxId, SubmitErr>,
715 },
716 Confirmation {
717 state_epoch: u64,
718 response: TxConfirmResult<ConfirmationResponse<TxId, ConfirmInfo, ConfirmResponse>>,
719 },
720 Signing {
721 sequence: u64,
722 result: TxSigningResult<TxId, ConfirmInfo, ConfirmResponse, SubmitErr>,
723 },
724}
725
726#[derive(Debug)]
727enum ConfirmationResponse<TxId, ConfirmInfo, ConfirmResponse> {
728 Batch {
729 statuses: Vec<(TxId, TxStatus<ConfirmInfo, ConfirmResponse>)>,
730 },
731 Single {
732 id: TxId,
733 status: TxStatus<ConfirmInfo, ConfirmResponse>,
734 },
735}
736
737struct SubmissionResult<TxId, SubmitErr> {
738 node_id: NodeId,
739 sequence: u64,
740 result: TxSubmitResult<TxId, SubmitErr>,
741}
742
743struct ConfirmationResult<TxId, ConfirmInfo, ConfirmResponse> {
744 node_id: NodeId,
745 state_epoch: u64,
746 response: TxConfirmResult<ConfirmationResponse<TxId, ConfirmInfo, ConfirmResponse>>,
747}
748
749type PendingRequest<S> = RequestWithChannels<
750 <S as TxServer>::TxId,
751 <S as TxServer>::ConfirmInfo,
752 <S as TxServer>::ConfirmResponse,
753 <S as TxServer>::SubmitError,
754 <S as TxServer>::TxRequest,
755>;
756type NodeEventFor<S> = NodeEvent<
757 <S as TxServer>::TxId,
758 <S as TxServer>::ConfirmInfo,
759 <S as TxServer>::ConfirmResponse,
760 <S as TxServer>::SubmitError,
761>;
762type SigningResultFor<S> = SigningResult<
763 <S as TxServer>::TxId,
764 <S as TxServer>::ConfirmInfo,
765 <S as TxServer>::ConfirmResponse,
766 <S as TxServer>::SubmitError,
767>;
768type StopErrorFor<S> = StopError<
769 <S as TxServer>::SubmitError,
770 <S as TxServer>::ConfirmInfo,
771 <S as TxServer>::ConfirmResponse,
772>;
773type TxBuf<S> = tx_buffer::TxBuffer<
774 <S as TxServer>::TxId,
775 <S as TxServer>::ConfirmInfo,
776 <S as TxServer>::ConfirmResponse,
777 <S as TxServer>::SubmitError,
778 PendingRequest<S>,
779>;
780type WorkerMutations<S> = Vec<
781 WorkerMutation<
782 <S as TxServer>::TxId,
783 <S as TxServer>::ConfirmInfo,
784 <S as TxServer>::ConfirmResponse,
785 <S as TxServer>::SubmitError,
786 >,
787>;
788type SubmissionFuture<S> = BoxFuture<
789 'static,
790 Option<SubmissionResult<<S as TxServer>::TxId, <S as TxServer>::SubmitError>>,
791>;
792type ConfirmationFuture<S> = BoxFuture<
793 'static,
794 Option<
795 ConfirmationResult<
796 <S as TxServer>::TxId,
797 <S as TxServer>::ConfirmInfo,
798 <S as TxServer>::ConfirmResponse,
799 >,
800 >,
801>;
802type SigningFuture<S> = BoxFuture<
803 'static,
804 Option<
805 SigningResult<
806 <S as TxServer>::TxId,
807 <S as TxServer>::ConfirmInfo,
808 <S as TxServer>::ConfirmResponse,
809 <S as TxServer>::SubmitError,
810 >,
811 >,
812>;
813
814pub struct TransactionWorker<S: TxServer> {
819 nodes: NodeManager<S>,
820 servers: HashMap<NodeId, Arc<S>>,
821 transactions: TxBuf<S>,
822 add_tx_rx: mpsc::Receiver<PendingRequest<S>>,
823 submissions: FuturesUnordered<SubmissionFuture<S>>,
824 confirmations: FuturesUnordered<ConfirmationFuture<S>>,
825 signing: Option<SigningFuture<S>>,
826 confirm_ticker: Interval,
827 max_status_batch: usize,
828 should_confirm: bool,
829}
830
831pub type WorkerPair<S> = (
833 TxSubmitter<
834 <S as TxServer>::TxId,
835 <S as TxServer>::ConfirmInfo,
836 <S as TxServer>::ConfirmResponse,
837 <S as TxServer>::SubmitError,
838 <S as TxServer>::TxRequest,
839 >,
840 TransactionWorker<S>,
841);
842
843impl<S: TxServer + 'static> TransactionWorker<S> {
844 pub fn new(
851 nodes: HashMap<NodeId, Arc<S>>,
852 confirm_interval: Duration,
853 max_status_batch: usize,
854 confirmed_sequence: Option<u64>,
855 add_tx_capacity: usize,
856 ) -> WorkerPair<S> {
857 let (add_tx_tx, add_tx_rx) = mpsc::channel(add_tx_capacity);
858 let manager = TxSubmitter {
859 add_tx: add_tx_tx.clone(),
860 };
861 let node_ids = nodes.keys().cloned().collect::<Vec<_>>();
862 let node_manager = NodeManager::new(node_ids, confirmed_sequence);
863 let transactions = tx_buffer::TxBuffer::new(confirmed_sequence);
864 (
865 manager,
866 TransactionWorker {
867 add_tx_rx,
868 nodes: node_manager,
869 servers: nodes,
870 transactions,
871 submissions: FuturesUnordered::new(),
872 confirmations: FuturesUnordered::new(),
873 signing: None,
874 confirm_ticker: Interval::new(confirm_interval),
875 max_status_batch,
876 should_confirm: false,
877 },
878 )
879 }
880
881 fn enqueue_pending(&mut self, request: PendingRequest<S>) -> Result<()> {
882 self.transactions
883 .add_pending(request)
884 .map_err(|_| crate::Error::UnexpectedResponseType("pending queue error".to_string()))?;
885 Ok(())
886 }
887
888 pub async fn process(&mut self, shutdown: CancellationToken) -> Result<()> {
894 let mut current_event: Option<NodeEventFor<S>> = None;
895 let mut shutdown_seen = false;
896
897 loop {
898 let plans = self.nodes.plan(
899 &self.transactions,
900 self.max_status_batch,
901 self.should_confirm,
902 );
903 if self.should_confirm {
904 self.should_confirm = false;
905 }
906 self.execute_plans(plans, shutdown.clone());
907
908 if let Some(event) = current_event.take() {
909 let mutations = self.nodes.apply_event(event, &self.transactions);
910 let stop = self.apply_mutations(mutations)?;
911 if stop {
912 break;
913 }
914 continue;
915 }
916
917 select! {
918 _ = poll_shutdown(&shutdown, &mut shutdown_seen) => {
919 current_event = Some(NodeEvent::NodeStop);
920 }
921 tx = self.add_tx_rx.recv() => {
922 if let Some(tx) = tx {
923 self.enqueue_pending(tx)?;
924 }
925 }
926 _ = self.confirm_ticker.tick() => {
927 self.should_confirm = true;
928 }
929 result = self.submissions.next(), if !self.submissions.is_empty() => {
930 if let Some(Some(result)) = result {
931 current_event = Some(NodeEvent::NodeResponse {
932 node_id: result.node_id,
933 response: NodeResponse::Submission {
934 sequence: result.sequence,
935 result: result.result,
936 },
937 });
938 }
939 }
940 result = self.confirmations.next(), if !self.confirmations.is_empty() => {
941 if let Some(Some(result)) = result {
942 current_event = Some(NodeEvent::NodeResponse {
943 node_id: result.node_id,
944 response: NodeResponse::Confirmation {
945 state_epoch: result.state_epoch,
946 response: result.response,
947 },
948 });
949 }
950 }
951 result = poll_opt(&mut self.signing), if self.signing.is_some() => {
952 self.signing = None;
953 if let Some(Some(result)) = result {
954 current_event = Some(NodeEvent::NodeResponse {
955 node_id: result.node_id,
956 response: NodeResponse::Signing {
957 sequence: result.sequence,
958 result: result.response,
959 },
960 });
961 }
962 }
963 }
964 }
965 info!("stopping nodes");
966 shutdown.cancel();
967 Ok(())
968 }
969
970 fn execute_plans(&mut self, plans: Vec<WorkerPlan<S::TxId>>, token: CancellationToken) {
971 for plan in plans {
972 debug!(plan = %plan.summary(), "worker plan");
973 match plan {
974 WorkerPlan::SpawnSigning {
975 node_id,
976 sequence,
977 delay,
978 } => {
979 if self.signing.is_some() {
980 continue;
981 }
982 let Some(node) = self.servers.get(&node_id).cloned() else {
983 continue;
984 };
985 let Some(request) = self.transactions.peek_pending() else {
986 continue;
987 };
988 let request_ref = request.request.clone();
989 let tx = self.push_signing();
990 spawn_cancellable(token.clone(), async move {
991 time::sleep(delay).await;
992 let response = node.simulate_and_sign(request_ref, sequence).await;
993 let _ = tx.send(SigningResult {
994 node_id,
995 sequence,
996 response,
997 });
998 });
999 }
1000 WorkerPlan::SpawnSubmit {
1001 node_id,
1002 bytes,
1003 sequence,
1004 delay,
1005 } => {
1006 let Some(node) = self.servers.get(&node_id).cloned() else {
1007 continue;
1008 };
1009 spawn_task(&mut self.submissions, token.clone(), async move |tx| {
1010 time::sleep(delay).await;
1011 let result = node.submit(bytes, sequence).await;
1012 let _ = tx.send(SubmissionResult {
1013 node_id,
1014 sequence,
1015 result,
1016 });
1017 });
1018 }
1019 WorkerPlan::SpawnConfirmBatch {
1020 node_id,
1021 ids,
1022 epoch,
1023 } => {
1024 let Some(node) = self.servers.get(&node_id).cloned() else {
1025 continue;
1026 };
1027 spawn_task(&mut self.confirmations, token.clone(), async move |tx| {
1028 let response = node
1029 .status_batch(ids)
1030 .await
1031 .map(|statuses| ConfirmationResponse::Batch { statuses });
1032 let _ = tx.send(ConfirmationResult {
1033 state_epoch: epoch,
1034 node_id,
1035 response,
1036 });
1037 });
1038 }
1039 WorkerPlan::SpawnRecover { node_id, id, epoch } => {
1040 let Some(node) = self.servers.get(&node_id).cloned() else {
1041 continue;
1042 };
1043 spawn_task(&mut self.confirmations, token.clone(), async move |tx| {
1044 let response = node
1045 .status(id.clone())
1046 .await
1047 .map(|status| ConfirmationResponse::Single { id, status });
1048 let _ = tx.send(ConfirmationResult {
1049 node_id,
1050 response,
1051 state_epoch: epoch,
1052 });
1053 });
1054 }
1055 }
1056 }
1057 }
1058
1059 fn apply_mutations(&mut self, mutations: WorkerMutations<S>) -> Result<bool> {
1060 for mutation in mutations {
1061 debug!(mutation = %mutation.summary(), "worker mutation");
1062 match mutation {
1063 WorkerMutation::EnqueueSigned { mut tx } => {
1064 let Some(request) = self.transactions.pop_pending() else {
1065 return Err(Error::UnexpectedResponseType(
1066 "missing pending request".to_string(),
1067 ));
1068 };
1069 tx.callbacks.submitted = Some(request.submit_tx);
1070 tx.callbacks.confirmed = Some(request.confirm_tx);
1071 self.enqueue_signed(tx, request.sign_tx)?;
1072 }
1073 WorkerMutation::MarkSubmitted { sequence, id } => {
1074 self.mark_submitted(sequence, id);
1075 }
1076 WorkerMutation::Confirm { seq, info } => {
1077 self.confirm_tx(seq, info);
1078 }
1079 WorkerMutation::WorkerStop => {
1080 let fatal = self.nodes.tail_stop_error();
1081 self.finalize_remaining(fatal);
1082 return Ok(true);
1083 }
1084 }
1085 }
1086 if let Some(limit) = self.nodes.min_confirmed_non_stopped() {
1087 let before = self.transactions.confirmed_seq();
1088 debug!(
1089 before = ?before,
1090 limit,
1091 max_seq = ?self.transactions.max_seq(),
1092 "clear_confirmed_up_to candidate"
1093 );
1094 self.clear_confirmed_up_to(limit);
1095 let after = self.transactions.confirmed_seq();
1096 debug!(before = ?before, after = ?after, limit, "clear_confirmed_up_to result");
1097 }
1098 Ok(false)
1099 }
1100
1101 fn push_signing(&mut self) -> oneshot::Sender<SigningResultFor<S>> {
1102 let (tx, rx) = oneshot::channel();
1103 self.signing = Some(async move { rx.await.ok() }.boxed());
1104 tx
1105 }
1106
1107 fn enqueue_signed(
1108 &mut self,
1109 tx: Transaction<S::TxId, S::ConfirmInfo, S::ConfirmResponse, S::SubmitError>,
1110 signed: oneshot::Sender<Result<()>>,
1111 ) -> Result<()> {
1112 let exp_seq = self.transactions.next_sequence();
1113 let tx_sequence = tx.sequence;
1114 if self.transactions.add_transaction(tx).is_err() {
1115 let msg = format!("tx sequence gap: expected {}, got {}", exp_seq, tx_sequence);
1116 let _ = signed.send(Err(crate::Error::UnexpectedResponseType(msg.clone())));
1117 return Err(crate::Error::UnexpectedResponseType(msg));
1118 }
1119 let _ = signed.send(Ok(()));
1120 Ok(())
1121 }
1122
1123 fn mark_submitted(&mut self, sequence: u64, id: S::TxId) {
1124 if let Some(tx) = self.transactions.get_mut(sequence) {
1125 tx.notify_submitted(Ok(id.clone()));
1126 }
1127 let _ = self.transactions.set_submitted_id(sequence, id);
1128 }
1129
1130 fn confirm_tx(&mut self, seq: u64, info: S::ConfirmInfo) {
1131 let Some(tx) = self.transactions.get_mut(seq) else {
1132 return;
1133 };
1134 tx.notify_confirmed(Ok(info));
1135 }
1136
1137 fn finalize_remaining(&mut self, fatal: Option<StopErrorFor<S>>) {
1138 for pending in self.transactions.drain_pending() {
1139 let _ = pending.sign_tx.send(Err(Error::TxWorkerStopped));
1140 let _ = pending.submit_tx.send(Err(Error::TxWorkerStopped));
1141 let _ = pending.confirm_tx.send(Err(StopError::WorkerStopped));
1142 }
1143 loop {
1144 let next = self
1145 .transactions
1146 .confirmed_seq()
1147 .map(|seq| seq.saturating_add(1))
1148 .unwrap_or(0);
1149 let Ok(mut tx) = self.transactions.confirm(next) else {
1150 break;
1151 };
1152 let status = fatal.clone().unwrap_or(StopError::WorkerStopped);
1153 if tx.id.is_none() {
1154 tx.notify_submitted(Err(Error::TxWorkerStopped));
1155 }
1156 tx.notify_confirmed(Err(status));
1157 }
1158 }
1159
1160 fn clear_confirmed_up_to(&mut self, limit: u64) {
1161 loop {
1162 let next = self
1163 .transactions
1164 .confirmed_seq()
1165 .map(|seq| seq.saturating_add(1))
1166 .unwrap_or(0);
1167 if next > limit {
1168 break;
1169 }
1170 if self.transactions.confirm(next).is_err() {
1171 break;
1172 }
1173 }
1174 }
1175}
1176
1177fn spawn_task<T, F, Fut>(
1178 unordered: &mut FuturesUnordered<BoxFuture<'static, Option<T>>>,
1179 token: CancellationToken,
1180 fut_fn: F,
1181) where
1182 T: Send + 'static,
1183 F: FnOnce(oneshot::Sender<T>) -> Fut,
1184 Fut: Future<Output = ()> + Send + 'static,
1185{
1186 let (tx, rx) = oneshot::channel();
1187 spawn_cancellable(token, fut_fn(tx));
1188 unordered.push(async move { rx.await.ok() }.boxed());
1189}
1190
1191async fn poll_opt<F: std::future::Future + Unpin>(fut: &mut Option<F>) -> Option<F::Output> {
1192 match fut.as_mut() {
1193 Some(fut) => Some(fut.await),
1194 None => futures::future::pending().await,
1195 }
1196}
1197
1198async fn poll_shutdown(shutdown: &CancellationToken, seen: &mut bool) {
1199 if *seen {
1200 futures::future::pending::<()>().await;
1201 } else {
1202 shutdown.cancelled().await;
1203 *seen = true;
1204 }
1205}
1206
1207#[cfg(all(test, not(target_arch = "wasm32")))]
1208mod tests;