use std::io::Write;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::Receiver;
use std::thread::{self, JoinHandle};
use tokio::sync::mpsc::UnboundedSender;
use crate::commands::{Command, CommandResult};
use crate::repl::Repl;
use crate::ui::UI;
use super::event::{ExitSummary, Job, UiEvent};
fn flush_captured_streams() {
let _ = std::io::stdout().flush();
let _ = std::io::stderr().flush();
}
pub struct WorkerHandle {
pub thread: JoinHandle<()>,
}
pub fn spawn(
mut repl: Repl,
job_rx: Receiver<Job>,
ui_tx: UnboundedSender<UiEvent>,
interrupt: Arc<AtomicBool>,
) -> std::io::Result<WorkerHandle> {
let thread = thread::Builder::new()
.name("sofos-worker".into())
.spawn(move || run(&mut repl, job_rx, ui_tx, interrupt))?;
Ok(WorkerHandle { thread })
}
struct ShutdownSender<'a> {
ui_tx: &'a UnboundedSender<UiEvent>,
summary: Option<ExitSummary>,
sent: bool,
}
impl<'a> ShutdownSender<'a> {
fn new(ui_tx: &'a UnboundedSender<UiEvent>) -> Self {
Self {
ui_tx,
summary: None,
sent: false,
}
}
fn set_summary(&mut self, summary: ExitSummary) {
self.summary = Some(summary);
}
fn send_now(&mut self) {
if self.sent {
return;
}
let summary = self.summary.take().unwrap_or(ExitSummary {
model: String::new(),
input_tokens: 0,
output_tokens: 0,
});
let _ = self.ui_tx.send(UiEvent::WorkerShutdown(summary));
self.sent = true;
}
}
impl Drop for ShutdownSender<'_> {
fn drop(&mut self) {
self.send_now();
}
}
fn run(
repl: &mut Repl,
job_rx: Receiver<Job>,
ui_tx: UnboundedSender<UiEvent>,
interrupt: Arc<AtomicBool>,
) {
let mut shutdown = ShutdownSender::new(&ui_tx);
let _ = ui_tx.send(UiEvent::Status(repl.status_snapshot()));
while let Ok(job) = job_rx.recv() {
match job {
Job::Shutdown => break,
Job::Message { text, images } => {
interrupt.store(false, Ordering::SeqCst);
let _ = ui_tx.send(UiEvent::WorkerBusy("processing".into()));
if let Err(e) = repl.process_message(&text, images) {
if !matches!(e, crate::error::SofosError::Interrupted) {
if e.is_blocked() {
UI::print_blocked_with_hint(&e);
} else {
UI::print_error_with_hint(&e);
}
}
}
if let Err(e) = repl.save_current_session() {
UI::print_warning(&format!("Failed to save session: {}", e));
}
println!();
flush_captured_streams();
let _ = ui_tx.send(UiEvent::Status(repl.status_snapshot()));
let _ = ui_tx.send(UiEvent::WorkerIdle);
}
Job::Command(cmd) => {
interrupt.store(false, Ordering::SeqCst);
let _ = ui_tx.send(UiEvent::WorkerBusy("command".into()));
match run_command(repl, cmd, &ui_tx) {
Ok(CommandResult::Exit) => {
break;
}
Ok(CommandResult::Continue) => {}
Err(e) => {
if e.is_blocked() {
UI::print_blocked_with_hint(&e);
} else {
UI::print_error_with_hint(&e);
}
}
}
flush_captured_streams();
let _ = ui_tx.send(UiEvent::Status(repl.status_snapshot()));
let _ = ui_tx.send(UiEvent::WorkerIdle);
}
Job::ResumeSelected(Some(session_id)) => {
interrupt.store(false, Ordering::SeqCst);
let _ = ui_tx.send(UiEvent::WorkerBusy("loading".into()));
if let Err(e) = repl.load_session_by_id(&session_id) {
UI::print_error_with_hint(&e);
}
flush_captured_streams();
let _ = ui_tx.send(UiEvent::Status(repl.status_snapshot()));
let _ = ui_tx.send(UiEvent::WorkerIdle);
}
Job::ResumeSelected(None) => {
flush_captured_streams();
let _ = ui_tx.send(UiEvent::WorkerIdle);
}
}
}
let (model, input_tokens, output_tokens) = repl.get_session_summary();
if let Err(e) = repl.save_current_session() {
UI::print_warning(&format!("Failed to save session: {}", e));
}
flush_captured_streams();
shutdown.set_summary(ExitSummary {
model,
input_tokens,
output_tokens,
});
shutdown.send_now();
}
fn run_command(
repl: &mut Repl,
cmd: Command,
ui_tx: &UnboundedSender<UiEvent>,
) -> crate::error::Result<CommandResult> {
match cmd {
Command::Resume => {
let sessions = repl.list_saved_sessions()?;
if sessions.is_empty() {
println!("No saved sessions found.");
return Ok(CommandResult::Continue);
}
let _ = ui_tx.send(UiEvent::ShowResumePicker(sessions));
Ok(CommandResult::Continue)
}
_ => cmd.execute(repl),
}
}