1use std::path::PathBuf;
7use std::sync::{Arc, Mutex};
8
9use opi_agent::event::AgentEvent;
10use opi_agent::hooks::{
11 AfterToolCallContext, AfterToolCallResult, AgentHooks, BeforeToolCallContext,
12 BeforeToolCallResult, PrepareNextTurnContext, ShouldStopAfterTurnContext,
13};
14use opi_agent::loop_types::AgentError;
15use opi_agent::message::AgentMessage;
16use opi_agent::session_event::{AgentSessionEvent, SessionCostTotals, SessionTokenTotals};
17use opi_ai::message::InputContent;
18use opi_ai::message::Message;
19use opi_ai::provider::Provider;
20use opi_ai::stream::AssistantStreamEvent;
21
22use crate::config::OpiConfig;
23use crate::harness::{CodingHarness, ResumeInfo};
24use crate::policy::{RunMode, ToolPolicyError, ToolRuntimeConfig, ToolSelection, is_mutating_tool};
25
26pub const NDJSON_SCHEMA_VERSION: u32 = 1;
28
29#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35#[repr(i32)]
36pub enum ExitCode {
37 Success = 0,
38 RuntimeFailure = 1,
39 ConfigError = 2,
40 AuthFailure = 3,
41 ProviderFailure = 4,
42 ToolFailure = 5,
43 Interrupted = 130,
44}
45
46#[derive(Debug, Clone)]
52pub struct NonInteractiveResult {
53 pub stdout: String,
54 pub stderr: String,
55 pub exit_code: i32,
56}
57
58pub struct NonInteractiveRunner {
64 harness: CodingHarness,
65}
66
67impl NonInteractiveRunner {
68 pub fn new(
70 provider: Box<dyn Provider>,
71 model: String,
72 config: OpiConfig,
73 workspace_root: PathBuf,
74 allow_mutating: bool,
75 user_system_prompt: Option<String>,
76 initial_messages: Vec<AgentMessage>,
77 ) -> Self {
78 Self::new_with_resume(
79 provider,
80 model,
81 config,
82 workspace_root,
83 allow_mutating,
84 user_system_prompt,
85 initial_messages,
86 None,
87 ToolSelection::Default,
88 )
89 .expect("default non-interactive tool policy should be valid")
90 }
91
92 #[allow(clippy::too_many_arguments)]
95 pub fn new_with_resume(
96 provider: Box<dyn Provider>,
97 model: String,
98 config: OpiConfig,
99 workspace_root: PathBuf,
100 allow_mutating: bool,
101 user_system_prompt: Option<String>,
102 initial_messages: Vec<AgentMessage>,
103 resume_info: Option<ResumeInfo>,
104 tool_selection: ToolSelection,
105 ) -> Result<Self, ToolPolicyError> {
106 let tool_config =
107 ToolRuntimeConfig::resolve(RunMode::NonInteractive, allow_mutating, tool_selection)?;
108 let hooks = Box::new(NonInteractiveHooks { allow_mutating });
109 let harness = CodingHarness::new_with_hooks_and_resume_tool_config(
110 provider,
111 model,
112 config,
113 workspace_root,
114 hooks,
115 user_system_prompt,
116 initial_messages,
117 resume_info,
118 tool_config,
119 );
120 Ok(Self { harness })
121 }
122
123 pub async fn run_json(&mut self, prompt: &str) -> NonInteractiveResult {
125 let output: Arc<Mutex<String>> = Arc::new(Mutex::new(String::new()));
126
127 {
129 let header = serde_json::json!({
130 "type": "session_header",
131 "schema_version": NDJSON_SCHEMA_VERSION,
132 });
133 let mut out = output.lock().unwrap();
134 out.push_str(&header.to_string());
135 out.push('\n');
136 }
137
138 let out = output.clone();
139 self.harness.subscribe(Box::new(move |event| {
140 let session_event = match event {
141 AgentEvent::AutoRetryStart {
142 attempt,
143 max_attempts,
144 delay_ms,
145 error_message,
146 } => AgentSessionEvent::AutoRetryStart {
147 attempt: *attempt,
148 max_attempts: *max_attempts,
149 delay_ms: *delay_ms,
150 error_message: error_message.clone(),
151 },
152 AgentEvent::AutoRetryEnd {
153 success,
154 attempt,
155 final_error,
156 } => AgentSessionEvent::AutoRetryEnd {
157 success: *success,
158 attempt: *attempt,
159 final_error: final_error.clone(),
160 },
161 AgentEvent::CompactionStart { reason } => {
162 AgentSessionEvent::CompactionStart { reason: *reason }
163 }
164 AgentEvent::CompactionEnd {
165 reason,
166 result,
167 aborted,
168 error_message,
169 } => AgentSessionEvent::CompactionEnd {
170 reason: *reason,
171 result: result.clone(),
172 aborted: *aborted,
173 will_retry: false,
174 error_message: error_message.clone(),
175 },
176 _ => AgentSessionEvent::Agent {
177 event: event.clone(),
178 },
179 };
180 if let Ok(json) = serde_json::to_string(&session_event)
181 && let Ok(mut guard) = out.lock()
182 {
183 guard.push_str(&json);
184 guard.push('\n');
185 }
186 }));
187
188 let prompt_result = self.harness.prompt(prompt).await;
189
190 if let Some(session) = self.harness.session() {
194 let usage = session.usage();
195 let cost = session.cost_summary().map(|c| SessionCostTotals {
196 input: c.input_cost,
197 output: c.output_cost,
198 cache_read: c.cache_read_cost,
199 cache_write: c.cache_write_cost,
200 total: c.total_cost(),
201 });
202 let summary_event = AgentSessionEvent::SessionSummary {
203 session_id: session.session_id().to_owned(),
204 model: session.model().to_owned(),
205 turns: usage.turn_count(),
206 tokens: SessionTokenTotals {
207 input: usage.total_input_tokens(),
208 output: usage.total_output_tokens(),
209 cache_read: usage.total_cache_read_tokens(),
210 cache_write: usage.total_cache_write_tokens(),
211 },
212 cost_usd: cost,
213 };
214 if let Ok(json) = serde_json::to_string(&summary_event)
215 && let Ok(mut guard) = output.lock()
216 {
217 guard.push_str(&json);
218 guard.push('\n');
219 }
220 }
221
222 match prompt_result {
223 Ok(messages) => {
224 if let Some(error) = find_error_message(&messages) {
225 return NonInteractiveResult {
226 stdout: output.lock().map(|g| g.clone()).unwrap_or_default(),
227 stderr: error,
228 exit_code: ExitCode::ProviderFailure as i32,
229 };
230 }
231 NonInteractiveResult {
232 stdout: output.lock().map(|g| g.clone()).unwrap_or_default(),
233 stderr: String::new(),
234 exit_code: ExitCode::Success as i32,
235 }
236 }
237 Err(AgentError::Cancelled) => NonInteractiveResult {
238 stdout: output.lock().map(|g| g.clone()).unwrap_or_default(),
239 stderr: "cancelled".into(),
240 exit_code: ExitCode::Interrupted as i32,
241 },
242 Err(AgentError::AuthFailed(e)) => NonInteractiveResult {
243 stdout: output.lock().map(|g| g.clone()).unwrap_or_default(),
244 stderr: format!("authentication error: {e}"),
245 exit_code: ExitCode::AuthFailure as i32,
246 },
247 Err(AgentError::Provider(e)) => NonInteractiveResult {
248 stdout: output.lock().map(|g| g.clone()).unwrap_or_default(),
249 stderr: format!("provider error: {e}"),
250 exit_code: ExitCode::ProviderFailure as i32,
251 },
252 Err(AgentError::Tool(e)) => NonInteractiveResult {
253 stdout: output.lock().map(|g| g.clone()).unwrap_or_default(),
254 stderr: format!("tool error: {e}"),
255 exit_code: ExitCode::ToolFailure as i32,
256 },
257 Err(AgentError::Hook(e)) => NonInteractiveResult {
258 stdout: output.lock().map(|g| g.clone()).unwrap_or_default(),
259 stderr: format!("hook error: {e}"),
260 exit_code: ExitCode::RuntimeFailure as i32,
261 },
262 Err(AgentError::MaxTurnsExceeded(n)) => NonInteractiveResult {
263 stdout: output.lock().map(|g| g.clone()).unwrap_or_default(),
264 stderr: format!("max turns exceeded ({n})"),
265 exit_code: ExitCode::RuntimeFailure as i32,
266 },
267 }
268 }
269
270 pub async fn run_with_content(&mut self, content: Vec<InputContent>) -> NonInteractiveResult {
272 let text_parts: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
273 let persist_errors: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
274 let tp = text_parts.clone();
275 let pe = persist_errors.clone();
276 self.harness.subscribe(Box::new(move |event| match event {
277 AgentEvent::MessageUpdate {
278 assistant_event, ..
279 } => {
280 if let AssistantStreamEvent::TextDelta { delta, .. } = assistant_event.as_ref()
281 && let Ok(mut guard) = tp.lock()
282 {
283 guard.push(delta.clone());
284 }
285 }
286 AgentEvent::SessionPersistError { message } => {
287 if let Ok(mut guard) = pe.lock() {
288 guard.push(message.clone());
289 }
290 }
291 _ => {}
292 }));
293
294 let prompt_result = self.harness.prompt_with_content(content).await;
295 let persist_stderr = format_persist_errors(&persist_errors);
296
297 match prompt_result {
298 Ok(messages) => {
299 if let Some(error) = find_error_message(&messages) {
300 let mut stderr = error;
301 stderr.push_str(&persist_stderr);
302 return NonInteractiveResult {
303 stdout: String::new(),
304 stderr,
305 exit_code: ExitCode::ProviderFailure as i32,
306 };
307 }
308
309 let stdout = text_parts.lock().map(|g| g.join("")).unwrap_or_default();
310 NonInteractiveResult {
311 stdout,
312 stderr: persist_stderr,
313 exit_code: ExitCode::Success as i32,
314 }
315 }
316 Err(AgentError::Cancelled) => NonInteractiveResult {
317 stdout: String::new(),
318 stderr: format!("cancelled{persist_stderr}"),
319 exit_code: ExitCode::Interrupted as i32,
320 },
321 Err(AgentError::AuthFailed(e)) => NonInteractiveResult {
322 stdout: String::new(),
323 stderr: format!("authentication error: {e}{persist_stderr}"),
324 exit_code: ExitCode::AuthFailure as i32,
325 },
326 Err(AgentError::Provider(e)) => NonInteractiveResult {
327 stdout: String::new(),
328 stderr: format!("provider error: {e}{persist_stderr}"),
329 exit_code: ExitCode::ProviderFailure as i32,
330 },
331 Err(AgentError::Tool(e)) => NonInteractiveResult {
332 stdout: String::new(),
333 stderr: format!("tool error: {e}{persist_stderr}"),
334 exit_code: ExitCode::ToolFailure as i32,
335 },
336 Err(AgentError::Hook(e)) => NonInteractiveResult {
337 stdout: String::new(),
338 stderr: format!("hook error: {e}{persist_stderr}"),
339 exit_code: ExitCode::RuntimeFailure as i32,
340 },
341 Err(AgentError::MaxTurnsExceeded(n)) => NonInteractiveResult {
342 stdout: String::new(),
343 stderr: format!("max turns exceeded ({n}){persist_stderr}"),
344 exit_code: ExitCode::RuntimeFailure as i32,
345 },
346 }
347 }
348
349 pub async fn run_json_with_content(
351 &mut self,
352 content: Vec<InputContent>,
353 ) -> NonInteractiveResult {
354 let output: Arc<Mutex<String>> = Arc::new(Mutex::new(String::new()));
355
356 {
357 let header = serde_json::json!({
358 "type": "session_header",
359 "schema_version": NDJSON_SCHEMA_VERSION,
360 });
361 let mut out = output.lock().unwrap();
362 out.push_str(&header.to_string());
363 out.push('\n');
364 }
365
366 let out = output.clone();
367 self.harness.subscribe(Box::new(move |event| {
368 let session_event = match event {
369 AgentEvent::AutoRetryStart {
370 attempt,
371 max_attempts,
372 delay_ms,
373 error_message,
374 } => AgentSessionEvent::AutoRetryStart {
375 attempt: *attempt,
376 max_attempts: *max_attempts,
377 delay_ms: *delay_ms,
378 error_message: error_message.clone(),
379 },
380 AgentEvent::AutoRetryEnd {
381 success,
382 attempt,
383 final_error,
384 } => AgentSessionEvent::AutoRetryEnd {
385 success: *success,
386 attempt: *attempt,
387 final_error: final_error.clone(),
388 },
389 AgentEvent::CompactionStart { reason } => {
390 AgentSessionEvent::CompactionStart { reason: *reason }
391 }
392 AgentEvent::CompactionEnd {
393 reason,
394 result,
395 aborted,
396 error_message,
397 } => AgentSessionEvent::CompactionEnd {
398 reason: *reason,
399 result: result.clone(),
400 aborted: *aborted,
401 will_retry: false,
402 error_message: error_message.clone(),
403 },
404 _ => AgentSessionEvent::Agent {
405 event: event.clone(),
406 },
407 };
408 if let Ok(json) = serde_json::to_string(&session_event)
409 && let Ok(mut guard) = out.lock()
410 {
411 guard.push_str(&json);
412 guard.push('\n');
413 }
414 }));
415
416 let prompt_result = self.harness.prompt_with_content(content).await;
417
418 if let Some(session) = self.harness.session() {
419 let usage = session.usage();
420 let cost = session.cost_summary().map(|c| SessionCostTotals {
421 input: c.input_cost,
422 output: c.output_cost,
423 cache_read: c.cache_read_cost,
424 cache_write: c.cache_write_cost,
425 total: c.total_cost(),
426 });
427 let summary_event = AgentSessionEvent::SessionSummary {
428 session_id: session.session_id().to_owned(),
429 model: session.model().to_owned(),
430 turns: usage.turn_count(),
431 tokens: SessionTokenTotals {
432 input: usage.total_input_tokens(),
433 output: usage.total_output_tokens(),
434 cache_read: usage.total_cache_read_tokens(),
435 cache_write: usage.total_cache_write_tokens(),
436 },
437 cost_usd: cost,
438 };
439 if let Ok(json) = serde_json::to_string(&summary_event)
440 && let Ok(mut guard) = output.lock()
441 {
442 guard.push_str(&json);
443 guard.push('\n');
444 }
445 }
446
447 match prompt_result {
448 Ok(messages) => {
449 if let Some(error) = find_error_message(&messages) {
450 return NonInteractiveResult {
451 stdout: output.lock().map(|g| g.clone()).unwrap_or_default(),
452 stderr: error,
453 exit_code: ExitCode::ProviderFailure as i32,
454 };
455 }
456 NonInteractiveResult {
457 stdout: output.lock().map(|g| g.clone()).unwrap_or_default(),
458 stderr: String::new(),
459 exit_code: ExitCode::Success as i32,
460 }
461 }
462 Err(AgentError::Cancelled) => NonInteractiveResult {
463 stdout: output.lock().map(|g| g.clone()).unwrap_or_default(),
464 stderr: "cancelled".into(),
465 exit_code: ExitCode::Interrupted as i32,
466 },
467 Err(AgentError::AuthFailed(e)) => NonInteractiveResult {
468 stdout: output.lock().map(|g| g.clone()).unwrap_or_default(),
469 stderr: format!("authentication error: {e}"),
470 exit_code: ExitCode::AuthFailure as i32,
471 },
472 Err(AgentError::Provider(e)) => NonInteractiveResult {
473 stdout: output.lock().map(|g| g.clone()).unwrap_or_default(),
474 stderr: format!("provider error: {e}"),
475 exit_code: ExitCode::ProviderFailure as i32,
476 },
477 Err(AgentError::Tool(e)) => NonInteractiveResult {
478 stdout: output.lock().map(|g| g.clone()).unwrap_or_default(),
479 stderr: format!("tool error: {e}"),
480 exit_code: ExitCode::ToolFailure as i32,
481 },
482 Err(AgentError::Hook(e)) => NonInteractiveResult {
483 stdout: output.lock().map(|g| g.clone()).unwrap_or_default(),
484 stderr: format!("hook error: {e}"),
485 exit_code: ExitCode::RuntimeFailure as i32,
486 },
487 Err(AgentError::MaxTurnsExceeded(n)) => NonInteractiveResult {
488 stdout: output.lock().map(|g| g.clone()).unwrap_or_default(),
489 stderr: format!("max turns exceeded ({n})"),
490 exit_code: ExitCode::RuntimeFailure as i32,
491 },
492 }
493 }
494
495 pub fn cancel(&self) {
497 self.harness.cancel();
498 }
499
500 pub fn session(&self) -> Option<&crate::session_coordinator::SessionCoordinator> {
502 self.harness.session()
503 }
504
505 pub async fn run(&mut self, prompt: &str) -> NonInteractiveResult {
507 let text_parts: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
509 let persist_errors: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
510 let tp = text_parts.clone();
511 let pe = persist_errors.clone();
512 self.harness.subscribe(Box::new(move |event| match event {
513 AgentEvent::MessageUpdate {
514 assistant_event, ..
515 } => {
516 if let AssistantStreamEvent::TextDelta { delta, .. } = assistant_event.as_ref()
517 && let Ok(mut guard) = tp.lock()
518 {
519 guard.push(delta.clone());
520 }
521 }
522 AgentEvent::SessionPersistError { message } => {
523 if let Ok(mut guard) = pe.lock() {
524 guard.push(message.clone());
525 }
526 }
527 _ => {}
528 }));
529
530 let prompt_result = self.harness.prompt(prompt).await;
531
532 let persist_stderr = format_persist_errors(&persist_errors);
535
536 match prompt_result {
537 Ok(messages) => {
538 if let Some(error) = find_error_message(&messages) {
540 let mut stderr = error;
541 stderr.push_str(&persist_stderr);
542 return NonInteractiveResult {
543 stdout: String::new(),
544 stderr,
545 exit_code: ExitCode::ProviderFailure as i32,
546 };
547 }
548
549 let stdout = text_parts.lock().map(|g| g.join("")).unwrap_or_default();
550 NonInteractiveResult {
551 stdout,
552 stderr: persist_stderr,
553 exit_code: ExitCode::Success as i32,
554 }
555 }
556 Err(AgentError::Cancelled) => NonInteractiveResult {
557 stdout: String::new(),
558 stderr: format!("cancelled{persist_stderr}"),
559 exit_code: ExitCode::Interrupted as i32,
560 },
561 Err(AgentError::AuthFailed(e)) => NonInteractiveResult {
562 stdout: String::new(),
563 stderr: format!("authentication error: {e}{persist_stderr}"),
564 exit_code: ExitCode::AuthFailure as i32,
565 },
566 Err(AgentError::Provider(e)) => NonInteractiveResult {
567 stdout: String::new(),
568 stderr: format!("provider error: {e}{persist_stderr}"),
569 exit_code: ExitCode::ProviderFailure as i32,
570 },
571 Err(AgentError::Tool(e)) => NonInteractiveResult {
572 stdout: String::new(),
573 stderr: format!("tool error: {e}{persist_stderr}"),
574 exit_code: ExitCode::ToolFailure as i32,
575 },
576 Err(AgentError::Hook(e)) => NonInteractiveResult {
577 stdout: String::new(),
578 stderr: format!("hook error: {e}{persist_stderr}"),
579 exit_code: ExitCode::RuntimeFailure as i32,
580 },
581 Err(AgentError::MaxTurnsExceeded(n)) => NonInteractiveResult {
582 stdout: String::new(),
583 stderr: format!("max turns exceeded ({n}){persist_stderr}"),
584 exit_code: ExitCode::RuntimeFailure as i32,
585 },
586 }
587 }
588}
589
590fn find_error_message(messages: &[AgentMessage]) -> Option<String> {
596 for msg in messages {
597 if let AgentMessage::Llm(Message::Assistant(asst)) = msg
598 && let Some(err) = &asst.error_message
599 {
600 return Some(err.clone());
601 }
602 }
603 None
604}
605
606pub fn format_persist_errors(errors: &Arc<Mutex<Vec<String>>>) -> String {
608 let guard = errors.lock().unwrap();
609 if guard.is_empty() {
610 return String::new();
611 }
612 let mut out = String::new();
613 for e in guard.iter() {
614 out.push_str("\nsession persist error: ");
615 out.push_str(e);
616 }
617 out
618}
619
620pub struct NonInteractiveHooks {
626 allow_mutating: bool,
627}
628
629impl NonInteractiveHooks {
630 pub fn new(allow_mutating: bool) -> Self {
632 Self { allow_mutating }
633 }
634}
635
636impl AgentHooks for NonInteractiveHooks {
637 fn convert_to_llm(&self, messages: &[AgentMessage]) -> Result<Vec<Message>, AgentError> {
638 Ok(crate::harness::agent_messages_to_llm(messages))
639 }
640
641 fn before_tool_call(
642 &self,
643 ctx: BeforeToolCallContext,
644 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = BeforeToolCallResult> + Send>> {
645 let allowed = self.allow_mutating;
646 let tool_name = ctx.tool_name.clone();
647 Box::pin(async move {
648 if !allowed && is_mutating_tool(&tool_name) {
649 return BeforeToolCallResult::Deny {
650 reason: format!(
651 "tool '{}' is not allowed in non-interactive mode without --allow-mutating",
652 tool_name
653 ),
654 };
655 }
656 BeforeToolCallResult::Allow
657 })
658 }
659
660 fn after_tool_call(
661 &self,
662 _ctx: AfterToolCallContext,
663 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = AfterToolCallResult> + Send>> {
664 Box::pin(async { AfterToolCallResult::Keep })
665 }
666
667 fn should_stop_after_turn(
668 &self,
669 _ctx: ShouldStopAfterTurnContext,
670 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = bool> + Send>> {
671 Box::pin(async { false })
672 }
673
674 fn prepare_next_turn(
675 &self,
676 _ctx: PrepareNextTurnContext,
677 ) -> std::pin::Pin<
678 Box<
679 dyn std::future::Future<Output = Option<opi_agent::loop_types::AgentLoopTurnUpdate>>
680 + Send,
681 >,
682 > {
683 Box::pin(async { None })
684 }
685}