use std::collections::HashMap;
use std::fmt;
use std::hash::Hash as StdHash;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use celestia_types::state::ErrorCode;
use futures::future::{BoxFuture, FutureExt};
use futures::stream::{FuturesUnordered, StreamExt};
use lumina_utils::executor::spawn_cancellable;
use lumina_utils::time::{self, Interval};
use tokio::select;
use tokio::sync::{mpsc, oneshot};
use tokio_util::sync::CancellationToken;
use tracing::{debug, info};
mod node_manager;
mod tx_buffer;
use crate::{Error, Result, TxConfig};
use node_manager::NodeManager;
pub type NodeId = Arc<str>;
pub type TxSubmitResult<T, E> = Result<T, SubmitFailure<E>>;
pub type TxConfirmResult<T> = Result<T>;
pub type TxSigningResult<TxId, ConfirmInfo, ConfirmResponse, SubmitErr> =
Result<Transaction<TxId, ConfirmInfo, ConfirmResponse, SubmitErr>, SigningFailure<SubmitErr>>;
pub trait TxIdT: Clone + std::fmt::Debug {}
impl<T> TxIdT for T where T: Clone + std::fmt::Debug {}
#[derive(Debug, Clone)]
pub enum TxPayload {
Blobs(Vec<celestia_types::Blob>),
Tx(celestia_types::state::RawTxBody),
}
#[derive(Debug, Clone)]
pub struct TxRequest {
pub tx: TxPayload,
pub cfg: TxConfig,
}
#[derive(Debug, Clone)]
pub enum StopError<SubmitErr, ConfirmInfo, ConfirmResponse> {
ConfirmError(TxStatus<ConfirmInfo, ConfirmResponse>),
SubmitError(SubmitErr),
SignError(SubmitErr),
WorkerStopped,
}
#[derive(Debug)]
#[allow(clippy::enum_variant_names)]
pub(crate) enum WorkerPlan<TxId: TxIdT> {
SpawnSigning {
node_id: NodeId,
sequence: u64,
delay: Duration,
},
SpawnSubmit {
node_id: NodeId,
bytes: Arc<Vec<u8>>,
sequence: u64,
delay: Duration,
},
SpawnConfirmBatch {
node_id: NodeId,
ids: Vec<TxId>,
epoch: u64,
},
SpawnRecover {
node_id: NodeId,
id: TxId,
epoch: u64,
},
}
impl<TxId: TxIdT> WorkerPlan<TxId> {
pub(crate) fn summary(&self) -> String {
match self {
WorkerPlan::SpawnSigning {
node_id, sequence, ..
} => {
format!("SpawnSigning node={} seq={}", node_id.as_ref(), sequence)
}
WorkerPlan::SpawnSubmit {
node_id, sequence, ..
} => format!("SpawnSubmit node={} seq={}", node_id.as_ref(), sequence),
WorkerPlan::SpawnConfirmBatch { node_id, ids, .. } => {
format!(
"SpawnConfirmBatch node={} count={}",
node_id.as_ref(),
ids.len()
)
}
WorkerPlan::SpawnRecover { node_id, .. } => {
format!("SpawnRecover node={}", node_id.as_ref())
}
}
}
}
#[derive(Debug)]
pub(crate) enum WorkerMutation<TxId: TxIdT, ConfirmInfo, ConfirmResponse, SubmitErr> {
EnqueueSigned {
tx: crate::tx_client_v2::Transaction<TxId, ConfirmInfo, ConfirmResponse, SubmitErr>,
},
MarkSubmitted {
sequence: u64,
id: TxId,
},
Confirm {
seq: u64,
info: ConfirmInfo,
},
WorkerStop,
}
impl<TxId: TxIdT, ConfirmInfo, ConfirmResponse, SubmitErr>
WorkerMutation<TxId, ConfirmInfo, ConfirmResponse, SubmitErr>
{
pub(crate) fn summary(&self) -> String {
match self {
WorkerMutation::EnqueueSigned { tx } => {
format!("EnqueueSigned seq={}", tx.sequence)
}
WorkerMutation::MarkSubmitted { sequence, .. } => {
format!("MarkSubmitted seq={}", sequence)
}
WorkerMutation::Confirm { seq, .. } => format!("Confirm seq={}", seq),
WorkerMutation::WorkerStop => "WorkerStop".to_string(),
}
}
}
pub type ConfirmResult<ConfirmInfo, SubmitErr, ConfirmResponse> =
std::result::Result<ConfirmInfo, StopError<SubmitErr, ConfirmInfo, ConfirmResponse>>;
impl<SubmitErr, ConfirmInfo, ConfirmResponse> fmt::Display
for StopError<SubmitErr, ConfirmInfo, ConfirmResponse>
where
SubmitErr: fmt::Debug,
ConfirmInfo: fmt::Debug,
ConfirmResponse: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
StopError::ConfirmError(status) => write!(f, "ConfirmError({:?})", status),
StopError::SubmitError(err) => write!(f, "SubmitError({:?})", err),
StopError::SignError(err) => write!(f, "SignError({:?})", err),
StopError::WorkerStopped => write!(f, "WorkerStopped"),
}
}
}
impl TxRequest {
pub fn tx(body: celestia_types::state::RawTxBody, cfg: TxConfig) -> Self {
Self {
tx: TxPayload::Tx(body),
cfg,
}
}
pub fn blobs(blobs: Vec<celestia_types::Blob>, cfg: TxConfig) -> Self {
Self {
tx: TxPayload::Blobs(blobs),
cfg,
}
}
}
#[derive(Debug)]
pub struct Transaction<TxId: TxIdT, ConfirmInfo, ConfirmResponse, SubmitErr> {
pub sequence: u64,
pub bytes: Arc<Vec<u8>>,
pub callbacks: TxCallbacks<TxId, ConfirmInfo, ConfirmResponse, SubmitErr>,
pub id: Option<TxId>,
}
impl<TxId: TxIdT, ConfirmInfo, ConfirmResponse, SubmitErr>
Transaction<TxId, ConfirmInfo, ConfirmResponse, SubmitErr>
{
fn notify_submitted(&mut self, result: Result<TxId>) {
if let Some(submitted) = self.callbacks.submitted.take() {
let _ = submitted.send(result);
}
}
fn notify_confirmed(&mut self, result: ConfirmResult<ConfirmInfo, SubmitErr, ConfirmResponse>) {
if let Some(confirmed) = self.callbacks.confirmed.take() {
let _ = confirmed.send(result);
}
}
}
#[derive(Debug)]
pub struct TxCallbacks<TxId: TxIdT, ConfirmInfo, ConfirmResponse, SubmitErr> {
pub submitted: Option<oneshot::Sender<Result<TxId>>>,
pub confirmed: Option<oneshot::Sender<ConfirmResult<ConfirmInfo, SubmitErr, ConfirmResponse>>>,
}
impl<TxId: TxIdT, ConfirmInfo, ConfirmResponse, SubmitErr> Default
for TxCallbacks<TxId, ConfirmInfo, ConfirmResponse, SubmitErr>
{
fn default() -> Self {
Self {
submitted: None,
confirmed: None,
}
}
}
struct RequestWithChannels<TxId, ConfirmInfo, ConfirmResponse, SubmitErr, Request> {
request: Arc<Request>,
sign_tx: oneshot::Sender<Result<()>>,
submit_tx: oneshot::Sender<Result<TxId>>,
confirm_tx: oneshot::Sender<ConfirmResult<ConfirmInfo, SubmitErr, ConfirmResponse>>,
}
impl<TxId: TxIdT, ConfirmInfo, ConfirmResponse, SubmitErr, Request>
RequestWithChannels<TxId, ConfirmInfo, ConfirmResponse, SubmitErr, Request>
{
fn new(
request: Request,
sign_tx: oneshot::Sender<Result<()>>,
submit_tx: oneshot::Sender<Result<TxId>>,
confirm_tx: oneshot::Sender<ConfirmResult<ConfirmInfo, SubmitErr, ConfirmResponse>>,
) -> Self {
Self {
request: Arc::new(request),
sign_tx,
submit_tx,
confirm_tx,
}
}
}
struct SigningResult<TxId: TxIdT, ConfirmInfo, ConfirmResponse, SubmitErr> {
node_id: NodeId,
sequence: u64,
response: TxSigningResult<TxId, ConfirmInfo, ConfirmResponse, SubmitErr>,
}
#[derive(Debug)]
pub struct TxHandle<TxId: TxIdT, ConfirmInfo, ConfirmResponse, SubmitErr> {
pub signed: oneshot::Receiver<Result<()>>,
pub submitted: oneshot::Receiver<Result<TxId>>,
pub confirmed: oneshot::Receiver<ConfirmResult<ConfirmInfo, SubmitErr, ConfirmResponse>>,
}
#[derive(Clone)]
pub struct TxSubmitter<TxId: TxIdT, ConfirmInfo, ConfirmResponse, SubmitErr, Request> {
add_tx:
mpsc::Sender<RequestWithChannels<TxId, ConfirmInfo, ConfirmResponse, SubmitErr, Request>>,
}
impl<TxId: TxIdT, ConfirmInfo, ConfirmResponse, SubmitErr, Request>
TxSubmitter<TxId, ConfirmInfo, ConfirmResponse, SubmitErr, Request>
{
pub async fn add_tx(
&self,
request: Request,
) -> Result<TxHandle<TxId, ConfirmInfo, ConfirmResponse, SubmitErr>> {
let (sign_tx, sign_rx) = oneshot::channel();
let (submit_tx, submit_rx) = oneshot::channel();
let (confirm_tx, confirm_rx) = oneshot::channel();
let request_with_channels =
RequestWithChannels::new(request, sign_tx, submit_tx, confirm_tx);
match self.add_tx.try_send(request_with_channels) {
Ok(()) => Ok(TxHandle {
signed: sign_rx,
submitted: submit_rx,
confirmed: confirm_rx,
}),
Err(mpsc::error::TrySendError::Full(_)) => Err(Error::UnexpectedResponseType(
"transaction queue full".to_string(),
)),
Err(mpsc::error::TrySendError::Closed(_)) => Err(Error::TxWorkerStopped),
}
}
}
#[derive(Debug)]
#[allow(dead_code)]
struct TxIndexEntry<TxId: TxIdT> {
node_id: NodeId,
sequence: u64,
id: TxId,
}
#[derive(Debug, Clone)]
pub enum TxStatusKind<ConfirmInfo> {
Pending,
Confirmed {
info: ConfirmInfo,
},
Rejected {
reason: RejectionReason,
},
Evicted,
Unknown,
}
#[derive(Debug, Clone)]
pub struct TxStatus<ConfirmInfo, OriginalResponse> {
pub kind: TxStatusKind<ConfirmInfo>,
pub original_response: OriginalResponse,
}
impl<ConfirmInfo, OriginalResponse> TxStatus<ConfirmInfo, OriginalResponse> {
pub(crate) fn new(
kind: TxStatusKind<ConfirmInfo>,
original_response: OriginalResponse,
) -> Self {
Self {
kind,
original_response,
}
}
}
#[derive(Debug, Clone)]
#[allow(dead_code)]
pub enum SubmitError {
SequenceMismatch {
expected: u64,
},
InsufficientFee {
expected_fee: u64,
message: String,
},
NetworkError,
MempoolIsFull,
TxInMempoolCache,
Other {
error_code: ErrorCode,
message: String,
},
}
impl SubmitError {
fn label(&self) -> String {
match self {
SubmitError::SequenceMismatch { expected } => {
format!("SequenceMismatch expected={}", expected)
}
SubmitError::InsufficientFee {
expected_fee,
message,
} => format!(
"InsufficientFee expected_fee={} msg={}",
expected_fee, message
),
SubmitError::Other {
error_code,
message,
} => format!("Other code={:?} msg={}", error_code, message),
SubmitError::NetworkError => "NetworkError".to_string(),
SubmitError::MempoolIsFull => "MempoolIsFull".to_string(),
SubmitError::TxInMempoolCache => "TxInMempoolCache".to_string(),
}
}
}
#[derive(Debug, Clone)]
pub struct SubmitFailure<T> {
pub mapped_error: SubmitError,
pub original_error: T,
}
impl<T: fmt::Debug> SubmitFailure<T> {
fn label(&self) -> String {
format!(
"{} original={:?}",
self.mapped_error.label(),
self.original_error
)
}
}
#[derive(Debug, Clone)]
#[allow(dead_code)]
pub enum SigningError {
SequenceMismatch {
expected: u64,
},
Other {
message: String,
},
NetworkError,
}
impl SigningError {
fn label(&self) -> String {
match self {
SigningError::SequenceMismatch { expected } => {
format!("SequenceMismatch expected={}", expected)
}
SigningError::Other { message } => format!("Other msg={}", message),
SigningError::NetworkError => "NetworkError".to_string(),
}
}
}
#[derive(Debug, Clone)]
pub struct SigningFailure<T> {
pub mapped_error: SigningError,
pub original_error: T,
}
impl<T: fmt::Debug> SigningFailure<T> {
fn label(&self) -> String {
format!(
"{} original={:?}",
self.mapped_error.label(),
self.original_error
)
}
}
#[derive(Debug, Clone)]
#[allow(dead_code)]
pub struct ConfirmFailure {
reason: RejectionReason,
}
#[derive(Debug, Clone)]
#[allow(dead_code)]
pub enum RejectionReason {
SequenceMismatch {
expected: u64,
node_id: NodeId,
},
TxNotSubmitted {
expected: u64,
node_id: NodeId,
},
OtherReason {
error_code: ErrorCode,
message: String,
node_id: NodeId,
},
}
#[async_trait]
pub trait TxServer: Send + Sync {
type TxId: TxIdT + Eq + StdHash + Send + Sync + 'static;
type ConfirmInfo: Clone + Send + Sync + 'static;
type TxRequest: Send + Sync + 'static;
type SubmitError: Clone + Send + Sync + fmt::Debug + 'static;
type ConfirmResponse: Clone + Send + Sync + 'static;
async fn submit(
&self,
tx_bytes: Arc<Vec<u8>>,
sequence: u64,
) -> TxSubmitResult<Self::TxId, Self::SubmitError>;
async fn status_batch(
&self,
ids: Vec<Self::TxId>,
) -> TxConfirmResult<
Vec<(
Self::TxId,
TxStatus<Self::ConfirmInfo, Self::ConfirmResponse>,
)>,
>;
async fn status(
&self,
id: Self::TxId,
) -> TxConfirmResult<TxStatus<Self::ConfirmInfo, Self::ConfirmResponse>>;
#[allow(dead_code)]
async fn current_sequence(&self) -> Result<u64>;
async fn simulate_and_sign(
&self,
req: Arc<Self::TxRequest>,
sequence: u64,
) -> Result<
Transaction<Self::TxId, Self::ConfirmInfo, Self::ConfirmResponse, Self::SubmitError>,
SigningFailure<Self::SubmitError>,
>;
}
#[derive(Debug)]
enum NodeEvent<TxId: TxIdT, ConfirmInfo, ConfirmResponse, SubmitErr> {
NodeResponse {
node_id: NodeId,
response: NodeResponse<TxId, ConfirmInfo, ConfirmResponse, SubmitErr>,
},
NodeStop,
}
impl<TxId: TxIdT, ConfirmInfo, ConfirmResponse, SubmitErr: fmt::Debug>
NodeEvent<TxId, ConfirmInfo, ConfirmResponse, SubmitErr>
{
fn summary(&self) -> String {
match self {
NodeEvent::NodeResponse { response, .. } => match response {
NodeResponse::Submission { sequence, result } => match result {
Ok(_) => format!("NodeResponse::Submission seq={} Ok", sequence),
Err(err) => format!(
"NodeResponse::Submission seq={} Err {}",
sequence,
err.label()
),
},
NodeResponse::Confirmation { response, .. } => match response {
Ok(ConfirmationResponse::Batch { statuses }) => {
format!("NodeResponse::Confirmation Batch {}", statuses.len())
}
Ok(ConfirmationResponse::Single { .. }) => {
"NodeResponse::Confirmation Single".to_string()
}
Err(err) => format!("NodeResponse::Confirmation Err {}", err),
},
NodeResponse::Signing { sequence, result } => match result {
Ok(_) => format!("NodeResponse::Signing seq={} Ok", sequence),
Err(err) => {
format!("NodeResponse::Signing seq={} Err {}", sequence, err.label())
}
},
},
NodeEvent::NodeStop => "NodeStop".to_string(),
}
}
}
#[derive(Debug)]
enum NodeResponse<TxId: TxIdT, ConfirmInfo, ConfirmResponse, SubmitErr> {
Submission {
sequence: u64,
result: TxSubmitResult<TxId, SubmitErr>,
},
Confirmation {
state_epoch: u64,
response: TxConfirmResult<ConfirmationResponse<TxId, ConfirmInfo, ConfirmResponse>>,
},
Signing {
sequence: u64,
result: TxSigningResult<TxId, ConfirmInfo, ConfirmResponse, SubmitErr>,
},
}
#[derive(Debug)]
enum ConfirmationResponse<TxId, ConfirmInfo, ConfirmResponse> {
Batch {
statuses: Vec<(TxId, TxStatus<ConfirmInfo, ConfirmResponse>)>,
},
Single {
id: TxId,
status: TxStatus<ConfirmInfo, ConfirmResponse>,
},
}
struct SubmissionResult<TxId, SubmitErr> {
node_id: NodeId,
sequence: u64,
result: TxSubmitResult<TxId, SubmitErr>,
}
struct ConfirmationResult<TxId, ConfirmInfo, ConfirmResponse> {
node_id: NodeId,
state_epoch: u64,
response: TxConfirmResult<ConfirmationResponse<TxId, ConfirmInfo, ConfirmResponse>>,
}
type PendingRequest<S> = RequestWithChannels<
<S as TxServer>::TxId,
<S as TxServer>::ConfirmInfo,
<S as TxServer>::ConfirmResponse,
<S as TxServer>::SubmitError,
<S as TxServer>::TxRequest,
>;
type NodeEventFor<S> = NodeEvent<
<S as TxServer>::TxId,
<S as TxServer>::ConfirmInfo,
<S as TxServer>::ConfirmResponse,
<S as TxServer>::SubmitError,
>;
type SigningResultFor<S> = SigningResult<
<S as TxServer>::TxId,
<S as TxServer>::ConfirmInfo,
<S as TxServer>::ConfirmResponse,
<S as TxServer>::SubmitError,
>;
type StopErrorFor<S> = StopError<
<S as TxServer>::SubmitError,
<S as TxServer>::ConfirmInfo,
<S as TxServer>::ConfirmResponse,
>;
type TxBuf<S> = tx_buffer::TxBuffer<
<S as TxServer>::TxId,
<S as TxServer>::ConfirmInfo,
<S as TxServer>::ConfirmResponse,
<S as TxServer>::SubmitError,
PendingRequest<S>,
>;
type WorkerMutations<S> = Vec<
WorkerMutation<
<S as TxServer>::TxId,
<S as TxServer>::ConfirmInfo,
<S as TxServer>::ConfirmResponse,
<S as TxServer>::SubmitError,
>,
>;
type SubmissionFuture<S> = BoxFuture<
'static,
Option<SubmissionResult<<S as TxServer>::TxId, <S as TxServer>::SubmitError>>,
>;
type ConfirmationFuture<S> = BoxFuture<
'static,
Option<
ConfirmationResult<
<S as TxServer>::TxId,
<S as TxServer>::ConfirmInfo,
<S as TxServer>::ConfirmResponse,
>,
>,
>;
type SigningFuture<S> = BoxFuture<
'static,
Option<
SigningResult<
<S as TxServer>::TxId,
<S as TxServer>::ConfirmInfo,
<S as TxServer>::ConfirmResponse,
<S as TxServer>::SubmitError,
>,
>,
>;
pub struct TransactionWorker<S: TxServer> {
nodes: NodeManager<S>,
servers: HashMap<NodeId, Arc<S>>,
transactions: TxBuf<S>,
add_tx_rx: mpsc::Receiver<PendingRequest<S>>,
submissions: FuturesUnordered<SubmissionFuture<S>>,
confirmations: FuturesUnordered<ConfirmationFuture<S>>,
signing: Option<SigningFuture<S>>,
confirm_ticker: Interval,
max_status_batch: usize,
should_confirm: bool,
}
pub type WorkerPair<S> = (
TxSubmitter<
<S as TxServer>::TxId,
<S as TxServer>::ConfirmInfo,
<S as TxServer>::ConfirmResponse,
<S as TxServer>::SubmitError,
<S as TxServer>::TxRequest,
>,
TransactionWorker<S>,
);
impl<S: TxServer + 'static> TransactionWorker<S> {
pub fn new(
nodes: HashMap<NodeId, Arc<S>>,
confirm_interval: Duration,
max_status_batch: usize,
confirmed_sequence: Option<u64>,
add_tx_capacity: usize,
) -> WorkerPair<S> {
let (add_tx_tx, add_tx_rx) = mpsc::channel(add_tx_capacity);
let manager = TxSubmitter {
add_tx: add_tx_tx.clone(),
};
let node_ids = nodes.keys().cloned().collect::<Vec<_>>();
let node_manager = NodeManager::new(node_ids, confirmed_sequence);
let transactions = tx_buffer::TxBuffer::new(confirmed_sequence);
(
manager,
TransactionWorker {
add_tx_rx,
nodes: node_manager,
servers: nodes,
transactions,
submissions: FuturesUnordered::new(),
confirmations: FuturesUnordered::new(),
signing: None,
confirm_ticker: Interval::new(confirm_interval),
max_status_batch,
should_confirm: false,
},
)
}
fn enqueue_pending(&mut self, request: PendingRequest<S>) -> Result<()> {
self.transactions
.add_pending(request)
.map_err(|_| crate::Error::UnexpectedResponseType("pending queue error".to_string()))?;
Ok(())
}
pub async fn process(&mut self, shutdown: CancellationToken) -> Result<()> {
let mut current_event: Option<NodeEventFor<S>> = None;
let mut shutdown_seen = false;
loop {
let plans = self.nodes.plan(
&self.transactions,
self.max_status_batch,
self.should_confirm,
);
if self.should_confirm {
self.should_confirm = false;
}
self.execute_plans(plans, shutdown.clone());
if let Some(event) = current_event.take() {
let mutations = self.nodes.apply_event(event, &self.transactions);
let stop = self.apply_mutations(mutations)?;
if stop {
break;
}
continue;
}
select! {
_ = poll_shutdown(&shutdown, &mut shutdown_seen) => {
current_event = Some(NodeEvent::NodeStop);
}
tx = self.add_tx_rx.recv() => {
if let Some(tx) = tx {
self.enqueue_pending(tx)?;
}
}
_ = self.confirm_ticker.tick() => {
self.should_confirm = true;
}
result = self.submissions.next(), if !self.submissions.is_empty() => {
if let Some(Some(result)) = result {
current_event = Some(NodeEvent::NodeResponse {
node_id: result.node_id,
response: NodeResponse::Submission {
sequence: result.sequence,
result: result.result,
},
});
}
}
result = self.confirmations.next(), if !self.confirmations.is_empty() => {
if let Some(Some(result)) = result {
current_event = Some(NodeEvent::NodeResponse {
node_id: result.node_id,
response: NodeResponse::Confirmation {
state_epoch: result.state_epoch,
response: result.response,
},
});
}
}
result = poll_opt(&mut self.signing), if self.signing.is_some() => {
self.signing = None;
if let Some(Some(result)) = result {
current_event = Some(NodeEvent::NodeResponse {
node_id: result.node_id,
response: NodeResponse::Signing {
sequence: result.sequence,
result: result.response,
},
});
}
}
}
}
info!("stopping nodes");
shutdown.cancel();
Ok(())
}
fn execute_plans(&mut self, plans: Vec<WorkerPlan<S::TxId>>, token: CancellationToken) {
for plan in plans {
debug!(plan = %plan.summary(), "worker plan");
match plan {
WorkerPlan::SpawnSigning {
node_id,
sequence,
delay,
} => {
if self.signing.is_some() {
continue;
}
let Some(node) = self.servers.get(&node_id).cloned() else {
continue;
};
let Some(request) = self.transactions.peek_pending() else {
continue;
};
let request_ref = request.request.clone();
let tx = self.push_signing();
spawn_cancellable(token.clone(), async move {
time::sleep(delay).await;
let response = node.simulate_and_sign(request_ref, sequence).await;
let _ = tx.send(SigningResult {
node_id,
sequence,
response,
});
});
}
WorkerPlan::SpawnSubmit {
node_id,
bytes,
sequence,
delay,
} => {
let Some(node) = self.servers.get(&node_id).cloned() else {
continue;
};
spawn_task(&mut self.submissions, token.clone(), async move |tx| {
time::sleep(delay).await;
let result = node.submit(bytes, sequence).await;
let _ = tx.send(SubmissionResult {
node_id,
sequence,
result,
});
});
}
WorkerPlan::SpawnConfirmBatch {
node_id,
ids,
epoch,
} => {
let Some(node) = self.servers.get(&node_id).cloned() else {
continue;
};
spawn_task(&mut self.confirmations, token.clone(), async move |tx| {
let response = node
.status_batch(ids)
.await
.map(|statuses| ConfirmationResponse::Batch { statuses });
let _ = tx.send(ConfirmationResult {
state_epoch: epoch,
node_id,
response,
});
});
}
WorkerPlan::SpawnRecover { node_id, id, epoch } => {
let Some(node) = self.servers.get(&node_id).cloned() else {
continue;
};
spawn_task(&mut self.confirmations, token.clone(), async move |tx| {
let response = node
.status(id.clone())
.await
.map(|status| ConfirmationResponse::Single { id, status });
let _ = tx.send(ConfirmationResult {
node_id,
response,
state_epoch: epoch,
});
});
}
}
}
}
fn apply_mutations(&mut self, mutations: WorkerMutations<S>) -> Result<bool> {
for mutation in mutations {
debug!(mutation = %mutation.summary(), "worker mutation");
match mutation {
WorkerMutation::EnqueueSigned { mut tx } => {
let Some(request) = self.transactions.pop_pending() else {
return Err(Error::UnexpectedResponseType(
"missing pending request".to_string(),
));
};
tx.callbacks.submitted = Some(request.submit_tx);
tx.callbacks.confirmed = Some(request.confirm_tx);
self.enqueue_signed(tx, request.sign_tx)?;
}
WorkerMutation::MarkSubmitted { sequence, id } => {
self.mark_submitted(sequence, id);
}
WorkerMutation::Confirm { seq, info } => {
self.confirm_tx(seq, info);
}
WorkerMutation::WorkerStop => {
let fatal = self.nodes.tail_stop_error();
self.finalize_remaining(fatal);
return Ok(true);
}
}
}
if let Some(limit) = self.nodes.min_confirmed_non_stopped() {
let before = self.transactions.confirmed_seq();
debug!(
before = ?before,
limit,
max_seq = ?self.transactions.max_seq(),
"clear_confirmed_up_to candidate"
);
self.clear_confirmed_up_to(limit);
let after = self.transactions.confirmed_seq();
debug!(before = ?before, after = ?after, limit, "clear_confirmed_up_to result");
}
Ok(false)
}
fn push_signing(&mut self) -> oneshot::Sender<SigningResultFor<S>> {
let (tx, rx) = oneshot::channel();
self.signing = Some(async move { rx.await.ok() }.boxed());
tx
}
fn enqueue_signed(
&mut self,
tx: Transaction<S::TxId, S::ConfirmInfo, S::ConfirmResponse, S::SubmitError>,
signed: oneshot::Sender<Result<()>>,
) -> Result<()> {
let exp_seq = self.transactions.next_sequence();
let tx_sequence = tx.sequence;
if self.transactions.add_transaction(tx).is_err() {
let msg = format!("tx sequence gap: expected {}, got {}", exp_seq, tx_sequence);
let _ = signed.send(Err(crate::Error::UnexpectedResponseType(msg.clone())));
return Err(crate::Error::UnexpectedResponseType(msg));
}
let _ = signed.send(Ok(()));
Ok(())
}
fn mark_submitted(&mut self, sequence: u64, id: S::TxId) {
if let Some(tx) = self.transactions.get_mut(sequence) {
tx.notify_submitted(Ok(id.clone()));
}
let _ = self.transactions.set_submitted_id(sequence, id);
}
fn confirm_tx(&mut self, seq: u64, info: S::ConfirmInfo) {
let Some(tx) = self.transactions.get_mut(seq) else {
return;
};
tx.notify_confirmed(Ok(info));
}
fn finalize_remaining(&mut self, fatal: Option<StopErrorFor<S>>) {
for pending in self.transactions.drain_pending() {
let _ = pending.sign_tx.send(Err(Error::TxWorkerStopped));
let _ = pending.submit_tx.send(Err(Error::TxWorkerStopped));
let _ = pending.confirm_tx.send(Err(StopError::WorkerStopped));
}
loop {
let next = self
.transactions
.confirmed_seq()
.map(|seq| seq.saturating_add(1))
.unwrap_or(0);
let Ok(mut tx) = self.transactions.confirm(next) else {
break;
};
let status = fatal.clone().unwrap_or(StopError::WorkerStopped);
if tx.id.is_none() {
tx.notify_submitted(Err(Error::TxWorkerStopped));
}
tx.notify_confirmed(Err(status));
}
}
fn clear_confirmed_up_to(&mut self, limit: u64) {
loop {
let next = self
.transactions
.confirmed_seq()
.map(|seq| seq.saturating_add(1))
.unwrap_or(0);
if next > limit {
break;
}
if self.transactions.confirm(next).is_err() {
break;
}
}
}
}
fn spawn_task<T, F, Fut>(
unordered: &mut FuturesUnordered<BoxFuture<'static, Option<T>>>,
token: CancellationToken,
fut_fn: F,
) where
T: Send + 'static,
F: FnOnce(oneshot::Sender<T>) -> Fut,
Fut: Future<Output = ()> + Send + 'static,
{
let (tx, rx) = oneshot::channel();
spawn_cancellable(token, fut_fn(tx));
unordered.push(async move { rx.await.ok() }.boxed());
}
async fn poll_opt<F: std::future::Future + Unpin>(fut: &mut Option<F>) -> Option<F::Output> {
match fut.as_mut() {
Some(fut) => Some(fut.await),
None => futures::future::pending().await,
}
}
async fn poll_shutdown(shutdown: &CancellationToken, seen: &mut bool) {
if *seen {
futures::future::pending::<()>().await;
} else {
shutdown.cancelled().await;
*seen = true;
}
}
#[cfg(all(test, not(target_arch = "wasm32")))]
mod tests;