vtcode_bash_runner/
background.rs

1use anyhow::Result;
2use std::collections::HashMap;
3use std::sync::Arc;
4use tokio::sync::{RwLock, oneshot};
5
6use crate::executor::{CommandExecutor, CommandInvocation, CommandOutput};
7
8#[derive(Debug, Clone)]
9pub struct BackgroundTaskHandle {
10    pub id: String,
11    pub command: String,
12    pub status: BackgroundTaskStatus,
13}
14
15#[derive(Debug, Clone, PartialEq)]
16pub enum BackgroundTaskStatus {
17    Pending,
18    Running,
19    Completed,
20    Failed,
21}
22
23#[derive(Debug)]
24pub struct BackgroundTask {
25    pub id: String,
26    pub invocation: CommandInvocation,
27    pub status: BackgroundTaskStatus,
28    pub result: Option<Result<CommandOutput, String>>,
29    pub cancel_tx: Option<oneshot::Sender<()>>,
30}
31
32pub struct BackgroundCommandManager<E: CommandExecutor> {
33    executor: Arc<E>,
34    tasks: Arc<RwLock<HashMap<String, BackgroundTask>>>,
35    next_id: Arc<RwLock<u64>>,
36}
37
38impl<E: CommandExecutor + 'static> BackgroundCommandManager<E> {
39    pub fn new(executor: E) -> Self {
40        Self {
41            executor: Arc::new(executor),
42            tasks: Arc::new(RwLock::new(HashMap::new())),
43            next_id: Arc::new(RwLock::new(1)),
44        }
45    }
46
47    pub async fn run_command(&self, invocation: CommandInvocation) -> Result<String> {
48        let task_id = self.generate_task_id().await;
49
50        let (cancel_tx, cancel_rx) = oneshot::channel();
51
52        let task = BackgroundTask {
53            id: task_id.clone(),
54            invocation: invocation.clone(),
55            status: BackgroundTaskStatus::Pending,
56            result: None,
57            cancel_tx: Some(cancel_tx),
58        };
59
60        {
61            let mut tasks = self.tasks.write().await;
62            tasks.insert(task_id.clone(), task);
63        }
64
65        // Update status to running
66        self.update_task_status(&task_id, BackgroundTaskStatus::Running)
67            .await;
68
69        // Spawn the background task
70        let executor = self.executor.clone();
71        let tasks = self.tasks.clone();
72        let id = task_id.clone();
73
74        tokio::spawn(async move {
75            let result = tokio::select! {
76                command_result = execute_command(executor.as_ref(), &invocation) => {
77                    command_result
78                }
79                _ = cancel_rx => {
80                    // Task was cancelled
81                    Err(anyhow::anyhow!("Command was cancelled"))
82                }
83            };
84
85            let mut tasks = tasks.write().await;
86            if let Some(task) = tasks.get_mut(&id) {
87                task.status = match result.is_ok() {
88                    true => BackgroundTaskStatus::Completed,
89                    false => BackgroundTaskStatus::Failed,
90                };
91                task.result = Some(result.map_err(|e| e.to_string()));
92                task.cancel_tx = None; // Clear the cancel sender
93            }
94        });
95
96        Ok(task_id)
97    }
98
99    pub async fn get_task(&self, task_id: &str) -> Option<BackgroundTaskHandle> {
100        let tasks = self.tasks.read().await;
101        tasks.get(task_id).map(|task| BackgroundTaskHandle {
102            id: task.id.clone(),
103            command: task.invocation.command.clone(),
104            status: task.status.clone(),
105        })
106    }
107
108    pub async fn get_task_output(&self, task_id: &str) -> Option<Result<CommandOutput, String>> {
109        let tasks = self.tasks.read().await;
110        tasks.get(task_id).and_then(|task| task.result.clone())
111    }
112
113    pub async fn list_tasks(&self) -> Vec<BackgroundTaskHandle> {
114        let tasks = self.tasks.read().await;
115        tasks
116            .values()
117            .map(|task| BackgroundTaskHandle {
118                id: task.id.clone(),
119                command: task.invocation.command.clone(),
120                status: task.status.clone(),
121            })
122            .collect()
123    }
124
125    pub async fn cancel_task(&self, task_id: &str) -> Result<()> {
126        let mut tasks = self.tasks.write().await;
127        if let Some(task) = tasks.get_mut(task_id) {
128            if let Some(cancel_tx) = task.cancel_tx.take() {
129                if cancel_tx.send(()).is_ok() {
130                    task.status = BackgroundTaskStatus::Failed;
131                    return Ok(());
132                }
133            }
134        }
135        anyhow::bail!("Task not found or already completed: {}", task_id);
136    }
137
138    async fn generate_task_id(&self) -> String {
139        let mut next_id = self.next_id.write().await;
140        let id = format!("bg-{}", *next_id);
141        *next_id += 1;
142        id
143    }
144
145    async fn update_task_status(&self, task_id: &str, status: BackgroundTaskStatus) {
146        let mut tasks = self.tasks.write().await;
147        if let Some(task) = tasks.get_mut(task_id) {
148            task.status = status;
149        }
150    }
151}
152
153async fn execute_command<E: CommandExecutor>(
154    executor: &E,
155    invocation: &CommandInvocation,
156) -> Result<CommandOutput> {
157    executor.execute(invocation)
158}