1use std::io::{self, BufRead, Write as IoWrite};
45use std::path::PathBuf;
46use std::sync::Arc;
47use std::time::Duration;
48
49use opi_agent::agent::AgentControl;
50use opi_agent::event::AgentEvent;
51use opi_agent::loop_types::AgentError;
52use opi_agent::message::AgentMessage;
53use opi_agent::sdk::{SDK_SCHEMA_VERSION, SdkCommand, SdkResponse, agent_event_to_value};
54use opi_agent::session_event::CompactionReason;
55use opi_ai::provider::Provider;
56
57use crate::config::OpiConfig;
58use crate::harness::CodingHarness;
59use crate::policy::{RunMode, ToolSelection};
60use crate::runner::ExitCode;
61
62const ACTIVE_RUN_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
63
64pub type RpcCommand = SdkCommand;
66
67pub const RPC_SCHEMA_VERSION: u32 = SDK_SCHEMA_VERSION;
69
70enum RpcInput {
71 Command(SdkCommand),
72 ParseError(String),
73}
74
75enum ActiveRun {
76 Prompt(String),
77 Continue(String),
78}
79
80type RunResult = (CodingHarness, Result<Vec<AgentMessage>, AgentError>);
81
82pub struct RpcRunner {
84 harness: Option<CodingHarness>,
85 control: AgentControl,
86 running: bool,
87}
88
89impl RpcRunner {
90 #[allow(clippy::too_many_arguments)]
92 pub fn new(
93 provider: Box<dyn Provider>,
94 model: String,
95 config: OpiConfig,
96 workspace_root: PathBuf,
97 allow_mutating: bool,
98 tool_selection: ToolSelection,
99 user_system_prompt: Option<String>,
100 initial_messages: Vec<AgentMessage>,
101 ) -> Result<Self, crate::policy::ToolPolicyError> {
102 let tool_config = crate::policy::ToolRuntimeConfig::resolve(
103 RunMode::NonInteractive,
104 allow_mutating,
105 tool_selection,
106 )?;
107 let hooks = Box::new(crate::runner::NonInteractiveHooks::new(allow_mutating));
108 let harness = CodingHarness::new_with_hooks_and_resume_tool_config(
109 provider,
110 model,
111 config,
112 workspace_root,
113 hooks,
114 user_system_prompt,
115 initial_messages,
116 None,
117 tool_config,
118 );
119 let control = harness.control_handle();
120 Ok(Self {
121 harness: Some(harness),
122 control,
123 running: false,
124 })
125 }
126
127 pub fn system_prompt(&self) -> Option<&str> {
129 self.harness.as_ref().map(CodingHarness::system_prompt)
130 }
131
132 pub async fn run(&mut self) -> i32 {
134 let (input_tx, input_rx) = tokio::sync::mpsc::unbounded_channel();
135 tokio::task::spawn_blocking(move || {
136 let stdin = io::stdin();
137 let reader = io::BufReader::new(stdin.lock());
138 for line in reader.lines() {
139 let line = match line {
140 Ok(line) => line,
141 Err(_) => break,
142 };
143 let trimmed = line.trim_end_matches('\r').trim();
144 if trimmed.is_empty() {
145 continue;
146 }
147 let input = match serde_json::from_str::<SdkCommand>(trimmed) {
148 Ok(command) => RpcInput::Command(command),
149 Err(e) => RpcInput::ParseError(format!("failed to parse command: {e}")),
150 };
151 if input_tx.send(input).is_err() {
152 break;
153 }
154 }
155 });
156
157 let stdout = io::stdout();
158 let mut writer = io::BufWriter::new(stdout.lock());
159 self.run_loop(input_rx, |value| {
160 write_jsonl(&mut writer, value)
161 .and_then(|_| writer.flush())
162 .is_ok()
163 })
164 .await
165 }
166
167 pub async fn run_with_channels(
172 &mut self,
173 mut command_rx: tokio::sync::mpsc::UnboundedReceiver<SdkCommand>,
174 output_tx: tokio::sync::mpsc::UnboundedSender<serde_json::Value>,
175 ) -> i32 {
176 let (input_tx, input_rx) = tokio::sync::mpsc::unbounded_channel();
177 tokio::spawn(async move {
178 while let Some(command) = command_rx.recv().await {
179 if input_tx.send(RpcInput::Command(command)).is_err() {
180 break;
181 }
182 }
183 });
184
185 self.run_loop(input_rx, |value| output_tx.send(value.clone()).is_ok())
186 .await
187 }
188
189 async fn run_loop(
190 &mut self,
191 mut input_rx: tokio::sync::mpsc::UnboundedReceiver<RpcInput>,
192 mut emit: impl FnMut(&serde_json::Value) -> bool,
193 ) -> i32 {
194 let header = serde_json::json!({
195 "type": "rpc_ready",
196 "schema_version": SDK_SCHEMA_VERSION,
197 "mode": "rpc",
198 "version": env!("CARGO_PKG_VERSION"),
199 });
200 if !emit(&header) {
201 return ExitCode::RuntimeFailure as i32;
202 }
203
204 let (event_tx, mut event_rx) = tokio::sync::mpsc::unbounded_channel::<serde_json::Value>();
205 let event_tx = Arc::new(event_tx);
206 if let Some(harness) = self.harness.as_mut() {
207 let etx = event_tx.clone();
208 harness.subscribe(Box::new(move |event: &AgentEvent| {
209 let _ = etx.send(agent_event_to_value(event));
210 }));
211 }
212
213 let mut run_task: Option<tokio::task::JoinHandle<RunResult>> = None;
214
215 loop {
216 tokio::select! {
217 Some(event) = event_rx.recv() => {
218 if !emit(&event) {
219 return self
220 .runtime_failure_after_emit_failure(
221 &mut run_task,
222 &mut event_rx,
223 &mut emit,
224 )
225 .await;
226 }
227 }
228 input = input_rx.recv() => {
229 match input {
230 None => {
231 if !self
232 .shutdown_active_run(&mut run_task, &mut event_rx, &mut emit)
233 .await
234 {
235 return ExitCode::RuntimeFailure as i32;
236 }
237 drain_events(&mut event_rx, &mut emit);
238 return ExitCode::Success as i32;
239 }
240 Some(input) => match input {
241 RpcInput::ParseError(message) => {
242 let resp = response_error(None, "parse", &message);
243 if !emit(&resp) {
244 return self
245 .runtime_failure_after_emit_failure(
246 &mut run_task,
247 &mut event_rx,
248 &mut emit,
249 )
250 .await;
251 }
252 }
253 RpcInput::Command(command) => {
254 if command.is_quit() {
255 let cmd_id = command.id().map(String::from);
256 let cmd_name = command.command_name();
257 let resp = response_success(cmd_id.as_deref(), cmd_name);
258 if !emit(&resp) {
259 return self
260 .runtime_failure_after_emit_failure(
261 &mut run_task,
262 &mut event_rx,
263 &mut emit,
264 )
265 .await;
266 }
267 if !self
268 .shutdown_active_run(&mut run_task, &mut event_rx, &mut emit)
269 .await
270 {
271 return ExitCode::RuntimeFailure as i32;
272 }
273 drain_events(&mut event_rx, &mut emit);
274 return ExitCode::Success as i32;
275 }
276
277 if !self.handle_command(command, &mut run_task, &mut emit) {
278 let _ = self
279 .shutdown_active_run(&mut run_task, &mut event_rx, &mut emit)
280 .await;
281 return ExitCode::RuntimeFailure as i32;
282 }
283 }
284 },
285 }
286 }
287 joined = async {
288 match run_task.as_mut() {
289 Some(task) => task.await,
290 None => std::future::pending().await,
291 }
292 }, if run_task.is_some() => {
293 let _ = run_task.take();
294 if !self.complete_run_task(joined, &mut emit) {
295 return ExitCode::RuntimeFailure as i32;
296 }
297 drain_events(&mut event_rx, &mut emit);
298 }
299 else => {
300 if !self
301 .shutdown_active_run(&mut run_task, &mut event_rx, &mut emit)
302 .await
303 {
304 return ExitCode::RuntimeFailure as i32;
305 }
306 drain_events(&mut event_rx, &mut emit);
307 return ExitCode::Success as i32;
308 }
309 }
310 }
311 }
312
313 async fn runtime_failure_after_emit_failure(
314 &mut self,
315 run_task: &mut Option<tokio::task::JoinHandle<RunResult>>,
316 event_rx: &mut tokio::sync::mpsc::UnboundedReceiver<serde_json::Value>,
317 emit: &mut impl FnMut(&serde_json::Value) -> bool,
318 ) -> i32 {
319 let _ = self.shutdown_active_run(run_task, event_rx, emit).await;
320 ExitCode::RuntimeFailure as i32
321 }
322
323 async fn shutdown_active_run(
324 &mut self,
325 run_task: &mut Option<tokio::task::JoinHandle<RunResult>>,
326 event_rx: &mut tokio::sync::mpsc::UnboundedReceiver<serde_json::Value>,
327 emit: &mut impl FnMut(&serde_json::Value) -> bool,
328 ) -> bool {
329 if self.running {
330 self.control.abort();
331 }
332
333 let Some(mut task) = run_task.take() else {
334 self.running = false;
335 return true;
336 };
337
338 match tokio::time::timeout(ACTIVE_RUN_SHUTDOWN_TIMEOUT, &mut task).await {
339 Ok(joined) => {
340 let ok = self.complete_run_task(joined, emit);
341 drain_events(event_rx, emit);
342 ok
343 }
344 Err(_) => {
345 task.abort();
346 let joined = task.await;
347 let ok = self.complete_run_task(joined, emit);
348 let timeout_event = serde_json::json!({
349 "type": "SessionPersistError",
350 "message": "rpc active run did not stop before shutdown timeout; task aborted",
351 });
352 drain_events(event_rx, emit);
353 ok && emit(&timeout_event)
354 }
355 }
356 }
357
358 fn complete_run_task(
359 &mut self,
360 joined: Result<RunResult, tokio::task::JoinError>,
361 emit: &mut impl FnMut(&serde_json::Value) -> bool,
362 ) -> bool {
363 self.running = false;
364 match joined {
365 Ok((harness, result)) => {
366 self.harness = Some(harness);
367 self.handle_agent_result(result);
368 true
369 }
370 Err(e) => {
371 let event = serde_json::json!({
372 "type": "SessionPersistError",
373 "message": format!("rpc run task failed: {e}"),
374 });
375 let _ = emit(&event);
376 false
377 }
378 }
379 }
380
381 fn handle_command(
382 &mut self,
383 command: SdkCommand,
384 run_task: &mut Option<tokio::task::JoinHandle<RunResult>>,
385 emit: &mut impl FnMut(&serde_json::Value) -> bool,
386 ) -> bool {
387 let cmd_id = command.id().map(String::from);
388 let cmd_name = command.command_name();
389
390 match command {
391 SdkCommand::prompt { message, .. } => self.start_run(
392 ActiveRun::Prompt(message),
393 cmd_id.as_deref(),
394 cmd_name,
395 run_task,
396 emit,
397 ),
398 SdkCommand::continue_ { message, .. } => self.start_run(
399 ActiveRun::Continue(message),
400 cmd_id.as_deref(),
401 cmd_name,
402 run_task,
403 emit,
404 ),
405 SdkCommand::abort { .. } => {
406 if self.running {
407 self.control.abort();
408 }
409 emit(&response_success(cmd_id.as_deref(), cmd_name))
410 }
411 SdkCommand::steer { message, .. } => {
412 if self.running {
413 self.control.steer(message);
414 } else if let Some(harness) = self.harness.as_ref() {
415 harness.steer(message);
416 }
417 emit(&response_success(cmd_id.as_deref(), cmd_name))
418 }
419 SdkCommand::follow_up { message, .. } => {
420 if self.running {
421 self.control.follow_up(message);
422 } else if let Some(harness) = self.harness.as_ref() {
423 harness.follow_up(message);
424 }
425 emit(&response_success(cmd_id.as_deref(), cmd_name))
426 }
427 SdkCommand::set_model { model, .. } => {
428 if self.running {
429 return emit(&response_error(
430 cmd_id.as_deref(),
431 cmd_name,
432 "cannot change model while agent is running",
433 ));
434 }
435 if let Some(harness) = self.harness.as_mut() {
436 match harness.set_model_validated(model) {
437 Ok(model) => {
438 let data = serde_json::json!({ "model": model });
439 emit(&response_success_with_data(
440 cmd_id.as_deref(),
441 cmd_name,
442 data,
443 ))
444 }
445 Err(e) => emit(&response_error(cmd_id.as_deref(), cmd_name, &e)),
446 }
447 } else {
448 emit(&response_error(
449 cmd_id.as_deref(),
450 cmd_name,
451 "agent harness is unavailable",
452 ))
453 }
454 }
455 SdkCommand::set_thinking_level { level, .. } => {
456 if self.running {
457 return emit(&response_error(
458 cmd_id.as_deref(),
459 cmd_name,
460 "cannot change thinking level while agent is running",
461 ));
462 }
463 let Some(harness) = self.harness.as_mut() else {
464 return emit(&response_error(
465 cmd_id.as_deref(),
466 cmd_name,
467 "agent harness is unavailable",
468 ));
469 };
470 match harness.set_thinking_level(&level) {
471 Ok(state) => {
472 let data = serde_json::json!({
473 "level": state.level,
474 "enabled": state.enabled,
475 "budget_tokens": state.budget_tokens,
476 });
477 emit(&response_success_with_data(
478 cmd_id.as_deref(),
479 cmd_name,
480 data,
481 ))
482 }
483 Err(e) => emit(&response_error(cmd_id.as_deref(), cmd_name, &e)),
484 }
485 }
486 SdkCommand::compact { .. } => {
487 if self.running {
488 return emit(&response_error(
489 cmd_id.as_deref(),
490 cmd_name,
491 "cannot compact while agent is running",
492 ));
493 }
494 let Some(harness) = self.harness.as_mut() else {
495 return emit(&response_error(
496 cmd_id.as_deref(),
497 cmd_name,
498 "agent harness is unavailable",
499 ));
500 };
501 match harness.compact(CompactionReason::Manual) {
502 Ok(Some(result)) => {
503 let data = serde_json::json!({
504 "summary": result.summary,
505 "first_kept_entry_id": result.first_kept_entry_id,
506 "tokens_before": result.tokens_before,
507 "tokens_after": result.tokens_after,
508 });
509 emit(&response_success_with_data(
510 cmd_id.as_deref(),
511 cmd_name,
512 data,
513 ))
514 }
515 Ok(None) => emit(&response_error(
516 cmd_id.as_deref(),
517 cmd_name,
518 "compaction produced no output",
519 )),
520 Err(e) => emit(&response_error(cmd_id.as_deref(), cmd_name, &e)),
521 }
522 }
523 SdkCommand::session_info { .. } => {
524 if self.running {
525 return emit(&response_error(
526 cmd_id.as_deref(),
527 cmd_name,
528 "cannot query session info while agent is running",
529 ));
530 }
531 let Some(harness) = self.harness.as_ref() else {
532 return emit(&response_error(
533 cmd_id.as_deref(),
534 cmd_name,
535 "agent harness is unavailable",
536 ));
537 };
538 let mut data = serde_json::json!({
539 "model": harness.model(),
540 "resources": harness.resource_metadata_json(),
541 });
542 if let Some(session) = harness.session() {
543 data["session_id"] = serde_json::Value::String(session.session_id().to_owned());
544 }
545 emit(&response_success_with_data(
546 cmd_id.as_deref(),
547 cmd_name,
548 data,
549 ))
550 }
551 SdkCommand::quit { .. } => true,
552 }
553 }
554
555 fn start_run(
556 &mut self,
557 run: ActiveRun,
558 id: Option<&str>,
559 command: &str,
560 run_task: &mut Option<tokio::task::JoinHandle<RunResult>>,
561 emit: &mut impl FnMut(&serde_json::Value) -> bool,
562 ) -> bool {
563 if self.running {
564 return emit(&response_error(
565 id,
566 command,
567 "agent is already running; use steer or follow_up to queue messages",
568 ));
569 }
570
571 if self.harness.is_none() {
572 return emit(&response_error(id, command, "agent harness is unavailable"));
573 }
574
575 if !emit(&response_success(id, command)) {
576 return false;
577 }
578
579 let mut harness = self.harness.take().expect("harness checked above");
580 harness.reset_cancel_if_cancelled();
581 self.control = harness.control_handle();
582 self.running = true;
583
584 *run_task = Some(tokio::spawn(async move {
585 let result = match run {
586 ActiveRun::Prompt(message) => harness.prompt(&message).await,
587 ActiveRun::Continue(message) => harness.continue_(&message).await,
588 };
589 (harness, result)
590 }));
591 true
592 }
593
594 fn handle_agent_result(&self, result: Result<Vec<AgentMessage>, AgentError>) {
595 match result {
596 Ok(_) | Err(AgentError::Cancelled) => {}
597 Err(_) => {}
598 }
599 }
600}
601
602fn response_success(id: Option<&str>, command: &str) -> serde_json::Value {
603 serde_json::to_value(SdkResponse::success(id, command)).unwrap()
604}
605
606fn response_success_with_data(
607 id: Option<&str>,
608 command: &str,
609 data: serde_json::Value,
610) -> serde_json::Value {
611 serde_json::to_value(SdkResponse::success_with_data(id, command, data)).unwrap()
612}
613
614fn response_error(id: Option<&str>, command: &str, message: &str) -> serde_json::Value {
615 serde_json::to_value(SdkResponse::error(id, command, message)).unwrap()
616}
617
618fn write_jsonl(writer: &mut dyn IoWrite, value: &serde_json::Value) -> io::Result<()> {
620 serde_json::to_writer(&mut *writer, value)?;
621 writer.write_all(b"\n")
622}
623
624fn drain_events(
625 rx: &mut tokio::sync::mpsc::UnboundedReceiver<serde_json::Value>,
626 emit: &mut impl FnMut(&serde_json::Value) -> bool,
627) {
628 while let Ok(event) = rx.try_recv() {
629 if !emit(&event) {
630 break;
631 }
632 }
633}