1use std::path::PathBuf;
2use std::time::{Duration, SystemTime, UNIX_EPOCH};
3
4use crate::bridge::events::{BridgeEvent, PromptResult};
5use crate::bridge::{AcpBridge, BridgeCancelHandle};
6use crate::client::permissions::{PermissionMode, resolve_permission};
7use crate::error::{AcpCliError, Result, is_transient};
8use crate::output::OutputRenderer;
9use crate::output::json::JsonRenderer;
10use crate::output::quiet::QuietRenderer;
11use crate::output::text::TextRenderer;
12use crate::queue::client::QueueClient;
13use crate::queue::ipc::start_ipc_server;
14use crate::queue::lease::LeaseFile;
15use crate::queue::owner::QueueOwner;
16use crate::session::history::{ConversationEntry, append_entry};
17use crate::session::persistence::SessionRecord;
18use crate::session::pid;
19use crate::session::scoping::{find_git_root, session_dir, session_key};
20
21fn backoff(attempt: u32) -> Duration {
26 let base = Duration::from_secs(1u64 << attempt.min(6));
27 let jitter = Duration::from_millis(
28 (SystemTime::now()
29 .duration_since(UNIX_EPOCH)
30 .unwrap_or_default()
31 .subsec_millis()
32 % 500) as u64,
33 );
34 base + jitter
35}
36
37fn log_retry(attempt: u32, total: u32, err: &AcpCliError, delay: Duration) {
42 eprintln!("prompt attempt {attempt}/{total} failed ({err}), retrying in {delay:?}");
43}
44
45struct PidGuard {
48 session_key: String,
49}
50
51impl PidGuard {
52 fn new(session_key: &str) -> std::io::Result<Self> {
53 pid::write_pid(session_key)?;
54 Ok(Self {
55 session_key: session_key.to_string(),
56 })
57 }
58}
59
60impl Drop for PidGuard {
61 fn drop(&mut self) {
62 let _ = pid::remove_pid(&self.session_key);
63 }
64}
65
66fn make_renderer(output_format: &str, suppress_reads: bool) -> Box<dyn OutputRenderer> {
68 match output_format {
69 "json" => Box::new(JsonRenderer::new(suppress_reads)),
70 "quiet" => Box::new(QuietRenderer::new()),
71 _ => Box::new(TextRenderer::new(suppress_reads)),
72 }
73}
74
75struct EventLoopResult {
77 exit_code: i32,
78 assistant_text: String,
79 acp_session_id: Option<String>,
81}
82
83async fn event_loop(
89 evt_rx: &mut tokio::sync::mpsc::UnboundedReceiver<BridgeEvent>,
90 prompt_reply: tokio::sync::oneshot::Receiver<Result<PromptResult>>,
91 cancel: &BridgeCancelHandle,
92 renderer: &mut Box<dyn OutputRenderer>,
93 permission_mode: &PermissionMode,
94 timeout_secs: Option<u64>,
95) -> Result<EventLoopResult> {
96 let mut cancel_sent = false;
97 let mut collected_text = String::new();
98 let mut acp_session_id: Option<String> = None;
99
100 let timeout_fut = async {
102 match timeout_secs {
103 Some(secs) => tokio::time::sleep(Duration::from_secs(secs)).await,
104 None => std::future::pending::<()>().await,
105 }
106 };
107 tokio::pin!(timeout_fut);
108 tokio::pin!(prompt_reply);
109
110 loop {
111 tokio::select! {
112 event = evt_rx.recv() => {
113 match event {
114 Some(BridgeEvent::TextChunk { text }) => {
115 collected_text.push_str(&text);
116 renderer.text_chunk(&text);
117 }
118 Some(BridgeEvent::ToolUse { name }) => renderer.tool_status(&name),
119 Some(BridgeEvent::ToolResult { name, output, is_read }) => {
120 renderer.tool_result(&name, &output, is_read);
121 }
122 Some(BridgeEvent::PermissionRequest { tool, options, reply }) => {
123 let decision = resolve_permission(&tool, &options, permission_mode);
124 if matches!(decision, crate::bridge::PermissionOutcome::Cancelled) {
125 renderer.permission_denied(&tool.name);
126 }
127 let _ = reply.send(decision);
128 }
129 Some(BridgeEvent::SessionCreated { session_id }) => {
130 acp_session_id = Some(session_id.clone());
131 renderer.session_info(&session_id);
132 }
133 Some(BridgeEvent::PromptDone { .. }) => {
134 }
136 Some(BridgeEvent::Error { message }) => {
137 renderer.error(&message);
138 }
139 Some(BridgeEvent::AgentExited { code }) => {
140 if let Some(c) = code
141 && c != 0
142 {
143 renderer.error(&format!("agent exited with code {c}"));
144 }
145 }
146 None => break, }
148 }
149 result = &mut prompt_reply => {
150 renderer.done();
152 return match result {
153 Ok(Ok(_)) => Ok(EventLoopResult { exit_code: 0, assistant_text: collected_text, acp_session_id: acp_session_id.clone() }),
154 Ok(Err(e)) => {
155 renderer.error(&e.to_string());
156 Ok(EventLoopResult { exit_code: e.exit_code(), assistant_text: collected_text, acp_session_id: acp_session_id.clone() })
157 }
158 Err(_) => {
159 renderer.error("bridge connection lost");
161 Ok(EventLoopResult { exit_code: 1, assistant_text: collected_text, acp_session_id: acp_session_id.clone() })
162 }
163 };
164 }
165 _ = tokio::signal::ctrl_c() => {
166 if cancel_sent {
167 return Err(AcpCliError::Interrupted);
169 }
170 cancel_sent = true;
171 eprintln!("\nCancelling... (press Ctrl+C again to force quit)");
172 let _ = cancel.cancel().await;
173 }
174 _ = &mut timeout_fut => {
175 eprintln!("\nTimeout after {}s", timeout_secs.unwrap_or(0));
176 let _ = cancel.cancel().await;
177 tokio::time::sleep(Duration::from_secs(3)).await;
178 return Err(AcpCliError::Timeout(timeout_secs.unwrap_or(0)));
179 }
180 }
181 }
182
183 renderer.done();
184 Ok(EventLoopResult {
185 exit_code: 0,
186 assistant_text: collected_text,
187 acp_session_id,
188 })
189}
190
191#[allow(clippy::too_many_arguments)]
198pub async fn run_prompt(
199 agent_name: &str,
200 command: String,
201 args: Vec<String>,
202 cwd: PathBuf,
203 prompt_text: String,
204 session_name: Option<String>,
205 permission_mode: PermissionMode,
206 output_format: &str,
207 timeout_secs: Option<u64>,
208 no_wait: bool,
209 prompt_retries: u32,
210 suppress_reads: bool,
211) -> Result<i32> {
212 let mut renderer = make_renderer(output_format, suppress_reads);
213
214 let queue_ttl_secs: u64 = 300;
216
217 let resolved_dir = find_git_root(&cwd).unwrap_or_else(|| cwd.clone());
221 let dir_str = resolved_dir.to_string_lossy();
222 let sess_name = session_name.as_deref().unwrap_or("");
223 let key = session_key(agent_name, &dir_str, sess_name);
224
225 let sess_file = session_dir().join(format!("{key}.json"));
226 let existing = SessionRecord::load(&sess_file).ok().flatten();
227 let is_resume = existing.as_ref().is_some_and(|r| !r.closed);
228
229 if existing.is_none() {
230 let now = std::time::SystemTime::now()
231 .duration_since(std::time::UNIX_EPOCH)
232 .unwrap_or_default()
233 .as_secs();
234 let record = SessionRecord {
235 id: key.clone(),
236 agent: agent_name.to_string(),
237 cwd: resolved_dir,
238 name: session_name,
239 created_at: now,
240 closed: false,
241 acp_session_id: None,
242 };
243 if let Err(e) = record.save(&sess_file) {
244 renderer.error(&format!("failed to save session: {e}"));
245 }
246 }
247
248 if is_resume {
249 renderer.session_info(&format!("resuming session {}", &key[..12.min(key.len())]));
250 }
251
252 if let Some(lease) = LeaseFile::read(&key)
257 && lease.is_valid(queue_ttl_secs)
258 {
259 match QueueClient::connect(&key).await {
260 Ok(mut client) => {
261 renderer.session_info("Connected to queue owner");
262
263 if no_wait {
265 let position = client.enqueue_only(vec![prompt_text.clone()]).await?;
266 renderer.session_info(&format!("Prompt queued (position {position})"));
267 renderer.done();
268
269 let now = SystemTime::now()
272 .duration_since(UNIX_EPOCH)
273 .unwrap_or_default()
274 .as_secs();
275 let user_entry = ConversationEntry {
276 role: "user".to_string(),
277 content: prompt_text,
278 timestamp: now,
279 };
280 let _ = append_entry(&key, &user_entry);
281
282 return Ok(0);
283 }
284
285 let result = client
286 .prompt(vec![prompt_text.clone()], &mut *renderer, &permission_mode)
287 .await;
288 renderer.done();
289
290 let now = SystemTime::now()
292 .duration_since(UNIX_EPOCH)
293 .unwrap_or_default()
294 .as_secs();
295 let user_entry = ConversationEntry {
296 role: "user".to_string(),
297 content: prompt_text,
298 timestamp: now,
299 };
300 let _ = append_entry(&key, &user_entry);
301
302 if let Ok(ref pr) = result
303 && !pr.content.is_empty()
304 {
305 let assistant_entry = ConversationEntry {
306 role: "assistant".to_string(),
307 content: pr.content.clone(),
308 timestamp: now,
309 };
310 let _ = append_entry(&key, &assistant_entry);
311 }
312
313 return result.map(|_| 0);
314 }
315 Err(e) => {
316 if no_wait {
318 return Err(AcpCliError::Usage(
319 "No active session. Run without --no-wait first to start a session."
320 .to_string(),
321 ));
322 }
323 renderer.session_info(&format!(
326 "Could not connect to queue owner (pid {}): {e}; starting new session",
327 lease.pid
328 ));
329 }
330 }
331 }
332
333 if no_wait {
338 return Err(AcpCliError::Usage(
339 "No active session. Run without --no-wait first to start a session.".to_string(),
340 ));
341 }
342
343 let _pid_guard = PidGuard::new(&key).map_err(|e| {
346 renderer.error(&format!("failed to write pid file: {e}"));
347 AcpCliError::Io(e)
348 })?;
349
350 LeaseFile::write(&key).map_err(|e| {
352 renderer.error(&format!("failed to write lease file: {e}"));
353 AcpCliError::Io(e)
354 })?;
355
356 let listener = start_ipc_server(&key).await.map_err(|e| {
358 LeaseFile::remove(&key);
359 renderer.error(&format!("failed to start IPC server: {e}"));
360 AcpCliError::Io(e)
361 })?;
362
363 let total_attempts = prompt_retries.saturating_add(1);
368 let mut attempt = 0u32; let (bridge, loop_result) = 'retry: loop {
370 let mut b = match AcpBridge::start(command.clone(), args.clone(), cwd.clone()).await {
371 Ok(b) => b,
372 Err(e) if is_transient(&e) && attempt < prompt_retries => {
373 let delay = backoff(attempt);
374 log_retry(attempt + 1, total_attempts, &e, delay);
375 attempt += 1;
376 tokio::time::sleep(delay).await;
377 continue 'retry;
378 }
379 Err(e) => {
380 LeaseFile::remove(&key);
381 crate::queue::ipc::cleanup_socket(&key);
382 return Err(e);
383 }
384 };
385
386 let cancel = b.cancel_handle();
387
388 let prompt_reply = match b.send_prompt(vec![prompt_text.clone()]).await {
390 Ok(rx) => rx,
391 Err(e) if is_transient(&e) && attempt < prompt_retries => {
392 let _ = b.shutdown().await;
393 let delay = backoff(attempt);
394 log_retry(attempt + 1, total_attempts, &e, delay);
395 attempt += 1;
396 tokio::time::sleep(delay).await;
397 continue 'retry;
398 }
399 Err(e) => {
400 let _ = b.shutdown().await;
401 LeaseFile::remove(&key);
402 crate::queue::ipc::cleanup_socket(&key);
403 return Err(e);
404 }
405 };
406
407 let result = event_loop(
409 &mut b.evt_rx,
410 prompt_reply,
411 &cancel,
412 &mut renderer,
413 &permission_mode,
414 timeout_secs,
415 )
416 .await;
417
418 match result {
425 Err(e) if is_transient(&e) && attempt < prompt_retries => {
426 let _ = b.shutdown().await;
427 let delay = backoff(attempt);
428 log_retry(attempt + 1, total_attempts, &e, delay);
429 attempt += 1;
430 tokio::time::sleep(delay).await;
431 continue 'retry;
432 }
433 result => break 'retry (b, result),
434 }
435 };
436
437 if let Ok(ref res) = loop_result
439 && let Some(ref new_acp_id) = res.acp_session_id
440 && let Ok(Some(mut record)) = SessionRecord::load(&sess_file)
441 {
442 let _ = record.update_acp_session_id(new_acp_id.clone(), &sess_file);
443 }
444
445 if let Ok(ref res) = loop_result {
447 let now = SystemTime::now()
448 .duration_since(UNIX_EPOCH)
449 .unwrap_or_default()
450 .as_secs();
451
452 let user_entry = ConversationEntry {
453 role: "user".to_string(),
454 content: prompt_text,
455 timestamp: now,
456 };
457 let _ = append_entry(&key, &user_entry);
458
459 if !res.assistant_text.is_empty() {
460 let assistant_entry = ConversationEntry {
461 role: "assistant".to_string(),
462 content: res.assistant_text.clone(),
463 timestamp: now,
464 };
465 let _ = append_entry(&key, &assistant_entry);
466 }
467 }
468
469 if loop_result.is_err() {
471 LeaseFile::remove(&key);
472 crate::queue::ipc::cleanup_socket(&key);
473 let _ = bridge.shutdown().await;
474 return loop_result.map(|r| r.exit_code);
475 }
476
477 let first_exit_code = loop_result.map(|r| r.exit_code)?;
478
479 let owner = QueueOwner::new(bridge, listener, &key, queue_ttl_secs).await?;
482 let _ = owner.run().await;
483
484 Ok(first_exit_code)
485}
486
487#[allow(clippy::too_many_arguments)]
489pub async fn run_exec(
490 command: String,
491 args: Vec<String>,
492 cwd: PathBuf,
493 prompt_text: String,
494 permission_mode: PermissionMode,
495 output_format: &str,
496 timeout_secs: Option<u64>,
497 prompt_retries: u32,
498 suppress_reads: bool,
499) -> Result<i32> {
500 let mut renderer = make_renderer(output_format, suppress_reads);
501 let total_attempts = prompt_retries.saturating_add(1);
502 let mut attempt = 0u32;
503
504 loop {
505 let mut bridge = match AcpBridge::start(command.clone(), args.clone(), cwd.clone()).await {
506 Ok(b) => b,
507 Err(e) if is_transient(&e) && attempt < prompt_retries => {
508 let delay = backoff(attempt);
509 log_retry(attempt + 1, total_attempts, &e, delay);
510 attempt += 1;
511 tokio::time::sleep(delay).await;
512 continue;
513 }
514 Err(e) => return Err(e),
515 };
516
517 let cancel = bridge.cancel_handle();
518
519 let prompt_reply = match bridge.send_prompt(vec![prompt_text.clone()]).await {
520 Ok(rx) => rx,
521 Err(e) if is_transient(&e) && attempt < prompt_retries => {
522 let _ = bridge.shutdown().await;
523 let delay = backoff(attempt);
524 log_retry(attempt + 1, total_attempts, &e, delay);
525 attempt += 1;
526 tokio::time::sleep(delay).await;
527 continue;
528 }
529 Err(e) => {
530 let _ = bridge.shutdown().await;
531 return Err(e);
532 }
533 };
534
535 let result = event_loop(
536 &mut bridge.evt_rx,
537 prompt_reply,
538 &cancel,
539 &mut renderer,
540 &permission_mode,
541 timeout_secs,
542 )
543 .await;
544
545 match result {
548 Err(e) if is_transient(&e) && attempt < prompt_retries => {
549 let _ = bridge.shutdown().await;
550 let delay = backoff(attempt);
551 log_retry(attempt + 1, total_attempts, &e, delay);
552 attempt += 1;
553 tokio::time::sleep(delay).await;
554 continue;
555 }
556 result => {
557 let _ = bridge.shutdown().await;
558 return result.map(|r| r.exit_code);
559 }
560 }
561 }
562}