use std::sync::Arc;
use std::time::Duration;
use futures_util::StreamExt;
use solana_commitment_config::{CommitmentConfig, CommitmentLevel};
use solana_keypair::Keypair;
use solana_pubsub_client::nonblocking::pubsub_client::PubsubClient;
use solana_rpc_client_types::config::{RpcSendTransactionConfig, RpcSignatureSubscribeConfig};
use solana_rpc_client_types::response::{ProcessedSignatureResult, Response, RpcSignatureResult};
use solana_signature::Signature;
use solana_signer::Signer;
use super::context::TxContext;
use crate::tui::config::current_user_config;
fn send_cfg() -> RpcSendTransactionConfig {
RpcSendTransactionConfig {
skip_preflight: current_user_config().skip_preflight,
preflight_commitment: Some(CommitmentLevel::Processed),
encoding: None,
max_retries: Some(2),
min_context_slot: None,
}
}
pub(super) const SEND_TIMEOUT: Duration = Duration::from_secs(8);
pub(super) const CONFIRM_TIMEOUT: Duration = Duration::from_secs(8);
const SIGNATURE_SUBSCRIBE_TIMEOUT: Duration = Duration::from_secs(5);
const SIGNATURE_HTTP_POLL_INTERVAL: Duration = Duration::from_millis(350);
const REBROADCAST_INTERVAL: Duration = Duration::from_millis(300);
const REBROADCAST_SEND_TIMEOUT: Duration = Duration::from_secs(1);
pub(super) async fn compile_and_sign(
ctx: &TxContext,
keypair: &Keypair,
ixs: &[solana_instruction::Instruction],
) -> Result<
(
solana_transaction::versioned::VersionedTransaction,
Signature,
),
String,
> {
use solana_message::{v0, VersionedMessage};
use solana_transaction::versioned::VersionedTransaction;
let blockhash = ctx.pop_blockhash().await?;
let message = v0::Message::try_compile(&keypair.pubkey(), ixs, &[], blockhash)
.map_err(|e| format!("{}", e))?;
let tx = VersionedTransaction::try_new(VersionedMessage::V0(message), &[keypair])
.map_err(|e| format!("{}", e))?;
let sig = tx.signatures[0];
Ok((tx, sig))
}
pub(super) enum ConfirmError {
Rejected(String),
NotConfirmed(String),
}
pub(super) async fn get_or_connect_sig_pubsub(
ctx: &TxContext,
) -> Result<Arc<PubsubClient>, String> {
let mut guard = ctx.sig_pubsub.lock().await;
if let Some(ref client) = *guard {
return Ok(Arc::clone(client));
}
let client = tokio::time::timeout(SIGNATURE_SUBSCRIBE_TIMEOUT, PubsubClient::new(&ctx.ws_url))
.await
.map_err(|_| "signature WSS connect timed out after 5s".to_string())?
.map_err(|e| format!("signature WSS connect: {}", e))?;
let client = Arc::new(client);
*guard = Some(Arc::clone(&client));
Ok(client)
}
pub(super) async fn send_and_confirm_on_stream(
ctx: &TxContext,
tx: &solana_transaction::versioned::VersionedTransaction,
sig: &Signature,
stream: &mut (impl futures_util::Stream<Item = Response<RpcSignatureResult>> + Unpin),
) -> Result<(), ConfirmError> {
if let Some(secondary) = ctx.secondary_send_rpc.as_ref() {
let secondary = Arc::clone(secondary);
let tx_clone = tx.clone();
let cfg = send_cfg();
tokio::spawn(async move {
let _ = tokio::time::timeout(
SEND_TIMEOUT,
secondary.send_transaction_with_config(&tx_clone, cfg),
)
.await;
});
}
let send_result = tokio::time::timeout(SEND_TIMEOUT, {
ctx.rpc_client.send_transaction_with_config(tx, send_cfg())
})
.await;
match send_result {
Err(_) => {
return Err(ConfirmError::Rejected(
"Transaction timed out — RPC did not respond within 8s".into(),
));
}
Ok(Err(e)) => {
return Err(ConfirmError::Rejected(format!("{:#?}", e)));
}
Ok(Ok(_)) => {}
}
let confirm = tokio::time::timeout(CONFIRM_TIMEOUT, async {
let wss = async {
while let Some(resp) = stream.next().await {
match resp.value {
RpcSignatureResult::ProcessedSignature(ProcessedSignatureResult { err }) => {
return if err.is_none() {
Ok(())
} else {
Err(format!("transaction failed: {:?}", err))
};
}
RpcSignatureResult::ReceivedSignature(_) => continue,
}
}
Err("signature subscription closed before confirmation".into())
};
let http_poll = async {
let mut interval = tokio::time::interval(SIGNATURE_HTTP_POLL_INTERVAL);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
interval.tick().await; loop {
interval.tick().await;
match ctx.rpc_client.get_signature_status(sig).await {
Ok(Some(Ok(()))) => return Ok(()),
Ok(Some(Err(e))) => return Err(format!("transaction failed: {:?}", e)),
_ => {} }
}
};
tokio::select! {
r = wss => r,
r = http_poll => r,
}
});
let rebroadcast = async {
let mut interval = tokio::time::interval(REBROADCAST_INTERVAL);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
interval.tick().await; loop {
interval.tick().await;
let primary = tokio::time::timeout(
REBROADCAST_SEND_TIMEOUT,
ctx.rpc_client.send_transaction_with_config(tx, send_cfg()),
);
let secondary = async {
if let Some(secondary) = ctx.secondary_send_rpc.as_ref() {
let _ = tokio::time::timeout(
REBROADCAST_SEND_TIMEOUT,
secondary.send_transaction_with_config(tx, send_cfg()),
)
.await;
}
};
let _ = tokio::join!(primary, secondary);
}
#[allow(unreachable_code)]
Result::<(), String>::Ok(())
};
let result = tokio::select! {
r = confirm => r,
_ = rebroadcast => Ok(Err("rebroadcast loop ended unexpectedly".into())),
};
match result {
Ok(r) => r.map_err(ConfirmError::NotConfirmed),
Err(_) => Err(ConfirmError::NotConfirmed("confirmation timeout".into())),
}
}
pub(super) async fn subscribe_send_confirm(
ctx: &TxContext,
tx: &solana_transaction::versioned::VersionedTransaction,
sig: &Signature,
) -> Result<(), ConfirmError> {
let sub_config = RpcSignatureSubscribeConfig {
commitment: Some(CommitmentConfig::processed()),
enable_received_notification: Some(false),
};
let client = get_or_connect_sig_pubsub(ctx)
.await
.map_err(ConfirmError::Rejected)?;
if let Ok(Ok((mut stream, unsubscribe))) = tokio::time::timeout(
SIGNATURE_SUBSCRIBE_TIMEOUT,
client.signature_subscribe(sig, Some(sub_config.clone())),
)
.await
{
let result = send_and_confirm_on_stream(ctx, tx, sig, &mut stream).await;
unsubscribe().await;
return result;
}
drop(client);
{
ctx.sig_pubsub.lock().await.take();
}
let client = get_or_connect_sig_pubsub(ctx)
.await
.map_err(ConfirmError::Rejected)?;
let (mut stream, unsubscribe) = tokio::time::timeout(
SIGNATURE_SUBSCRIBE_TIMEOUT,
client.signature_subscribe(sig, Some(sub_config)),
)
.await
.map_err(|_| ConfirmError::Rejected("signature_subscribe timed out after 5s".to_string()))?
.map_err(|e| ConfirmError::Rejected(format!("signature_subscribe: {}", e)))?;
let result = send_and_confirm_on_stream(ctx, tx, sig, &mut stream).await;
unsubscribe().await;
result
}