use std::time::Duration;
use fedimint_core::TransactionId;
use fedimint_core::core::{Decoder, IntoDynInstance, ModuleInstanceId, ModuleKind, OperationId};
use fedimint_core::encoding::{Decodable, Encodable};
use fedimint_core::transaction::{Transaction, TransactionSubmissionOutcome};
use fedimint_core::util::backoff_util::custom_backoff;
use fedimint_core::util::retry;
use fedimint_logging::LOG_CLIENT_NET_API;
use tokio::sync::watch;
use tracing::debug;
use crate::sm::{Context, DynContext, State, StateTransition};
use crate::{DynGlobalClientContext, DynState, TxAcceptedEvent, TxRejectedEvent};
pub const TRANSACTION_SUBMISSION_MODULE_INSTANCE: ModuleInstanceId = 0xffff;
#[derive(Debug, Clone)]
pub struct TxSubmissionContext;
impl Context for TxSubmissionContext {
const KIND: Option<ModuleKind> = None;
}
impl IntoDynInstance for TxSubmissionContext {
type DynType = DynContext;
fn into_dyn(self, instance_id: ModuleInstanceId) -> Self::DynType {
DynContext::from_typed(instance_id, self)
}
}
#[cfg_attr(doc, aquamarine::aquamarine)]
#[derive(Debug, Clone, Eq, PartialEq, Hash, Decodable, Encodable)]
pub struct TxSubmissionStatesSM {
pub operation_id: OperationId,
pub state: TxSubmissionStates,
}
#[derive(Debug, Clone, Eq, PartialEq, Hash, Decodable, Encodable)]
pub enum TxSubmissionStates {
Created(Transaction),
Accepted(TransactionId),
Rejected(TransactionId, String),
NonRetryableError(String),
}
impl State for TxSubmissionStatesSM {
type ModuleContext = TxSubmissionContext;
fn transitions(
&self,
_context: &Self::ModuleContext,
global_context: &DynGlobalClientContext,
) -> Vec<StateTransition<Self>> {
let operation_id = self.operation_id;
let (tx_submitted_sender, tx_submitted_receiver) = watch::channel(false);
match self.state.clone() {
TxSubmissionStates::Created(transaction) => {
let txid = transaction.tx_hash();
vec![
StateTransition::new(
TxSubmissionStates::trigger_created_rejected(
transaction.clone(),
global_context.clone(),
tx_submitted_sender,
),
{
let global_context = global_context.clone();
move |sm_dbtx, error, _| {
let global_context = global_context.clone();
Box::pin(async move {
global_context
.log_event(
sm_dbtx,
TxRejectedEvent {
txid,
operation_id,
error: error.clone(),
},
)
.await;
TxSubmissionStatesSM {
state: TxSubmissionStates::Rejected(txid, error),
operation_id,
}
})
}
},
),
StateTransition::new(
TxSubmissionStates::trigger_created_accepted(
txid,
global_context.clone(),
tx_submitted_receiver,
),
{
let global_context = global_context.clone();
move |sm_dbtx, (), _| {
let global_context = global_context.clone();
Box::pin(async move {
global_context
.log_event(sm_dbtx, TxAcceptedEvent { txid, operation_id })
.await;
TxSubmissionStatesSM {
state: TxSubmissionStates::Accepted(txid),
operation_id,
}
})
}
},
),
]
}
TxSubmissionStates::Accepted(..)
| TxSubmissionStates::Rejected(..)
| TxSubmissionStates::NonRetryableError(..) => {
vec![]
}
}
}
fn operation_id(&self) -> OperationId {
self.operation_id
}
}
impl TxSubmissionStates {
async fn trigger_created_rejected(
transaction: Transaction,
context: DynGlobalClientContext,
tx_submitted: watch::Sender<bool>,
) -> String {
let txid = transaction.tx_hash();
debug!(target: LOG_CLIENT_NET_API, %txid, "Submitting transaction");
retry(
"tx-submit-sm",
custom_backoff(Duration::from_secs(2), Duration::from_mins(10), None),
|| async {
if let TransactionSubmissionOutcome(Err(transaction_error)) = context
.api()
.submit_transaction(transaction.clone())
.await
.try_into_inner(context.decoders())?
{
Ok(transaction_error.to_string())
} else {
debug!(
target: LOG_CLIENT_NET_API,
%txid,
"Transaction submission accepted by peer, awaiting consensus",
);
tx_submitted.send_replace(true);
Err(anyhow::anyhow!("Transaction is still valid"))
}
},
)
.await
.expect("Number of retries is has no limit")
}
async fn trigger_created_accepted(
txid: TransactionId,
context: DynGlobalClientContext,
mut tx_submitted: watch::Receiver<bool>,
) {
let _ = tx_submitted.wait_for(|submitted| *submitted).await;
context.api().await_transaction(txid).await;
debug!(target: LOG_CLIENT_NET_API, %txid, "Transaction accepted in consensus");
}
}
impl IntoDynInstance for TxSubmissionStatesSM {
type DynType = DynState;
fn into_dyn(self, instance_id: ModuleInstanceId) -> Self::DynType {
DynState::from_typed(instance_id, self)
}
}
pub fn tx_submission_sm_decoder() -> Decoder {
let mut decoder_builder = Decoder::builder_system();
decoder_builder.with_decodable_type::<TxSubmissionStatesSM>();
decoder_builder.build()
}