codetether_agent/tui/app/
background.rs1use std::path::Path;
2
3use tokio::sync::mpsc;
4
5use crate::bus::BusHandle;
6use crate::session::{Session, SessionEvent};
7use crate::tui::app::session_events::handle_session_event;
8use crate::tui::app::session_sync::refresh_sessions;
9use crate::tui::app::state::App;
10use crate::tui::app::worker_bridge::handle_processing_stopped;
11use crate::tui::chat::message::{ChatMessage, MessageType};
12use crate::tui::worker_bridge::TuiWorkerBridge;
13
14pub async fn drain_background_updates(
15 app: &mut App,
16 cwd: &Path,
17 session: &mut Session,
18 bus_handle: &mut BusHandle,
19 worker_bridge: &mut Option<TuiWorkerBridge>,
20 event_rx: &mut mpsc::Receiver<SessionEvent>,
21 result_rx: &mut mpsc::Receiver<anyhow::Result<Session>>,
22) {
23 ingest_bus_messages(app, bus_handle);
24 queue_worker_tasks(app, worker_bridge);
25 display_next_worker_task(app);
26 apply_completed_sessions(app, cwd, session, worker_bridge, result_rx).await;
27 apply_session_events(app, session, worker_bridge, event_rx).await;
28}
29
30fn ingest_bus_messages(app: &mut App, bus_handle: &mut BusHandle) {
31 while let Some(envelope) = bus_handle.try_recv() {
32 app.state.bus_log.ingest(&envelope);
33 }
34}
35
36fn queue_worker_tasks(app: &mut App, worker_bridge: &mut Option<TuiWorkerBridge>) {
37 if let Some(bridge) = worker_bridge.as_mut() {
38 while let Ok(task) = bridge.task_rx.try_recv() {
39 let preview = format!(
40 "{} from {}",
41 task.task_id,
42 task.from_agent
43 .clone()
44 .unwrap_or_else(|| "unknown".to_string())
45 );
46 app.state.push_recent_task(preview);
47 app.state.enqueue_worker_task(task);
48 app.state.status = format!(
49 "Queued remote task(s): {}",
50 app.state.worker_task_queue.len()
51 );
52 }
53 }
54}
55
56fn display_next_worker_task(app: &mut App) {
57 if !app.state.processing
58 && let Some(task) = app.state.dequeue_worker_task()
59 {
60 let from_agent = task.from_agent.unwrap_or_else(|| "unknown".to_string());
61 app.state.messages.push(ChatMessage::new(
62 MessageType::System,
63 format!(
64 "Incoming A2A task {} from {}\n{}",
65 task.task_id, from_agent, task.message
66 ),
67 ));
68 app.state.status = format!("Received remote task {}", task.task_id);
69 app.state.scroll_to_bottom();
70 }
71}
72
73async fn apply_completed_sessions(
74 app: &mut App,
75 cwd: &Path,
76 session: &mut Session,
77 worker_bridge: &mut Option<TuiWorkerBridge>,
78 result_rx: &mut mpsc::Receiver<anyhow::Result<Session>>,
79) {
80 while let Ok(updated_session) = result_rx.try_recv() {
81 apply_single_result(app, cwd, session, worker_bridge, updated_session).await;
82 }
83}
84
85pub async fn apply_single_result(
90 app: &mut App,
91 cwd: &Path,
92 session: &mut Session,
93 worker_bridge: &mut Option<TuiWorkerBridge>,
94 result: anyhow::Result<Session>,
95) {
96 match result {
97 Ok(updated_session) => {
98 if updated_session.id != session.id {
103 tracing::warn!(
104 stale_id = %updated_session.id,
105 current_id = %session.id,
106 "Discarding stale session result from a previous session"
107 );
108 let _ = updated_session.save().await;
111 refresh_sessions(app, cwd).await;
112 return;
113 }
114
115 if app.state.processing {
118 handle_processing_stopped(app, worker_bridge).await;
119 app.state.clear_request_timing();
120 }
121
122 *session = updated_session;
123 session.attach_global_bus_if_missing();
124 app.state.session_id = Some(session.id.clone());
125 let _ = session.save().await;
126 refresh_sessions(app, cwd).await;
127 }
128 Err(err) => {
129 handle_processing_stopped(app, worker_bridge).await;
130 app.state
131 .messages
132 .push(ChatMessage::new(MessageType::Error, err.to_string()));
133 app.state.status = "Request failed".to_string();
134 app.state.scroll_to_bottom();
135 }
136 }
137}
138
139async fn apply_session_events(
140 app: &mut App,
141 session: &mut Session,
142 worker_bridge: &Option<TuiWorkerBridge>,
143 event_rx: &mut mpsc::Receiver<SessionEvent>,
144) {
145 while let Ok(evt) = event_rx.try_recv() {
146 handle_session_event(app, session, worker_bridge, evt).await;
147 }
148}