1use std::path::PathBuf;
2use std::time::{Duration, SystemTime, UNIX_EPOCH};
3
4use crate::bridge::events::{BridgeEvent, PromptResult};
5use crate::bridge::{AcpBridge, BridgeCancelHandle};
6use crate::client::permissions::{PermissionMode, resolve_permission};
7use crate::error::{AcpCliError, Result};
8use crate::output::OutputRenderer;
9use crate::output::json::JsonRenderer;
10use crate::output::quiet::QuietRenderer;
11use crate::output::text::TextRenderer;
12use crate::queue::client::QueueClient;
13use crate::queue::ipc::start_ipc_server;
14use crate::queue::lease::LeaseFile;
15use crate::queue::owner::QueueOwner;
16use crate::session::history::{ConversationEntry, append_entry};
17use crate::session::persistence::SessionRecord;
18use crate::session::pid;
19use crate::session::scoping::{find_git_root, session_dir, session_key};
20
21struct PidGuard {
24 session_key: String,
25}
26
27impl PidGuard {
28 fn new(session_key: &str) -> std::io::Result<Self> {
29 pid::write_pid(session_key)?;
30 Ok(Self {
31 session_key: session_key.to_string(),
32 })
33 }
34}
35
36impl Drop for PidGuard {
37 fn drop(&mut self) {
38 let _ = pid::remove_pid(&self.session_key);
39 }
40}
41
42fn make_renderer(output_format: &str) -> Box<dyn OutputRenderer> {
44 match output_format {
45 "json" => Box::new(JsonRenderer::new()),
46 "quiet" => Box::new(QuietRenderer::new()),
47 _ => Box::new(TextRenderer::new()),
48 }
49}
50
51struct EventLoopResult {
53 exit_code: i32,
54 assistant_text: String,
55 acp_session_id: Option<String>,
57}
58
59async fn event_loop(
65 evt_rx: &mut tokio::sync::mpsc::UnboundedReceiver<BridgeEvent>,
66 prompt_reply: tokio::sync::oneshot::Receiver<Result<PromptResult>>,
67 cancel: &BridgeCancelHandle,
68 renderer: &mut Box<dyn OutputRenderer>,
69 permission_mode: &PermissionMode,
70 timeout_secs: Option<u64>,
71) -> Result<EventLoopResult> {
72 let mut cancel_sent = false;
73 let mut collected_text = String::new();
74 let mut acp_session_id: Option<String> = None;
75
76 let timeout_fut = async {
78 match timeout_secs {
79 Some(secs) => tokio::time::sleep(Duration::from_secs(secs)).await,
80 None => std::future::pending::<()>().await,
81 }
82 };
83 tokio::pin!(timeout_fut);
84 tokio::pin!(prompt_reply);
85
86 loop {
87 tokio::select! {
88 event = evt_rx.recv() => {
89 match event {
90 Some(BridgeEvent::TextChunk { text }) => {
91 collected_text.push_str(&text);
92 renderer.text_chunk(&text);
93 }
94 Some(BridgeEvent::ToolUse { name }) => renderer.tool_status(&name),
95 Some(BridgeEvent::PermissionRequest { tool, options, reply }) => {
96 let decision = resolve_permission(&tool, &options, permission_mode);
97 if matches!(decision, crate::bridge::PermissionOutcome::Cancelled) {
98 renderer.permission_denied(&tool.name);
99 }
100 let _ = reply.send(decision);
101 }
102 Some(BridgeEvent::SessionCreated { session_id }) => {
103 acp_session_id = Some(session_id.clone());
104 renderer.session_info(&session_id);
105 }
106 Some(BridgeEvent::PromptDone { .. }) => {
107 }
109 Some(BridgeEvent::Error { message }) => {
110 renderer.error(&message);
111 }
112 Some(BridgeEvent::AgentExited { code }) => {
113 if let Some(c) = code
114 && c != 0
115 {
116 renderer.error(&format!("agent exited with code {c}"));
117 }
118 }
119 None => break, }
121 }
122 result = &mut prompt_reply => {
123 renderer.done();
125 return match result {
126 Ok(Ok(_)) => Ok(EventLoopResult { exit_code: 0, assistant_text: collected_text, acp_session_id: acp_session_id.clone() }),
127 Ok(Err(e)) => {
128 renderer.error(&e.to_string());
129 Ok(EventLoopResult { exit_code: e.exit_code(), assistant_text: collected_text, acp_session_id: acp_session_id.clone() })
130 }
131 Err(_) => {
132 renderer.error("bridge connection lost");
134 Ok(EventLoopResult { exit_code: 1, assistant_text: collected_text, acp_session_id: acp_session_id.clone() })
135 }
136 };
137 }
138 _ = tokio::signal::ctrl_c() => {
139 if cancel_sent {
140 return Err(AcpCliError::Interrupted);
142 }
143 cancel_sent = true;
144 eprintln!("\nCancelling... (press Ctrl+C again to force quit)");
145 let _ = cancel.cancel().await;
146 }
147 _ = &mut timeout_fut => {
148 eprintln!("\nTimeout after {}s", timeout_secs.unwrap_or(0));
149 let _ = cancel.cancel().await;
150 tokio::time::sleep(Duration::from_secs(3)).await;
151 return Err(AcpCliError::Timeout(timeout_secs.unwrap_or(0)));
152 }
153 }
154 }
155
156 renderer.done();
157 Ok(EventLoopResult {
158 exit_code: 0,
159 assistant_text: collected_text,
160 acp_session_id,
161 })
162}
163
164#[allow(clippy::too_many_arguments)]
171pub async fn run_prompt(
172 agent_name: &str,
173 command: String,
174 args: Vec<String>,
175 cwd: PathBuf,
176 prompt_text: String,
177 session_name: Option<String>,
178 permission_mode: PermissionMode,
179 output_format: &str,
180 timeout_secs: Option<u64>,
181 no_wait: bool,
182) -> Result<i32> {
183 let mut renderer = make_renderer(output_format);
184
185 let queue_ttl_secs: u64 = 300;
187
188 let resolved_dir = find_git_root(&cwd).unwrap_or_else(|| cwd.clone());
192 let dir_str = resolved_dir.to_string_lossy();
193 let sess_name = session_name.as_deref().unwrap_or("");
194 let key = session_key(agent_name, &dir_str, sess_name);
195
196 let sess_file = session_dir().join(format!("{key}.json"));
197 let existing = SessionRecord::load(&sess_file).ok().flatten();
198 let is_resume = existing.as_ref().is_some_and(|r| !r.closed);
199
200 if existing.is_none() {
201 let now = std::time::SystemTime::now()
202 .duration_since(std::time::UNIX_EPOCH)
203 .unwrap_or_default()
204 .as_secs();
205 let record = SessionRecord {
206 id: key.clone(),
207 agent: agent_name.to_string(),
208 cwd: resolved_dir,
209 name: session_name,
210 created_at: now,
211 closed: false,
212 acp_session_id: None,
213 };
214 if let Err(e) = record.save(&sess_file) {
215 renderer.error(&format!("failed to save session: {e}"));
216 }
217 }
218
219 if is_resume {
220 renderer.session_info(&format!("resuming session {}", &key[..12.min(key.len())]));
221 }
222
223 if let Some(lease) = LeaseFile::read(&key)
228 && lease.is_valid(queue_ttl_secs)
229 {
230 match QueueClient::connect(&key).await {
231 Ok(mut client) => {
232 renderer.session_info("Connected to queue owner");
233
234 if no_wait {
236 let position = client.enqueue_only(vec![prompt_text.clone()]).await?;
237 renderer.session_info(&format!("Prompt queued (position {position})"));
238 renderer.done();
239
240 let now = SystemTime::now()
243 .duration_since(UNIX_EPOCH)
244 .unwrap_or_default()
245 .as_secs();
246 let user_entry = ConversationEntry {
247 role: "user".to_string(),
248 content: prompt_text,
249 timestamp: now,
250 };
251 let _ = append_entry(&key, &user_entry);
252
253 return Ok(0);
254 }
255
256 let result = client
257 .prompt(vec![prompt_text.clone()], &mut *renderer, &permission_mode)
258 .await;
259 renderer.done();
260
261 let now = SystemTime::now()
263 .duration_since(UNIX_EPOCH)
264 .unwrap_or_default()
265 .as_secs();
266 let user_entry = ConversationEntry {
267 role: "user".to_string(),
268 content: prompt_text,
269 timestamp: now,
270 };
271 let _ = append_entry(&key, &user_entry);
272
273 if let Ok(ref pr) = result
274 && !pr.content.is_empty()
275 {
276 let assistant_entry = ConversationEntry {
277 role: "assistant".to_string(),
278 content: pr.content.clone(),
279 timestamp: now,
280 };
281 let _ = append_entry(&key, &assistant_entry);
282 }
283
284 return result.map(|_| 0);
285 }
286 Err(e) => {
287 if no_wait {
289 return Err(AcpCliError::Usage(
290 "No active session. Run without --no-wait first to start a session."
291 .to_string(),
292 ));
293 }
294 renderer.session_info(&format!(
297 "Could not connect to queue owner (pid {}): {e}; starting new session",
298 lease.pid
299 ));
300 }
301 }
302 }
303
304 if no_wait {
309 return Err(AcpCliError::Usage(
310 "No active session. Run without --no-wait first to start a session.".to_string(),
311 ));
312 }
313
314 let _pid_guard = PidGuard::new(&key).map_err(|e| {
317 renderer.error(&format!("failed to write pid file: {e}"));
318 AcpCliError::Io(e)
319 })?;
320
321 LeaseFile::write(&key).map_err(|e| {
323 renderer.error(&format!("failed to write lease file: {e}"));
324 AcpCliError::Io(e)
325 })?;
326
327 let listener = start_ipc_server(&key).await.map_err(|e| {
329 LeaseFile::remove(&key);
330 renderer.error(&format!("failed to start IPC server: {e}"));
331 AcpCliError::Io(e)
332 })?;
333
334 let mut bridge = AcpBridge::start(command, args, cwd).await?;
336 let cancel = bridge.cancel_handle();
337
338 let prompt_reply = bridge.send_prompt(vec![prompt_text.clone()]).await?;
340
341 let loop_result = event_loop(
343 &mut bridge.evt_rx,
344 prompt_reply,
345 &cancel,
346 &mut renderer,
347 &permission_mode,
348 timeout_secs,
349 )
350 .await;
351
352 if let Ok(ref res) = loop_result
354 && let Some(ref new_acp_id) = res.acp_session_id
355 && let Ok(Some(mut record)) = SessionRecord::load(&sess_file)
356 {
357 let _ = record.update_acp_session_id(new_acp_id.clone(), &sess_file);
358 }
359
360 if let Ok(ref res) = loop_result {
362 let now = SystemTime::now()
363 .duration_since(UNIX_EPOCH)
364 .unwrap_or_default()
365 .as_secs();
366
367 let user_entry = ConversationEntry {
368 role: "user".to_string(),
369 content: prompt_text,
370 timestamp: now,
371 };
372 let _ = append_entry(&key, &user_entry);
373
374 if !res.assistant_text.is_empty() {
375 let assistant_entry = ConversationEntry {
376 role: "assistant".to_string(),
377 content: res.assistant_text.clone(),
378 timestamp: now,
379 };
380 let _ = append_entry(&key, &assistant_entry);
381 }
382 }
383
384 if loop_result.is_err() {
386 LeaseFile::remove(&key);
387 crate::queue::ipc::cleanup_socket(&key);
388 let _ = bridge.shutdown().await;
389 return loop_result.map(|r| r.exit_code);
390 }
391
392 let first_exit_code = loop_result.map(|r| r.exit_code)?;
393
394 let owner = QueueOwner::new(bridge, listener, &key, queue_ttl_secs).await?;
397 let _ = owner.run().await;
398
399 Ok(first_exit_code)
400}
401
402pub async fn run_exec(
404 command: String,
405 args: Vec<String>,
406 cwd: PathBuf,
407 prompt_text: String,
408 permission_mode: PermissionMode,
409 output_format: &str,
410 timeout_secs: Option<u64>,
411) -> Result<i32> {
412 let mut renderer = make_renderer(output_format);
413
414 let mut bridge = AcpBridge::start(command, args, cwd).await?;
416 let cancel = bridge.cancel_handle();
417
418 let prompt_reply = bridge.send_prompt(vec![prompt_text]).await?;
420
421 let result = event_loop(
423 &mut bridge.evt_rx,
424 prompt_reply,
425 &cancel,
426 &mut renderer,
427 &permission_mode,
428 timeout_secs,
429 )
430 .await;
431
432 let _ = bridge.shutdown().await;
433 result.map(|r| r.exit_code)
434}