#![allow(dead_code, unused_imports, unused_variables)]
use std::collections::VecDeque;
use std::io::{self, Write};
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use crossterm::cursor;
use crossterm::event::{self, Event, KeyCode, KeyEvent, KeyModifiers};
use crossterm::style::{self, Stylize};
use crossterm::terminal;
use tokio::sync::{Mutex, Notify};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum WorkPriority {
Normal,
High,
Delayed,
}
#[derive(Debug, Clone)]
pub struct QueuedWork {
pub input: String,
pub queued_at: Instant,
pub priority: WorkPriority,
pub execute_after: Option<Instant>,
}
#[derive(Clone)]
pub struct WorkQueue {
queue: Arc<Mutex<VecDeque<QueuedWork>>>,
notify: Arc<Notify>,
}
impl WorkQueue {
pub fn new() -> Self {
Self {
queue: Arc::new(Mutex::new(VecDeque::new())),
notify: Arc::new(Notify::new()),
}
}
pub async fn push(&self, input: String) {
let mut q = self.queue.lock().await;
q.push_back(QueuedWork {
input,
queued_at: Instant::now(),
priority: WorkPriority::Normal,
execute_after: None,
});
self.notify.notify_one();
}
pub async fn push_with_priority(
&self,
input: String,
priority: WorkPriority,
execute_after: Option<Instant>,
) {
let mut q = self.queue.lock().await;
let item = QueuedWork {
input,
queued_at: Instant::now(),
priority,
execute_after,
};
if priority == WorkPriority::High {
q.push_front(item);
} else {
q.push_back(item);
}
self.notify.notify_one();
}
pub async fn try_pop(&self) -> Option<QueuedWork> {
let mut q = self.queue.lock().await;
let now = Instant::now();
let pos = q
.iter()
.position(|item| item.execute_after.is_none_or(|after| now >= after));
pos.and_then(|i| q.remove(i))
}
pub async fn pop(&self) -> QueuedWork {
loop {
if let Some(item) = self.try_pop().await {
return item;
}
tokio::select! {
_ = self.notify.notified() => {}
_ = tokio::time::sleep(Duration::from_millis(250)) => {}
}
}
}
pub async fn peek(&self) -> Option<QueuedWork> {
let q = self.queue.lock().await;
let now = Instant::now();
q.iter()
.find(|item| item.execute_after.is_none_or(|after| now >= after))
.cloned()
}
pub async fn len(&self) -> usize {
self.queue.lock().await.len()
}
pub async fn is_empty(&self) -> bool {
self.queue.lock().await.is_empty()
}
pub async fn clear(&self) -> usize {
let mut q = self.queue.lock().await;
let n = q.len();
q.clear();
n
}
pub async fn snapshot(&self) -> Vec<QueuedWork> {
self.queue.lock().await.iter().cloned().collect()
}
}
impl Default for WorkQueue {
fn default() -> Self {
Self::new()
}
}
struct InputHistory {
entries: Vec<String>,
position: Option<usize>,
max_entries: usize,
file_path: Option<PathBuf>,
}
impl InputHistory {
fn new() -> Self {
let file_path = dirs::home_dir().map(|h| h.join(".selfware").join("history"));
let mut history = Self {
entries: Vec::new(),
position: None,
max_entries: 100,
file_path,
};
history.load_from_disk();
history
}
fn add(&mut self, input: &str) {
let trimmed = input.trim();
if trimmed.is_empty() {
return;
}
if self.entries.last().map(|s| s.as_str()) == Some(trimmed) {
return;
}
self.entries.push(trimmed.to_string());
if self.entries.len() > self.max_entries {
self.entries.remove(0);
}
self.position = None;
self.save_to_disk();
}
fn up(&mut self, current_input: &str) -> Option<&str> {
if self.entries.is_empty() {
return None;
}
let new_pos = match self.position {
None => self.entries.len().saturating_sub(1),
Some(0) => 0,
Some(p) => p - 1,
};
self.position = Some(new_pos);
self.entries.get(new_pos).map(|s| s.as_str())
}
fn down(&mut self) -> Option<&str> {
match self.position {
None => None,
Some(p) => {
if p + 1 >= self.entries.len() {
self.position = None;
None
} else {
let new_pos = p + 1;
self.position = Some(new_pos);
self.entries.get(new_pos).map(|s| s.as_str())
}
}
}
}
fn load_from_disk(&mut self) {
let Some(ref path) = self.file_path else {
return;
};
if let Ok(content) = std::fs::read_to_string(path) {
self.entries = content
.lines()
.filter(|l| !l.is_empty())
.map(String::from)
.collect();
if self.entries.len() > self.max_entries {
let skip = self.entries.len() - self.max_entries;
self.entries.drain(..skip);
}
}
}
fn save_to_disk(&self) {
let Some(ref path) = self.file_path else {
return;
};
if let Some(parent) = path.parent() {
let _ = std::fs::create_dir_all(parent);
}
let content = self.entries.join("\n");
let _ = std::fs::write(path, content);
}
}
fn parse_delay_prefix(input: &str) -> Option<(Duration, String)> {
let input = input.trim();
if !input.starts_with('@') {
return None;
}
let rest = &input[1..];
let mut num_end = 0;
for ch in rest.chars() {
if ch.is_ascii_digit() {
num_end += ch.len_utf8();
} else {
break;
}
}
if num_end == 0 {
return None;
}
let number: u64 = rest[..num_end].parse().ok()?;
let suffix_start = num_end;
let suffix_char = rest[suffix_start..].chars().next()?;
let multiplier = match suffix_char {
's' => 1,
'm' => 60,
'h' => 3600,
_ => return None,
};
let unit_len = suffix_char.len_utf8();
let remaining = rest[suffix_start + unit_len..].trim();
if remaining.is_empty() {
return None; }
let duration = Duration::from_secs(number * multiplier);
Some((duration, remaining.to_string()))
}
fn parse_priority_prefix(input: &str) -> (bool, String) {
let trimmed = input.trim();
if let Some(rest) = trimmed.strip_prefix('!') {
let rest = rest.trim();
if rest.is_empty() {
(false, trimmed.to_string())
} else {
(true, rest.to_string())
}
} else {
(false, trimmed.to_string())
}
}
pub struct InputHandler {
cancelled: Arc<AtomicBool>,
interrupted: Arc<AtomicBool>,
work_queue: WorkQueue,
input_loop_handle: Option<tokio::task::JoinHandle<()>>,
shutdown: Arc<AtomicBool>,
}
impl InputHandler {
pub fn new(cancelled: Arc<AtomicBool>) -> Self {
Self {
cancelled,
interrupted: Arc::new(AtomicBool::new(false)),
work_queue: WorkQueue::new(),
input_loop_handle: None,
shutdown: Arc::new(AtomicBool::new(false)),
}
}
pub fn work_queue(&self) -> &WorkQueue {
&self.work_queue
}
pub fn spawn_input_loop(&mut self, execution_mode: crate::config::ExecutionMode) {
let cancelled = Arc::clone(&self.cancelled);
let interrupted = Arc::clone(&self.interrupted);
let shutdown = Arc::clone(&self.shutdown);
let work_queue = self.work_queue.clone();
let handle = tokio::task::spawn_blocking(move || {
let mut handler = RawInputLoop {
cancelled,
interrupted,
shutdown,
work_queue,
buffer: String::new(),
cursor_pos: 0,
history: InputHistory::new(),
execution_mode,
agent_busy: Arc::new(AtomicBool::new(false)),
};
handler.run();
});
self.input_loop_handle = Some(handle);
}
pub async fn next_input(&self) -> Option<String> {
self.work_queue.try_pop().await.map(|w| w.input)
}
pub async fn wait_for_input(&self) -> String {
self.work_queue.pop().await.input
}
pub fn is_interrupted(&self) -> bool {
self.interrupted.load(Ordering::Relaxed)
}
pub fn clear_interrupt(&self) {
self.interrupted.store(false, Ordering::Relaxed);
}
pub async fn shutdown(mut self) {
self.shutdown.store(true, Ordering::Release);
if let Some(handle) = self.input_loop_handle.take() {
let _ = tokio::time::timeout(Duration::from_millis(500), handle).await;
}
}
}
struct RawInputLoop {
cancelled: Arc<AtomicBool>,
interrupted: Arc<AtomicBool>,
shutdown: Arc<AtomicBool>,
work_queue: WorkQueue,
buffer: String,
cursor_pos: usize,
history: InputHistory,
execution_mode: crate::config::ExecutionMode,
agent_busy: Arc<AtomicBool>,
}
impl RawInputLoop {
fn run(&mut self) {
if terminal::enable_raw_mode().is_err() {
return;
}
self.render_prompt();
loop {
if self.shutdown.load(Ordering::Relaxed) {
break;
}
match event::poll(Duration::from_millis(100)) {
Ok(true) => {
if let Ok(evt) = event::read() {
if self.handle_event(evt) {
break; }
}
}
Ok(false) => {
}
Err(_) => break,
}
}
let _ = terminal::disable_raw_mode();
}
fn handle_event(&mut self, evt: Event) -> bool {
match evt {
Event::Key(key) => self.handle_key(key),
Event::Paste(text) => {
self.buffer.insert_str(self.cursor_pos, &text);
self.cursor_pos += text.len();
self.render_prompt();
false
}
_ => false,
}
}
fn handle_key(&mut self, key: KeyEvent) -> bool {
match (key.code, key.modifiers) {
(KeyCode::Esc, _) => {
self.cancelled.store(true, Ordering::Relaxed);
self.interrupted.store(true, Ordering::Relaxed);
self.write_raw(b"\r\n\x1b[33m[ESC] Cancelling...\x1b[0m\r\n");
self.render_prompt();
false
}
(KeyCode::Char('c'), m) if m.contains(KeyModifiers::CONTROL) => {
self.cancelled.store(true, Ordering::Relaxed);
self.interrupted.store(true, Ordering::Relaxed);
self.write_raw(b"\r\n\x1b[33m[Ctrl+C] Cancelling...\x1b[0m\r\n");
self.render_prompt();
false
}
(KeyCode::Char('d'), m) if m.contains(KeyModifiers::CONTROL) => {
self.write_raw(b"\r\n");
true
}
(KeyCode::Enter, _) => {
self.submit_input();
false
}
(KeyCode::Backspace, _) => {
if self.cursor_pos > 0 {
let prev = self.buffer[..self.cursor_pos]
.char_indices()
.next_back()
.map(|(i, _)| i)
.unwrap_or(0);
self.buffer.remove(prev);
self.cursor_pos = prev;
self.render_prompt();
}
false
}
(KeyCode::Delete, _) => {
if self.cursor_pos < self.buffer.len() {
self.buffer.remove(self.cursor_pos);
self.render_prompt();
}
false
}
(KeyCode::Left, _) => {
if self.cursor_pos > 0 {
let prev = self.buffer[..self.cursor_pos]
.char_indices()
.next_back()
.map(|(i, _)| i)
.unwrap_or(0);
self.cursor_pos = prev;
self.render_prompt();
}
false
}
(KeyCode::Right, _) => {
if self.cursor_pos < self.buffer.len() {
let next = self.buffer[self.cursor_pos..]
.char_indices()
.nth(1)
.map(|(i, _)| self.cursor_pos + i)
.unwrap_or(self.buffer.len());
self.cursor_pos = next;
self.render_prompt();
}
false
}
(KeyCode::Up, _) => {
if let Some(entry) = self.history.up(&self.buffer) {
self.buffer = entry.to_string();
self.cursor_pos = self.buffer.len();
self.render_prompt();
}
false
}
(KeyCode::Down, _) => {
match self.history.down() {
Some(entry) => {
self.buffer = entry.to_string();
self.cursor_pos = self.buffer.len();
}
None => {
self.buffer.clear();
self.cursor_pos = 0;
}
}
self.render_prompt();
false
}
(KeyCode::Home, _) => {
self.cursor_pos = 0;
self.render_prompt();
false
}
(KeyCode::End, _) => {
self.cursor_pos = self.buffer.len();
self.render_prompt();
false
}
(KeyCode::Char('u'), m) if m.contains(KeyModifiers::CONTROL) => {
self.buffer.clear();
self.cursor_pos = 0;
self.render_prompt();
false
}
(KeyCode::Char('w'), m) if m.contains(KeyModifiers::CONTROL) => {
if self.cursor_pos > 0 {
let before = &self.buffer[..self.cursor_pos];
let trimmed = before.trim_end();
let new_end = trimmed
.rfind(|c: char| c.is_whitespace())
.map(|i| i + 1)
.unwrap_or(0);
self.buffer.drain(new_end..self.cursor_pos);
self.cursor_pos = new_end;
self.render_prompt();
}
false
}
(KeyCode::Char('a'), m) if m.contains(KeyModifiers::CONTROL) => {
self.cursor_pos = 0;
self.render_prompt();
false
}
(KeyCode::Char('e'), m) if m.contains(KeyModifiers::CONTROL) => {
self.cursor_pos = self.buffer.len();
self.render_prompt();
false
}
(KeyCode::Char('k'), m) if m.contains(KeyModifiers::CONTROL) => {
self.buffer.truncate(self.cursor_pos);
self.render_prompt();
false
}
(KeyCode::Char(ch), m) if !m.contains(KeyModifiers::CONTROL) => {
self.buffer.insert(self.cursor_pos, ch);
self.cursor_pos += ch.len_utf8();
self.render_prompt();
false
}
(KeyCode::Tab, _) => {
false
}
_ => false,
}
}
fn submit_input(&mut self) {
let input = self.buffer.trim().to_string();
self.write_raw(b"\r\n");
if input.is_empty() {
self.buffer.clear();
self.cursor_pos = 0;
self.render_prompt();
return;
}
self.history.add(&input);
if self.handle_queue_command(&input) {
self.buffer.clear();
self.cursor_pos = 0;
self.render_prompt();
return;
}
if let Some((delay, remaining)) = parse_delay_prefix(&input) {
let execute_after = Instant::now() + delay;
let wq = self.work_queue.clone();
let secs = delay.as_secs();
let label = if secs >= 3600 {
format!("{}h", secs / 3600)
} else if secs >= 60 {
format!("{}m", secs / 60)
} else {
format!("{}s", secs)
};
self.write_raw(
format!("\x1b[36m[Delayed @{}]\x1b[0m {}\r\n", label, remaining).as_bytes(),
);
let remaining_clone = remaining.clone();
tokio::task::block_in_place(move || {
tokio::runtime::Handle::current().block_on(async {
wq.push_with_priority(
remaining_clone,
WorkPriority::Delayed,
Some(execute_after),
)
.await;
});
});
self.buffer.clear();
self.cursor_pos = 0;
self.render_prompt();
return;
}
let (is_high, clean_input) = parse_priority_prefix(&input);
let priority = if is_high {
WorkPriority::High
} else {
WorkPriority::Normal
};
let wq = self.work_queue.clone();
let ci = clean_input.clone();
tokio::task::block_in_place(move || {
tokio::runtime::Handle::current().block_on(async {
wq.push_with_priority(ci, priority, None).await;
});
});
self.buffer.clear();
self.cursor_pos = 0;
self.render_prompt();
}
fn handle_queue_command(&mut self, input: &str) -> bool {
let input = input.trim();
if input == "/queue" || input == "/q" {
let items = tokio::task::block_in_place(|| {
tokio::runtime::Handle::current()
.block_on(async { self.work_queue.snapshot().await })
});
if items.is_empty() {
self.write_raw(b"\x1b[36m[Queue] Empty\x1b[0m\r\n");
} else {
let header = format!("\x1b[36m[Queue] {} item(s):\x1b[0m\r\n", items.len());
self.write_raw(header.as_bytes());
for (i, item) in items.iter().enumerate() {
let priority_tag = match item.priority {
WorkPriority::High => " \x1b[31m[HIGH]\x1b[0m",
WorkPriority::Delayed => " \x1b[33m[DELAYED]\x1b[0m",
WorkPriority::Normal => "",
};
let delay_info = if let Some(after) = item.execute_after {
let remaining = after.saturating_duration_since(Instant::now());
if remaining.as_secs() > 0 {
format!(" (in {}s)", remaining.as_secs())
} else {
" (ready)".to_string()
}
} else {
String::new()
};
let preview = if item.input.len() > 60 {
format!("{}...", &item.input[..57])
} else {
item.input.clone()
};
let line =
format!(" {}. {}{}{}\r\n", i + 1, preview, priority_tag, delay_info,);
self.write_raw(line.as_bytes());
}
}
return true;
}
if input == "/queue clear" || input == "/q clear" {
let count = tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async { self.work_queue.clear().await })
});
let msg = format!("\x1b[36m[Queue] Cleared {} item(s)\x1b[0m\r\n", count);
self.write_raw(msg.as_bytes());
return true;
}
if input == "/queue next" || input == "/q next" {
let next = tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async { self.work_queue.peek().await })
});
match next {
Some(item) => {
let msg = format!("\x1b[36m[Queue] Next:\x1b[0m {}\r\n", item.input);
self.write_raw(msg.as_bytes());
}
None => {
self.write_raw(b"\x1b[36m[Queue] Empty\x1b[0m\r\n");
}
}
return true;
}
false
}
fn render_prompt(&self) {
let mut stderr = io::stderr();
let queue_len = tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async { self.work_queue.len().await })
});
let mode_indicator = match self.execution_mode {
crate::config::ExecutionMode::Normal => "[N]",
crate::config::ExecutionMode::AutoEdit => "[A]",
crate::config::ExecutionMode::Yolo => "[Y]",
crate::config::ExecutionMode::Daemon => "[D]",
};
let queue_indicator = if queue_len > 0 {
format!(" [{} queued]", queue_len)
} else {
String::new()
};
let prompt = format!(
"\x1b[2K\r\x1b[36mselfware\x1b[0m \x1b[33m{}\x1b[0m\x1b[90m{}\x1b[0m\x1b[36m>\x1b[0m ",
mode_indicator, queue_indicator
);
let _ = stderr.write_all(prompt.as_bytes());
let _ = stderr.write_all(self.buffer.as_bytes());
let chars_after_cursor = self.buffer[self.cursor_pos..].chars().count();
if chars_after_cursor > 0 {
let _ = write!(stderr, "\x1b[{}D", chars_after_cursor);
}
let _ = stderr.flush();
}
fn write_raw(&self, bytes: &[u8]) {
let _ = io::stderr().write_all(bytes);
let _ = io::stderr().flush();
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_delay_prefix_minutes() {
let result = parse_delay_prefix("@5m run tests");
assert!(result.is_some());
let (dur, remaining) = result.unwrap();
assert_eq!(dur, Duration::from_secs(300));
assert_eq!(remaining, "run tests");
}
#[test]
fn test_parse_delay_prefix_seconds() {
let result = parse_delay_prefix("@30s check status");
assert!(result.is_some());
let (dur, remaining) = result.unwrap();
assert_eq!(dur, Duration::from_secs(30));
assert_eq!(remaining, "check status");
}
#[test]
fn test_parse_delay_prefix_hours() {
let result = parse_delay_prefix("@2h deploy");
assert!(result.is_some());
let (dur, remaining) = result.unwrap();
assert_eq!(dur, Duration::from_secs(7200));
assert_eq!(remaining, "deploy");
}
#[test]
fn test_parse_delay_prefix_no_command() {
let result = parse_delay_prefix("@5m");
assert!(result.is_none());
}
#[test]
fn test_parse_delay_prefix_no_prefix() {
let result = parse_delay_prefix("run tests");
assert!(result.is_none());
}
#[test]
fn test_parse_delay_prefix_invalid_suffix() {
let result = parse_delay_prefix("@5x run tests");
assert!(result.is_none());
}
#[test]
fn test_parse_priority_prefix_high() {
let (is_high, text) = parse_priority_prefix("!fix this now");
assert!(is_high);
assert_eq!(text, "fix this now");
}
#[test]
fn test_parse_priority_prefix_normal() {
let (is_high, text) = parse_priority_prefix("do something");
assert!(!is_high);
assert_eq!(text, "do something");
}
#[test]
fn test_parse_priority_prefix_bare_bang() {
let (is_high, text) = parse_priority_prefix("!");
assert!(!is_high);
assert_eq!(text, "!");
}
#[tokio::test]
async fn test_work_queue_push_pop() {
let q = WorkQueue::new();
q.push("task 1".to_string()).await;
q.push("task 2".to_string()).await;
assert_eq!(q.len().await, 2);
let item = q.try_pop().await.unwrap();
assert_eq!(item.input, "task 1");
assert_eq!(q.len().await, 1);
}
#[tokio::test]
async fn test_work_queue_high_priority_goes_front() {
let q = WorkQueue::new();
q.push("normal task".to_string()).await;
q.push_with_priority("urgent task".to_string(), WorkPriority::High, None)
.await;
let item = q.try_pop().await.unwrap();
assert_eq!(item.input, "urgent task");
assert_eq!(item.priority, WorkPriority::High);
}
#[tokio::test]
async fn test_work_queue_delayed_not_ready() {
let q = WorkQueue::new();
let future = Instant::now() + Duration::from_secs(3600);
q.push_with_priority(
"future task".to_string(),
WorkPriority::Delayed,
Some(future),
)
.await;
assert!(q.try_pop().await.is_none());
assert_eq!(q.len().await, 1);
}
#[tokio::test]
async fn test_work_queue_delayed_ready() {
let q = WorkQueue::new();
let past = Instant::now() - Duration::from_secs(1);
q.push_with_priority("past task".to_string(), WorkPriority::Delayed, Some(past))
.await;
let item = q.try_pop().await.unwrap();
assert_eq!(item.input, "past task");
}
#[tokio::test]
async fn test_work_queue_clear() {
let q = WorkQueue::new();
q.push("a".to_string()).await;
q.push("b".to_string()).await;
q.push("c".to_string()).await;
let cleared = q.clear().await;
assert_eq!(cleared, 3);
assert!(q.is_empty().await);
}
#[tokio::test]
async fn test_work_queue_snapshot() {
let q = WorkQueue::new();
q.push("first".to_string()).await;
q.push("second".to_string()).await;
let snap = q.snapshot().await;
assert_eq!(snap.len(), 2);
assert_eq!(snap[0].input, "first");
assert_eq!(snap[1].input, "second");
}
#[tokio::test]
async fn test_work_queue_peek() {
let q = WorkQueue::new();
q.push("peeked".to_string()).await;
let item = q.peek().await.unwrap();
assert_eq!(item.input, "peeked");
assert_eq!(q.len().await, 1);
}
#[test]
fn test_input_history_add_and_navigate() {
let mut history = InputHistory {
entries: Vec::new(),
position: None,
max_entries: 5,
file_path: None,
};
history.add("first");
history.add("second");
history.add("third");
assert_eq!(history.up(""), Some("third"));
assert_eq!(history.up(""), Some("second"));
assert_eq!(history.up(""), Some("first"));
assert_eq!(history.up(""), Some("first"));
assert_eq!(history.down(), Some("second"));
assert_eq!(history.down(), Some("third"));
assert_eq!(history.down(), None);
}
#[test]
fn test_input_history_dedup() {
let mut history = InputHistory {
entries: Vec::new(),
position: None,
max_entries: 5,
file_path: None,
};
history.add("same");
history.add("same");
history.add("same");
assert_eq!(history.entries.len(), 1);
}
#[test]
fn test_input_history_max_entries() {
let mut history = InputHistory {
entries: Vec::new(),
position: None,
max_entries: 3,
file_path: None,
};
history.add("one");
history.add("two");
history.add("three");
history.add("four");
assert_eq!(history.entries.len(), 3);
assert_eq!(history.entries[0], "two");
}
#[test]
fn test_input_handler_new() {
let cancelled = Arc::new(AtomicBool::new(false));
let handler = InputHandler::new(Arc::clone(&cancelled));
assert!(!handler.is_interrupted());
handler.interrupted.store(true, Ordering::Relaxed);
assert!(handler.is_interrupted());
handler.clear_interrupt();
assert!(!handler.is_interrupted());
}
}