use std::time::{Duration, Instant};
use solana_pubkey::Pubkey;
use tokio::sync::oneshot;
use super::super::trading::TradingSide;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TwapStatus {
Running,
Paused,
Stopped,
Completed,
}
impl TwapStatus {
pub fn is_terminal(self) -> bool {
matches!(self, TwapStatus::Stopped | TwapStatus::Completed)
}
}
#[derive(Debug)]
pub enum SliceOutcome {
Confirmed,
Failed(String),
Unknown(String),
}
pub struct InFlightSlice {
pub rx: oneshot::Receiver<SliceOutcome>,
pub slice_number: u32,
pub task: tokio::task::JoinHandle<()>,
#[allow(dead_code)]
pub started_at: Instant,
}
impl InFlightSlice {
fn abort(&self) {
self.task.abort();
}
}
#[derive(Debug, Clone, Copy)]
pub enum TwapBotConfirm {
Stop(usize),
Restart(usize),
Remove(usize),
}
pub struct TwapBot {
pub symbol: String,
pub side: TradingSide,
pub total_size: f64,
pub slice_count: u32,
pub slice_interval: Duration,
pub slice_size: f64,
pub slices_submitted: u32,
pub slices_failed: u32,
pub slices_unconfirmed: u32,
pub status: TwapStatus,
pub started_at: Instant,
pub last_slice_at: Option<Instant>,
pub last_status: String,
pub defer_reason: Option<String>,
pub paused_at: Option<Instant>,
pub in_flight: Option<InFlightSlice>,
pub authority: Pubkey,
}
impl TwapBot {
pub fn new(
symbol: String,
side: TradingSide,
total_size: f64,
slice_count: u32,
duration_secs: u64,
authority: Pubkey,
) -> Self {
let slice_size = if slice_count == 0 {
0.0
} else {
total_size / slice_count as f64
};
let slice_interval = if slice_count <= 1 {
Duration::ZERO
} else {
let raw = duration_secs / slice_count as u64;
Duration::from_secs(raw.max(1))
};
Self {
symbol,
side,
total_size,
slice_count,
slice_interval,
slice_size,
slices_submitted: 0,
slices_failed: 0,
slices_unconfirmed: 0,
status: TwapStatus::Running,
started_at: Instant::now(),
last_slice_at: None,
last_status: String::new(),
defer_reason: None,
paused_at: None,
in_flight: None,
authority,
}
}
pub fn is_active(&self) -> bool {
matches!(self.status, TwapStatus::Running)
}
fn slices_resolved(&self) -> u32 {
self.slices_submitted + self.slices_failed + self.slices_unconfirmed
}
pub fn slice_due(&self, now: Instant) -> bool {
if !self.is_active() {
return false;
}
if self.in_flight.is_some() {
return false;
}
if self.slices_resolved() >= self.slice_count {
return false;
}
match self.last_slice_at {
None => true,
Some(prev) => now.duration_since(prev) >= self.slice_interval,
}
}
pub fn touch_last_slice_at(&mut self, now: Instant) {
if self.last_slice_at.is_some() {
self.last_slice_at = Some(now);
}
}
pub fn set_defer_reason(&mut self, reason: impl Into<String>) {
self.defer_reason = Some(reason.into());
}
pub fn clear_defer_reason(&mut self) {
self.defer_reason = None;
}
fn maybe_complete(&mut self) -> bool {
if matches!(self.status, TwapStatus::Running) && self.slices_resolved() >= self.slice_count
{
self.status = TwapStatus::Completed;
true
} else {
false
}
}
#[must_use = "completion transition is the scheduler's signal to emit a status line"]
pub fn record_slice_confirmed(
&mut self,
_now: Instant,
status_line: impl Into<String>,
) -> bool {
self.slices_submitted += 1;
self.last_status = status_line.into();
self.defer_reason = None;
self.maybe_complete()
}
#[must_use = "completion transition is the scheduler's signal to emit a status line"]
pub fn record_slice_failed(&mut self, _now: Instant, status_line: impl Into<String>) -> bool {
self.slices_failed += 1;
self.last_status = status_line.into();
self.defer_reason = None;
self.maybe_complete()
}
#[must_use = "completion transition is the scheduler's signal to emit a status line"]
pub fn record_slice_unconfirmed(
&mut self,
_now: Instant,
status_line: impl Into<String>,
) -> bool {
self.slices_unconfirmed += 1;
self.last_status = status_line.into();
self.defer_reason = None;
self.maybe_complete()
}
pub fn record_slice_dispatched(
&mut self,
now: Instant,
slice_number: u32,
rx: oneshot::Receiver<SliceOutcome>,
task: tokio::task::JoinHandle<()>,
) {
self.in_flight = Some(InFlightSlice {
rx,
slice_number,
task,
started_at: now,
});
self.last_slice_at = Some(now);
self.defer_reason = None;
}
pub fn try_take_outcome(&mut self) -> Option<(u32, SliceOutcome)> {
use tokio::sync::oneshot::error::TryRecvError;
let in_flight = self.in_flight.as_mut()?;
match in_flight.rx.try_recv() {
Ok(outcome) => {
let n = in_flight.slice_number;
self.in_flight = None;
Some((n, outcome))
}
Err(TryRecvError::Empty) => None,
Err(TryRecvError::Closed) => {
let n = in_flight.slice_number;
self.in_flight = None;
Some((n, SliceOutcome::Failed("dropped".to_string())))
}
}
}
pub fn pause(&mut self) {
if matches!(self.status, TwapStatus::Running) {
self.status = TwapStatus::Paused;
self.paused_at = Some(Instant::now());
}
}
#[must_use = "completion transition on resume is the caller's signal to emit a status line"]
pub fn resume(&mut self) -> bool {
if matches!(self.status, TwapStatus::Paused) {
let now = Instant::now();
if let (Some(paused_at), Some(last_at)) = (self.paused_at, self.last_slice_at) {
let pause_duration = now.saturating_duration_since(paused_at);
let shifted = last_at.checked_add(pause_duration).unwrap_or(now);
self.last_slice_at = Some(shifted.min(now));
}
self.paused_at = None;
self.status = TwapStatus::Running;
return self.maybe_complete();
}
false
}
pub fn stop(&mut self) {
if let Some(in_flight) = self.in_flight.take() {
let slice_number = in_flight.slice_number;
in_flight.abort();
let _ = self.record_slice_unconfirmed(
Instant::now(),
format!(
"slice {}/{}: interrupted (may have landed)",
slice_number, self.slice_count
),
);
}
if !self.status.is_terminal() || matches!(self.status, TwapStatus::Completed) {
self.status = TwapStatus::Stopped;
}
self.paused_at = None;
self.defer_reason = None;
}
pub fn restart(&mut self) {
if self.in_flight.is_some() {
self.stop();
return;
}
self.slices_submitted = 0;
self.slices_failed = 0;
self.slices_unconfirmed = 0;
self.last_slice_at = None;
self.status = TwapStatus::Running;
self.started_at = Instant::now();
self.last_status.clear();
self.defer_reason = None;
self.paused_at = None;
}
}
impl Drop for TwapBot {
fn drop(&mut self) {
if let Some(in_flight) = self.in_flight.take() {
in_flight.abort();
}
}
}
pub struct TwapsView {
pub bots: Vec<TwapBot>,
pub selected_index: usize,
pub pending_confirm: Option<TwapBotConfirm>,
}
impl TwapsView {
pub fn new() -> Self {
Self {
bots: Vec::new(),
selected_index: 0,
pending_confirm: None,
}
}
pub fn push(&mut self, bot: TwapBot) {
self.bots.push(bot);
}
pub fn move_up(&mut self) {
if self.selected_index > 0 {
self.selected_index -= 1;
}
}
pub fn move_down(&mut self) {
if self.selected_index + 1 < self.bots.len() {
self.selected_index += 1;
}
}
pub fn clamp_index(&mut self) {
if !self.bots.is_empty() {
self.selected_index = self.selected_index.min(self.bots.len() - 1);
} else {
self.selected_index = 0;
}
}
pub fn selected_mut(&mut self) -> Option<&mut TwapBot> {
self.bots.get_mut(self.selected_index)
}
pub fn remove_selected(&mut self) -> Option<TwapBot> {
if self.selected_index >= self.bots.len() {
return None;
}
let bot = self.bots.remove(self.selected_index);
self.clamp_index();
Some(bot)
}
}
impl Default for TwapsView {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
fn auth() -> Pubkey {
Pubkey::new_unique()
}
fn make_bot() -> TwapBot {
TwapBot::new("SOL".to_string(), TradingSide::Long, 1.0, 4, 40, auth())
}
fn noop_task() -> tokio::task::JoinHandle<()> {
let rt = Box::leak(Box::new(
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap(),
));
let _guard = rt.enter();
rt.spawn(async {})
}
#[test]
fn slice_size_divides_total_evenly() {
let bot = make_bot();
assert_eq!(bot.slice_size, 0.25);
assert_eq!(bot.slice_interval, Duration::from_secs(10));
}
#[test]
fn slice_interval_clamps_to_at_least_one_second() {
let bot = TwapBot::new("SOL".into(), TradingSide::Long, 1.0, 10, 5, auth());
assert!(bot.slice_interval >= Duration::from_secs(1));
}
#[test]
fn first_slice_due_immediately() {
let bot = make_bot();
assert!(bot.slice_due(Instant::now()));
}
#[test]
fn after_dispatch_not_due_until_interval_passes() {
let mut bot = make_bot();
let t0 = Instant::now();
let (tx, rx) = oneshot::channel();
bot.record_slice_dispatched(t0, 1, rx, noop_task());
assert!(!bot.slice_due(t0 + Duration::from_secs(20)));
tx.send(SliceOutcome::Confirmed).unwrap();
let outcome = bot.try_take_outcome();
assert!(outcome.is_some());
let completed = bot.record_slice_confirmed(t0 + Duration::from_millis(100), "slice 1/4");
assert!(!completed);
assert!(!bot.slice_due(t0 + Duration::from_secs(9)));
assert!(bot.slice_due(t0 + Duration::from_secs(10)));
}
#[test]
fn completes_after_last_slice_confirmed() {
let mut bot = TwapBot::new("SOL".into(), TradingSide::Long, 1.0, 2, 2, auth());
let t0 = Instant::now();
let completed_first = bot.record_slice_confirmed(t0, "1/2");
assert!(!completed_first);
assert_eq!(bot.status, TwapStatus::Running);
let completed_last = bot.record_slice_confirmed(t0 + Duration::from_secs(1), "2/2");
assert!(completed_last);
assert_eq!(bot.status, TwapStatus::Completed);
assert_eq!(bot.slices_submitted, bot.slice_count);
}
#[test]
fn failures_count_toward_completion() {
let mut bot = TwapBot::new("SOL".into(), TradingSide::Long, 1.0, 3, 3, auth());
let t0 = Instant::now();
let c1 = bot.record_slice_confirmed(t0, "1/3");
let c2 = bot.record_slice_failed(t0 + Duration::from_secs(1), "2/3 failed");
let c3 = bot.record_slice_failed(t0 + Duration::from_secs(2), "3/3 failed");
assert!(!c1 && !c2 && c3);
assert_eq!(bot.status, TwapStatus::Completed);
assert_eq!(bot.slices_submitted, 1);
assert_eq!(bot.slices_failed, 2);
}
#[test]
fn paused_bot_does_not_fire_slices() {
let mut bot = make_bot();
bot.pause();
assert!(!bot.slice_due(Instant::now()));
}
#[test]
fn resume_after_long_pause_does_not_burst_slice() {
let mut bot = TwapBot::new("SOL".into(), TradingSide::Long, 1.0, 4, 40, auth());
let t0 = Instant::now();
let (tx, rx) = oneshot::channel();
bot.record_slice_dispatched(t0, 1, rx, noop_task());
tx.send(SliceOutcome::Confirmed).unwrap();
let _ = bot.try_take_outcome();
let _ = bot.record_slice_confirmed(t0, "slice 1/4");
bot.pause();
std::thread::sleep(Duration::from_millis(20));
let _ = bot.resume();
assert!(!bot.slice_due(Instant::now()));
}
#[test]
fn slice_resolving_during_pause_does_not_stall_resume() {
let mut bot = TwapBot::new("SOL".into(), TradingSide::Long, 1.0, 4, 4, auth());
let t0 = Instant::now();
let (tx, rx) = oneshot::channel();
bot.record_slice_dispatched(t0, 1, rx, noop_task());
bot.pause();
tx.send(SliceOutcome::Confirmed).unwrap();
let _ = bot.try_take_outcome();
let confirm_at = t0 + Duration::from_millis(500);
let _ = bot.record_slice_confirmed(confirm_at, "1/4 confirmed");
let _ = bot.resume();
let later = Instant::now() + Duration::from_secs(5);
assert!(bot.slice_due(later));
}
#[test]
fn record_slice_during_pause_does_not_flip_status_to_completed() {
let mut bot = TwapBot::new("SOL".into(), TradingSide::Long, 1.0, 1, 1, auth());
bot.pause();
let completed_during_pause = bot.record_slice_confirmed(Instant::now(), "done");
assert!(!completed_during_pause);
assert_eq!(bot.status, TwapStatus::Paused);
let completed_on_resume = bot.resume();
assert!(completed_on_resume);
assert_eq!(bot.status, TwapStatus::Completed);
}
#[test]
fn stopped_bot_is_terminal() {
let mut bot = make_bot();
bot.stop();
assert!(bot.status.is_terminal());
let _ = bot.resume();
assert_eq!(bot.status, TwapStatus::Stopped);
}
#[test]
fn stop_counts_in_flight_as_unknown() {
let mut bot = make_bot();
let (_tx, rx) = oneshot::channel();
bot.record_slice_dispatched(Instant::now(), 1, rx, noop_task());
bot.stop();
assert_eq!(bot.status, TwapStatus::Stopped);
assert_eq!(bot.slices_unconfirmed, 1);
assert!(bot.in_flight.is_none());
}
#[test]
fn restart_clears_progress_and_failures() {
let mut bot = make_bot();
let _ = bot.record_slice_confirmed(Instant::now(), "1/4");
let _ = bot.record_slice_failed(Instant::now(), "2/4 failed");
bot.stop();
bot.restart();
assert_eq!(bot.slices_submitted, 0);
assert_eq!(bot.slices_failed, 0);
assert_eq!(bot.slices_unconfirmed, 0);
assert_eq!(bot.status, TwapStatus::Running);
assert!(bot.slice_due(Instant::now()));
}
#[test]
fn restart_with_in_flight_stops_instead_of_resetting() {
let mut bot = make_bot();
let (_tx, rx) = oneshot::channel();
bot.record_slice_dispatched(Instant::now(), 1, rx, noop_task());
bot.restart();
assert_eq!(bot.status, TwapStatus::Stopped);
assert_eq!(bot.slices_unconfirmed, 1);
assert_eq!(bot.slices_submitted, 0);
assert!(bot.in_flight.is_none());
}
#[test]
fn touch_last_slice_at_does_not_seed_when_none() {
let mut bot = make_bot();
bot.touch_last_slice_at(Instant::now());
assert!(bot.slice_due(Instant::now()));
}
#[test]
fn touch_last_slice_at_advances_when_some() {
let mut bot = make_bot();
let t0 = Instant::now();
let (tx, rx) = oneshot::channel();
bot.record_slice_dispatched(t0, 1, rx, noop_task());
tx.send(SliceOutcome::Confirmed).unwrap();
let _ = bot.try_take_outcome();
let _ = bot.record_slice_confirmed(t0, "1/4");
assert!(!bot.slice_due(t0));
bot.touch_last_slice_at(t0 + Duration::from_secs(20));
assert!(!bot.slice_due(t0 + Duration::from_secs(20)));
assert!(bot.slice_due(t0 + Duration::from_secs(30)));
}
#[test]
fn slice_due_false_while_slice_in_flight() {
let mut bot = make_bot();
let (_tx, rx) = oneshot::channel();
bot.record_slice_dispatched(Instant::now(), 1, rx, noop_task());
assert!(!bot.slice_due(Instant::now()));
}
#[test]
fn view_remove_selected_keeps_cursor_valid() {
let mut v = TwapsView::new();
v.push(make_bot());
v.push(make_bot());
v.selected_index = 1;
v.remove_selected();
assert_eq!(v.selected_index, 0);
v.remove_selected();
assert_eq!(v.selected_index, 0);
assert!(v.bots.is_empty());
}
}