use std::collections::HashMap;
use crate::errors::{PerpCityError, Result};
use crate::hft::gas::{FeeCache, GasFees, Urgency};
use crate::hft::nonce::NonceManager;
#[derive(Debug, Clone)]
pub struct TxRequest {
pub to: [u8; 20],
pub calldata: Vec<u8>,
pub value: u128,
pub gas_limit: u64,
pub urgency: Urgency,
}
#[derive(Debug, Clone)]
pub struct PreparedTx {
pub nonce: u64,
pub gas_limit: u64,
pub gas_fees: GasFees,
pub request: TxRequest,
}
#[derive(Debug, Clone)]
pub struct InFlightTx {
pub nonce: u64,
pub tx_hash: [u8; 32],
pub request: TxRequest,
pub submitted_at_ms: u64,
pub gas_fees: GasFees,
}
#[derive(Debug, Clone, Copy)]
pub struct BumpParams {
pub nonce: u64,
pub gas_limit: u64,
pub new_max_priority_fee: u64,
pub new_max_fee: u64,
pub original_tx_hash: [u8; 32],
}
#[derive(Debug, Clone, Copy)]
pub struct PipelineConfig {
pub max_in_flight: usize,
pub stuck_timeout_ms: u64,
}
impl Default for PipelineConfig {
fn default() -> Self {
Self {
max_in_flight: 16,
stuck_timeout_ms: 30_000,
}
}
}
#[derive(Debug)]
pub struct TxPipeline {
nonce_mgr: NonceManager,
config: PipelineConfig,
in_flight: HashMap<[u8; 32], InFlightTx>,
}
impl TxPipeline {
pub fn new(starting_nonce: u64, config: PipelineConfig) -> Self {
Self {
nonce_mgr: NonceManager::new(starting_nonce),
config,
in_flight: HashMap::new(),
}
}
#[inline]
pub fn prepare(
&self,
request: TxRequest,
fee_cache: &FeeCache,
now_ms: u64,
) -> Result<PreparedTx> {
if self.in_flight.len() >= self.config.max_in_flight {
tracing::warn!(
count = self.in_flight.len(),
max = self.config.max_in_flight,
"too many in-flight transactions"
);
return Err(PerpCityError::TooManyInFlight {
count: self.in_flight.len(),
max: self.config.max_in_flight,
});
}
let gas_fees = fee_cache.fees_for(request.urgency, now_ms).ok_or_else(|| {
tracing::warn!("gas cache stale or empty");
PerpCityError::GasPriceUnavailable {
reason: "gas cache stale or empty".into(),
}
})?;
let nonce = self.nonce_mgr.acquire();
tracing::trace!(nonce, ?request.urgency, in_flight = self.in_flight.len(), "tx prepared");
Ok(PreparedTx {
nonce,
gas_limit: request.gas_limit,
gas_fees,
request,
})
}
pub fn record_submission(&mut self, tx_hash: [u8; 32], prepared: PreparedTx, now_ms: u64) {
tracing::debug!(nonce = prepared.nonce, "tx submission recorded");
self.nonce_mgr.track(prepared.nonce, tx_hash, now_ms);
self.in_flight.insert(
tx_hash,
InFlightTx {
nonce: prepared.nonce,
tx_hash,
request: prepared.request,
submitted_at_ms: now_ms,
gas_fees: prepared.gas_fees,
},
);
}
pub fn confirm(&mut self, tx_hash: &[u8; 32]) {
if let Some(tx) = self.in_flight.remove(tx_hash) {
tracing::debug!(nonce = tx.nonce, "tx confirmed in pipeline");
self.nonce_mgr.confirm(tx.nonce);
}
}
pub fn fail(&mut self, tx_hash: &[u8; 32]) {
if let Some(tx) = self.in_flight.remove(tx_hash) {
tracing::debug!(nonce = tx.nonce, "tx failed in pipeline");
self.nonce_mgr.release(tx.nonce);
}
}
pub fn stuck_txs(&self, now_ms: u64) -> Vec<[u8; 32]> {
self.in_flight
.values()
.filter(|tx| now_ms.saturating_sub(tx.submitted_at_ms) >= self.config.stuck_timeout_ms)
.map(|tx| tx.tx_hash)
.collect()
}
pub fn prepare_bump(&self, tx_hash: &[u8; 32], multiplier: u64) -> Option<BumpParams> {
let tx = self.in_flight.get(tx_hash)?;
Some(BumpParams {
nonce: tx.nonce,
gas_limit: tx.request.gas_limit,
new_max_priority_fee: tx
.gas_fees
.max_priority_fee_per_gas
.saturating_mul(multiplier),
new_max_fee: tx.gas_fees.max_fee_per_gas.saturating_mul(multiplier),
original_tx_hash: *tx_hash,
})
}
pub fn in_flight_count(&self) -> usize {
self.in_flight.len()
}
pub fn nonce_manager(&self) -> &NonceManager {
&self.nonce_mgr
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::hft::gas::GasLimits;
const BASE_FEE: u64 = 50_000_000;
const TIP: u64 = 1_000_000_000;
fn test_fee_cache(now_ms: u64) -> FeeCache {
let mut gc = FeeCache::new(5000, TIP);
gc.update(BASE_FEE, now_ms);
gc
}
fn test_request() -> TxRequest {
TxRequest {
to: [0xAA; 20],
calldata: vec![0x01, 0x02, 0x03],
value: 0,
gas_limit: GasLimits::OPEN_TAKER,
urgency: Urgency::Normal,
}
}
#[test]
fn prepare_assigns_nonce_and_gas() {
let pipe = TxPipeline::new(10, PipelineConfig::default());
let gc = test_fee_cache(0);
let p1 = pipe.prepare(test_request(), &gc, 0).unwrap();
assert_eq!(p1.nonce, 10);
assert_eq!(p1.gas_limit, GasLimits::OPEN_TAKER);
assert_eq!(p1.gas_fees.base_fee, BASE_FEE);
let p2 = pipe.prepare(test_request(), &gc, 0).unwrap();
assert_eq!(p2.nonce, 11);
}
#[test]
fn prepare_fails_on_stale_gas() {
let pipe = TxPipeline::new(0, PipelineConfig::default());
let gc = test_fee_cache(0);
let result = pipe.prepare(test_request(), &gc, 6000);
assert!(matches!(
result,
Err(PerpCityError::GasPriceUnavailable { .. })
));
}
#[test]
fn prepare_fails_on_empty_gas() {
let pipe = TxPipeline::new(0, PipelineConfig::default());
let gc = FeeCache::new(5000, TIP); let result = pipe.prepare(test_request(), &gc, 0);
assert!(matches!(
result,
Err(PerpCityError::GasPriceUnavailable { .. })
));
}
#[test]
fn in_flight_limit_enforced() {
let config = PipelineConfig {
max_in_flight: 2,
stuck_timeout_ms: 30_000,
};
let mut pipe = TxPipeline::new(0, config);
let gc = test_fee_cache(0);
for i in 0..2u8 {
let p = pipe.prepare(test_request(), &gc, 0).unwrap();
let mut hash = [0u8; 32];
hash[0] = i;
pipe.record_submission(hash, p, 0);
}
assert_eq!(pipe.in_flight_count(), 2);
let result = pipe.prepare(test_request(), &gc, 0);
assert!(matches!(
result,
Err(PerpCityError::TooManyInFlight { count: 2, max: 2 })
));
}
#[test]
fn confirm_removes_from_tracking() {
let mut pipe = TxPipeline::new(0, PipelineConfig::default());
let gc = test_fee_cache(0);
let p = pipe.prepare(test_request(), &gc, 0).unwrap();
let hash = [0xAA; 32];
pipe.record_submission(hash, p, 0);
assert_eq!(pipe.in_flight_count(), 1);
pipe.confirm(&hash);
assert_eq!(pipe.in_flight_count(), 0);
}
#[test]
fn fail_removes_and_releases_nonce() {
let mut pipe = TxPipeline::new(0, PipelineConfig::default());
let gc = test_fee_cache(0);
let p = pipe.prepare(test_request(), &gc, 0).unwrap();
assert_eq!(p.nonce, 0);
let hash = [0xBB; 32];
pipe.record_submission(hash, p, 0);
pipe.fail(&hash);
assert_eq!(pipe.in_flight_count(), 0);
}
#[test]
fn stuck_txs_detection() {
let config = PipelineConfig {
max_in_flight: 16,
stuck_timeout_ms: 10_000,
};
let mut pipe = TxPipeline::new(0, config);
let gc = test_fee_cache(0);
let p1 = pipe.prepare(test_request(), &gc, 0).unwrap();
pipe.record_submission([0x01; 32], p1, 0);
let gc2 = test_fee_cache(5000);
let p2 = pipe.prepare(test_request(), &gc2, 5000).unwrap();
pipe.record_submission([0x02; 32], p2, 5000);
let stuck = pipe.stuck_txs(10_000);
assert_eq!(stuck.len(), 1);
assert_eq!(stuck[0], [0x01; 32]);
let stuck = pipe.stuck_txs(15_000);
assert_eq!(stuck.len(), 2);
}
#[test]
fn prepare_bump_scales_fees() {
let mut pipe = TxPipeline::new(0, PipelineConfig::default());
let gc = test_fee_cache(0);
let p = pipe.prepare(test_request(), &gc, 0).unwrap();
let original_priority = p.gas_fees.max_priority_fee_per_gas;
let original_max = p.gas_fees.max_fee_per_gas;
let hash = [0xCC; 32];
pipe.record_submission(hash, p, 0);
let bump = pipe.prepare_bump(&hash, 2).unwrap();
assert_eq!(bump.new_max_priority_fee, original_priority * 2);
assert_eq!(bump.new_max_fee, original_max * 2);
assert_eq!(bump.original_tx_hash, hash);
}
#[test]
fn prepare_bump_unknown_tx_returns_none() {
let pipe = TxPipeline::new(0, PipelineConfig::default());
assert!(pipe.prepare_bump(&[0xFF; 32], 2).is_none());
}
#[test]
fn confirm_unknown_tx_is_noop() {
let mut pipe = TxPipeline::new(0, PipelineConfig::default());
pipe.confirm(&[0xFF; 32]); assert_eq!(pipe.in_flight_count(), 0);
}
#[test]
fn urgency_propagates_through_prepare() {
let pipe = TxPipeline::new(0, PipelineConfig::default());
let gc = test_fee_cache(0);
let mut req = test_request();
req.urgency = Urgency::Critical;
let p = pipe.prepare(req, &gc, 0).unwrap();
assert_eq!(p.gas_fees.max_fee_per_gas, 4 * BASE_FEE + 5 * TIP);
}
#[test]
fn full_lifecycle() {
let config = PipelineConfig {
max_in_flight: 4,
stuck_timeout_ms: 30_000,
};
let mut pipe = TxPipeline::new(100, config);
let gc = test_fee_cache(0);
let p1 = pipe.prepare(test_request(), &gc, 0).unwrap();
assert_eq!(p1.nonce, 100);
pipe.record_submission([0x01; 32], p1, 0);
let p2 = pipe.prepare(test_request(), &gc, 100).unwrap();
assert_eq!(p2.nonce, 101);
pipe.record_submission([0x02; 32], p2, 100);
assert_eq!(pipe.in_flight_count(), 2);
pipe.confirm(&[0x01; 32]);
assert_eq!(pipe.in_flight_count(), 1);
pipe.fail(&[0x02; 32]);
assert_eq!(pipe.in_flight_count(), 0);
}
}