Skip to main content

opi_coding_agent/
rpc.rs

1//! RPC JSONL mode: bidirectional command/event protocol over stdin/stdout.
2//!
3//! RPC mode enables headless operation of the coding agent via a strict JSONL
4//! protocol. Commands arrive on stdin (one JSON object per line), responses
5//! and events are emitted on stdout (one JSON object per line). Diagnostics
6//! go to stderr.
7//!
8//! # Protocol version
9//!
10//! This is an unstable 0.x protocol. The schema may change between minor
11//! versions without notice. Clients MUST check `schema_version` in the
12//! `rpc_ready` header.
13//!
14//! # Framing
15//!
16//! LF (`\n`) is the only record delimiter. Clients MUST split on `\n` only
17//! and SHOULD strip a trailing `\r` if present.
18//!
19//! # Commands
20//!
21//! | Command           | Description                                      |
22//! |-------------------|--------------------------------------------------|
23//! | `prompt`          | Send user prompt, stream agent events            |
24//! | `continue`        | Continue conversation with additional text       |
25//! | `steer`           | Queue steering message during agent operation    |
26//! | `follow_up`       | Queue follow-up message for after agent stops    |
27//! | `abort`           | Cancel current agent operation                   |
28//! | `set_model`       | Switch provider:model                            |
29//! | `set_thinking_level` | Set reasoning/thinking level                  |
30//! | `compact`         | Trigger manual compaction                        |
31//! | `session_info`    | Query session metadata                           |
32//! | `quit`            | Shut down the RPC session                        |
33//!
34//! # Responses and Errors
35//!
36//! Every command produces at most one `response` object. For `prompt` and
37//! `continue`, `success: true` means the turn was accepted; subsequent agent
38//! output arrives as asynchronous event lines. Errors after acceptance are
39//! surfaced as events, not as a second response.
40//!
41//! `abort` cancels the active operation and succeeds immediately when a turn is
42//! running. A second `abort` while idle is a successful no-op.
43
44use std::io::{self, BufRead, Write as IoWrite};
45use std::path::PathBuf;
46use std::sync::Arc;
47use std::time::Duration;
48
49use opi_agent::agent::AgentControl;
50use opi_agent::event::AgentEvent;
51use opi_agent::loop_types::AgentError;
52use opi_agent::message::AgentMessage;
53use opi_agent::sdk::{SDK_SCHEMA_VERSION, SdkCommand, SdkResponse, agent_event_to_value};
54use opi_agent::session_event::CompactionReason;
55use opi_ai::provider::Provider;
56
57use crate::config::OpiConfig;
58use crate::harness::CodingHarness;
59use crate::policy::{RunMode, ToolSelection};
60use crate::runner::ExitCode;
61
62const ACTIVE_RUN_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
63
64/// Re-export the SDK command type as the RPC command type.
65pub type RpcCommand = SdkCommand;
66
67/// Re-export the SDK schema version for crate-level access (e.g. tests).
68pub const RPC_SCHEMA_VERSION: u32 = SDK_SCHEMA_VERSION;
69
70enum RpcInput {
71    Command(SdkCommand),
72    ParseError(String),
73}
74
75enum ActiveRun {
76    Prompt(String),
77    Continue(String),
78}
79
80type RunResult = (CodingHarness, Result<Vec<AgentMessage>, AgentError>);
81
82/// RPC runner that owns the harness and processes commands.
83pub struct RpcRunner {
84    harness: Option<CodingHarness>,
85    control: AgentControl,
86    running: bool,
87}
88
89impl RpcRunner {
90    /// Create a new RPC runner.
91    #[allow(clippy::too_many_arguments)]
92    pub fn new(
93        provider: Box<dyn Provider>,
94        model: String,
95        config: OpiConfig,
96        workspace_root: PathBuf,
97        allow_mutating: bool,
98        tool_selection: ToolSelection,
99        user_system_prompt: Option<String>,
100        initial_messages: Vec<AgentMessage>,
101    ) -> Result<Self, crate::policy::ToolPolicyError> {
102        let tool_config = crate::policy::ToolRuntimeConfig::resolve(
103            RunMode::NonInteractive,
104            allow_mutating,
105            tool_selection,
106        )?;
107        let hooks = Box::new(crate::runner::NonInteractiveHooks::new(allow_mutating));
108        let harness = CodingHarness::new_with_hooks_and_resume_tool_config(
109            provider,
110            model,
111            config,
112            workspace_root,
113            hooks,
114            user_system_prompt,
115            initial_messages,
116            None,
117            tool_config,
118        );
119        let control = harness.control_handle();
120        Ok(Self {
121            harness: Some(harness),
122            control,
123            running: false,
124        })
125    }
126
127    /// Return the assembled system prompt while the runner is idle.
128    pub fn system_prompt(&self) -> Option<&str> {
129        self.harness.as_ref().map(CodingHarness::system_prompt)
130    }
131
132    /// Run the RPC main loop over stdin/stdout. Returns an exit code.
133    pub async fn run(&mut self) -> i32 {
134        let (input_tx, input_rx) = tokio::sync::mpsc::unbounded_channel();
135        tokio::task::spawn_blocking(move || {
136            let stdin = io::stdin();
137            let reader = io::BufReader::new(stdin.lock());
138            for line in reader.lines() {
139                let line = match line {
140                    Ok(line) => line,
141                    Err(_) => break,
142                };
143                let trimmed = line.trim_end_matches('\r').trim();
144                if trimmed.is_empty() {
145                    continue;
146                }
147                let input = match serde_json::from_str::<SdkCommand>(trimmed) {
148                    Ok(command) => RpcInput::Command(command),
149                    Err(e) => RpcInput::ParseError(format!("failed to parse command: {e}")),
150                };
151                if input_tx.send(input).is_err() {
152                    break;
153                }
154            }
155        });
156
157        let stdout = io::stdout();
158        let mut writer = io::BufWriter::new(stdout.lock());
159        self.run_loop(input_rx, |value| {
160            write_jsonl(&mut writer, value)
161                .and_then(|_| writer.flush())
162                .is_ok()
163        })
164        .await
165    }
166
167    /// Run the RPC main loop with in-process command and output channels.
168    ///
169    /// This is intended for tests and SDK-style embedders that already have
170    /// structured commands. Stdin parsing is covered by `run`.
171    pub async fn run_with_channels(
172        &mut self,
173        mut command_rx: tokio::sync::mpsc::UnboundedReceiver<SdkCommand>,
174        output_tx: tokio::sync::mpsc::UnboundedSender<serde_json::Value>,
175    ) -> i32 {
176        let (input_tx, input_rx) = tokio::sync::mpsc::unbounded_channel();
177        tokio::spawn(async move {
178            while let Some(command) = command_rx.recv().await {
179                if input_tx.send(RpcInput::Command(command)).is_err() {
180                    break;
181                }
182            }
183        });
184
185        self.run_loop(input_rx, |value| output_tx.send(value.clone()).is_ok())
186            .await
187    }
188
189    async fn run_loop(
190        &mut self,
191        mut input_rx: tokio::sync::mpsc::UnboundedReceiver<RpcInput>,
192        mut emit: impl FnMut(&serde_json::Value) -> bool,
193    ) -> i32 {
194        let header = serde_json::json!({
195            "type": "rpc_ready",
196            "schema_version": SDK_SCHEMA_VERSION,
197            "mode": "rpc",
198            "version": env!("CARGO_PKG_VERSION"),
199        });
200        if !emit(&header) {
201            return ExitCode::RuntimeFailure as i32;
202        }
203
204        let (event_tx, mut event_rx) = tokio::sync::mpsc::unbounded_channel::<serde_json::Value>();
205        let event_tx = Arc::new(event_tx);
206        if let Some(harness) = self.harness.as_mut() {
207            let etx = event_tx.clone();
208            harness.subscribe(Box::new(move |event: &AgentEvent| {
209                let _ = etx.send(agent_event_to_value(event));
210            }));
211        }
212
213        let mut run_task: Option<tokio::task::JoinHandle<RunResult>> = None;
214
215        loop {
216            tokio::select! {
217                Some(event) = event_rx.recv() => {
218                    if !emit(&event) {
219                        return self
220                            .runtime_failure_after_emit_failure(
221                                &mut run_task,
222                                &mut event_rx,
223                                &mut emit,
224                            )
225                            .await;
226                    }
227                }
228                input = input_rx.recv() => {
229                    match input {
230                        None => {
231                            if !self
232                                .shutdown_active_run(&mut run_task, &mut event_rx, &mut emit)
233                                .await
234                            {
235                                return ExitCode::RuntimeFailure as i32;
236                            }
237                            drain_events(&mut event_rx, &mut emit);
238                            return ExitCode::Success as i32;
239                        }
240                        Some(input) => match input {
241                        RpcInput::ParseError(message) => {
242                            let resp = response_error(None, "parse", &message);
243                            if !emit(&resp) {
244                                return self
245                                    .runtime_failure_after_emit_failure(
246                                        &mut run_task,
247                                        &mut event_rx,
248                                        &mut emit,
249                                    )
250                                    .await;
251                            }
252                        }
253                        RpcInput::Command(command) => {
254                            if command.is_quit() {
255                                let cmd_id = command.id().map(String::from);
256                                let cmd_name = command.command_name();
257                                let resp = response_success(cmd_id.as_deref(), cmd_name);
258                                if !emit(&resp) {
259                                    return self
260                                        .runtime_failure_after_emit_failure(
261                                            &mut run_task,
262                                            &mut event_rx,
263                                            &mut emit,
264                                        )
265                                        .await;
266                                }
267                                if !self
268                                    .shutdown_active_run(&mut run_task, &mut event_rx, &mut emit)
269                                    .await
270                                {
271                                    return ExitCode::RuntimeFailure as i32;
272                                }
273                                drain_events(&mut event_rx, &mut emit);
274                                return ExitCode::Success as i32;
275                            }
276
277                            if !self.handle_command(command, &mut run_task, &mut emit) {
278                                let _ = self
279                                    .shutdown_active_run(&mut run_task, &mut event_rx, &mut emit)
280                                    .await;
281                                return ExitCode::RuntimeFailure as i32;
282                            }
283                        }
284                        },
285                    }
286                }
287                joined = async {
288                    match run_task.as_mut() {
289                        Some(task) => task.await,
290                        None => std::future::pending().await,
291                    }
292                }, if run_task.is_some() => {
293                    let _ = run_task.take();
294                    if !self.complete_run_task(joined, &mut emit) {
295                        return ExitCode::RuntimeFailure as i32;
296                    }
297                    drain_events(&mut event_rx, &mut emit);
298                }
299                else => {
300                    if !self
301                        .shutdown_active_run(&mut run_task, &mut event_rx, &mut emit)
302                        .await
303                    {
304                        return ExitCode::RuntimeFailure as i32;
305                    }
306                    drain_events(&mut event_rx, &mut emit);
307                    return ExitCode::Success as i32;
308                }
309            }
310        }
311    }
312
313    async fn runtime_failure_after_emit_failure(
314        &mut self,
315        run_task: &mut Option<tokio::task::JoinHandle<RunResult>>,
316        event_rx: &mut tokio::sync::mpsc::UnboundedReceiver<serde_json::Value>,
317        emit: &mut impl FnMut(&serde_json::Value) -> bool,
318    ) -> i32 {
319        let _ = self.shutdown_active_run(run_task, event_rx, emit).await;
320        ExitCode::RuntimeFailure as i32
321    }
322
323    async fn shutdown_active_run(
324        &mut self,
325        run_task: &mut Option<tokio::task::JoinHandle<RunResult>>,
326        event_rx: &mut tokio::sync::mpsc::UnboundedReceiver<serde_json::Value>,
327        emit: &mut impl FnMut(&serde_json::Value) -> bool,
328    ) -> bool {
329        if self.running {
330            self.control.abort();
331        }
332
333        let Some(mut task) = run_task.take() else {
334            self.running = false;
335            return true;
336        };
337
338        match tokio::time::timeout(ACTIVE_RUN_SHUTDOWN_TIMEOUT, &mut task).await {
339            Ok(joined) => {
340                let ok = self.complete_run_task(joined, emit);
341                drain_events(event_rx, emit);
342                ok
343            }
344            Err(_) => {
345                task.abort();
346                let joined = task.await;
347                let ok = self.complete_run_task(joined, emit);
348                let timeout_event = serde_json::json!({
349                    "type": "SessionPersistError",
350                    "message": "rpc active run did not stop before shutdown timeout; task aborted",
351                });
352                drain_events(event_rx, emit);
353                ok && emit(&timeout_event)
354            }
355        }
356    }
357
358    fn complete_run_task(
359        &mut self,
360        joined: Result<RunResult, tokio::task::JoinError>,
361        emit: &mut impl FnMut(&serde_json::Value) -> bool,
362    ) -> bool {
363        self.running = false;
364        match joined {
365            Ok((harness, result)) => {
366                self.harness = Some(harness);
367                self.handle_agent_result(result);
368                true
369            }
370            Err(e) => {
371                let event = serde_json::json!({
372                    "type": "SessionPersistError",
373                    "message": format!("rpc run task failed: {e}"),
374                });
375                let _ = emit(&event);
376                false
377            }
378        }
379    }
380
381    fn handle_command(
382        &mut self,
383        command: SdkCommand,
384        run_task: &mut Option<tokio::task::JoinHandle<RunResult>>,
385        emit: &mut impl FnMut(&serde_json::Value) -> bool,
386    ) -> bool {
387        let cmd_id = command.id().map(String::from);
388        let cmd_name = command.command_name();
389
390        match command {
391            SdkCommand::prompt { message, .. } => self.start_run(
392                ActiveRun::Prompt(message),
393                cmd_id.as_deref(),
394                cmd_name,
395                run_task,
396                emit,
397            ),
398            SdkCommand::continue_ { message, .. } => self.start_run(
399                ActiveRun::Continue(message),
400                cmd_id.as_deref(),
401                cmd_name,
402                run_task,
403                emit,
404            ),
405            SdkCommand::abort { .. } => {
406                if self.running {
407                    self.control.abort();
408                }
409                emit(&response_success(cmd_id.as_deref(), cmd_name))
410            }
411            SdkCommand::steer { message, .. } => {
412                if self.running {
413                    self.control.steer(message);
414                } else if let Some(harness) = self.harness.as_ref() {
415                    harness.steer(message);
416                }
417                emit(&response_success(cmd_id.as_deref(), cmd_name))
418            }
419            SdkCommand::follow_up { message, .. } => {
420                if self.running {
421                    self.control.follow_up(message);
422                } else if let Some(harness) = self.harness.as_ref() {
423                    harness.follow_up(message);
424                }
425                emit(&response_success(cmd_id.as_deref(), cmd_name))
426            }
427            SdkCommand::set_model { model, .. } => {
428                if self.running {
429                    return emit(&response_error(
430                        cmd_id.as_deref(),
431                        cmd_name,
432                        "cannot change model while agent is running",
433                    ));
434                }
435                if let Some(harness) = self.harness.as_mut() {
436                    match harness.set_model_validated(model) {
437                        Ok(model) => {
438                            let data = serde_json::json!({ "model": model });
439                            emit(&response_success_with_data(
440                                cmd_id.as_deref(),
441                                cmd_name,
442                                data,
443                            ))
444                        }
445                        Err(e) => emit(&response_error(cmd_id.as_deref(), cmd_name, &e)),
446                    }
447                } else {
448                    emit(&response_error(
449                        cmd_id.as_deref(),
450                        cmd_name,
451                        "agent harness is unavailable",
452                    ))
453                }
454            }
455            SdkCommand::set_thinking_level { level, .. } => {
456                if self.running {
457                    return emit(&response_error(
458                        cmd_id.as_deref(),
459                        cmd_name,
460                        "cannot change thinking level while agent is running",
461                    ));
462                }
463                let Some(harness) = self.harness.as_mut() else {
464                    return emit(&response_error(
465                        cmd_id.as_deref(),
466                        cmd_name,
467                        "agent harness is unavailable",
468                    ));
469                };
470                match harness.set_thinking_level(&level) {
471                    Ok(state) => {
472                        let data = serde_json::json!({
473                            "level": state.level,
474                            "enabled": state.enabled,
475                            "budget_tokens": state.budget_tokens,
476                        });
477                        emit(&response_success_with_data(
478                            cmd_id.as_deref(),
479                            cmd_name,
480                            data,
481                        ))
482                    }
483                    Err(e) => emit(&response_error(cmd_id.as_deref(), cmd_name, &e)),
484                }
485            }
486            SdkCommand::compact { .. } => {
487                if self.running {
488                    return emit(&response_error(
489                        cmd_id.as_deref(),
490                        cmd_name,
491                        "cannot compact while agent is running",
492                    ));
493                }
494                let Some(harness) = self.harness.as_mut() else {
495                    return emit(&response_error(
496                        cmd_id.as_deref(),
497                        cmd_name,
498                        "agent harness is unavailable",
499                    ));
500                };
501                match harness.compact(CompactionReason::Manual) {
502                    Ok(Some(result)) => {
503                        let data = serde_json::json!({
504                            "summary": result.summary,
505                            "first_kept_entry_id": result.first_kept_entry_id,
506                            "tokens_before": result.tokens_before,
507                            "tokens_after": result.tokens_after,
508                        });
509                        emit(&response_success_with_data(
510                            cmd_id.as_deref(),
511                            cmd_name,
512                            data,
513                        ))
514                    }
515                    Ok(None) => emit(&response_error(
516                        cmd_id.as_deref(),
517                        cmd_name,
518                        "compaction produced no output",
519                    )),
520                    Err(e) => emit(&response_error(cmd_id.as_deref(), cmd_name, &e)),
521                }
522            }
523            SdkCommand::session_info { .. } => {
524                if self.running {
525                    return emit(&response_error(
526                        cmd_id.as_deref(),
527                        cmd_name,
528                        "cannot query session info while agent is running",
529                    ));
530                }
531                let Some(harness) = self.harness.as_ref() else {
532                    return emit(&response_error(
533                        cmd_id.as_deref(),
534                        cmd_name,
535                        "agent harness is unavailable",
536                    ));
537                };
538                let mut data = serde_json::json!({
539                    "model": harness.model(),
540                    "resources": harness.resource_metadata_json(),
541                });
542                if let Some(session) = harness.session() {
543                    data["session_id"] = serde_json::Value::String(session.session_id().to_owned());
544                }
545                emit(&response_success_with_data(
546                    cmd_id.as_deref(),
547                    cmd_name,
548                    data,
549                ))
550            }
551            SdkCommand::quit { .. } => true,
552        }
553    }
554
555    fn start_run(
556        &mut self,
557        run: ActiveRun,
558        id: Option<&str>,
559        command: &str,
560        run_task: &mut Option<tokio::task::JoinHandle<RunResult>>,
561        emit: &mut impl FnMut(&serde_json::Value) -> bool,
562    ) -> bool {
563        if self.running {
564            return emit(&response_error(
565                id,
566                command,
567                "agent is already running; use steer or follow_up to queue messages",
568            ));
569        }
570
571        if self.harness.is_none() {
572            return emit(&response_error(id, command, "agent harness is unavailable"));
573        }
574
575        if !emit(&response_success(id, command)) {
576            return false;
577        }
578
579        let mut harness = self.harness.take().expect("harness checked above");
580        harness.reset_cancel_if_cancelled();
581        self.control = harness.control_handle();
582        self.running = true;
583
584        *run_task = Some(tokio::spawn(async move {
585            let result = match run {
586                ActiveRun::Prompt(message) => harness.prompt(&message).await,
587                ActiveRun::Continue(message) => harness.continue_(&message).await,
588            };
589            (harness, result)
590        }));
591        true
592    }
593
594    fn handle_agent_result(&self, result: Result<Vec<AgentMessage>, AgentError>) {
595        match result {
596            Ok(_) | Err(AgentError::Cancelled) => {}
597            Err(_) => {}
598        }
599    }
600}
601
602fn response_success(id: Option<&str>, command: &str) -> serde_json::Value {
603    serde_json::to_value(SdkResponse::success(id, command)).unwrap()
604}
605
606fn response_success_with_data(
607    id: Option<&str>,
608    command: &str,
609    data: serde_json::Value,
610) -> serde_json::Value {
611    serde_json::to_value(SdkResponse::success_with_data(id, command, data)).unwrap()
612}
613
614fn response_error(id: Option<&str>, command: &str, message: &str) -> serde_json::Value {
615    serde_json::to_value(SdkResponse::error(id, command, message)).unwrap()
616}
617
618/// Write a JSON value as a single line to the writer.
619fn write_jsonl(writer: &mut dyn IoWrite, value: &serde_json::Value) -> io::Result<()> {
620    serde_json::to_writer(&mut *writer, value)?;
621    writer.write_all(b"\n")
622}
623
624fn drain_events(
625    rx: &mut tokio::sync::mpsc::UnboundedReceiver<serde_json::Value>,
626    emit: &mut impl FnMut(&serde_json::Value) -> bool,
627) {
628    while let Ok(event) = rx.try_recv() {
629        if !emit(&event) {
630            break;
631        }
632    }
633}