use super::*;
pub(in crate::tui::runtime) fn spawn_wallet_wss(
pubkey_bytes: [u8; 32],
ws_url: String,
usdc_tx: UnboundedSender<f64>,
sol_tx: UnboundedSender<f64>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
let token_program_id =
solana_pubkey::Pubkey::from_str("TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA")
.expect("valid token program id");
let ata_program_id =
solana_pubkey::Pubkey::from_str("ATokenGPvbdGVxr1b2hvZbsiqW5xWH25efTNsLJA8knL")
.expect("valid ata program id");
let usdc_mint =
solana_pubkey::Pubkey::from_str("EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v")
.expect("valid usdc mint id");
let wallet_pk = solana_pubkey::Pubkey::from(pubkey_bytes);
let (ata, _) = solana_pubkey::Pubkey::find_program_address(
&[
pubkey_bytes.as_ref(),
token_program_id.as_ref(),
usdc_mint.as_ref(),
],
&ata_program_id,
);
let account_cfg = RpcAccountInfoConfig {
encoding: Some(UiAccountEncoding::Base64),
commitment: Some(CommitmentConfig::processed()),
..Default::default()
};
{
let rpc_client = RpcClient::new_with_commitment(
rpc_http_url_from_env(),
CommitmentConfig::processed(),
);
if let Ok(res) = rpc_client.get_token_account_balance(&ata).await {
let _ = usdc_tx.send(res.ui_amount.unwrap_or(0.0));
}
if let Ok(lamports) = rpc_client.get_balance(&wallet_pk).await {
let _ = sol_tx.send(lamports as f64 / 1_000_000_000.0);
}
}
let mut backoff = WSS_RETRY_INIT;
loop {
let pubsub = match PubsubClient::new(&ws_url).await {
Ok(c) => c,
Err(_) => {
tokio::time::sleep(backoff).await;
backoff = (backoff * 2).min(WSS_RETRY_CAP);
continue;
}
};
let (mut usdc_stream, usdc_unsub) = match pubsub
.account_subscribe(&ata, Some(account_cfg.clone()))
.await
{
Ok(s) => s,
Err(_) => {
tokio::time::sleep(backoff).await;
backoff = (backoff * 2).min(WSS_RETRY_CAP);
continue;
}
};
let (mut sol_stream, sol_unsub) = match pubsub
.account_subscribe(&wallet_pk, Some(account_cfg.clone()))
.await
{
Ok(s) => s,
Err(_) => {
usdc_unsub().await;
tokio::time::sleep(backoff).await;
backoff = (backoff * 2).min(WSS_RETRY_CAP);
continue;
}
};
backoff = WSS_RETRY_INIT;
loop {
tokio::select! {
msg = usdc_stream.next() => {
let Some(resp) = msg else { break };
if let Some(data) = resp.value.data.decode() {
if data.len() >= 72 {
let raw = u64::from_le_bytes(
data[64..72].try_into().unwrap_or_default(),
);
let _ = usdc_tx.send(raw as f64 / 1_000_000.0);
}
}
}
msg = sol_stream.next() => {
let Some(resp) = msg else { break };
let _ = sol_tx.send(resp.value.lamports as f64 / 1_000_000_000.0);
}
}
}
usdc_unsub().await;
sol_unsub().await;
tokio::time::sleep(WSS_RETRY_INIT).await;
}
})
}
pub(super) const BALANCE_FETCH_TIMEOUT: Duration = Duration::from_millis(1500);
pub(super) const TOP_POSITIONS_TIMEOUT: Duration = Duration::from_secs(5);