Skip to main content

acp_cli/cli/
prompt.rs

1use std::path::PathBuf;
2use std::time::{Duration, SystemTime, UNIX_EPOCH};
3
4use crate::bridge::events::{BridgeEvent, PromptResult};
5use crate::bridge::{AcpBridge, BridgeCancelHandle};
6use crate::client::permissions::{PermissionMode, resolve_permission};
7use crate::error::{AcpCliError, Result, is_transient};
8use crate::output::OutputRenderer;
9use crate::output::json::JsonRenderer;
10use crate::output::quiet::QuietRenderer;
11use crate::output::text::TextRenderer;
12use crate::queue::client::QueueClient;
13use crate::queue::ipc::start_ipc_server;
14use crate::queue::lease::LeaseFile;
15use crate::queue::owner::QueueOwner;
16use crate::session::history::{ConversationEntry, append_entry};
17use crate::session::persistence::SessionRecord;
18use crate::session::pid;
19use crate::session::scoping::{find_git_root, session_dir, session_key};
20
21/// Exponential backoff duration for retry attempt `n` (0-based), capped at 64 s.
22/// Produces the sequence 1 s, 2 s, 4 s, 8 s, 16 s, 32 s, 64 s, 64 s, …
23/// A small jitter derived from the current nanosecond timestamp is added to
24/// prevent thundering-herd retries when multiple clients restart simultaneously.
25fn backoff(attempt: u32) -> Duration {
26    let base = Duration::from_secs(1u64 << attempt.min(6));
27    let jitter = Duration::from_millis(
28        (SystemTime::now()
29            .duration_since(UNIX_EPOCH)
30            .unwrap_or_default()
31            .subsec_millis()
32            % 500) as u64,
33    );
34    base + jitter
35}
36
37/// Emit a human-readable retry warning to stderr.
38///
39/// `attempt` is 1-based (the attempt that just failed); `total` is the total
40/// number of attempts that will be made (`prompt_retries + 1`).
41fn log_retry(attempt: u32, total: u32, err: &AcpCliError, delay: Duration) {
42    eprintln!("prompt attempt {attempt}/{total} failed ({err}), retrying in {delay:?}");
43}
44
45/// RAII guard that removes the PID file when dropped, ensuring cleanup even on
46/// early returns or panics.
47struct PidGuard {
48    session_key: String,
49}
50
51impl PidGuard {
52    fn new(session_key: &str) -> std::io::Result<Self> {
53        pid::write_pid(session_key)?;
54        Ok(Self {
55            session_key: session_key.to_string(),
56        })
57    }
58}
59
60impl Drop for PidGuard {
61    fn drop(&mut self) {
62        let _ = pid::remove_pid(&self.session_key);
63    }
64}
65
66/// Build a renderer from the format string and options.
67fn make_renderer(output_format: &str, suppress_reads: bool) -> Box<dyn OutputRenderer> {
68    match output_format {
69        "json" => Box::new(JsonRenderer::new(suppress_reads)),
70        "quiet" => Box::new(QuietRenderer::new()),
71        _ => Box::new(TextRenderer::new(suppress_reads)),
72    }
73}
74
75/// Result from the event loop, including the exit code and collected assistant text.
76struct EventLoopResult {
77    exit_code: i32,
78    assistant_text: String,
79    /// The ACP session ID emitted by the bridge (if any).
80    acp_session_id: Option<String>,
81}
82
83/// Core event loop shared by `run_prompt` and `run_exec`.
84///
85/// Drives the bridge's event channel concurrently with the prompt reply oneshot.
86/// Handles Ctrl-C (graceful cancel on first press, force quit on second) and
87/// enforces an optional timeout. Collects all TextChunk content for conversation logging.
88async fn event_loop(
89    evt_rx: &mut tokio::sync::mpsc::UnboundedReceiver<BridgeEvent>,
90    prompt_reply: tokio::sync::oneshot::Receiver<Result<PromptResult>>,
91    cancel: &BridgeCancelHandle,
92    renderer: &mut Box<dyn OutputRenderer>,
93    permission_mode: &PermissionMode,
94    timeout_secs: Option<u64>,
95) -> Result<EventLoopResult> {
96    let mut cancel_sent = false;
97    let mut collected_text = String::new();
98    let mut acp_session_id: Option<String> = None;
99
100    // Timeout: either sleep for the given duration, or pend forever.
101    let timeout_fut = async {
102        match timeout_secs {
103            Some(secs) => tokio::time::sleep(Duration::from_secs(secs)).await,
104            None => std::future::pending::<()>().await,
105        }
106    };
107    tokio::pin!(timeout_fut);
108    tokio::pin!(prompt_reply);
109
110    loop {
111        tokio::select! {
112            event = evt_rx.recv() => {
113                match event {
114                    Some(BridgeEvent::TextChunk { text }) => {
115                        collected_text.push_str(&text);
116                        renderer.text_chunk(&text);
117                    }
118                    Some(BridgeEvent::ToolUse { name }) => renderer.tool_status(&name),
119                    Some(BridgeEvent::ToolResult { name, output, is_read }) => {
120                        renderer.tool_result(&name, &output, is_read);
121                    }
122                    Some(BridgeEvent::PermissionRequest { tool, options, reply }) => {
123                        let decision = resolve_permission(&tool, &options, permission_mode);
124                        if matches!(decision, crate::bridge::PermissionOutcome::Cancelled) {
125                            renderer.permission_denied(&tool.name);
126                        }
127                        let _ = reply.send(decision);
128                    }
129                    Some(BridgeEvent::SessionCreated { session_id }) => {
130                        acp_session_id = Some(session_id.clone());
131                        renderer.session_info(&session_id);
132                    }
133                    Some(BridgeEvent::PromptDone { .. }) => {
134                        // Prompt finished on ACP side; continue draining events.
135                    }
136                    Some(BridgeEvent::Error { message }) => {
137                        renderer.error(&message);
138                    }
139                    Some(BridgeEvent::AgentExited { code }) => {
140                        if let Some(c) = code
141                            && c != 0
142                        {
143                            renderer.error(&format!("agent exited with code {c}"));
144                        }
145                    }
146                    None => break, // channel closed — agent done
147                }
148            }
149            result = &mut prompt_reply => {
150                // Prompt RPC completed (oneshot reply from bridge thread).
151                renderer.done();
152                return match result {
153                    Ok(Ok(_)) => Ok(EventLoopResult { exit_code: 0, assistant_text: collected_text, acp_session_id: acp_session_id.clone() }),
154                    Ok(Err(e)) => {
155                        renderer.error(&e.to_string());
156                        Ok(EventLoopResult { exit_code: e.exit_code(), assistant_text: collected_text, acp_session_id: acp_session_id.clone() })
157                    }
158                    Err(_) => {
159                        // Oneshot sender dropped — bridge died unexpectedly
160                        renderer.error("bridge connection lost");
161                        Ok(EventLoopResult { exit_code: 1, assistant_text: collected_text, acp_session_id: acp_session_id.clone() })
162                    }
163                };
164            }
165            _ = tokio::signal::ctrl_c() => {
166                if cancel_sent {
167                    // Second Ctrl+C — force quit
168                    return Err(AcpCliError::Interrupted);
169                }
170                cancel_sent = true;
171                eprintln!("\nCancelling... (press Ctrl+C again to force quit)");
172                let _ = cancel.cancel().await;
173            }
174            _ = &mut timeout_fut => {
175                eprintln!("\nTimeout after {}s", timeout_secs.unwrap_or(0));
176                let _ = cancel.cancel().await;
177                tokio::time::sleep(Duration::from_secs(3)).await;
178                return Err(AcpCliError::Timeout(timeout_secs.unwrap_or(0)));
179            }
180        }
181    }
182
183    renderer.done();
184    Ok(EventLoopResult {
185        exit_code: 0,
186        assistant_text: collected_text,
187        acp_session_id,
188    })
189}
190
191/// Run an interactive or piped prompt session.
192///
193/// Resolves or creates a session, starts the ACP bridge, sends the prompt, and
194/// enters the event loop with signal handling and optional timeout. After the
195/// first prompt completes, becomes the queue owner — listening on a Unix socket
196/// for subsequent prompts from IPC clients until idle timeout.
197#[allow(clippy::too_many_arguments)]
198pub async fn run_prompt(
199    agent_name: &str,
200    command: String,
201    args: Vec<String>,
202    cwd: PathBuf,
203    prompt_text: String,
204    session_name: Option<String>,
205    permission_mode: PermissionMode,
206    output_format: &str,
207    timeout_secs: Option<u64>,
208    no_wait: bool,
209    prompt_retries: u32,
210    suppress_reads: bool,
211) -> Result<i32> {
212    let mut renderer = make_renderer(output_format, suppress_reads);
213
214    // Default TTL for the queue owner (5 minutes).
215    let queue_ttl_secs: u64 = 300;
216
217    // Find or create session record.
218    // If an existing open session is found, reuse its key so conversation
219    // history accumulates across multiple runs.
220    let resolved_dir = find_git_root(&cwd).unwrap_or_else(|| cwd.clone());
221    let dir_str = resolved_dir.to_string_lossy();
222    let sess_name = session_name.as_deref().unwrap_or("");
223    let key = session_key(agent_name, &dir_str, sess_name);
224
225    let sess_file = session_dir().join(format!("{key}.json"));
226    let existing = SessionRecord::load(&sess_file).ok().flatten();
227    let is_resume = existing.as_ref().is_some_and(|r| !r.closed);
228
229    if existing.is_none() {
230        let now = std::time::SystemTime::now()
231            .duration_since(std::time::UNIX_EPOCH)
232            .unwrap_or_default()
233            .as_secs();
234        let record = SessionRecord {
235            id: key.clone(),
236            agent: agent_name.to_string(),
237            cwd: resolved_dir,
238            name: session_name,
239            created_at: now,
240            closed: false,
241            acp_session_id: None,
242        };
243        if let Err(e) = record.save(&sess_file) {
244            renderer.error(&format!("failed to save session: {e}"));
245        }
246    }
247
248    if is_resume {
249        renderer.session_info(&format!("resuming session {}", &key[..12.min(key.len())]));
250    }
251
252    // Check if a queue owner already exists for this session.
253    // Use the lease file (with heartbeat + PID liveness check) for reliable
254    // ownership detection. If a valid lease exists, connect as a queue client
255    // instead of starting a new bridge.
256    if let Some(lease) = LeaseFile::read(&key)
257        && lease.is_valid(queue_ttl_secs)
258    {
259        match QueueClient::connect(&key).await {
260            Ok(mut client) => {
261                renderer.session_info("Connected to queue owner");
262
263                // --no-wait: enqueue the prompt and return immediately.
264                if no_wait {
265                    let position = client.enqueue_only(vec![prompt_text.clone()]).await?;
266                    renderer.session_info(&format!("Prompt queued (position {position})"));
267                    renderer.done();
268
269                    // Log the user prompt (best-effort). No assistant entry since
270                    // we won't wait for the response.
271                    let now = SystemTime::now()
272                        .duration_since(UNIX_EPOCH)
273                        .unwrap_or_default()
274                        .as_secs();
275                    let user_entry = ConversationEntry {
276                        role: "user".to_string(),
277                        content: prompt_text,
278                        timestamp: now,
279                    };
280                    let _ = append_entry(&key, &user_entry);
281
282                    return Ok(0);
283                }
284
285                let result = client
286                    .prompt(vec![prompt_text.clone()], &mut *renderer, &permission_mode)
287                    .await;
288                renderer.done();
289
290                // Log conversation history (best-effort).
291                let now = SystemTime::now()
292                    .duration_since(UNIX_EPOCH)
293                    .unwrap_or_default()
294                    .as_secs();
295                let user_entry = ConversationEntry {
296                    role: "user".to_string(),
297                    content: prompt_text,
298                    timestamp: now,
299                };
300                let _ = append_entry(&key, &user_entry);
301
302                if let Ok(ref pr) = result
303                    && !pr.content.is_empty()
304                {
305                    let assistant_entry = ConversationEntry {
306                        role: "assistant".to_string(),
307                        content: pr.content.clone(),
308                        timestamp: now,
309                    };
310                    let _ = append_entry(&key, &assistant_entry);
311                }
312
313                return result.map(|_| 0);
314            }
315            Err(e) => {
316                // Socket connection failed despite valid lease.
317                if no_wait {
318                    return Err(AcpCliError::Usage(
319                        "No active session. Run without --no-wait first to start a session."
320                            .to_string(),
321                    ));
322                }
323                // Fall through to become a new queue owner (owner may have
324                // crashed after writing the lease).
325                renderer.session_info(&format!(
326                    "Could not connect to queue owner (pid {}): {e}; starting new session",
327                    lease.pid
328                ));
329            }
330        }
331    }
332
333    // --- Become the queue owner ---
334
335    // --no-wait requires an existing queue owner to accept the prompt. Since no
336    // valid lease was found, we cannot fire-and-forget.
337    if no_wait {
338        return Err(AcpCliError::Usage(
339            "No active session. Run without --no-wait first to start a session.".to_string(),
340        ));
341    }
342
343    // Write PID file so `cancel` and `status` commands can find us.
344    // The guard removes it automatically when this function returns.
345    let _pid_guard = PidGuard::new(&key).map_err(|e| {
346        renderer.error(&format!("failed to write pid file: {e}"));
347        AcpCliError::Io(e)
348    })?;
349
350    // Write lease file to claim queue ownership.
351    LeaseFile::write(&key).map_err(|e| {
352        renderer.error(&format!("failed to write lease file: {e}"));
353        AcpCliError::Io(e)
354    })?;
355
356    // Start IPC server so future clients can connect.
357    let listener = start_ipc_server(&key).await.map_err(|e| {
358        LeaseFile::remove(&key);
359        renderer.error(&format!("failed to start IPC server: {e}"));
360        AcpCliError::Io(e)
361    })?;
362
363    // Start bridge + send first prompt, with optional retry on transient failures.
364    // Each attempt starts a fresh bridge process. `Connection` errors (spawn failure,
365    // ACP init/session stall, bridge channel closed) are transient and retried.
366    // Semantic errors (Agent, PermissionDenied, Interrupted, Timeout) are not.
367    let total_attempts = prompt_retries.saturating_add(1);
368    let mut attempt = 0u32; // 0-based index of the current try
369    let (bridge, loop_result) = 'retry: loop {
370        let mut b = match AcpBridge::start(command.clone(), args.clone(), cwd.clone()).await {
371            Ok(b) => b,
372            Err(e) if is_transient(&e) && attempt < prompt_retries => {
373                let delay = backoff(attempt);
374                log_retry(attempt + 1, total_attempts, &e, delay);
375                attempt += 1;
376                tokio::time::sleep(delay).await;
377                continue 'retry;
378            }
379            Err(e) => {
380                LeaseFile::remove(&key);
381                crate::queue::ipc::cleanup_socket(&key);
382                return Err(e);
383            }
384        };
385
386        let cancel = b.cancel_handle();
387
388        // Send the first prompt (get oneshot receiver without blocking).
389        let prompt_reply = match b.send_prompt(vec![prompt_text.clone()]).await {
390            Ok(rx) => rx,
391            Err(e) if is_transient(&e) && attempt < prompt_retries => {
392                let _ = b.shutdown().await;
393                let delay = backoff(attempt);
394                log_retry(attempt + 1, total_attempts, &e, delay);
395                attempt += 1;
396                tokio::time::sleep(delay).await;
397                continue 'retry;
398            }
399            Err(e) => {
400                let _ = b.shutdown().await;
401                LeaseFile::remove(&key);
402                crate::queue::ipc::cleanup_socket(&key);
403                return Err(e);
404            }
405        };
406
407        // Run event loop for the first prompt.
408        let result = event_loop(
409            &mut b.evt_rx,
410            prompt_reply,
411            &cancel,
412            &mut renderer,
413            &permission_mode,
414            timeout_secs,
415        )
416        .await;
417
418        // Guard against retrying after side effects. Currently, event_loop only
419        // returns Err(Interrupted) or Err(Timeout), neither of which is transient,
420        // so this branch is effectively dead code today. It is kept to correctly
421        // handle any future transient variant added to event_loop. Connection errors
422        // arise only during initialization (before any text is produced), so retrying
423        // them cannot replay agent side effects.
424        match result {
425            Err(e) if is_transient(&e) && attempt < prompt_retries => {
426                let _ = b.shutdown().await;
427                let delay = backoff(attempt);
428                log_retry(attempt + 1, total_attempts, &e, delay);
429                attempt += 1;
430                tokio::time::sleep(delay).await;
431                continue 'retry;
432            }
433            result => break 'retry (b, result),
434        }
435    };
436
437    // Update the session record with the new ACP session ID (best-effort).
438    if let Ok(ref res) = loop_result
439        && let Some(ref new_acp_id) = res.acp_session_id
440        && let Ok(Some(mut record)) = SessionRecord::load(&sess_file)
441    {
442        let _ = record.update_acp_session_id(new_acp_id.clone(), &sess_file);
443    }
444
445    // Log conversation history (best-effort, don't fail the prompt on log errors)
446    if let Ok(ref res) = loop_result {
447        let now = SystemTime::now()
448            .duration_since(UNIX_EPOCH)
449            .unwrap_or_default()
450            .as_secs();
451
452        let user_entry = ConversationEntry {
453            role: "user".to_string(),
454            content: prompt_text,
455            timestamp: now,
456        };
457        let _ = append_entry(&key, &user_entry);
458
459        if !res.assistant_text.is_empty() {
460            let assistant_entry = ConversationEntry {
461                role: "assistant".to_string(),
462                content: res.assistant_text.clone(),
463                timestamp: now,
464            };
465            let _ = append_entry(&key, &assistant_entry);
466        }
467    }
468
469    // If the first prompt failed, clean up and return early.
470    if loop_result.is_err() {
471        LeaseFile::remove(&key);
472        crate::queue::ipc::cleanup_socket(&key);
473        let _ = bridge.shutdown().await;
474        return loop_result.map(|r| r.exit_code);
475    }
476
477    let first_exit_code = loop_result.map(|r| r.exit_code)?;
478
479    // First prompt succeeded — enter queue owner mode to serve subsequent
480    // prompts from IPC clients until idle timeout.
481    let owner = QueueOwner::new(bridge, listener, &key, queue_ttl_secs).await?;
482    let _ = owner.run().await;
483
484    Ok(first_exit_code)
485}
486
487/// Run a non-interactive exec command (no session persistence).
488#[allow(clippy::too_many_arguments)]
489pub async fn run_exec(
490    command: String,
491    args: Vec<String>,
492    cwd: PathBuf,
493    prompt_text: String,
494    permission_mode: PermissionMode,
495    output_format: &str,
496    timeout_secs: Option<u64>,
497    prompt_retries: u32,
498    suppress_reads: bool,
499) -> Result<i32> {
500    let mut renderer = make_renderer(output_format, suppress_reads);
501    let total_attempts = prompt_retries.saturating_add(1);
502    let mut attempt = 0u32;
503
504    loop {
505        let mut bridge = match AcpBridge::start(command.clone(), args.clone(), cwd.clone()).await {
506            Ok(b) => b,
507            Err(e) if is_transient(&e) && attempt < prompt_retries => {
508                let delay = backoff(attempt);
509                log_retry(attempt + 1, total_attempts, &e, delay);
510                attempt += 1;
511                tokio::time::sleep(delay).await;
512                continue;
513            }
514            Err(e) => return Err(e),
515        };
516
517        let cancel = bridge.cancel_handle();
518
519        let prompt_reply = match bridge.send_prompt(vec![prompt_text.clone()]).await {
520            Ok(rx) => rx,
521            Err(e) if is_transient(&e) && attempt < prompt_retries => {
522                let _ = bridge.shutdown().await;
523                let delay = backoff(attempt);
524                log_retry(attempt + 1, total_attempts, &e, delay);
525                attempt += 1;
526                tokio::time::sleep(delay).await;
527                continue;
528            }
529            Err(e) => {
530                let _ = bridge.shutdown().await;
531                return Err(e);
532            }
533        };
534
535        let result = event_loop(
536            &mut bridge.evt_rx,
537            prompt_reply,
538            &cancel,
539            &mut renderer,
540            &permission_mode,
541            timeout_secs,
542        )
543        .await;
544
545        // Mirror the event_loop retry from run_prompt for consistency.
546        // See comment there for why this branch is currently dead code.
547        match result {
548            Err(e) if is_transient(&e) && attempt < prompt_retries => {
549                let _ = bridge.shutdown().await;
550                let delay = backoff(attempt);
551                log_retry(attempt + 1, total_attempts, &e, delay);
552                attempt += 1;
553                tokio::time::sleep(delay).await;
554                continue;
555            }
556            result => {
557                let _ = bridge.shutdown().await;
558                return result.map(|r| r.exit_code);
559            }
560        }
561    }
562}