1use crate::config;
4use crate::logging;
5use crate::server::OrchestratorServer;
6use crate::token_tracker::TokenTracker;
7use crate::types::CommandInfo;
8use crate::types::ListCommandsInput;
9use crate::types::ListCommandsOutput;
10use crate::types::ListSessionsInput;
11use crate::types::ListSessionsOutput;
12use crate::types::OrchestratorRunInput;
13use crate::types::OrchestratorRunOutput;
14use crate::types::PermissionReply;
15use crate::types::QuestionAction;
16use crate::types::QuestionInfoView;
17use crate::types::QuestionOptionView;
18use crate::types::RespondPermissionInput;
19use crate::types::RespondPermissionOutput;
20use crate::types::RespondQuestionInput;
21use crate::types::RespondQuestionOutput;
22use crate::types::RunStatus;
23use crate::types::SessionSummary;
24use agentic_logging::CallTimer;
25use agentic_logging::ToolCallRecord;
26use agentic_tools_core::Tool;
27use agentic_tools_core::ToolContext;
28use agentic_tools_core::ToolError;
29use agentic_tools_core::ToolRegistry;
30use agentic_tools_core::fmt::TextFormat;
31use agentic_tools_core::fmt::TextOptions;
32use futures::future::BoxFuture;
33use opencode_rs::types::event::Event;
34use opencode_rs::types::message::CommandRequest;
35use opencode_rs::types::message::PromptPart;
36use opencode_rs::types::message::PromptRequest;
37use opencode_rs::types::permission::PermissionReply as ApiPermissionReply;
38use opencode_rs::types::permission::PermissionReplyRequest;
39use opencode_rs::types::question::QuestionReply;
40use opencode_rs::types::question::QuestionRequest;
41use opencode_rs::types::session::CreateSessionRequest;
42use opencode_rs::types::session::SessionStatusInfo;
43use opencode_rs::types::session::SummarizeRequest;
44use serde::Serialize;
45use std::sync::Arc;
46use std::time::Duration;
47use tokio::sync::OnceCell;
48use tokio::task::JoinHandle;
49
50const SERVER_NAME: &str = "opencode-orchestrator-mcp";
51
52#[derive(Debug, Clone, Default)]
53struct ToolLogMeta {
54 token_usage: Option<agentic_logging::TokenUsage>,
55 token_usage_saturated: bool,
56}
57
58struct RunOutcome {
59 output: OrchestratorRunOutput,
60 log_meta: ToolLogMeta,
61}
62
63impl RunOutcome {
64 fn without_tokens(output: OrchestratorRunOutput) -> Self {
65 Self {
66 output,
67 log_meta: ToolLogMeta::default(),
68 }
69 }
70
71 fn with_tracker(output: OrchestratorRunOutput, token_tracker: &TokenTracker) -> Self {
72 let (token_usage, token_usage_saturated) = token_tracker.to_log_token_usage();
73 Self {
74 output,
75 log_meta: ToolLogMeta {
76 token_usage,
77 token_usage_saturated,
78 },
79 }
80 }
81}
82
83fn request_json<T: Serialize>(request: &T) -> serde_json::Value {
84 serde_json::to_value(request)
85 .unwrap_or_else(|error| serde_json::json!({"serialization_error": error.to_string()}))
86}
87
88fn log_tool_success<TReq: Serialize, TOut: TextFormat>(
89 timer: &CallTimer,
90 tool: &str,
91 request: &TReq,
92 output: &TOut,
93 log_meta: ToolLogMeta,
94 write_markdown: bool,
95) {
96 let (completed_at, duration_ms) = timer.finish();
97 let rendered = output.fmt_text(&TextOptions::default());
98 let response_file = write_markdown
99 .then(|| logging::write_markdown_best_effort(completed_at, &timer.call_id, &rendered))
100 .flatten();
101
102 let record = ToolCallRecord {
103 call_id: timer.call_id.clone(),
104 server: SERVER_NAME.into(),
105 tool: tool.into(),
106 started_at: timer.started_at,
107 completed_at,
108 duration_ms,
109 request: request_json(request),
110 response_file,
111 success: true,
112 error: None,
113 model: None,
114 token_usage: log_meta.token_usage,
115 summary: log_meta
116 .token_usage_saturated
117 .then(|| serde_json::json!({"token_usage_saturated": true})),
118 };
119
120 logging::append_record_best_effort(&record);
121}
122
123fn log_tool_error<TReq: Serialize>(
124 timer: &CallTimer,
125 tool: &str,
126 request: &TReq,
127 error: &ToolError,
128) {
129 let (completed_at, duration_ms) = timer.finish();
130 let record = ToolCallRecord {
131 call_id: timer.call_id.clone(),
132 server: SERVER_NAME.into(),
133 tool: tool.into(),
134 started_at: timer.started_at,
135 completed_at,
136 duration_ms,
137 request: request_json(request),
138 response_file: None,
139 success: false,
140 error: Some(error.to_string()),
141 model: None,
142 token_usage: None,
143 summary: None,
144 };
145
146 logging::append_record_best_effort(&record);
147}
148
149#[derive(Clone)]
159pub struct OrchestratorRunTool {
160 server: Arc<OnceCell<OrchestratorServer>>,
161}
162
163impl OrchestratorRunTool {
164 pub fn new(server: Arc<OnceCell<OrchestratorServer>>) -> Self {
166 Self { server }
167 }
168
169 async fn finalize_completed(
178 client: &opencode_rs::Client,
179 session_id: String,
180 token_tracker: &TokenTracker,
181 mut warnings: Vec<String>,
182 ) -> Result<OrchestratorRunOutput, ToolError> {
183 const BACKOFFS_MS: &[u64] = &[0, 50, 100, 200, 400];
185
186 let mut response: Option<String> = None;
187
188 for (attempt, &delay_ms) in BACKOFFS_MS.iter().enumerate() {
189 if delay_ms > 0 {
190 tokio::time::sleep(Duration::from_millis(delay_ms)).await;
191 }
192
193 let messages = client
194 .messages()
195 .list(&session_id)
196 .await
197 .map_err(|e| ToolError::Internal(format!("Failed to list messages: {e}")))?;
198
199 response = OrchestratorServer::extract_assistant_text(&messages);
200
201 if response.is_some() {
202 if attempt > 0 {
203 tracing::debug!(
204 session_id = %session_id,
205 attempt,
206 "assistant response became available after retry"
207 );
208 }
209 break;
210 }
211 }
212
213 if response.is_none() {
214 tracing::debug!(
215 session_id = %session_id,
216 "no assistant response found after bounded retry"
217 );
218 }
219
220 if token_tracker.compaction_needed
222 && let (Some(pid), Some(mid)) = (&token_tracker.provider_id, &token_tracker.model_id)
223 {
224 let summarize_req = SummarizeRequest {
225 provider_id: pid.clone(),
226 model_id: mid.clone(),
227 auto: None,
228 };
229
230 match client
231 .sessions()
232 .summarize(&session_id, &summarize_req)
233 .await
234 {
235 Ok(_) => {
236 tracing::info!(session_id = %session_id, "context summarization triggered");
237 warnings.push("Context limit reached; summarization triggered".into());
238 }
239 Err(e) => {
240 tracing::warn!(session_id = %session_id, error = %e, "summarization failed");
241 warnings.push(format!("Summarization failed: {e}"));
242 }
243 }
244 }
245
246 Ok(OrchestratorRunOutput {
247 session_id,
248 status: RunStatus::Completed,
249 response,
250 partial_response: None,
251 permission_request_id: None,
252 permission_type: None,
253 permission_patterns: vec![],
254 question_request_id: None,
255 questions: vec![],
256 warnings,
257 })
258 }
259
260 fn map_questions(req: &QuestionRequest) -> Vec<QuestionInfoView> {
261 req.questions
262 .iter()
263 .map(|question| QuestionInfoView {
264 question: question.question.clone(),
265 header: question.header.clone(),
266 options: question
267 .options
268 .iter()
269 .map(|option| QuestionOptionView {
270 label: option.label.clone(),
271 description: option.description.clone(),
272 })
273 .collect(),
274 multiple: question.multiple,
275 custom: question.custom,
276 })
277 .collect()
278 }
279
280 fn question_required_output(
281 session_id: String,
282 partial_response: Option<String>,
283 request: &QuestionRequest,
284 warnings: Vec<String>,
285 ) -> OrchestratorRunOutput {
286 OrchestratorRunOutput {
287 session_id,
288 status: RunStatus::QuestionRequired,
289 response: None,
290 partial_response,
291 permission_request_id: None,
292 permission_type: None,
293 permission_patterns: vec![],
294 question_request_id: Some(request.id.clone()),
295 questions: Self::map_questions(request),
296 warnings,
297 }
298 }
299
300 async fn run_impl_outcome(&self, input: OrchestratorRunInput) -> Result<RunOutcome, ToolError> {
301 if input.session_id.is_none() && input.message.is_none() && input.command.is_none() {
303 return Err(ToolError::InvalidInput(
304 "Either session_id (to resume/check status) or message/command (to start work) is required"
305 .into(),
306 ));
307 }
308
309 if input.command.is_some() && input.message.is_none() {
310 return Err(ToolError::InvalidInput(
311 "message is required when command is specified (becomes $ARGUMENTS for template expansion)"
312 .into(),
313 ));
314 }
315
316 let message = input.message.map(|m| m.trim().to_string());
318 if let Some(ref m) = message
319 && m.is_empty()
320 {
321 return Err(ToolError::InvalidInput(
322 "message cannot be empty or whitespace-only".into(),
323 ));
324 }
325
326 let wait_for_activity = input.wait_for_activity.unwrap_or(false);
327
328 let server = self
330 .server
331 .get_or_try_init(OrchestratorServer::start_lazy)
332 .await
333 .map_err(|e| ToolError::Internal(e.to_string()))?;
334
335 let client = server.client();
336
337 tracing::debug!(
338 command = ?input.command,
339 has_message = message.is_some(),
340 message_len = message.as_ref().map(String::len),
341 session_id = ?input.session_id,
342 "run: starting"
343 );
344
345 let session_id = if let Some(sid) = input.session_id {
347 client.sessions().get(&sid).await.map_err(|e| {
349 if e.is_not_found() {
350 ToolError::InvalidInput(format!(
351 "Session '{sid}' not found. Use list_sessions to discover sessions, \
352 or omit session_id to create a new session."
353 ))
354 } else {
355 ToolError::Internal(format!("Failed to get session: {e}"))
356 }
357 })?;
358 sid
359 } else {
360 let session = client
362 .sessions()
363 .create(&CreateSessionRequest::default())
364 .await
365 .map_err(|e| ToolError::Internal(format!("Failed to create session: {e}")))?;
366 session.id
367 };
368
369 tracing::info!(session_id = %session_id, "run: session resolved");
370
371 let status = client
373 .sessions()
374 .status_for(&session_id)
375 .await
376 .map_err(|e| ToolError::Internal(format!("Failed to get session status: {e}")))?;
377
378 let is_idle = matches!(status, SessionStatusInfo::Idle);
379
380 let pending_permissions = client
382 .permissions()
383 .list()
384 .await
385 .map_err(|e| ToolError::Internal(format!("Failed to list permissions: {e}")))?;
386
387 let my_permission = pending_permissions
388 .into_iter()
389 .find(|p| p.session_id == session_id);
390
391 if let Some(perm) = my_permission {
392 tracing::info!(
393 session_id = %session_id,
394 permission_type = %perm.permission,
395 "run: pending permission found"
396 );
397 return Ok(RunOutcome::without_tokens(OrchestratorRunOutput {
398 session_id,
399 status: RunStatus::PermissionRequired,
400 response: None,
401 partial_response: None,
402 permission_request_id: Some(perm.id),
403 permission_type: Some(perm.permission),
404 permission_patterns: perm.patterns,
405 question_request_id: None,
406 questions: vec![],
407 warnings: vec![],
408 }));
409 }
410
411 let pending_questions = client
412 .question()
413 .list()
414 .await
415 .map_err(|e| ToolError::Internal(format!("Failed to list questions: {e}")))?;
416
417 if let Some(question) = pending_questions
418 .into_iter()
419 .find(|question| question.session_id == session_id)
420 {
421 tracing::info!(session_id = %session_id, question_id = %question.id, "run: pending question found");
422 return Ok(RunOutcome::without_tokens(Self::question_required_output(
423 session_id,
424 None,
425 &question,
426 vec![],
427 )));
428 }
429
430 if message.is_none() && input.command.is_none() && is_idle && !wait_for_activity {
433 let token_tracker = TokenTracker::with_threshold(server.compaction_threshold());
434 let output =
435 Self::finalize_completed(client, session_id, &token_tracker, vec![]).await?;
436 return Ok(RunOutcome::with_tracker(output, &token_tracker));
437 }
438
439 let mut subscription = client
441 .subscribe_session(&session_id)
442 .map_err(|e| ToolError::Internal(format!("Failed to subscribe to session: {e}")))?;
443
444 let dispatched_new_work = input.command.is_some() || message.is_some() || wait_for_activity;
447 let idle_grace = config::idle_grace();
448 let mut idle_grace_deadline: Option<tokio::time::Instant> = None;
449 let mut awaiting_idle_grace_check = false;
450
451 if wait_for_activity && input.command.is_none() && message.is_none() {
452 idle_grace_deadline = Some(tokio::time::Instant::now() + idle_grace);
453 }
454
455 let mut command_task: Option<JoinHandle<Result<(), String>>> = None;
457 let mut command_name_for_logging: Option<String> = None;
458
459 if let Some(command) = &input.command {
460 command_name_for_logging = Some(command.clone());
461
462 let cmd_client = client.clone();
463 let cmd_session_id = session_id.clone();
464 let cmd_name = command.clone();
465 let cmd_arguments = message.clone().unwrap_or_default();
466
467 command_task = Some(tokio::spawn(async move {
468 let req = CommandRequest {
469 command: cmd_name,
470 arguments: cmd_arguments,
471 message_id: None,
472 };
473
474 cmd_client
475 .messages()
476 .command(&cmd_session_id, &req)
477 .await
478 .map(|_| ())
479 .map_err(|e| e.to_string())
480 }));
481 } else if let Some(msg) = &message {
482 let req = PromptRequest {
484 parts: vec![PromptPart::Text {
485 text: msg.clone(),
486 synthetic: None,
487 ignored: None,
488 metadata: None,
489 }],
490 message_id: None,
491 model: None,
492 agent: None,
493 no_reply: None,
494 system: None,
495 variant: None,
496 };
497
498 client
499 .messages()
500 .prompt_async(&session_id, &req)
501 .await
502 .map_err(|e| ToolError::Internal(format!("Failed to send prompt: {e}")))?;
503
504 idle_grace_deadline = Some(tokio::time::Instant::now() + idle_grace);
505 }
506
507 let deadline = tokio::time::Instant::now() + server.session_deadline();
510 let inactivity_timeout = server.inactivity_timeout();
511 let mut last_activity_time = tokio::time::Instant::now();
512
513 tracing::debug!(session_id = %session_id, "run: entering event loop");
514 let mut token_tracker = TokenTracker::with_threshold(server.compaction_threshold());
515 let mut partial_response = String::new();
516 let warnings = Vec::new();
517
518 let mut poll_interval = tokio::time::interval(Duration::from_secs(1));
519 poll_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
520
521 let mut observed_busy = false;
525
526 let mut sse_active = true;
529
530 if !dispatched_new_work
535 && let Ok(status) = client.sessions().status_for(&session_id).await
536 && matches!(status, SessionStatusInfo::Idle)
537 {
538 tracing::debug!(
539 session_id = %session_id,
540 "session already idle on post-subscribe check"
541 );
542 let output =
543 Self::finalize_completed(client, session_id, &token_tracker, warnings).await?;
544 return Ok(RunOutcome::with_tracker(output, &token_tracker));
545 }
546 loop {
549 let now = tokio::time::Instant::now();
551
552 if now.duration_since(last_activity_time) >= inactivity_timeout {
553 return Err(ToolError::Internal(format!(
554 "Session idle timeout: no activity for 5 minutes (session_id={session_id}). \
555 The session may still be running; use run(session_id=...) to check status."
556 )));
557 }
558
559 if now >= deadline {
560 return Err(ToolError::Internal(
561 "Session execution timed out after 1 hour. \
562 The session may still be running; use run with the session_id to check status."
563 .into(),
564 ));
565 }
566
567 let command_task_active = command_task.is_some();
568
569 tokio::select! {
570 maybe_event = subscription.recv(), if sse_active => {
571 let Some(event) = maybe_event else {
572 tracing::warn!(
576 session_id = %session_id,
577 "SSE stream closed unexpectedly; falling back to polling-only mode"
578 );
579 sse_active = false;
580 continue; };
582
583 token_tracker.observe_event(&event, |pid, mid| {
585 server.context_limit(pid, mid)
586 });
587
588 match event {
589 Event::PermissionAsked { properties } => {
590 tracing::info!(
591 session_id = %session_id,
592 permission_type = %properties.request.permission,
593 "run: permission requested"
594 );
595 return Ok(RunOutcome::with_tracker(OrchestratorRunOutput {
596 session_id,
597 status: RunStatus::PermissionRequired,
598 response: None,
599 partial_response: if partial_response.is_empty() {
600 None
601 } else {
602 Some(partial_response)
603 },
604 permission_request_id: Some(properties.request.id),
605 permission_type: Some(properties.request.permission),
606 permission_patterns: properties.request.patterns,
607 question_request_id: None,
608 questions: vec![],
609 warnings,
610 }, &token_tracker));
611 }
612
613 Event::QuestionAsked { properties } => {
614 return Ok(RunOutcome::with_tracker(Self::question_required_output(
615 session_id,
616 if partial_response.is_empty() {
617 None
618 } else {
619 Some(partial_response)
620 },
621 &properties.request,
622 warnings,
623 ), &token_tracker));
624 }
625
626 Event::MessagePartUpdated { properties } => {
627 last_activity_time = tokio::time::Instant::now();
628 observed_busy = true;
630 awaiting_idle_grace_check = false;
631 if let Some(delta) = &properties.delta {
633 partial_response.push_str(delta);
634 }
635 }
636
637 Event::MessageUpdated { .. } => {
638 last_activity_time = tokio::time::Instant::now();
639 observed_busy = true;
640 awaiting_idle_grace_check = false;
641 }
642
643 Event::SessionError { properties } => {
644 let error_msg = properties
645 .error
646 .map_or_else(|| "Unknown error".to_string(), |e| format!("{e:?}"));
647 tracing::error!(
648 session_id = %session_id,
649 error = %error_msg,
650 "run: session error"
651 );
652 return Err(ToolError::Internal(format!("Session error: {error_msg}")));
653 }
654
655 Event::SessionIdle { .. } => {
656 tracing::debug!(session_id = %session_id, "received SessionIdle event");
657 let output = Self::finalize_completed(client, session_id, &token_tracker, warnings).await?;
658 return Ok(RunOutcome::with_tracker(output, &token_tracker));
659 }
660
661 _ => {
662 }
664 }
665 }
666
667 _ = poll_interval.tick() => {
668 let pending = match client.permissions().list().await {
670 Ok(p) => p,
671 Err(e) => {
672 tracing::warn!(
674 session_id = %session_id,
675 error = %e,
676 "failed to list permissions during poll fallback"
677 );
678 vec![]
679 }
680 };
681
682 if let Some(perm) = pending.into_iter().find(|p| p.session_id == session_id) {
683 tracing::debug!(
684 session_id = %session_id,
685 permission_id = %perm.id,
686 "detected pending permission via polling fallback"
687 );
688 return Ok(RunOutcome::with_tracker(OrchestratorRunOutput {
689 session_id,
690 status: RunStatus::PermissionRequired,
691 response: None,
692 partial_response: if partial_response.is_empty() {
693 None
694 } else {
695 Some(partial_response)
696 },
697 permission_request_id: Some(perm.id),
698 permission_type: Some(perm.permission),
699 permission_patterns: perm.patterns,
700 question_request_id: None,
701 questions: vec![],
702 warnings,
703 }, &token_tracker));
704 }
705
706 let pending_questions = match client.question().list().await {
707 Ok(questions) => questions,
708 Err(e) => {
709 tracing::warn!(
710 session_id = %session_id,
711 error = %e,
712 "failed to list questions during poll fallback"
713 );
714 vec![]
715 }
716 };
717
718 if let Some(question) = pending_questions
719 .into_iter()
720 .find(|question| question.session_id == session_id)
721 {
722 tracing::debug!(
723 session_id = %session_id,
724 question_id = %question.id,
725 "detected pending question via polling fallback"
726 );
727 return Ok(RunOutcome::with_tracker(Self::question_required_output(
728 session_id,
729 if partial_response.is_empty() {
730 None
731 } else {
732 Some(partial_response)
733 },
734 &question,
735 warnings,
736 ), &token_tracker));
737 }
738
739 match client.sessions().status_for(&session_id).await {
743 Ok(SessionStatusInfo::Busy | SessionStatusInfo::Retry { .. }) => {
744 last_activity_time = tokio::time::Instant::now();
745 observed_busy = true;
746 awaiting_idle_grace_check = false;
747 tracing::trace!(
748 session_id = %session_id,
749 "our session is busy/retry, waiting"
750 );
751 }
752 Ok(SessionStatusInfo::Idle) => {
753 if !dispatched_new_work || observed_busy {
754 tracing::debug!(
760 session_id = %session_id,
761 dispatched_new_work = dispatched_new_work,
762 observed_busy = observed_busy,
763 "detected session idle via polling fallback"
764 );
765 let output = Self::finalize_completed(client, session_id, &token_tracker, warnings).await?;
766 return Ok(RunOutcome::with_tracker(output, &token_tracker));
767 }
768
769 let Some(deadline) = idle_grace_deadline else {
770 tracing::trace!(
771 session_id = %session_id,
772 command_task_active = command_task_active,
773 "idle seen before dispatch confirmed; waiting"
774 );
775 continue;
776 };
777
778 let now = tokio::time::Instant::now();
779 if now >= deadline {
780 tracing::debug!(
781 session_id = %session_id,
782 idle_grace_ms = idle_grace.as_millis(),
783 "accepting idle via bounded idle grace (no busy observed)"
784 );
785 let output = Self::finalize_completed(client, session_id, &token_tracker, warnings).await?;
786 return Ok(RunOutcome::with_tracker(output, &token_tracker));
787 }
788
789 awaiting_idle_grace_check = true;
790 tracing::trace!(
791 session_id = %session_id,
792 remaining_ms = (deadline - now).as_millis(),
793 "idle detected before busy; waiting for idle-grace deadline"
794 );
795 }
796 Err(e) => {
797 tracing::warn!(
799 session_id = %session_id,
800 error = %e,
801 "failed to get session status during poll fallback"
802 );
803 }
804 }
805 }
806
807 () = async {
808 match idle_grace_deadline {
809 Some(deadline) => tokio::time::sleep_until(deadline).await,
810 None => std::future::pending::<()>().await,
811 }
812 }, if awaiting_idle_grace_check => {
813 awaiting_idle_grace_check = false;
814
815 match client.sessions().status_for(&session_id).await {
816 Ok(SessionStatusInfo::Idle) => {
817 tracing::debug!(session_id = %session_id, "idle-grace deadline reached; finalizing");
818 let output = Self::finalize_completed(client, session_id, &token_tracker, warnings).await?;
819 return Ok(RunOutcome::with_tracker(output, &token_tracker));
820 }
821 Ok(SessionStatusInfo::Busy | SessionStatusInfo::Retry { .. }) => {
822 last_activity_time = tokio::time::Instant::now();
823 observed_busy = true;
824 }
825 Err(e) => {
826 tracing::warn!(
827 session_id = %session_id,
828 error = %e,
829 "status check failed at idle-grace deadline"
830 );
831 }
832 }
833 }
834
835 cmd_result = async {
836 match command_task.as_mut() {
837 Some(handle) => Some(handle.await),
838 None => {
839 std::future::pending::<
840 Option<Result<Result<(), String>, tokio::task::JoinError>>,
841 >()
842 .await
843 }
844 }
845 }, if command_task_active => {
846 match cmd_result {
847 Some(Ok(Ok(()))) => {
848 idle_grace_deadline = Some(tokio::time::Instant::now() + idle_grace);
849 tracing::debug!(
850 session_id = %session_id,
851 command = ?command_name_for_logging,
852 "run: command dispatch completed successfully"
853 );
854 command_task = None;
855 }
856 Some(Ok(Err(e))) => {
857 tracing::error!(
858 session_id = %session_id,
859 command = ?command_name_for_logging,
860 error = %e,
861 "run: command dispatch failed"
862 );
863 return Err(ToolError::Internal(format!(
864 "Failed to execute command '{}': {e}",
865 command_name_for_logging.as_deref().unwrap_or("unknown")
866 )));
867 }
868 Some(Err(join_err)) => {
869 tracing::error!(
870 session_id = %session_id,
871 command = ?command_name_for_logging,
872 error = %join_err,
873 "run: command task panicked"
874 );
875 return Err(ToolError::Internal(format!("Command task panicked: {join_err}")));
876 }
877 None => {
878 unreachable!("command_task_active guard should prevent None");
879 }
880 }
881 }
882 }
883 }
884 }
885}
886
887impl Tool for OrchestratorRunTool {
888 type Input = OrchestratorRunInput;
889 type Output = OrchestratorRunOutput;
890 const NAME: &'static str = "run";
891 const DESCRIPTION: &'static str = r#"Start or resume an OpenCode session. Optionally run a named command or send a raw prompt.
892
893Returns when:
894- status=completed: Session finished executing. Response contains final assistant output.
895- status=permission_required: Session needs permission approval. Call respond_permission to continue.
896- status=question_required: Session needs question answers. Call respond_question to continue.
897
898Parameters:
899- session_id: Existing session to resume (omit to create new)
900- command: OpenCode command name (e.g., "research", "implement_plan")
901- message: Prompt text or $ARGUMENTS for command template
902
903Examples:
904- New session with prompt: run(message="explain this code")
905- New session with command: run(command="research", message="caching strategies")
906- Resume session: run(session_id="...", message="continue")
907- Check status: run(session_id="...")"#;
908
909 fn call(
910 &self,
911 input: Self::Input,
912 _ctx: &ToolContext,
913 ) -> BoxFuture<'static, Result<Self::Output, ToolError>> {
914 let this = self.clone();
915 Box::pin(async move {
916 let timer = CallTimer::start();
917 match this.run_impl_outcome(input.clone()).await {
918 Ok(outcome) => {
919 log_tool_success(
920 &timer,
921 Self::NAME,
922 &input,
923 &outcome.output,
924 outcome.log_meta,
925 true,
926 );
927 Ok(outcome.output)
928 }
929 Err(error) => {
930 log_tool_error(&timer, Self::NAME, &input, &error);
931 Err(error)
932 }
933 }
934 })
935 }
936}
937
938#[derive(Clone)]
944pub struct ListSessionsTool {
945 server: Arc<OnceCell<OrchestratorServer>>,
946}
947
948impl ListSessionsTool {
949 pub fn new(server: Arc<OnceCell<OrchestratorServer>>) -> Self {
951 Self { server }
952 }
953}
954
955impl Tool for ListSessionsTool {
956 type Input = ListSessionsInput;
957 type Output = ListSessionsOutput;
958 const NAME: &'static str = "list_sessions";
959 const DESCRIPTION: &'static str =
960 "List available OpenCode sessions in the current directory context.";
961
962 fn call(
963 &self,
964 input: Self::Input,
965 _ctx: &ToolContext,
966 ) -> BoxFuture<'static, Result<Self::Output, ToolError>> {
967 let server_cell = Arc::clone(&self.server);
968 Box::pin(async move {
969 let timer = CallTimer::start();
970 let result: Result<ListSessionsOutput, ToolError> = async {
971 let server = server_cell
972 .get_or_try_init(OrchestratorServer::start_lazy)
973 .await
974 .map_err(|e| ToolError::Internal(e.to_string()))?;
975
976 let sessions =
977 server.client().sessions().list().await.map_err(|e| {
978 ToolError::Internal(format!("Failed to list sessions: {e}"))
979 })?;
980
981 let limit = input.limit.unwrap_or(20);
982 let summaries: Vec<SessionSummary> = sessions
983 .into_iter()
984 .take(limit)
985 .map(|s| SessionSummary {
986 id: s.id,
987 title: s.title,
988 updated: s.time.as_ref().map(|t| t.updated),
989 })
990 .collect();
991
992 Ok(ListSessionsOutput {
993 sessions: summaries,
994 })
995 }
996 .await;
997
998 match result {
999 Ok(output) => {
1000 log_tool_success(
1001 &timer,
1002 Self::NAME,
1003 &input,
1004 &output,
1005 ToolLogMeta::default(),
1006 false,
1007 );
1008 Ok(output)
1009 }
1010 Err(error) => {
1011 log_tool_error(&timer, Self::NAME, &input, &error);
1012 Err(error)
1013 }
1014 }
1015 })
1016 }
1017}
1018
1019#[derive(Clone)]
1025pub struct ListCommandsTool {
1026 server: Arc<OnceCell<OrchestratorServer>>,
1027}
1028
1029impl ListCommandsTool {
1030 pub fn new(server: Arc<OnceCell<OrchestratorServer>>) -> Self {
1032 Self { server }
1033 }
1034}
1035
1036impl Tool for ListCommandsTool {
1037 type Input = ListCommandsInput;
1038 type Output = ListCommandsOutput;
1039 const NAME: &'static str = "list_commands";
1040 const DESCRIPTION: &'static str = "List available OpenCode commands that can be used with run.";
1041
1042 fn call(
1043 &self,
1044 input: Self::Input,
1045 _ctx: &ToolContext,
1046 ) -> BoxFuture<'static, Result<Self::Output, ToolError>> {
1047 let server_cell = Arc::clone(&self.server);
1048 Box::pin(async move {
1049 let timer = CallTimer::start();
1050 let result: Result<ListCommandsOutput, ToolError> = async {
1051 let server = server_cell
1052 .get_or_try_init(OrchestratorServer::start_lazy)
1053 .await
1054 .map_err(|e| ToolError::Internal(e.to_string()))?;
1055
1056 let commands =
1057 server.client().tools().commands().await.map_err(|e| {
1058 ToolError::Internal(format!("Failed to list commands: {e}"))
1059 })?;
1060
1061 let command_infos: Vec<CommandInfo> = commands
1062 .into_iter()
1063 .map(|c| CommandInfo {
1064 name: c.name,
1065 description: c.description,
1066 })
1067 .collect();
1068
1069 Ok(ListCommandsOutput {
1070 commands: command_infos,
1071 })
1072 }
1073 .await;
1074
1075 match result {
1076 Ok(output) => {
1077 log_tool_success(
1078 &timer,
1079 Self::NAME,
1080 &input,
1081 &output,
1082 ToolLogMeta::default(),
1083 false,
1084 );
1085 Ok(output)
1086 }
1087 Err(error) => {
1088 log_tool_error(&timer, Self::NAME, &input, &error);
1089 Err(error)
1090 }
1091 }
1092 })
1093 }
1094}
1095
1096#[derive(Clone)]
1105pub struct RespondPermissionTool {
1106 server: Arc<OnceCell<OrchestratorServer>>,
1107}
1108
1109impl RespondPermissionTool {
1110 pub fn new(server: Arc<OnceCell<OrchestratorServer>>) -> Self {
1112 Self { server }
1113 }
1114}
1115
1116impl Tool for RespondPermissionTool {
1117 type Input = RespondPermissionInput;
1118 type Output = RespondPermissionOutput;
1119 const NAME: &'static str = "respond_permission";
1120 const DESCRIPTION: &'static str = r#"Respond to a permission request from an OpenCode session.
1121
1122After responding, continues monitoring the session and returns when complete or when another permission is required.
1123
1124Parameters:
1125- session_id: Session with pending permission
1126- reply: "once" (allow this request), "always" (allow for matching patterns), or "reject" (deny)
1127- message: Optional message to include with reply"#;
1128
1129 fn call(
1130 &self,
1131 input: Self::Input,
1132 _ctx: &ToolContext,
1133 ) -> BoxFuture<'static, Result<Self::Output, ToolError>> {
1134 let server_cell = Arc::clone(&self.server);
1135 Box::pin(async move {
1136 let timer = CallTimer::start();
1137 let request = input.clone();
1138 let result: Result<(RespondPermissionOutput, ToolLogMeta), ToolError> = async {
1139 let server = server_cell
1140 .get_or_try_init(OrchestratorServer::start_lazy)
1141 .await
1142 .map_err(|e| ToolError::Internal(e.to_string()))?;
1143
1144 let client = server.client();
1145
1146 let mut pending =
1148 client.permissions().list().await.map_err(|e| {
1149 ToolError::Internal(format!("Failed to list permissions: {e}"))
1150 })?;
1151
1152 let perm = if let Some(req_id) = input.permission_request_id.as_deref() {
1153 let idx = pending.iter().position(|p| p.id == req_id).ok_or_else(|| {
1154 ToolError::InvalidInput(format!(
1155 "No pending permission found with id '{req_id}'. \
1156 (session_id='{}')",
1157 input.session_id
1158 ))
1159 })?;
1160
1161 let perm = pending.remove(idx);
1162
1163 if perm.session_id != input.session_id {
1164 return Err(ToolError::InvalidInput(format!(
1165 "Permission request '{req_id}' belongs to session '{}', not '{}'.",
1166 perm.session_id, input.session_id
1167 )));
1168 }
1169
1170 perm
1171 } else {
1172 let mut perms: Vec<_> = pending
1173 .into_iter()
1174 .filter(|p| p.session_id == input.session_id)
1175 .collect();
1176
1177 match perms.as_slice() {
1178 [] => {
1179 return Err(ToolError::InvalidInput(format!(
1180 "No pending permission found for session '{}'. \
1181 The permission may have already been responded to.",
1182 input.session_id
1183 )));
1184 }
1185 [_single] => perms.swap_remove(0),
1186 multiple => {
1187 let ids = multiple
1188 .iter()
1189 .map(|p| p.id.as_str())
1190 .collect::<Vec<_>>()
1191 .join(", ");
1192 return Err(ToolError::InvalidInput(format!(
1193 "Multiple pending permissions found for session '{}': {ids}. \
1194 Please retry with permission_request_id (returned by run).",
1195 input.session_id
1196 )));
1197 }
1198 }
1199 };
1200
1201 let is_reject = matches!(input.reply, PermissionReply::Reject);
1203
1204 let permission_type = perm.permission.clone();
1206 let permission_patterns = perm.patterns.clone();
1207
1208 let mut pre_warnings: Vec<String> = Vec::new();
1211 let baseline = if is_reject {
1212 match client.messages().list(&input.session_id).await {
1213 Ok(msgs) => OrchestratorServer::extract_assistant_text(&msgs),
1214 Err(e) => {
1215 pre_warnings.push(format!("Failed to fetch baseline messages: {e}"));
1216 None
1217 }
1218 }
1219 } else {
1220 None
1221 };
1222
1223 let api_reply = match input.reply {
1225 PermissionReply::Once => ApiPermissionReply::Once,
1226 PermissionReply::Always => ApiPermissionReply::Always,
1227 PermissionReply::Reject => ApiPermissionReply::Reject,
1228 };
1229
1230 client
1232 .permissions()
1233 .reply(
1234 &perm.id,
1235 &PermissionReplyRequest {
1236 reply: api_reply,
1237 message: input.message,
1238 },
1239 )
1240 .await
1241 .map_err(|e| {
1242 ToolError::Internal(format!("Failed to reply to permission: {e}"))
1243 })?;
1244
1245 let run_tool = OrchestratorRunTool::new(Arc::clone(&server_cell));
1247 let wait_for_activity = (!is_reject).then_some(true);
1248 let outcome = run_tool
1249 .run_impl_outcome(OrchestratorRunInput {
1250 session_id: Some(input.session_id),
1251 command: None,
1252 message: None,
1253 wait_for_activity,
1254 })
1255 .await?;
1256 let mut out = outcome.output;
1257
1258 out.warnings.extend(pre_warnings);
1260
1261 if is_reject && matches!(out.status, RunStatus::Completed) {
1263 let final_resp = out.response.as_deref();
1264 let baseline_resp = baseline.as_deref();
1265
1266 if final_resp.is_none() || final_resp == baseline_resp {
1268 out.response = None;
1269 let patterns_str = if permission_patterns.is_empty() {
1270 "(none)".to_string()
1271 } else {
1272 permission_patterns.join(", ")
1273 };
1274 out.warnings.push(format!(
1275 "Permission rejected for '{permission_type}'. Patterns: {patterns_str}. \
1276 Session stopped without generating a new assistant response."
1277 ));
1278 tracing::debug!(
1279 permission_type = %permission_type,
1280 "rejection override applied: response set to None"
1281 );
1282 }
1283 }
1284
1285 Ok((out, outcome.log_meta))
1286 }
1287 .await;
1288
1289 match result {
1290 Ok((output, log_meta)) => {
1291 log_tool_success(&timer, Self::NAME, &request, &output, log_meta, true);
1292 Ok(output)
1293 }
1294 Err(error) => {
1295 log_tool_error(&timer, Self::NAME, &request, &error);
1296 Err(error)
1297 }
1298 }
1299 })
1300 }
1301}
1302
1303#[derive(Clone)]
1308pub struct RespondQuestionTool {
1309 server: Arc<OnceCell<OrchestratorServer>>,
1310}
1311
1312impl RespondQuestionTool {
1313 pub fn new(server: Arc<OnceCell<OrchestratorServer>>) -> Self {
1314 Self { server }
1315 }
1316}
1317
1318impl Tool for RespondQuestionTool {
1319 type Input = RespondQuestionInput;
1320 type Output = RespondQuestionOutput;
1321 const NAME: &'static str = "respond_question";
1322 const DESCRIPTION: &'static str = r#"Respond to a question request from an OpenCode session.
1323
1324After replying, continues monitoring the session and returns when complete or when another interruption is required.
1325
1326Parameters:
1327- session_id: Session with pending question
1328- action: "reply" or "reject"
1329- answers: Required when action=reply; one list per question"#;
1330
1331 fn call(
1332 &self,
1333 input: Self::Input,
1334 _ctx: &ToolContext,
1335 ) -> BoxFuture<'static, Result<Self::Output, ToolError>> {
1336 let server_cell = Arc::clone(&self.server);
1337 Box::pin(async move {
1338 let timer = CallTimer::start();
1339 let request = input.clone();
1340 let result: Result<(RespondQuestionOutput, ToolLogMeta), ToolError> = async {
1341 let server = server_cell
1342 .get_or_try_init(OrchestratorServer::start_lazy)
1343 .await
1344 .map_err(|e| ToolError::Internal(e.to_string()))?;
1345
1346 let client = server.client();
1347 let mut pending = client
1348 .question()
1349 .list()
1350 .await
1351 .map_err(|e| ToolError::Internal(format!("Failed to list questions: {e}")))?;
1352
1353 let question = if let Some(req_id) = input.question_request_id.as_deref() {
1354 let idx = pending
1355 .iter()
1356 .position(|question| question.id == req_id)
1357 .ok_or_else(|| {
1358 ToolError::InvalidInput(format!(
1359 "No pending question found with id '{req_id}'. (session_id='{}')",
1360 input.session_id
1361 ))
1362 })?;
1363
1364 let question = pending.remove(idx);
1365 if question.session_id != input.session_id {
1366 return Err(ToolError::InvalidInput(format!(
1367 "Question request '{req_id}' belongs to session '{}', not '{}'.",
1368 question.session_id, input.session_id
1369 )));
1370 }
1371
1372 question
1373 } else {
1374 let mut questions: Vec<_> = pending
1375 .into_iter()
1376 .filter(|question| question.session_id == input.session_id)
1377 .collect();
1378
1379 match questions.as_slice() {
1380 [] => {
1381 return Err(ToolError::InvalidInput(format!(
1382 "No pending question found for session '{}'. The question may have already been responded to.",
1383 input.session_id
1384 )));
1385 }
1386 [_single] => questions.swap_remove(0),
1387 multiple => {
1388 let ids = multiple
1389 .iter()
1390 .map(|question| question.id.as_str())
1391 .collect::<Vec<_>>()
1392 .join(", ");
1393 return Err(ToolError::InvalidInput(format!(
1394 "Multiple pending questions found for session '{}': {ids}. Please retry with question_request_id (returned by run).",
1395 input.session_id
1396 )));
1397 }
1398 }
1399 };
1400
1401 match input.action {
1402 QuestionAction::Reply => {
1403 if input.answers.is_empty() {
1404 return Err(ToolError::InvalidInput(
1405 "answers is required when action=reply".into(),
1406 ));
1407 }
1408
1409 client
1410 .question()
1411 .reply(
1412 &question.id,
1413 &QuestionReply {
1414 answers: input.answers,
1415 },
1416 )
1417 .await
1418 .map_err(|e| {
1419 ToolError::Internal(format!("Failed to reply to question: {e}"))
1420 })?;
1421
1422 let outcome = OrchestratorRunTool::new(Arc::clone(&server_cell))
1423 .run_impl_outcome(OrchestratorRunInput {
1424 session_id: Some(input.session_id),
1425 command: None,
1426 message: None,
1427 wait_for_activity: Some(true),
1428 })
1429 .await?;
1430 Ok((outcome.output, outcome.log_meta))
1431 }
1432 QuestionAction::Reject => {
1433 client.question().reject(&question.id).await.map_err(|e| {
1434 ToolError::Internal(format!("Failed to reject question: {e}"))
1435 })?;
1436
1437 let outcome = OrchestratorRunTool::new(Arc::clone(&server_cell))
1438 .run_impl_outcome(OrchestratorRunInput {
1439 session_id: Some(input.session_id),
1440 command: None,
1441 message: None,
1442 wait_for_activity: None,
1443 })
1444 .await?;
1445 Ok((outcome.output, outcome.log_meta))
1446 }
1447 }
1448 }
1449 .await;
1450
1451 match result {
1452 Ok((output, log_meta)) => {
1453 log_tool_success(&timer, Self::NAME, &request, &output, log_meta, true);
1454 Ok(output)
1455 }
1456 Err(error) => {
1457 log_tool_error(&timer, Self::NAME, &request, &error);
1458 Err(error)
1459 }
1460 }
1461 })
1462 }
1463}
1464
1465pub fn build_registry(server: &Arc<OnceCell<OrchestratorServer>>) -> ToolRegistry {
1473 ToolRegistry::builder()
1474 .register::<OrchestratorRunTool, ()>(OrchestratorRunTool::new(Arc::clone(server)))
1475 .register::<ListSessionsTool, ()>(ListSessionsTool::new(Arc::clone(server)))
1476 .register::<ListCommandsTool, ()>(ListCommandsTool::new(Arc::clone(server)))
1477 .register::<RespondPermissionTool, ()>(RespondPermissionTool::new(Arc::clone(server)))
1478 .register::<RespondQuestionTool, ()>(RespondQuestionTool::new(Arc::clone(server)))
1479 .finish()
1480}
1481
1482#[cfg(test)]
1483mod tests {
1484 use super::*;
1485 use agentic_tools_core::Tool;
1486
1487 #[test]
1488 fn tool_names_are_short() {
1489 assert_eq!(<OrchestratorRunTool as Tool>::NAME, "run");
1490 assert_eq!(<ListSessionsTool as Tool>::NAME, "list_sessions");
1491 assert_eq!(<ListCommandsTool as Tool>::NAME, "list_commands");
1492 assert_eq!(<RespondPermissionTool as Tool>::NAME, "respond_permission");
1493 assert_eq!(<RespondQuestionTool as Tool>::NAME, "respond_question");
1494 }
1495}