Skip to main content

acp_cli/cli/
prompt.rs

1use std::path::PathBuf;
2use std::time::{Duration, SystemTime, UNIX_EPOCH};
3
4use crate::bridge::events::{BridgeEvent, PromptResult};
5use crate::bridge::{AcpBridge, BridgeCancelHandle};
6use crate::client::permissions::{PermissionMode, resolve_permission};
7use crate::error::{AcpCliError, Result};
8use crate::output::OutputRenderer;
9use crate::output::json::JsonRenderer;
10use crate::output::quiet::QuietRenderer;
11use crate::output::text::TextRenderer;
12use crate::queue::client::QueueClient;
13use crate::queue::ipc::start_ipc_server;
14use crate::queue::lease::LeaseFile;
15use crate::queue::owner::QueueOwner;
16use crate::session::history::{ConversationEntry, append_entry};
17use crate::session::persistence::SessionRecord;
18use crate::session::pid;
19use crate::session::scoping::{find_git_root, session_dir, session_key};
20
21/// RAII guard that removes the PID file when dropped, ensuring cleanup even on
22/// early returns or panics.
23struct PidGuard {
24    session_key: String,
25}
26
27impl PidGuard {
28    fn new(session_key: &str) -> std::io::Result<Self> {
29        pid::write_pid(session_key)?;
30        Ok(Self {
31            session_key: session_key.to_string(),
32        })
33    }
34}
35
36impl Drop for PidGuard {
37    fn drop(&mut self) {
38        let _ = pid::remove_pid(&self.session_key);
39    }
40}
41
42/// Build a renderer from the format string.
43fn make_renderer(output_format: &str) -> Box<dyn OutputRenderer> {
44    match output_format {
45        "json" => Box::new(JsonRenderer::new()),
46        "quiet" => Box::new(QuietRenderer::new()),
47        _ => Box::new(TextRenderer::new()),
48    }
49}
50
51/// Result from the event loop, including the exit code and collected assistant text.
52struct EventLoopResult {
53    exit_code: i32,
54    assistant_text: String,
55    /// The ACP session ID emitted by the bridge (if any).
56    acp_session_id: Option<String>,
57}
58
59/// Core event loop shared by `run_prompt` and `run_exec`.
60///
61/// Drives the bridge's event channel concurrently with the prompt reply oneshot.
62/// Handles Ctrl-C (graceful cancel on first press, force quit on second) and
63/// enforces an optional timeout. Collects all TextChunk content for conversation logging.
64async fn event_loop(
65    evt_rx: &mut tokio::sync::mpsc::UnboundedReceiver<BridgeEvent>,
66    prompt_reply: tokio::sync::oneshot::Receiver<Result<PromptResult>>,
67    cancel: &BridgeCancelHandle,
68    renderer: &mut Box<dyn OutputRenderer>,
69    permission_mode: &PermissionMode,
70    timeout_secs: Option<u64>,
71) -> Result<EventLoopResult> {
72    let mut cancel_sent = false;
73    let mut collected_text = String::new();
74    let mut acp_session_id: Option<String> = None;
75
76    // Timeout: either sleep for the given duration, or pend forever.
77    let timeout_fut = async {
78        match timeout_secs {
79            Some(secs) => tokio::time::sleep(Duration::from_secs(secs)).await,
80            None => std::future::pending::<()>().await,
81        }
82    };
83    tokio::pin!(timeout_fut);
84    tokio::pin!(prompt_reply);
85
86    loop {
87        tokio::select! {
88            event = evt_rx.recv() => {
89                match event {
90                    Some(BridgeEvent::TextChunk { text }) => {
91                        collected_text.push_str(&text);
92                        renderer.text_chunk(&text);
93                    }
94                    Some(BridgeEvent::ToolUse { name }) => renderer.tool_status(&name),
95                    Some(BridgeEvent::PermissionRequest { tool, options, reply }) => {
96                        let decision = resolve_permission(&tool, &options, permission_mode);
97                        if matches!(decision, crate::bridge::PermissionOutcome::Cancelled) {
98                            renderer.permission_denied(&tool.name);
99                        }
100                        let _ = reply.send(decision);
101                    }
102                    Some(BridgeEvent::SessionCreated { session_id }) => {
103                        acp_session_id = Some(session_id.clone());
104                        renderer.session_info(&session_id);
105                    }
106                    Some(BridgeEvent::PromptDone { .. }) => {
107                        // Prompt finished on ACP side; continue draining events.
108                    }
109                    Some(BridgeEvent::Error { message }) => {
110                        renderer.error(&message);
111                    }
112                    Some(BridgeEvent::AgentExited { code }) => {
113                        if let Some(c) = code
114                            && c != 0
115                        {
116                            renderer.error(&format!("agent exited with code {c}"));
117                        }
118                    }
119                    None => break, // channel closed — agent done
120                }
121            }
122            result = &mut prompt_reply => {
123                // Prompt RPC completed (oneshot reply from bridge thread).
124                renderer.done();
125                return match result {
126                    Ok(Ok(_)) => Ok(EventLoopResult { exit_code: 0, assistant_text: collected_text, acp_session_id: acp_session_id.clone() }),
127                    Ok(Err(e)) => {
128                        renderer.error(&e.to_string());
129                        Ok(EventLoopResult { exit_code: e.exit_code(), assistant_text: collected_text, acp_session_id: acp_session_id.clone() })
130                    }
131                    Err(_) => {
132                        // Oneshot sender dropped — bridge died unexpectedly
133                        renderer.error("bridge connection lost");
134                        Ok(EventLoopResult { exit_code: 1, assistant_text: collected_text, acp_session_id: acp_session_id.clone() })
135                    }
136                };
137            }
138            _ = tokio::signal::ctrl_c() => {
139                if cancel_sent {
140                    // Second Ctrl+C — force quit
141                    return Err(AcpCliError::Interrupted);
142                }
143                cancel_sent = true;
144                eprintln!("\nCancelling... (press Ctrl+C again to force quit)");
145                let _ = cancel.cancel().await;
146            }
147            _ = &mut timeout_fut => {
148                eprintln!("\nTimeout after {}s", timeout_secs.unwrap_or(0));
149                let _ = cancel.cancel().await;
150                tokio::time::sleep(Duration::from_secs(3)).await;
151                return Err(AcpCliError::Timeout(timeout_secs.unwrap_or(0)));
152            }
153        }
154    }
155
156    renderer.done();
157    Ok(EventLoopResult {
158        exit_code: 0,
159        assistant_text: collected_text,
160        acp_session_id,
161    })
162}
163
164/// Run an interactive or piped prompt session.
165///
166/// Resolves or creates a session, starts the ACP bridge, sends the prompt, and
167/// enters the event loop with signal handling and optional timeout. After the
168/// first prompt completes, becomes the queue owner — listening on a Unix socket
169/// for subsequent prompts from IPC clients until idle timeout.
170#[allow(clippy::too_many_arguments)]
171pub async fn run_prompt(
172    agent_name: &str,
173    command: String,
174    args: Vec<String>,
175    cwd: PathBuf,
176    prompt_text: String,
177    session_name: Option<String>,
178    permission_mode: PermissionMode,
179    output_format: &str,
180    timeout_secs: Option<u64>,
181    no_wait: bool,
182) -> Result<i32> {
183    let mut renderer = make_renderer(output_format);
184
185    // Default TTL for the queue owner (5 minutes).
186    let queue_ttl_secs: u64 = 300;
187
188    // Find or create session record.
189    // If an existing open session is found, reuse its key so conversation
190    // history accumulates across multiple runs.
191    let resolved_dir = find_git_root(&cwd).unwrap_or_else(|| cwd.clone());
192    let dir_str = resolved_dir.to_string_lossy();
193    let sess_name = session_name.as_deref().unwrap_or("");
194    let key = session_key(agent_name, &dir_str, sess_name);
195
196    let sess_file = session_dir().join(format!("{key}.json"));
197    let existing = SessionRecord::load(&sess_file).ok().flatten();
198    let is_resume = existing.as_ref().is_some_and(|r| !r.closed);
199
200    if existing.is_none() {
201        let now = std::time::SystemTime::now()
202            .duration_since(std::time::UNIX_EPOCH)
203            .unwrap_or_default()
204            .as_secs();
205        let record = SessionRecord {
206            id: key.clone(),
207            agent: agent_name.to_string(),
208            cwd: resolved_dir,
209            name: session_name,
210            created_at: now,
211            closed: false,
212            acp_session_id: None,
213        };
214        if let Err(e) = record.save(&sess_file) {
215            renderer.error(&format!("failed to save session: {e}"));
216        }
217    }
218
219    if is_resume {
220        renderer.session_info(&format!("resuming session {}", &key[..12.min(key.len())]));
221    }
222
223    // Check if a queue owner already exists for this session.
224    // Use the lease file (with heartbeat + PID liveness check) for reliable
225    // ownership detection. If a valid lease exists, connect as a queue client
226    // instead of starting a new bridge.
227    if let Some(lease) = LeaseFile::read(&key)
228        && lease.is_valid(queue_ttl_secs)
229    {
230        match QueueClient::connect(&key).await {
231            Ok(mut client) => {
232                renderer.session_info("Connected to queue owner");
233
234                // --no-wait: enqueue the prompt and return immediately.
235                if no_wait {
236                    let position = client.enqueue_only(vec![prompt_text.clone()]).await?;
237                    renderer.session_info(&format!("Prompt queued (position {position})"));
238                    renderer.done();
239
240                    // Log the user prompt (best-effort). No assistant entry since
241                    // we won't wait for the response.
242                    let now = SystemTime::now()
243                        .duration_since(UNIX_EPOCH)
244                        .unwrap_or_default()
245                        .as_secs();
246                    let user_entry = ConversationEntry {
247                        role: "user".to_string(),
248                        content: prompt_text,
249                        timestamp: now,
250                    };
251                    let _ = append_entry(&key, &user_entry);
252
253                    return Ok(0);
254                }
255
256                let result = client
257                    .prompt(vec![prompt_text.clone()], &mut *renderer, &permission_mode)
258                    .await;
259                renderer.done();
260
261                // Log conversation history (best-effort).
262                let now = SystemTime::now()
263                    .duration_since(UNIX_EPOCH)
264                    .unwrap_or_default()
265                    .as_secs();
266                let user_entry = ConversationEntry {
267                    role: "user".to_string(),
268                    content: prompt_text,
269                    timestamp: now,
270                };
271                let _ = append_entry(&key, &user_entry);
272
273                if let Ok(ref pr) = result
274                    && !pr.content.is_empty()
275                {
276                    let assistant_entry = ConversationEntry {
277                        role: "assistant".to_string(),
278                        content: pr.content.clone(),
279                        timestamp: now,
280                    };
281                    let _ = append_entry(&key, &assistant_entry);
282                }
283
284                return result.map(|_| 0);
285            }
286            Err(e) => {
287                // Socket connection failed despite valid lease.
288                if no_wait {
289                    return Err(AcpCliError::Usage(
290                        "No active session. Run without --no-wait first to start a session."
291                            .to_string(),
292                    ));
293                }
294                // Fall through to become a new queue owner (owner may have
295                // crashed after writing the lease).
296                renderer.session_info(&format!(
297                    "Could not connect to queue owner (pid {}): {e}; starting new session",
298                    lease.pid
299                ));
300            }
301        }
302    }
303
304    // --- Become the queue owner ---
305
306    // --no-wait requires an existing queue owner to accept the prompt. Since no
307    // valid lease was found, we cannot fire-and-forget.
308    if no_wait {
309        return Err(AcpCliError::Usage(
310            "No active session. Run without --no-wait first to start a session.".to_string(),
311        ));
312    }
313
314    // Write PID file so `cancel` and `status` commands can find us.
315    // The guard removes it automatically when this function returns.
316    let _pid_guard = PidGuard::new(&key).map_err(|e| {
317        renderer.error(&format!("failed to write pid file: {e}"));
318        AcpCliError::Io(e)
319    })?;
320
321    // Write lease file to claim queue ownership.
322    LeaseFile::write(&key).map_err(|e| {
323        renderer.error(&format!("failed to write lease file: {e}"));
324        AcpCliError::Io(e)
325    })?;
326
327    // Start IPC server so future clients can connect.
328    let listener = start_ipc_server(&key).await.map_err(|e| {
329        LeaseFile::remove(&key);
330        renderer.error(&format!("failed to start IPC server: {e}"));
331        AcpCliError::Io(e)
332    })?;
333
334    // Start bridge
335    let mut bridge = AcpBridge::start(command, args, cwd).await?;
336    let cancel = bridge.cancel_handle();
337
338    // Send the first prompt (get oneshot receiver without blocking)
339    let prompt_reply = bridge.send_prompt(vec![prompt_text.clone()]).await?;
340
341    // Run event loop for the first prompt
342    let loop_result = event_loop(
343        &mut bridge.evt_rx,
344        prompt_reply,
345        &cancel,
346        &mut renderer,
347        &permission_mode,
348        timeout_secs,
349    )
350    .await;
351
352    // Update the session record with the new ACP session ID (best-effort).
353    if let Ok(ref res) = loop_result
354        && let Some(ref new_acp_id) = res.acp_session_id
355        && let Ok(Some(mut record)) = SessionRecord::load(&sess_file)
356    {
357        let _ = record.update_acp_session_id(new_acp_id.clone(), &sess_file);
358    }
359
360    // Log conversation history (best-effort, don't fail the prompt on log errors)
361    if let Ok(ref res) = loop_result {
362        let now = SystemTime::now()
363            .duration_since(UNIX_EPOCH)
364            .unwrap_or_default()
365            .as_secs();
366
367        let user_entry = ConversationEntry {
368            role: "user".to_string(),
369            content: prompt_text,
370            timestamp: now,
371        };
372        let _ = append_entry(&key, &user_entry);
373
374        if !res.assistant_text.is_empty() {
375            let assistant_entry = ConversationEntry {
376                role: "assistant".to_string(),
377                content: res.assistant_text.clone(),
378                timestamp: now,
379            };
380            let _ = append_entry(&key, &assistant_entry);
381        }
382    }
383
384    // If the first prompt failed, clean up and return early.
385    if loop_result.is_err() {
386        LeaseFile::remove(&key);
387        crate::queue::ipc::cleanup_socket(&key);
388        let _ = bridge.shutdown().await;
389        return loop_result.map(|r| r.exit_code);
390    }
391
392    let first_exit_code = loop_result.map(|r| r.exit_code)?;
393
394    // First prompt succeeded — enter queue owner mode to serve subsequent
395    // prompts from IPC clients until idle timeout.
396    let owner = QueueOwner::new(bridge, listener, &key, queue_ttl_secs).await?;
397    let _ = owner.run().await;
398
399    Ok(first_exit_code)
400}
401
402/// Run a non-interactive exec command (no session persistence).
403pub async fn run_exec(
404    command: String,
405    args: Vec<String>,
406    cwd: PathBuf,
407    prompt_text: String,
408    permission_mode: PermissionMode,
409    output_format: &str,
410    timeout_secs: Option<u64>,
411) -> Result<i32> {
412    let mut renderer = make_renderer(output_format);
413
414    // Start bridge
415    let mut bridge = AcpBridge::start(command, args, cwd).await?;
416    let cancel = bridge.cancel_handle();
417
418    // Send prompt
419    let prompt_reply = bridge.send_prompt(vec![prompt_text]).await?;
420
421    // Run event loop
422    let result = event_loop(
423        &mut bridge.evt_rx,
424        prompt_reply,
425        &cancel,
426        &mut renderer,
427        &permission_mode,
428        timeout_secs,
429    )
430    .await;
431
432    let _ = bridge.shutdown().await;
433    result.map(|r| r.exit_code)
434}