Skip to main content

codetether_agent/tui/app/
background.rs

1use std::path::Path;
2
3use tokio::sync::mpsc;
4
5use crate::bus::BusHandle;
6use crate::session::{Session, SessionEvent};
7use crate::tui::app::session_events::handle_session_event;
8use crate::tui::app::session_sync::refresh_sessions;
9use crate::tui::app::state::App;
10use crate::tui::app::worker_bridge::handle_processing_stopped;
11use crate::tui::chat::message::{ChatMessage, MessageType};
12use crate::tui::worker_bridge::TuiWorkerBridge;
13
14pub async fn drain_background_updates(
15    app: &mut App,
16    cwd: &Path,
17    session: &mut Session,
18    bus_handle: &mut BusHandle,
19    worker_bridge: &mut Option<TuiWorkerBridge>,
20    event_rx: &mut mpsc::Receiver<SessionEvent>,
21    result_rx: &mut mpsc::Receiver<anyhow::Result<Session>>,
22) {
23    ingest_bus_messages(app, bus_handle);
24    queue_worker_tasks(app, worker_bridge);
25    display_next_worker_task(app);
26    apply_completed_sessions(app, cwd, session, worker_bridge, result_rx).await;
27    apply_session_events(app, session, worker_bridge, event_rx).await;
28}
29
30fn ingest_bus_messages(app: &mut App, bus_handle: &mut BusHandle) {
31    while let Some(envelope) = bus_handle.try_recv() {
32        app.state.bus_log.ingest(&envelope);
33    }
34}
35
36fn queue_worker_tasks(app: &mut App, worker_bridge: &mut Option<TuiWorkerBridge>) {
37    if let Some(bridge) = worker_bridge.as_mut() {
38        while let Ok(task) = bridge.task_rx.try_recv() {
39            let preview = format!(
40                "{} from {}",
41                task.task_id,
42                task.from_agent
43                    .clone()
44                    .unwrap_or_else(|| "unknown".to_string())
45            );
46            app.state.push_recent_task(preview);
47            app.state.enqueue_worker_task(task);
48            app.state.status = format!(
49                "Queued remote task(s): {}",
50                app.state.worker_task_queue.len()
51            );
52        }
53    }
54}
55
56fn display_next_worker_task(app: &mut App) {
57    if !app.state.processing
58        && let Some(task) = app.state.dequeue_worker_task()
59    {
60        let from_agent = task.from_agent.unwrap_or_else(|| "unknown".to_string());
61        app.state.messages.push(ChatMessage::new(
62            MessageType::System,
63            format!(
64                "Incoming A2A task {} from {}\n{}",
65                task.task_id, from_agent, task.message
66            ),
67        ));
68        app.state.status = format!("Received remote task {}", task.task_id);
69        app.state.scroll_to_bottom();
70    }
71}
72
73async fn apply_completed_sessions(
74    app: &mut App,
75    cwd: &Path,
76    session: &mut Session,
77    worker_bridge: &mut Option<TuiWorkerBridge>,
78    result_rx: &mut mpsc::Receiver<anyhow::Result<Session>>,
79) {
80    while let Ok(updated_session) = result_rx.try_recv() {
81        apply_single_result(app, cwd, session, worker_bridge, updated_session).await;
82    }
83}
84
85/// Process a single completed session result.
86///
87/// Extracted so both the `tokio::select!` branch in the event loop and the
88/// batch drain path can share the same logic.
89pub async fn apply_single_result(
90    app: &mut App,
91    cwd: &Path,
92    session: &mut Session,
93    worker_bridge: &mut Option<TuiWorkerBridge>,
94    result: anyhow::Result<Session>,
95) {
96    match result {
97        Ok(updated_session) => {
98            // Guard against stale results overwriting a newer session
99            // BEFORE resetting processing. This prevents a late result
100            // from the old session from clearing processing/timing on
101            // a new in-flight request.
102            if updated_session.id != session.id {
103                tracing::warn!(
104                    stale_id = %updated_session.id,
105                    current_id = %session.id,
106                    "Discarding stale session result from a previous session"
107                );
108                // Persist the old result and refresh so the session picker
109                // reflects the saved content from the previous session.
110                let _ = updated_session.save().await;
111                refresh_sessions(app, cwd).await;
112                return;
113            }
114
115            // Reset processing — the Done event may not have been
116            // consumed yet via event_rx (tokio::select! race condition).
117            if app.state.processing {
118                handle_processing_stopped(app, worker_bridge).await;
119                app.state.clear_request_timing();
120            }
121
122            *session = updated_session;
123            session.attach_global_bus_if_missing();
124            app.state.session_id = Some(session.id.clone());
125            let _ = session.save().await;
126            refresh_sessions(app, cwd).await;
127        }
128        Err(err) => {
129            handle_processing_stopped(app, worker_bridge).await;
130            app.state
131                .messages
132                .push(ChatMessage::new(MessageType::Error, err.to_string()));
133            app.state.status = "Request failed".to_string();
134            app.state.scroll_to_bottom();
135        }
136    }
137}
138
139async fn apply_session_events(
140    app: &mut App,
141    session: &mut Session,
142    worker_bridge: &Option<TuiWorkerBridge>,
143    event_rx: &mut mpsc::Receiver<SessionEvent>,
144) {
145    while let Ok(evt) = event_rx.try_recv() {
146        handle_session_event(app, session, worker_bridge, evt).await;
147    }
148}