Skip to main content

ai_agent/utils/task/
task_framework.rs

1//! Task framework utilities.
2//! Contains task management, polling, and notification logic.
3
4use std::collections::HashMap;
5use std::sync::Arc;
6use tokio::sync::Mutex;
7
8/// Standard polling interval for all tasks
9pub const POLL_INTERVAL_MS: u64 = 1000;
10
11/// Duration to display killed tasks before eviction
12pub const STOPPED_DISPLAY_MS: u64 = 3_000;
13
14/// Grace period for terminal local_agent tasks in the coordinator panel
15pub const PANEL_GRACE_MS: u64 = 30_000;
16
17/// Task types
18#[derive(Debug, Clone, PartialEq, Eq)]
19#[allow(dead_code)]
20pub enum TaskType {
21    LocalBash,
22    LocalAgent,
23    RemoteAgent,
24    InProcessTeammate,
25    LocalWorkflow,
26    MonitorMcp,
27    Dream,
28}
29
30impl TaskType {
31    pub fn as_str(&self) -> &'static str {
32        match self {
33            TaskType::LocalBash => "local_bash",
34            TaskType::LocalAgent => "local_agent",
35            TaskType::RemoteAgent => "remote_agent",
36            TaskType::InProcessTeammate => "in_process_teammate",
37            TaskType::LocalWorkflow => "local_workflow",
38            TaskType::MonitorMcp => "monitor_mcp",
39            TaskType::Dream => "dream",
40        }
41    }
42}
43
44/// Task status
45#[derive(Debug, Clone, PartialEq, Eq)]
46#[allow(dead_code)]
47pub enum TaskStatus {
48    Pending,
49    Running,
50    Completed,
51    Failed,
52    Killed,
53}
54
55impl TaskStatus {
56    pub fn as_str(&self) -> &'static str {
57        match self {
58            TaskStatus::Pending => "pending",
59            TaskStatus::Running => "running",
60            TaskStatus::Completed => "completed",
61            TaskStatus::Failed => "failed",
62            TaskStatus::Killed => "killed",
63        }
64    }
65}
66
67/// True when a task is in a terminal state and will not transition further.
68pub fn is_terminal_task_status(status: &TaskStatus) -> bool {
69    matches!(
70        status,
71        TaskStatus::Completed | TaskStatus::Failed | TaskStatus::Killed
72    )
73}
74
75/// Base task state
76#[derive(Debug, Clone)]
77#[allow(dead_code)]
78pub struct TaskStateBase {
79    pub id: String,
80    pub task_type: TaskType,
81    pub status: TaskStatus,
82    pub description: String,
83    pub tool_use_id: Option<String>,
84    pub start_time: u64,
85    pub end_time: Option<u64>,
86    pub total_paused_ms: Option<u64>,
87    pub output_file: String,
88    pub output_offset: u64,
89    pub notified: bool,
90    // Optional fields for local agent
91    pub retain: Option<bool>,
92    pub evict_after: Option<u64>,
93    pub messages: Option<String>,
94    pub disk_loaded: Option<bool>,
95    pub pending_messages: Option<Vec<String>>,
96}
97
98/// Attachment type for task status updates
99#[derive(Debug, Clone)]
100pub struct TaskAttachment {
101    pub task_id: String,
102    pub tool_use_id: Option<String>,
103    pub task_type: TaskType,
104    pub status: TaskStatus,
105    pub description: String,
106    pub delta_summary: Option<String>, // New output since last attachment
107}
108
109/// Type alias for app state
110pub type AppState = HashMap<String, TaskStateBase>;
111
112/// Type alias for set app state function
113pub type SetAppState = Arc<dyn Fn(Arc<Mutex<AppState>>) -> Arc<Mutex<AppState>> + Send + Sync>;
114
115/// Update a task's state in AppState.
116/// Helper function for task implementations.
117/// Generic to allow type-safe updates for specific task types.
118pub fn update_task_state<T: TaskStateTrait>(
119    task_id: &str,
120    set_app_state: &SetAppState,
121    updater: impl FnOnce(&T) -> T,
122) {
123    let app_state = Arc::new(Mutex::new(HashMap::new()));
124    let new_state = set_app_state(app_state.clone());
125
126    // Note: In a real implementation, this would update the actual app state
127    // This is a simplified version showing the pattern
128}
129
130/// Task state trait for type-safe updates
131pub trait TaskStateTrait {
132    fn as_any(&self) -> &dyn std::any::Any;
133    fn clone_box(&self) -> Box<dyn TaskStateTrait>;
134}
135
136impl<T: Clone + 'static> TaskStateTrait for T {
137    fn as_any(&self) -> &dyn std::any::Any {
138        self
139    }
140
141    fn clone_box(&self) -> Box<dyn TaskStateTrait> {
142        Box::new(self.clone())
143    }
144}
145
146/// Register a new task in AppState.
147pub fn register_task(task: TaskStateBase, set_app_state: &SetAppState) -> bool {
148    let mut is_replacement = false;
149    let task_for_emit = task.clone();
150
151    let _app_state = Arc::new(Mutex::new(HashMap::new()));
152    let existing_state = set_app_state(_app_state.clone());
153    let mut state = existing_state.blocking_lock();
154
155    // Check if task exists (would be replacement)
156    is_replacement = state.contains_key(&task_for_emit.id);
157
158    if !is_replacement {
159        state.insert(task_for_emit.id.clone(), task_for_emit.clone());
160    }
161
162    drop(state);
163
164    // Replacement (resume) — not a new start. Skip to avoid double-emit.
165    if is_replacement {
166        return false;
167    }
168
169    // Emit task_started SDK event
170    crate::utils::sdk_event_queue::emit_task_started(
171        &task_for_emit.id,
172        task_for_emit.tool_use_id.clone(),
173        &task_for_emit.description,
174        Some(task_for_emit.task_type.as_str().to_string()),
175        None, // workflow_name — not in TaskStateBase
176        None, // prompt — not in TaskStateBase
177    );
178
179    is_replacement
180}
181
182/// Eagerly evict a terminal task from AppState.
183/// The task must be in a terminal state (completed/failed/killed) with notified=true.
184pub fn evict_terminal_task(task_id: &str, set_app_state: &SetAppState) {
185    let _ = (task_id, set_app_state);
186    // Note: In real implementation, this would:
187    // 1. Check if task exists and is terminal
188    // 2. Check if task has been notified
189    // 3. Check retain/evict_after for grace period
190    // 4. Remove task from app state
191}
192
193/// Get all running tasks.
194pub fn get_running_tasks(state: &AppState) -> Vec<&TaskStateBase> {
195    state
196        .values()
197        .filter(|task| task.status == TaskStatus::Running)
198        .collect()
199}
200
201/// Generate attachments for tasks with new output or status changes.
202/// Called by the framework to create push notifications.
203pub async fn generate_task_attachments(
204    state: Arc<Mutex<AppState>>,
205) -> (
206    Vec<TaskAttachment>,
207    HashMap<String, u64>, // updated task offsets
208    Vec<String>,          // evicted task ids
209) {
210    let mut attachments = Vec::new();
211    let mut updated_task_offsets: HashMap<String, u64> = HashMap::new();
212    let mut evicted_task_ids: Vec<String> = Vec::new();
213
214    let tasks = state.lock().await;
215
216    for (task_id, task_state) in tasks.iter() {
217        if task_state.notified {
218            match task_state.status {
219                TaskStatus::Completed | TaskStatus::Failed | TaskStatus::Killed => {
220                    // Evict terminal tasks
221                    evicted_task_ids.push(task_id.clone());
222                    continue;
223                }
224                TaskStatus::Pending => {
225                    // Keep in map — hasn't run yet
226                    continue;
227                }
228                TaskStatus::Running => {
229                    // Fall through to running logic
230                }
231            }
232        }
233
234        if task_state.status == TaskStatus::Running {
235            // Get delta output
236            // In real implementation: get_task_output_delta(task_state.id, task_state.output_offset)
237            // For now, just update offset (simplified)
238            updated_task_offsets.insert(task_id.clone(), task_state.output_offset);
239        }
240    }
241
242    (attachments, updated_task_offsets, evicted_task_ids)
243}
244
245/// Apply the outputOffset patches and evictions from generate_task_attachments.
246/// Merges patches against fresh state.
247pub fn apply_task_offsets_and_evictions(
248    set_app_state: &SetAppState,
249    updated_task_offsets: HashMap<String, u64>,
250    evicted_task_ids: Vec<String>,
251) {
252    if updated_task_offsets.is_empty() && evicted_task_ids.is_empty() {
253        return;
254    }
255
256    let _ = (set_app_state, updated_task_offsets, evicted_task_ids);
257    // Note: In real implementation, this would update app state atomically
258}
259
260/// Poll all running tasks and check for updates.
261/// This is the main polling loop called by the framework.
262pub async fn poll_tasks(
263    get_app_state: impl Fn() -> Arc<Mutex<AppState>>,
264    set_app_state: &SetAppState,
265) {
266    let state = get_app_state();
267    let (attachments, updated_task_offsets, evicted_task_ids) =
268        generate_task_attachments(state).await;
269
270    apply_task_offsets_and_evictions(set_app_state, updated_task_offsets, evicted_task_ids);
271
272    // Send notifications for completed tasks
273    for attachment in attachments {
274        enqueue_task_notification(attachment);
275    }
276}
277
278/// Enqueue a task notification to the message queue.
279fn enqueue_task_notification(attachment: TaskAttachment) {
280    let status_text = get_status_text(&attachment.status);
281
282    // Note: In real implementation, this would create XML notification
283    // using the constants from xml.ts
284    let _ = (attachment, status_text);
285    // <task-notification>...message...</task-notification>
286}
287
288/// Get human-readable status text.
289fn get_status_text(status: &TaskStatus) -> &'static str {
290    match status {
291        TaskStatus::Completed => "completed successfully",
292        TaskStatus::Failed => "failed",
293        TaskStatus::Killed => "was stopped",
294        TaskStatus::Running => "is running",
295        TaskStatus::Pending => "is pending",
296    }
297}
298
299// XML tag constants
300pub const TASK_NOTIFICATION_TAG: &str = "task-notification";
301pub const TASK_ID_TAG: &str = "task-id";
302pub const TOOL_USE_ID_TAG: &str = "tool-use-id";
303pub const TASK_TYPE_TAG: &str = "task-type";
304pub const OUTPUT_FILE_TAG: &str = "output-file";
305pub const STATUS_TAG: &str = "status";
306pub const SUMMARY_TAG: &str = "summary";
307
308/// Helper to format task notification XML
309pub fn format_task_notification(
310    task_id: &str,
311    tool_use_id: Option<&str>,
312    task_type: &TaskType,
313    output_file: &str,
314    status: &TaskStatus,
315    description: &str,
316) -> String {
317    let tool_use_id_line = tool_use_id
318        .map(|id| format!("\n<{}>{}</{}>", TOOL_USE_ID_TAG, id, TOOL_USE_ID_TAG))
319        .unwrap_or_default();
320
321    let status_text = get_status_text(status);
322
323    format!(
324        "<{}>\
325<{}>{}</{}>{}\
326<{}>{}</{}>\
327<{}>{}</{}>\
328<{}>{}</{}>\
329<{}>Task \"{}\" {}</{}>\
330</{}>",
331        TASK_NOTIFICATION_TAG,
332        TASK_ID_TAG,
333        task_id,
334        TASK_ID_TAG,
335        tool_use_id_line,
336        TASK_TYPE_TAG,
337        task_type.as_str(),
338        TASK_TYPE_TAG,
339        OUTPUT_FILE_TAG,
340        output_file,
341        OUTPUT_FILE_TAG,
342        STATUS_TAG,
343        status.as_str(),
344        STATUS_TAG,
345        SUMMARY_TAG,
346        description,
347        status_text,
348        SUMMARY_TAG,
349        TASK_NOTIFICATION_TAG
350    )
351}
352
353/// Task output structure
354#[derive(Debug, Clone)]
355pub struct TaskOutput {
356    pub task_id: String,
357    pub content: String,
358    pub timestamp: i64,
359}
360
361/// Maximum task output size in bytes
362pub const MAX_TASK_OUTPUT_BYTES: usize = 100_000;
363
364/// Maximum task output size for display
365pub const MAX_TASK_OUTPUT_BYTES_DISPLAY: &str = "100KB";
366
367/// Initialize task output
368#[allow(dead_code)]
369pub fn init_task_output(_task_id: &str) -> std::path::PathBuf {
370    // Stub - would create task output file
371    std::env::temp_dir().join("task_output.txt")
372}
373
374/// Initialize task output as symlink
375#[allow(dead_code)]
376pub fn init_task_output_as_symlink(
377    _task_id: &str,
378    _target: &std::path::Path,
379) -> std::io::Result<()> {
380    // Stub - would create symlink
381    Ok(())
382}
383
384/// Get task output path
385#[allow(dead_code)]
386pub fn get_task_output_path(_task_id: &str) -> std::path::PathBuf {
387    std::env::temp_dir().join("task_output.txt")
388}
389
390/// Get task output size
391#[allow(dead_code)]
392pub fn get_task_output_size(_task_id: &str) -> usize {
393    0
394}
395
396/// Get task output
397#[allow(dead_code)]
398pub fn get_task_output(_task_id: &str) -> Option<TaskOutput> {
399    None
400}
401
402/// Get task output delta (for streaming)
403#[allow(dead_code)]
404pub fn get_task_output_delta(_task_id: &str, _from_byte: usize) -> Option<String> {
405    None
406}
407
408/// Append to task output
409#[allow(dead_code)]
410pub fn append_task_output(_task_id: &str, _content: &str) -> std::io::Result<usize> {
411    Ok(0)
412}
413
414/// Cleanup task output
415#[allow(dead_code)]
416pub fn cleanup_task_output(_task_id: &str) -> std::io::Result<()> {
417    Ok(())
418}
419
420/// Evict task output (remove old outputs)
421#[allow(dead_code)]
422pub fn evict_task_output(_task_id: &str) -> std::io::Result<()> {
423    Ok(())
424}
425
426/// Flush task output (ensure written to disk)
427#[allow(dead_code)]
428pub fn flush_task_output(_task_id: &str) -> std::io::Result<()> {
429    Ok(())
430}