use std::time::Instant;
use serde::{Deserialize, Serialize};
#[derive(Clone, Copy, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "lowercase")]
pub enum DisplayPreference {
#[default]
Adaptive,
Always,
Never,
}
const SRTT_TRIGGER_LOW_MS: u64 = 20;
const SRTT_TRIGGER_HIGH_MS: u64 = 30;
const FLAG_TRIGGER_LOW_MS: u64 = 50;
const FLAG_TRIGGER_HIGH_MS: u64 = 80;
const GLITCH_THRESHOLD_MS: u64 = 250;
const GLITCH_REPAIR_COUNT: u32 = 10;
const GLITCH_REPAIR_INTERVAL_MS: u64 = 150;
const GLITCH_FLAG_THRESHOLD_MS: u64 = 5_000;
#[derive(Clone, Debug)]
pub(crate) struct ConditionalOverlayCell {
pub(crate) col: u16,
pub(crate) replacement: char,
#[allow(dead_code)]
pub(crate) unknown: bool,
pub(crate) original: String,
pub(crate) tentative_until_epoch: u64,
#[allow(dead_code)]
pub(crate) expiration_frame: u64,
pub(crate) prediction_time: Instant,
pub(crate) active: bool,
}
impl ConditionalOverlayCell {
fn new(col: u16, replacement: char, tentative_until_epoch: u64) -> Self {
Self {
col,
replacement,
unknown: false,
original: String::new(),
tentative_until_epoch,
expiration_frame: u64::MAX,
prediction_time: Instant::now(),
active: true,
}
}
fn tentative(&self, confirmed_epoch: u64) -> bool {
self.tentative_until_epoch > confirmed_epoch
}
fn is_flagged(&self) -> bool {
self.prediction_time.elapsed().as_millis() > u128::from(GLITCH_FLAG_THRESHOLD_MS)
}
}
#[derive(Clone, Debug)]
pub(crate) struct ConditionalOverlayRow {
pub(crate) row: u16,
pub(crate) cells: Vec<ConditionalOverlayCell>,
}
impl ConditionalOverlayRow {
fn new(row: u16) -> Self {
Self {
row,
cells: Vec::new(),
}
}
fn cell_mut(&mut self, col: u16, tentative_epoch: u64) -> &mut ConditionalOverlayCell {
if let Some(pos) = self.cells.iter().position(|c| c.col == col) {
return &mut self.cells[pos];
}
self.cells
.push(ConditionalOverlayCell::new(col, ' ', tentative_epoch));
self.cells.last_mut().expect("vec is empty after push")
}
}
#[derive(Clone, Debug)]
pub(crate) struct ConditionalCursorMove {
pub(crate) row: u16,
pub(crate) col: u16,
pub(crate) tentative_until_epoch: u64,
pub(crate) prediction_time: Instant,
}
impl ConditionalCursorMove {
fn new(row: u16, col: u16, tentative_until_epoch: u64) -> Self {
Self {
row,
col,
tentative_until_epoch,
prediction_time: Instant::now(),
}
}
fn tentative(&self, confirmed_epoch: u64) -> bool {
self.tentative_until_epoch > confirmed_epoch
}
}
#[derive(Clone, Copy, Debug)]
pub struct OverlayCell {
pub row: u16,
pub col: u16,
pub ch: char,
pub flagged: bool,
}
#[derive(Clone, Copy, Debug)]
pub struct OverlayCursor {
pub row: u16,
pub col: u16,
}
#[derive(Debug)]
pub struct PredictionEngine {
overlay_rows: Vec<ConditionalOverlayRow>,
cursors: Vec<ConditionalCursorMove>,
prediction_epoch: u64,
confirmed_epoch: u64,
flagging: bool,
srtt_trigger: bool,
glitch_trigger: u32,
send_interval_ms: u64,
last_byte: u8,
glitch_repair_count: u32,
last_quick_confirmation: Instant,
display_preference: DisplayPreference,
last_rows: u16,
last_cols: u16,
}
impl PredictionEngine {
#[must_use]
pub fn new(display_preference: DisplayPreference) -> Self {
Self {
overlay_rows: Vec::new(),
cursors: Vec::new(),
prediction_epoch: 1,
confirmed_epoch: 0,
flagging: false,
srtt_trigger: false,
glitch_trigger: 0,
send_interval_ms: 0,
last_byte: 0,
glitch_repair_count: 0,
last_quick_confirmation: Instant::now(),
display_preference,
last_rows: 0,
last_cols: 0,
}
}
pub fn set_send_interval(&mut self, ms: u64) {
self.send_interval_ms = ms;
match ms {
ms if ms > FLAG_TRIGGER_HIGH_MS => self.flagging = true,
ms if ms <= FLAG_TRIGGER_LOW_MS => self.flagging = false,
_ => {}
}
match ms {
ms if ms > SRTT_TRIGGER_HIGH_MS => self.srtt_trigger = true,
ms if ms <= SRTT_TRIGGER_LOW_MS => self.srtt_trigger = false,
_ => {}
}
}
#[must_use]
pub fn is_active(&self) -> bool {
match self.display_preference {
DisplayPreference::Never => false,
DisplayPreference::Always => true,
DisplayPreference::Adaptive => self.srtt_trigger || self.glitch_trigger > 0,
}
}
fn get_or_make_row(&mut self, row: u16) -> &mut ConditionalOverlayRow {
if let Some(pos) = self.overlay_rows.iter().position(|r| r.row == row) {
return &mut self.overlay_rows[pos];
}
self.overlay_rows.push(ConditionalOverlayRow::new(row));
self.overlay_rows
.last_mut()
.expect("vec is empty after push")
}
fn cursor(&self) -> Option<&ConditionalCursorMove> {
self.cursors.last()
}
#[must_use]
pub fn predicted_cursor(&self) -> Option<OverlayCursor> {
self.cursors.last().map(|c| OverlayCursor {
row: c.row,
col: c.col,
})
}
fn become_tentative(&mut self) {
self.prediction_epoch += 1;
}
fn newline_carriage_return(&mut self, screen_cursor: (u16, u16), rows: u16, cols: u16) {
let (pred_row, _) = self.cursor().map_or(screen_cursor, |c| (c.row, c.col));
let epoch = self.prediction_epoch;
if pred_row + 1 >= rows {
let row_entry = self.get_or_make_row(pred_row);
for col in 0..cols {
let cell = row_entry.cell_mut(col, epoch);
cell.replacement = ' ';
cell.active = true;
cell.tentative_until_epoch = epoch;
cell.prediction_time = Instant::now();
}
self.push_cursor(pred_row, 0, epoch);
} else {
self.push_cursor(pred_row + 1, 0, epoch);
}
}
fn kill_epoch(&mut self, epoch: u64, screen: &vt100::Screen) {
self.cursors
.retain(|c| c.tentative_until_epoch != epoch + 1);
for row in &mut self.overlay_rows {
row.cells.retain(|c| c.tentative_until_epoch != epoch + 1);
}
self.overlay_rows.retain(|r| !r.cells.is_empty());
if self.confirmed_epoch > epoch {
self.confirmed_epoch = epoch;
}
let _ = screen; }
pub fn new_user_byte(&mut self, byte: u8, screen: &vt100::Screen) {
let (rows, cols) = screen.size();
if rows == 0 || cols == 0 {
return;
}
let (cursor_row, cursor_col) = screen.cursor_position();
let (pred_row, pred_col) = self
.cursor()
.map_or((cursor_row, cursor_col), |c| (c.row, c.col));
match byte {
0x20..=0x7e => {
let epoch = self.prediction_epoch;
let original = screen
.cell(pred_row, pred_col)
.map(vt100::Cell::contents)
.unwrap_or_default()
.to_owned();
let row_entry = self.get_or_make_row(pred_row);
let cell = row_entry.cell_mut(pred_col, epoch);
cell.replacement = byte as char;
cell.original = original;
cell.active = true;
cell.tentative_until_epoch = epoch;
cell.prediction_time = Instant::now();
if pred_col + 1 < cols {
self.push_cursor(pred_row, pred_col + 1, epoch);
} else {
self.become_tentative();
self.newline_carriage_return((cursor_row, cursor_col), rows, cols);
}
}
0x7f | 0x08 => {
if pred_col == 0 {
self.become_tentative();
return;
}
let new_col = pred_col - 1;
let epoch = self.prediction_epoch;
let original = screen
.cell(pred_row, new_col)
.map(vt100::Cell::contents)
.unwrap_or_default()
.to_owned();
let row_entry = self.get_or_make_row(pred_row);
let cell = row_entry.cell_mut(new_col, epoch);
cell.replacement = ' ';
cell.original = original;
cell.active = true;
cell.tentative_until_epoch = epoch;
cell.prediction_time = Instant::now();
self.push_cursor(pred_row, new_col, epoch);
}
0x1b if self.last_byte == b'[' => {
self.become_tentative();
}
b'\r' => {
self.become_tentative();
self.newline_carriage_return((cursor_row, cursor_col), rows, cols);
}
b'\n' | 0x00..=0x1f | 0x80..=0xff => {
self.become_tentative();
}
}
self.last_byte = byte;
}
fn push_cursor(&mut self, row: u16, col: u16, epoch: u64) {
self.cursors
.push(ConditionalCursorMove::new(row, col, epoch));
if self.cursors.len() > 128 {
let _ = self.cursors.remove(0);
}
}
pub fn cull(&mut self, screen: &vt100::Screen) {
let (rows, cols) = screen.size();
if rows != self.last_rows || cols != self.last_cols {
self.last_rows = rows;
self.last_cols = cols;
self.reset();
return;
}
let (real_row, real_col) = screen.cursor_position();
let mut confirmed_cursor_epoch: Option<u64> = None;
for cursor in &self.cursors {
if cursor.row == real_row && cursor.col == real_col {
confirmed_cursor_epoch = Some(cursor.tentative_until_epoch);
}
}
if let Some(epoch) = confirmed_cursor_epoch
&& epoch > self.confirmed_epoch
{
self.confirmed_epoch = epoch;
self.update_glitch_tracking();
}
let mut epochs_to_kill: Vec<u64> = Vec::new();
self.overlay_rows.retain_mut(|row_entry| {
row_entry.cells.retain_mut(|cell| {
if !cell.active {
return false;
}
if cell.tentative(self.confirmed_epoch) {
return true;
}
let actual = screen
.cell(row_entry.row, cell.col)
.map(vt100::Cell::contents)
.unwrap_or_default();
let predicted = cell.replacement.to_string();
if actual == predicted {
false
} else if actual == cell.original {
true
} else {
epochs_to_kill.push(cell.tentative_until_epoch.saturating_sub(1));
false
}
});
!row_entry.cells.is_empty()
});
for epoch in epochs_to_kill {
self.kill_epoch(epoch, screen);
}
if let Some(cursor) = self.cursors.last()
&& !cursor.tentative(self.confirmed_epoch)
&& (cursor.row != real_row || cursor.col != real_col)
{
self.reset();
return;
}
self.cursors.retain(|c| !c.tentative(self.confirmed_epoch));
}
fn update_glitch_tracking(&mut self) {
let outstanding_ms = self
.cursors
.first()
.map_or(0u128, |c| c.prediction_time.elapsed().as_millis());
if outstanding_ms > u128::from(GLITCH_THRESHOLD_MS) {
self.glitch_trigger = self.glitch_trigger.saturating_add(1);
self.glitch_repair_count = 0;
} else {
let since_last = self.last_quick_confirmation.elapsed().as_millis();
if since_last >= u128::from(GLITCH_REPAIR_INTERVAL_MS) {
self.last_quick_confirmation = Instant::now();
self.glitch_repair_count += 1;
if self.glitch_repair_count >= GLITCH_REPAIR_COUNT {
self.glitch_trigger = self.glitch_trigger.saturating_sub(1);
self.glitch_repair_count = 0;
}
}
}
}
#[must_use]
pub fn apply(&self, screen: &vt100::Screen) -> (Vec<OverlayCell>, Option<OverlayCursor>) {
if !self.is_active() {
return (Vec::new(), None);
}
let mut cells = Vec::new();
for row_entry in &self.overlay_rows {
for cell in &row_entry.cells {
if !cell.active {
continue;
}
if cell.tentative(self.confirmed_epoch) {
continue;
}
let actual = screen
.cell(row_entry.row, cell.col)
.map(vt100::Cell::contents)
.unwrap_or_default();
if actual == cell.replacement.to_string() {
continue;
}
cells.push(OverlayCell {
row: row_entry.row,
col: cell.col,
ch: cell.replacement,
flagged: self.flagging || cell.is_flagged(),
});
}
}
let cursor = self.cursors.last().and_then(|c| {
if c.tentative(self.confirmed_epoch) {
None
} else {
Some(OverlayCursor {
row: c.row,
col: c.col,
})
}
});
(cells, cursor)
}
pub fn reset(&mut self) {
self.overlay_rows.clear();
self.cursors.clear();
self.prediction_epoch = 1;
self.confirmed_epoch = 0;
self.glitch_trigger = 0;
self.glitch_repair_count = 0;
self.last_byte = 0;
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_screen(rows: u16, cols: u16, content: &[u8]) -> vt100::Parser {
let mut p = vt100::Parser::new(rows, cols, 0);
if !content.is_empty() {
p.process(content);
}
p
}
#[test]
fn new_user_byte_printable_stores_original_as_string() {
let mut engine = PredictionEngine::new(DisplayPreference::Always);
let parser = make_screen(24, 80, b"X\x1b[H");
let screen = parser.screen();
assert_eq!(screen.cursor_position(), (0, 0));
engine.new_user_byte(b'a', screen);
let row = engine.overlay_rows.iter().find(|r| r.row == 0).unwrap();
let cell = row.cells.iter().find(|c| c.col == 0).unwrap();
assert_eq!(cell.replacement, 'a');
assert_eq!(cell.original, "X");
}
#[test]
fn new_user_byte_backspace_stores_original_as_string() {
let mut engine = PredictionEngine::new(DisplayPreference::Always);
let parser = make_screen(24, 80, b"X\x1b[H");
let screen = parser.screen();
assert_eq!(screen.cursor_position(), (0, 0));
engine.new_user_byte(b'a', screen);
engine.new_user_byte(0x7f, screen);
let row = engine.overlay_rows.iter().find(|r| r.row == 0).unwrap();
let cell = row
.cells
.iter()
.find(|c| c.col == 0 && c.replacement == ' ')
.unwrap();
assert_eq!(cell.original, "X");
}
#[test]
fn display_preference_default_is_adaptive() {
assert_eq!(DisplayPreference::default(), DisplayPreference::Adaptive);
}
#[test]
fn is_active_always_returns_true() {
let engine = PredictionEngine::new(DisplayPreference::Always);
assert!(engine.is_active());
}
#[test]
fn is_active_never_returns_false() {
let engine = PredictionEngine::new(DisplayPreference::Never);
assert!(!engine.is_active());
}
#[test]
fn set_send_interval_above_high_threshold_activates_srtt_trigger() {
let mut engine = PredictionEngine::new(DisplayPreference::Adaptive);
assert!(!engine.is_active());
engine.set_send_interval(31); assert!(engine.is_active());
}
#[test]
fn apply_never_returns_empty() {
let engine = PredictionEngine::new(DisplayPreference::Never);
let parser = make_screen(24, 80, b"");
let (cells, cursor) = engine.apply(parser.screen());
assert!(cells.is_empty());
assert!(cursor.is_none());
}
#[test]
fn apply_always_with_prediction_returns_overlay() {
let mut engine = PredictionEngine::new(DisplayPreference::Always);
let parser_blank = make_screen(24, 80, b"");
engine.cull(parser_blank.screen());
engine.new_user_byte(b'z', parser_blank.screen());
let parser_cursor_at_1 = make_screen(24, 80, b"\x1b[1;2H");
engine.cull(parser_cursor_at_1.screen());
let (cells, _cursor) = engine.apply(parser_blank.screen());
assert!(
!cells.is_empty(),
"expected at least one overlay cell after typing 'z'"
);
let z_cell = cells.iter().find(|c| c.ch == 'z');
assert!(z_cell.is_some(), "expected 'z' overlay cell");
}
#[test]
fn apply_always_cursor_overlay_present() {
let mut engine = PredictionEngine::new(DisplayPreference::Always);
let parser_blank = make_screen(24, 80, b"");
engine.cull(parser_blank.screen());
engine.new_user_byte(b'a', parser_blank.screen());
let parser_cursor_at_1 = make_screen(24, 80, b"\x1b[1;2H");
engine.cull(parser_cursor_at_1.screen());
let (_cells, cursor) = engine.apply(parser_blank.screen());
assert!(
cursor.is_some(),
"expected cursor overlay after typing a key"
);
}
#[test]
fn reset_clears_all_overlays() {
let mut engine = PredictionEngine::new(DisplayPreference::Always);
let parser = make_screen(24, 80, b"");
engine.new_user_byte(b'a', parser.screen());
engine.new_user_byte(b'b', parser.screen());
engine.reset();
let (cells, cursor) = engine.apply(parser.screen());
assert!(cells.is_empty(), "reset must clear all cell overlays");
assert!(cursor.is_none(), "reset must clear cursor overlay");
}
#[test]
fn cull_confirmed_prediction_removed() {
let mut engine = PredictionEngine::new(DisplayPreference::Always);
let parser_before = make_screen(24, 80, b"");
engine.cull(parser_before.screen());
engine.new_user_byte(b'a', parser_before.screen());
let parser_after = make_screen(24, 80, b"a");
engine.cull(parser_after.screen());
let (cells, _) = engine.apply(parser_after.screen());
let a_cell = cells.iter().find(|c| c.ch == 'a');
assert!(
a_cell.is_none(),
"confirmed prediction for 'a' must be culled"
);
}
#[test]
fn new_user_byte_esc_marks_unknown() {
let mut engine = PredictionEngine::new(DisplayPreference::Always);
let parser = make_screen(24, 80, b"");
engine.new_user_byte(0x1b, parser.screen());
let (cells, _cursor) = engine.apply(parser.screen());
assert!(cells.is_empty(), "ESC should clear/reset predictions");
}
#[test]
fn set_send_interval_below_low_threshold_deactivates_adaptive() {
let mut engine = PredictionEngine::new(DisplayPreference::Adaptive);
engine.set_send_interval(31);
assert!(engine.is_active());
engine.set_send_interval(10); assert!(!engine.is_active());
}
#[test]
fn is_active_adaptive_starts_inactive() {
let engine = PredictionEngine::new(DisplayPreference::Adaptive);
assert!(
!engine.is_active(),
"adaptive must start inactive (no RTT data)"
);
}
}