1use crate::config;
4use crate::logging;
5use crate::server::CommandPolicyDecision;
6use crate::server::OrchestratorServer;
7use crate::server::OrchestratorServerHandle;
8use crate::token_tracker::TokenTracker;
9use crate::types::ChangeStats;
10use crate::types::CommandInfo;
11use crate::types::GetSessionStateInput;
12use crate::types::GetSessionStateOutput;
13use crate::types::ListCommandsInput;
14use crate::types::ListCommandsOutput;
15use crate::types::ListSessionsInput;
16use crate::types::ListSessionsOutput;
17use crate::types::OrchestratorRunInput;
18use crate::types::OrchestratorRunOutput;
19use crate::types::PermissionReply;
20use crate::types::QuestionAction;
21use crate::types::QuestionInfoView;
22use crate::types::QuestionOptionView;
23use crate::types::RespondPermissionInput;
24use crate::types::RespondPermissionOutput;
25use crate::types::RespondQuestionInput;
26use crate::types::RespondQuestionOutput;
27use crate::types::RunStatus;
28use crate::types::SessionStatusSummary;
29use crate::types::SessionSummary;
30use crate::types::ToolCallSummary;
31use crate::types::ToolStateSummary;
32use agentic_logging::CallTimer;
33use agentic_logging::ToolCallRecord;
34use agentic_tools_core::Tool;
35use agentic_tools_core::ToolContext;
36use agentic_tools_core::ToolError;
37use agentic_tools_core::ToolRegistry;
38use agentic_tools_core::fmt::TextFormat;
39use agentic_tools_core::fmt::TextOptions;
40use futures::future::BoxFuture;
41use opencode_rs::types::event::Event;
42use opencode_rs::types::message::CommandRequest;
43use opencode_rs::types::message::Message;
44use opencode_rs::types::message::Part;
45use opencode_rs::types::message::PromptPart;
46use opencode_rs::types::message::PromptRequest;
47use opencode_rs::types::message::ToolState;
48use opencode_rs::types::permission::PermissionReply as ApiPermissionReply;
49use opencode_rs::types::permission::PermissionReplyRequest;
50use opencode_rs::types::question::QuestionReply;
51use opencode_rs::types::question::QuestionRequest;
52use opencode_rs::types::session::CreateSessionRequest;
53use opencode_rs::types::session::SessionStatusInfo;
54use opencode_rs::types::session::SummarizeRequest;
55use serde::Serialize;
56use std::sync::Arc;
57use std::time::Duration;
58use tokio::task::JoinHandle;
59
60const SERVER_NAME: &str = "opencode-orchestrator-mcp";
61
62#[derive(Debug, Clone, Default)]
63struct ToolLogMeta {
64 token_usage: Option<agentic_logging::TokenUsage>,
65 token_usage_saturated: bool,
66}
67
68struct RunOutcome {
69 output: OrchestratorRunOutput,
70 log_meta: ToolLogMeta,
71}
72
73fn blocked_command_error(command: &str, decision: CommandPolicyDecision) -> ToolError {
74 let reason = match decision {
75 CommandPolicyDecision::Allowed => {
76 return ToolError::Internal("command unexpectedly allowed".into());
77 }
78 CommandPolicyDecision::DeniedByAllowlist => {
79 "it is not present in orchestrator.commands.allow"
80 }
81 CommandPolicyDecision::DeniedByDenylist => "it is blocked by orchestrator.commands.deny",
82 };
83
84 ToolError::InvalidInput(format!(
85 "Command '{command}' cannot be run because {reason}."
86 ))
87}
88
89impl RunOutcome {
90 fn without_tokens(output: OrchestratorRunOutput) -> Self {
91 Self {
92 output,
93 log_meta: ToolLogMeta::default(),
94 }
95 }
96
97 fn with_tracker(output: OrchestratorRunOutput, token_tracker: &TokenTracker) -> Self {
98 let (token_usage, token_usage_saturated) = token_tracker.to_log_token_usage();
99 Self {
100 output,
101 log_meta: ToolLogMeta {
102 token_usage,
103 token_usage_saturated,
104 },
105 }
106 }
107}
108
109async fn abort_command_task(task: &mut Option<JoinHandle<Result<(), String>>>) {
110 if let Some(handle) = task.take() {
111 handle.abort();
112 let _ = handle.await;
113 }
114}
115
116fn request_json<T: Serialize>(request: &T) -> serde_json::Value {
117 serde_json::to_value(request)
118 .unwrap_or_else(|error| serde_json::json!({"serialization_error": error.to_string()}))
119}
120
121fn log_tool_success<TReq: Serialize, TOut: TextFormat>(
122 timer: &CallTimer,
123 tool: &str,
124 request: &TReq,
125 output: &TOut,
126 log_meta: ToolLogMeta,
127 write_markdown: bool,
128) {
129 let (completed_at, duration_ms) = timer.finish();
130 let rendered = output.fmt_text(&TextOptions::default());
131 let response_file = write_markdown
132 .then(|| logging::write_markdown_best_effort(completed_at, &timer.call_id, &rendered))
133 .flatten();
134
135 let record = ToolCallRecord {
136 call_id: timer.call_id.clone(),
137 server: SERVER_NAME.into(),
138 tool: tool.into(),
139 started_at: timer.started_at,
140 completed_at,
141 duration_ms,
142 request: request_json(request),
143 response_file,
144 success: true,
145 error: None,
146 failure_kind: None,
147 model: None,
148 token_usage: log_meta.token_usage,
149 summary: log_meta
150 .token_usage_saturated
151 .then(|| serde_json::json!({"token_usage_saturated": true})),
152 };
153
154 logging::append_record_best_effort(&record);
155}
156
157fn log_tool_error<TReq: Serialize>(
158 timer: &CallTimer,
159 tool: &str,
160 request: &TReq,
161 error: &ToolError,
162) {
163 let (completed_at, duration_ms) = timer.finish();
164 let error = error.to_string();
165 let record = ToolCallRecord {
166 call_id: timer.call_id.clone(),
167 server: SERVER_NAME.into(),
168 tool: tool.into(),
169 started_at: timer.started_at,
170 completed_at,
171 duration_ms,
172 request: request_json(request),
173 response_file: None,
174 success: false,
175 error: Some(error.clone()),
176 failure_kind: agentic_logging::classify_failure_kind(false, Some(&error)),
177 model: None,
178 token_usage: None,
179 summary: None,
180 };
181
182 logging::append_record_best_effort(&record);
183}
184
185#[derive(Clone)]
195pub struct OrchestratorRunTool {
196 server: Arc<OrchestratorServerHandle>,
197}
198
199impl OrchestratorRunTool {
200 pub fn new(server: Arc<OrchestratorServerHandle>) -> Self {
202 Self { server }
203 }
204
205 async fn finalize_completed(
214 client: &opencode_rs::Client,
215 session_id: String,
216 token_tracker: &TokenTracker,
217 mut warnings: Vec<String>,
218 ) -> Result<OrchestratorRunOutput, ToolError> {
219 const BACKOFFS_MS: &[u64] = &[0, 50, 100, 200, 400];
221
222 let mut response: Option<String> = None;
223
224 for (attempt, &delay_ms) in BACKOFFS_MS.iter().enumerate() {
225 if delay_ms > 0 {
226 tokio::time::sleep(Duration::from_millis(delay_ms)).await;
227 }
228
229 let messages = client
230 .messages()
231 .list(&session_id)
232 .await
233 .map_err(|e| ToolError::Internal(format!("Failed to list messages: {e}")))?;
234
235 response = OrchestratorServer::extract_assistant_text(&messages);
236
237 if response.is_some() {
238 if attempt > 0 {
239 tracing::debug!(
240 session_id = %session_id,
241 attempt,
242 "assistant response became available after retry"
243 );
244 }
245 break;
246 }
247 }
248
249 if response.is_none() {
250 tracing::debug!(
251 session_id = %session_id,
252 "no assistant response found after bounded retry"
253 );
254 }
255
256 if token_tracker.compaction_needed
258 && let (Some(pid), Some(mid)) = (&token_tracker.provider_id, &token_tracker.model_id)
259 {
260 let summarize_req = SummarizeRequest {
261 provider_id: pid.clone(),
262 model_id: mid.clone(),
263 auto: None,
264 };
265
266 match client
267 .sessions()
268 .summarize(&session_id, &summarize_req)
269 .await
270 {
271 Ok(_) => {
272 tracing::info!(session_id = %session_id, "context summarization triggered");
273 warnings.push("Context limit reached; summarization triggered".into());
274 }
275 Err(e) => {
276 tracing::warn!(session_id = %session_id, error = %e, "summarization failed");
277 warnings.push(format!("Summarization failed: {e}"));
278 }
279 }
280 }
281
282 Ok(OrchestratorRunOutput {
283 session_id,
284 status: RunStatus::Completed,
285 response,
286 partial_response: None,
287 permission_request_id: None,
288 permission_type: None,
289 permission_patterns: vec![],
290 question_request_id: None,
291 questions: vec![],
292 warnings,
293 })
294 }
295
296 fn map_questions(req: &QuestionRequest) -> Vec<QuestionInfoView> {
297 req.questions
298 .iter()
299 .map(|question| QuestionInfoView {
300 question: question.question.clone(),
301 header: question.header.clone(),
302 options: question
303 .options
304 .iter()
305 .map(|option| QuestionOptionView {
306 label: option.label.clone(),
307 description: option.description.clone(),
308 })
309 .collect(),
310 multiple: question.multiple,
311 custom: question.custom,
312 })
313 .collect()
314 }
315
316 fn question_required_output(
317 session_id: String,
318 partial_response: Option<String>,
319 request: &QuestionRequest,
320 warnings: Vec<String>,
321 ) -> OrchestratorRunOutput {
322 OrchestratorRunOutput {
323 session_id,
324 status: RunStatus::QuestionRequired,
325 response: None,
326 partial_response,
327 permission_request_id: None,
328 permission_type: None,
329 permission_patterns: vec![],
330 question_request_id: Some(request.id.clone()),
331 questions: Self::map_questions(request),
332 warnings,
333 }
334 }
335
336 async fn run_impl_outcome(
337 &self,
338 input: OrchestratorRunInput,
339 ctx: &ToolContext,
340 ) -> Result<RunOutcome, ToolError> {
341 if input.session_id.is_none() && input.message.is_none() && input.command.is_none() {
343 return Err(ToolError::InvalidInput(
344 "Either session_id (to resume/check status) or message/command (to start work) is required"
345 .into(),
346 ));
347 }
348
349 if input.command.is_some() && input.message.is_none() {
350 return Err(ToolError::InvalidInput(
351 "message is required when command is specified (becomes $ARGUMENTS for template expansion)"
352 .into(),
353 ));
354 }
355
356 let message = input.message.map(|m| m.trim().to_string());
358 if let Some(ref m) = message
359 && m.is_empty()
360 {
361 return Err(ToolError::InvalidInput(
362 "message cannot be empty or whitespace-only".into(),
363 ));
364 }
365
366 let wait_for_activity = input.wait_for_activity.unwrap_or(false);
367
368 let server = self
370 .server
371 .acquire()
372 .await
373 .map_err(|e| ToolError::Internal(e.to_string()))?;
374
375 if let Some(command) = input.command.as_deref() {
376 let decision = server.command_policy_decision(command);
377 if !decision.is_allowed() {
378 return Err(blocked_command_error(command, decision));
379 }
380 }
381
382 let client = server.client();
383
384 tracing::debug!(
385 command = ?input.command,
386 has_message = message.is_some(),
387 message_len = message.as_ref().map(String::len),
388 session_id = ?input.session_id,
389 "run: starting"
390 );
391
392 let session_id = if let Some(sid) = input.session_id {
394 client.sessions().get(&sid).await.map_err(|e| {
396 if e.is_not_found() {
397 ToolError::InvalidInput(format!(
398 "Session '{sid}' not found. Use list_sessions to discover sessions, \
399 or omit session_id to create a new session."
400 ))
401 } else {
402 ToolError::Internal(format!("Failed to get session: {e}"))
403 }
404 })?;
405 sid
406 } else {
407 let session = client
409 .sessions()
410 .create(&CreateSessionRequest::default())
411 .await
412 .map_err(|e| ToolError::Internal(format!("Failed to create session: {e}")))?;
413
414 {
415 let mut spawned = server.spawned_sessions().write().await;
416 spawned.insert(session.id.clone());
417 }
418
419 session.id
420 };
421
422 tracing::info!(session_id = %session_id, "run: session resolved");
423
424 let status = client
426 .sessions()
427 .status_for(&session_id)
428 .await
429 .map_err(|e| ToolError::Internal(format!("Failed to get session status: {e}")))?;
430
431 let is_idle = matches!(status, SessionStatusInfo::Idle);
432
433 let pending_permissions = client
435 .permissions()
436 .list()
437 .await
438 .map_err(|e| ToolError::Internal(format!("Failed to list permissions: {e}")))?;
439
440 let my_permission = pending_permissions
441 .into_iter()
442 .find(|p| p.session_id == session_id);
443
444 if let Some(perm) = my_permission {
445 tracing::info!(
446 session_id = %session_id,
447 permission_type = %perm.permission,
448 "run: pending permission found"
449 );
450 return Ok(RunOutcome::without_tokens(OrchestratorRunOutput {
451 session_id,
452 status: RunStatus::PermissionRequired,
453 response: None,
454 partial_response: None,
455 permission_request_id: Some(perm.id),
456 permission_type: Some(perm.permission),
457 permission_patterns: perm.patterns,
458 question_request_id: None,
459 questions: vec![],
460 warnings: vec![],
461 }));
462 }
463
464 let pending_questions = client
465 .question()
466 .list()
467 .await
468 .map_err(|e| ToolError::Internal(format!("Failed to list questions: {e}")))?;
469
470 if let Some(question) = pending_questions
471 .into_iter()
472 .find(|question| question.session_id == session_id)
473 {
474 tracing::info!(session_id = %session_id, question_id = %question.id, "run: pending question found");
475 return Ok(RunOutcome::without_tokens(Self::question_required_output(
476 session_id,
477 None,
478 &question,
479 vec![],
480 )));
481 }
482
483 if message.is_none() && input.command.is_none() && is_idle && !wait_for_activity {
486 let token_tracker = TokenTracker::with_threshold(server.compaction_threshold());
487 let output =
488 Self::finalize_completed(client, session_id, &token_tracker, vec![]).await?;
489 return Ok(RunOutcome::with_tracker(output, &token_tracker));
490 }
491
492 let mut subscription = client
494 .subscribe_session(&session_id)
495 .map_err(|e| ToolError::Internal(format!("Failed to subscribe to session: {e}")))?;
496
497 let dispatched_new_work = input.command.is_some() || message.is_some() || wait_for_activity;
500 let idle_grace = config::idle_grace();
501 let mut idle_grace_deadline: Option<tokio::time::Instant> = None;
502 let mut awaiting_idle_grace_check = false;
503
504 if wait_for_activity && input.command.is_none() && message.is_none() {
505 idle_grace_deadline = Some(tokio::time::Instant::now() + idle_grace);
506 }
507
508 let mut command_task: Option<JoinHandle<Result<(), String>>> = None;
510 let mut command_name_for_logging: Option<String> = None;
511
512 if let Some(command) = &input.command {
513 command_name_for_logging = Some(command.clone());
514
515 let cmd_client = client.clone();
516 let cmd_session_id = session_id.clone();
517 let cmd_name = command.clone();
518 let cmd_arguments = message.clone().unwrap_or_default();
519
520 command_task = Some(tokio::spawn(async move {
521 let req = CommandRequest {
522 command: cmd_name,
523 arguments: cmd_arguments,
524 message_id: None,
525 };
526
527 cmd_client
528 .messages()
529 .command(&cmd_session_id, &req)
530 .await
531 .map(|_| ())
532 .map_err(|e| e.to_string())
533 }));
534 } else if let Some(msg) = &message {
535 let req = PromptRequest {
537 parts: vec![PromptPart::Text {
538 text: msg.clone(),
539 synthetic: None,
540 ignored: None,
541 metadata: None,
542 }],
543 message_id: None,
544 model: None,
545 agent: None,
546 no_reply: None,
547 system: None,
548 variant: None,
549 };
550
551 client
552 .messages()
553 .prompt_async(&session_id, &req)
554 .await
555 .map_err(|e| ToolError::Internal(format!("Failed to send prompt: {e}")))?;
556
557 idle_grace_deadline = Some(tokio::time::Instant::now() + idle_grace);
558 }
559
560 let deadline = tokio::time::Instant::now() + server.session_deadline();
563 let inactivity_timeout = server.inactivity_timeout();
564 let mut last_activity_time = tokio::time::Instant::now();
565
566 tracing::debug!(session_id = %session_id, "run: entering event loop");
567 let mut token_tracker = TokenTracker::with_threshold(server.compaction_threshold());
568 let mut partial_response = String::new();
569 let warnings = Vec::new();
570
571 let mut poll_interval = tokio::time::interval(Duration::from_secs(1));
572 poll_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
573
574 let mut observed_busy = false;
578
579 let mut sse_active = true;
582
583 if !dispatched_new_work
588 && let Ok(status) = client.sessions().status_for(&session_id).await
589 && matches!(status, SessionStatusInfo::Idle)
590 {
591 tracing::debug!(
592 session_id = %session_id,
593 "session already idle on post-subscribe check"
594 );
595 let output =
596 Self::finalize_completed(client, session_id, &token_tracker, warnings).await?;
597 return Ok(RunOutcome::with_tracker(output, &token_tracker));
598 }
599 loop {
602 let now = tokio::time::Instant::now();
604
605 if now.duration_since(last_activity_time) >= inactivity_timeout {
606 return Err(ToolError::Internal(format!(
607 "Session idle timeout: no activity for 5 minutes (session_id={session_id}). \
608 The session may still be running; use run(session_id=...) to check status."
609 )));
610 }
611
612 if now >= deadline {
613 return Err(ToolError::Internal(
614 "Session execution timed out after 1 hour. \
615 The session may still be running; use run with the session_id to check status."
616 .into(),
617 ));
618 }
619
620 let command_task_active = command_task.is_some();
621
622 tokio::select! {
623 () = ctx.cancelled() => {
624 abort_command_task(&mut command_task).await;
625 return Err(ToolError::cancelled(None));
626 }
627
628 maybe_event = subscription.recv(), if sse_active => {
629 let Some(event) = maybe_event else {
630 tracing::warn!(
634 session_id = %session_id,
635 "SSE stream closed unexpectedly; falling back to polling-only mode"
636 );
637 sse_active = false;
638 continue; };
640
641 token_tracker.observe_event(&event, |pid, mid| {
643 server.context_limit(pid, mid)
644 });
645
646 match event {
647 Event::PermissionAsked { properties } => {
648 tracing::info!(
649 session_id = %session_id,
650 permission_type = %properties.request.permission,
651 "run: permission requested"
652 );
653 return Ok(RunOutcome::with_tracker(OrchestratorRunOutput {
654 session_id,
655 status: RunStatus::PermissionRequired,
656 response: None,
657 partial_response: if partial_response.is_empty() {
658 None
659 } else {
660 Some(partial_response)
661 },
662 permission_request_id: Some(properties.request.id),
663 permission_type: Some(properties.request.permission),
664 permission_patterns: properties.request.patterns,
665 question_request_id: None,
666 questions: vec![],
667 warnings,
668 }, &token_tracker));
669 }
670
671 Event::QuestionAsked { properties } => {
672 return Ok(RunOutcome::with_tracker(Self::question_required_output(
673 session_id,
674 if partial_response.is_empty() {
675 None
676 } else {
677 Some(partial_response)
678 },
679 &properties.request,
680 warnings,
681 ), &token_tracker));
682 }
683
684 Event::MessagePartDelta { properties } => {
685 last_activity_time = tokio::time::Instant::now();
686 observed_busy = true;
688 awaiting_idle_grace_check = false;
689 if let Some(delta) = &properties.delta {
691 partial_response.push_str(delta);
692 }
693 }
694
695 Event::MessagePartUpdated { .. } | Event::MessageUpdated { .. } => {
696 last_activity_time = tokio::time::Instant::now();
697 observed_busy = true;
698 awaiting_idle_grace_check = false;
699 }
700
701 Event::SessionError { properties } => {
702 let error_msg = properties
703 .error
704 .map_or_else(|| "Unknown error".to_string(), |e| format!("{e:?}"));
705 tracing::error!(
706 session_id = %session_id,
707 error = %error_msg,
708 "run: session error"
709 );
710 return Err(ToolError::Internal(format!("Session error: {error_msg}")));
711 }
712
713 Event::SessionIdle { .. } => {
714 tracing::debug!(session_id = %session_id, "received SessionIdle event");
715 let output = Self::finalize_completed(client, session_id, &token_tracker, warnings).await?;
716 return Ok(RunOutcome::with_tracker(output, &token_tracker));
717 }
718
719 _ => {
720 }
722 }
723 }
724
725 _ = poll_interval.tick() => {
726 let pending = match client.permissions().list().await {
728 Ok(p) => p,
729 Err(e) => {
730 tracing::warn!(
732 session_id = %session_id,
733 error = %e,
734 "failed to list permissions during poll fallback"
735 );
736 vec![]
737 }
738 };
739
740 if let Some(perm) = pending.into_iter().find(|p| p.session_id == session_id) {
741 tracing::debug!(
742 session_id = %session_id,
743 permission_id = %perm.id,
744 "detected pending permission via polling fallback"
745 );
746 return Ok(RunOutcome::with_tracker(OrchestratorRunOutput {
747 session_id,
748 status: RunStatus::PermissionRequired,
749 response: None,
750 partial_response: if partial_response.is_empty() {
751 None
752 } else {
753 Some(partial_response)
754 },
755 permission_request_id: Some(perm.id),
756 permission_type: Some(perm.permission),
757 permission_patterns: perm.patterns,
758 question_request_id: None,
759 questions: vec![],
760 warnings,
761 }, &token_tracker));
762 }
763
764 let pending_questions = match client.question().list().await {
765 Ok(questions) => questions,
766 Err(e) => {
767 tracing::warn!(
768 session_id = %session_id,
769 error = %e,
770 "failed to list questions during poll fallback"
771 );
772 vec![]
773 }
774 };
775
776 if let Some(question) = pending_questions
777 .into_iter()
778 .find(|question| question.session_id == session_id)
779 {
780 tracing::debug!(
781 session_id = %session_id,
782 question_id = %question.id,
783 "detected pending question via polling fallback"
784 );
785 return Ok(RunOutcome::with_tracker(Self::question_required_output(
786 session_id,
787 if partial_response.is_empty() {
788 None
789 } else {
790 Some(partial_response)
791 },
792 &question,
793 warnings,
794 ), &token_tracker));
795 }
796
797 match client.sessions().status_for(&session_id).await {
801 Ok(SessionStatusInfo::Busy | SessionStatusInfo::Retry { .. }) => {
802 last_activity_time = tokio::time::Instant::now();
803 observed_busy = true;
804 awaiting_idle_grace_check = false;
805 tracing::trace!(
806 session_id = %session_id,
807 "our session is busy/retry, waiting"
808 );
809 }
810 Ok(SessionStatusInfo::Idle) => {
811 if !dispatched_new_work || observed_busy {
812 tracing::debug!(
818 session_id = %session_id,
819 dispatched_new_work = dispatched_new_work,
820 observed_busy = observed_busy,
821 "detected session idle via polling fallback"
822 );
823 let output = Self::finalize_completed(client, session_id, &token_tracker, warnings).await?;
824 return Ok(RunOutcome::with_tracker(output, &token_tracker));
825 }
826
827 let Some(deadline) = idle_grace_deadline else {
828 tracing::trace!(
829 session_id = %session_id,
830 command_task_active = command_task_active,
831 "idle seen before dispatch confirmed; waiting"
832 );
833 continue;
834 };
835
836 let now = tokio::time::Instant::now();
837 if now >= deadline {
838 tracing::debug!(
839 session_id = %session_id,
840 idle_grace_ms = idle_grace.as_millis(),
841 "accepting idle via bounded idle grace (no busy observed)"
842 );
843 let output = Self::finalize_completed(client, session_id, &token_tracker, warnings).await?;
844 return Ok(RunOutcome::with_tracker(output, &token_tracker));
845 }
846
847 awaiting_idle_grace_check = true;
848 tracing::trace!(
849 session_id = %session_id,
850 remaining_ms = (deadline - now).as_millis(),
851 "idle detected before busy; waiting for idle-grace deadline"
852 );
853 }
854 Err(e) => {
855 tracing::warn!(
857 session_id = %session_id,
858 error = %e,
859 "failed to get session status during poll fallback"
860 );
861 }
862 }
863 }
864
865 () = async {
866 match idle_grace_deadline {
867 Some(deadline) => tokio::time::sleep_until(deadline).await,
868 None => std::future::pending::<()>().await,
869 }
870 }, if awaiting_idle_grace_check => {
871 awaiting_idle_grace_check = false;
872
873 match client.sessions().status_for(&session_id).await {
874 Ok(SessionStatusInfo::Idle) => {
875 tracing::debug!(session_id = %session_id, "idle-grace deadline reached; finalizing");
876 let output = Self::finalize_completed(client, session_id, &token_tracker, warnings).await?;
877 return Ok(RunOutcome::with_tracker(output, &token_tracker));
878 }
879 Ok(SessionStatusInfo::Busy | SessionStatusInfo::Retry { .. }) => {
880 last_activity_time = tokio::time::Instant::now();
881 observed_busy = true;
882 }
883 Err(e) => {
884 tracing::warn!(
885 session_id = %session_id,
886 error = %e,
887 "status check failed at idle-grace deadline"
888 );
889 }
890 }
891 }
892
893 cmd_result = async {
894 match command_task.as_mut() {
895 Some(handle) => Some(handle.await),
896 None => {
897 std::future::pending::<
898 Option<Result<Result<(), String>, tokio::task::JoinError>>,
899 >()
900 .await
901 }
902 }
903 }, if command_task_active => {
904 match cmd_result {
905 Some(Ok(Ok(()))) => {
906 idle_grace_deadline = Some(tokio::time::Instant::now() + idle_grace);
907 tracing::debug!(
908 session_id = %session_id,
909 command = ?command_name_for_logging,
910 "run: command dispatch completed successfully"
911 );
912 command_task = None;
913 }
914 Some(Ok(Err(e))) => {
915 tracing::error!(
916 session_id = %session_id,
917 command = ?command_name_for_logging,
918 error = %e,
919 "run: command dispatch failed"
920 );
921 return Err(ToolError::Internal(format!(
922 "Failed to execute command '{}': {e}",
923 command_name_for_logging.as_deref().unwrap_or("unknown")
924 )));
925 }
926 Some(Err(join_err)) => {
927 tracing::error!(
928 session_id = %session_id,
929 command = ?command_name_for_logging,
930 error = %join_err,
931 "run: command task panicked"
932 );
933 return Err(ToolError::Internal(format!("Command task panicked: {join_err}")));
934 }
935 None => {
936 unreachable!("command_task_active guard should prevent None");
937 }
938 }
939 }
940 }
941 }
942 }
943}
944
945impl Tool for OrchestratorRunTool {
946 type Input = OrchestratorRunInput;
947 type Output = OrchestratorRunOutput;
948 const NAME: &'static str = "run";
949 const DESCRIPTION: &'static str = r#"Start or resume an OpenCode session. Optionally run a named command or send a raw prompt.
950
951Returns when:
952- status=completed: Session finished executing. Response contains final assistant output.
953- status=permission_required: Session needs permission approval. Call respond_permission to continue.
954- status=question_required: Session needs question answers. Call respond_question to continue.
955
956Parameters:
957- session_id: Existing session to resume (omit to create new)
958- command: OpenCode command name (e.g., "research", "implement_plan")
959- message: Prompt text or $ARGUMENTS for command template
960
961Examples:
962- New session with prompt: run(message="explain this code")
963- New session with command: run(command="research", message="caching strategies")
964- Resume session: run(session_id="...", message="continue")
965- Check status: run(session_id="...")"#;
966
967 fn call(
968 &self,
969 input: Self::Input,
970 ctx: &ToolContext,
971 ) -> BoxFuture<'static, Result<Self::Output, ToolError>> {
972 let this = self.clone();
973 let ctx = ctx.clone();
974 Box::pin(async move {
975 let timer = CallTimer::start();
976 match this.run_impl_outcome(input.clone(), &ctx).await {
977 Ok(outcome) => {
978 log_tool_success(
979 &timer,
980 Self::NAME,
981 &input,
982 &outcome.output,
983 outcome.log_meta,
984 true,
985 );
986 Ok(outcome.output)
987 }
988 Err(error) => {
989 log_tool_error(&timer, Self::NAME, &input, &error);
990 Err(error)
991 }
992 }
993 })
994 }
995}
996
997#[derive(Clone)]
1003pub struct ListSessionsTool {
1004 server: Arc<OrchestratorServerHandle>,
1005}
1006
1007impl ListSessionsTool {
1008 pub fn new(server: Arc<OrchestratorServerHandle>) -> Self {
1010 Self { server }
1011 }
1012}
1013
1014impl Tool for ListSessionsTool {
1015 type Input = ListSessionsInput;
1016 type Output = ListSessionsOutput;
1017 const NAME: &'static str = "list_sessions";
1018 const DESCRIPTION: &'static str =
1019 "List available OpenCode sessions in the current directory context.";
1020
1021 fn call(
1022 &self,
1023 input: Self::Input,
1024 _ctx: &ToolContext,
1025 ) -> BoxFuture<'static, Result<Self::Output, ToolError>> {
1026 let server_handle = Arc::clone(&self.server);
1027 Box::pin(async move {
1028 let timer = CallTimer::start();
1029 let result: Result<ListSessionsOutput, ToolError> = async {
1030 let server = server_handle
1031 .acquire()
1032 .await
1033 .map_err(|e| ToolError::Internal(e.to_string()))?;
1034
1035 let sessions =
1036 server.client().sessions().list().await.map_err(|e| {
1038 ToolError::Internal(format!("Failed to list sessions: {e}"))
1039 })?;
1040 let status_map = server.client().sessions().status_map().await.ok();
1041 let spawned = server.spawned_sessions().read().await;
1042
1043 let limit = input.limit.unwrap_or(20);
1044 let summaries: Vec<SessionSummary> = sessions
1045 .into_iter()
1046 .take(limit)
1047 .map(|s| {
1048 let status =
1049 status_map
1050 .as_ref()
1051 .map(|status_map| match status_map.get(&s.id) {
1052 Some(SessionStatusInfo::Busy) => SessionStatusSummary::Busy,
1053 Some(SessionStatusInfo::Retry {
1054 attempt,
1055 message,
1056 next,
1057 }) => SessionStatusSummary::Retry {
1058 attempt: *attempt,
1059 message: message.clone(),
1060 next: *next,
1061 },
1062 Some(SessionStatusInfo::Idle) | None => {
1063 SessionStatusSummary::Idle
1064 }
1065 });
1066
1067 let change_stats = s.summary.as_ref().map(|summary| ChangeStats {
1068 additions: summary.additions,
1069 deletions: summary.deletions,
1070 files: summary.files,
1071 });
1072
1073 SessionSummary {
1074 launched_by_you: spawned.contains(&s.id),
1075 created: s.time.as_ref().map(|t| t.created),
1076 updated: s.time.as_ref().map(|t| t.updated),
1077 directory: s.directory,
1078 path: s.path,
1079 title: s.title,
1080 id: s.id,
1081 status,
1082 change_stats,
1083 }
1084 })
1085 .collect();
1086
1087 Ok(ListSessionsOutput {
1088 sessions: summaries,
1089 })
1090 }
1091 .await;
1092
1093 match result {
1094 Ok(output) => {
1095 log_tool_success(
1096 &timer,
1097 Self::NAME,
1098 &input,
1099 &output,
1100 ToolLogMeta::default(),
1101 false,
1102 );
1103 Ok(output)
1104 }
1105 Err(error) => {
1106 log_tool_error(&timer, Self::NAME, &input, &error);
1107 Err(error)
1108 }
1109 }
1110 })
1111 }
1112}
1113
1114fn count_pending_messages(messages: &[Message]) -> usize {
1115 let mut pending = 0;
1116
1117 for message in messages.iter().rev() {
1118 if message.role() == "user" {
1119 pending += 1;
1120 } else if message.role() == "assistant" {
1121 break;
1122 }
1123 }
1124
1125 pending
1126}
1127
1128fn get_last_activity_time(messages: &[Message], fallback: Option<i64>) -> Option<i64> {
1129 messages.last().map_or(fallback, |message| {
1130 Some(
1131 message
1132 .info
1133 .time
1134 .completed
1135 .unwrap_or(message.info.time.created),
1136 )
1137 })
1138}
1139
1140fn extract_recent_tool_calls(messages: &[Message], limit: usize) -> Vec<ToolCallSummary> {
1141 let mut tool_calls = Vec::new();
1142
1143 for message in messages.iter().rev() {
1144 for part in message.parts.iter().rev() {
1145 if let Part::Tool {
1146 call_id,
1147 tool,
1148 state,
1149 ..
1150 } = part
1151 {
1152 let (state, started_at, completed_at) = match state {
1153 Some(ToolState::Running(running)) => {
1154 (ToolStateSummary::Running, Some(running.time.start), None)
1155 }
1156 Some(ToolState::Completed(completed)) => (
1157 ToolStateSummary::Completed,
1158 Some(completed.time.start),
1159 Some(completed.time.end),
1160 ),
1161 Some(ToolState::Error(error)) => (
1162 ToolStateSummary::Error {
1163 message: error.error.clone(),
1164 },
1165 Some(error.time.start),
1166 Some(error.time.end),
1167 ),
1168 _ => (ToolStateSummary::Pending, None, None),
1169 };
1170
1171 tool_calls.push(ToolCallSummary {
1172 call_id: call_id.clone(),
1173 tool_name: tool.clone(),
1174 state,
1175 started_at,
1176 completed_at,
1177 });
1178
1179 if tool_calls.len() >= limit {
1180 return tool_calls;
1181 }
1182 }
1183 }
1184 }
1185
1186 tool_calls
1187}
1188
1189#[derive(Clone)]
1191pub struct GetSessionStateTool {
1192 server: Arc<OrchestratorServerHandle>,
1193}
1194
1195impl GetSessionStateTool {
1196 pub fn new(server: Arc<OrchestratorServerHandle>) -> Self {
1197 Self { server }
1198 }
1199}
1200
1201impl Tool for GetSessionStateTool {
1202 type Input = GetSessionStateInput;
1203 type Output = GetSessionStateOutput;
1204 const NAME: &'static str = "get_session_state";
1205 const DESCRIPTION: &'static str = "Get detailed state of a specific session including status, pending messages, and recent tool calls.";
1206
1207 fn call(
1208 &self,
1209 input: Self::Input,
1210 _ctx: &ToolContext,
1211 ) -> BoxFuture<'static, Result<Self::Output, ToolError>> {
1212 let server_handle = Arc::clone(&self.server);
1213 Box::pin(async move {
1214 let timer = CallTimer::start();
1215 let result: Result<GetSessionStateOutput, ToolError> = async {
1216 let server = server_handle
1217 .acquire()
1218 .await
1219 .map_err(|e| ToolError::Internal(e.to_string()))?;
1220
1221 let client = server.client();
1222 let session_id = &input.session_id;
1223
1224 let session = client.sessions().get(session_id).await.map_err(|e| {
1225 if e.is_not_found() {
1226 ToolError::InvalidInput(format!(
1227 "Session '{session_id}' not found. Use list_sessions to discover available sessions."
1228 ))
1229 } else {
1230 ToolError::Internal(format!("Failed to get session: {e}"))
1231 }
1232 })?;
1233
1234 let status = match client.sessions().status_for(session_id).await.map_err(|e| {
1235 ToolError::Internal(format!("Failed to get session status: {e}"))
1236 })? {
1237 SessionStatusInfo::Busy => SessionStatusSummary::Busy,
1238 SessionStatusInfo::Retry {
1239 attempt,
1240 message,
1241 next,
1242 } => SessionStatusSummary::Retry {
1243 attempt,
1244 message,
1245 next,
1246 },
1247 SessionStatusInfo::Idle => SessionStatusSummary::Idle,
1248 };
1249
1250 let messages = client.messages().list(session_id).await.map_err(|e| {
1251 ToolError::Internal(format!("Failed to list messages: {e}"))
1252 })?;
1253 let pending_message_count = count_pending_messages(&messages);
1254 let last_activity = get_last_activity_time(
1255 &messages,
1256 session.time.as_ref().map(|time| time.updated),
1257 );
1258 let recent_tool_calls = extract_recent_tool_calls(&messages, 10);
1259
1260 let spawned = server.spawned_sessions().read().await;
1261 let launched_by_you = spawned.contains(session_id);
1262
1263 Ok(GetSessionStateOutput {
1264 session_id: session.id,
1265 title: session.title,
1266 directory: session.directory,
1267 path: session.path,
1268 status,
1269 launched_by_you,
1270 pending_message_count,
1271 last_activity,
1272 recent_tool_calls,
1273 })
1274 }
1275 .await;
1276
1277 match result {
1278 Ok(output) => {
1279 log_tool_success(
1280 &timer,
1281 Self::NAME,
1282 &input,
1283 &output,
1284 ToolLogMeta::default(),
1285 false,
1286 );
1287 Ok(output)
1288 }
1289 Err(error) => {
1290 log_tool_error(&timer, Self::NAME, &input, &error);
1291 Err(error)
1292 }
1293 }
1294 })
1295 }
1296}
1297
1298#[derive(Clone)]
1304pub struct ListCommandsTool {
1305 server: Arc<OrchestratorServerHandle>,
1306}
1307
1308impl ListCommandsTool {
1309 pub fn new(server: Arc<OrchestratorServerHandle>) -> Self {
1311 Self { server }
1312 }
1313}
1314
1315impl Tool for ListCommandsTool {
1316 type Input = ListCommandsInput;
1317 type Output = ListCommandsOutput;
1318 const NAME: &'static str = "list_commands";
1319 const DESCRIPTION: &'static str = "List available OpenCode commands that can be used with run.";
1320
1321 fn call(
1322 &self,
1323 input: Self::Input,
1324 _ctx: &ToolContext,
1325 ) -> BoxFuture<'static, Result<Self::Output, ToolError>> {
1326 let server_handle = Arc::clone(&self.server);
1327 Box::pin(async move {
1328 let timer = CallTimer::start();
1329 let result: Result<ListCommandsOutput, ToolError> = async {
1330 let server = server_handle
1331 .acquire()
1332 .await
1333 .map_err(|e| ToolError::Internal(e.to_string()))?;
1334
1335 let commands =
1336 server.client().tools().commands().await.map_err(|e| {
1337 ToolError::Internal(format!("Failed to list commands: {e}"))
1338 })?;
1339
1340 let command_infos: Vec<CommandInfo> = commands
1341 .into_iter()
1342 .filter(|command| server.is_command_allowed(&command.name))
1343 .map(|c| CommandInfo {
1344 name: c.name,
1345 description: c.description,
1346 })
1347 .collect();
1348
1349 Ok(ListCommandsOutput {
1350 commands: command_infos,
1351 })
1352 }
1353 .await;
1354
1355 match result {
1356 Ok(output) => {
1357 log_tool_success(
1358 &timer,
1359 Self::NAME,
1360 &input,
1361 &output,
1362 ToolLogMeta::default(),
1363 false,
1364 );
1365 Ok(output)
1366 }
1367 Err(error) => {
1368 log_tool_error(&timer, Self::NAME, &input, &error);
1369 Err(error)
1370 }
1371 }
1372 })
1373 }
1374}
1375
1376#[derive(Clone)]
1385pub struct RespondPermissionTool {
1386 server: Arc<OrchestratorServerHandle>,
1387}
1388
1389impl RespondPermissionTool {
1390 pub fn new(server: Arc<OrchestratorServerHandle>) -> Self {
1392 Self { server }
1393 }
1394}
1395
1396impl Tool for RespondPermissionTool {
1397 type Input = RespondPermissionInput;
1398 type Output = RespondPermissionOutput;
1399 const NAME: &'static str = "respond_permission";
1400 const DESCRIPTION: &'static str = r#"Respond to a permission request from an OpenCode session.
1401
1402After responding, continues monitoring the session and returns when complete or when another permission is required.
1403
1404Parameters:
1405- session_id: Session with pending permission
1406- reply: "once" (allow this request), "always" (allow for matching patterns), or "reject" (deny)
1407- message: Optional message to include with reply"#;
1408
1409 fn call(
1410 &self,
1411 input: Self::Input,
1412 ctx: &ToolContext,
1413 ) -> BoxFuture<'static, Result<Self::Output, ToolError>> {
1414 let server_handle = Arc::clone(&self.server);
1415 let ctx = ctx.clone();
1416 Box::pin(async move {
1417 let timer = CallTimer::start();
1418 let request = input.clone();
1419 let result: Result<(RespondPermissionOutput, ToolLogMeta), ToolError> = async {
1420 let server = server_handle
1421 .acquire()
1422 .await
1423 .map_err(|e| ToolError::Internal(e.to_string()))?;
1424
1425 let client = server.client();
1426
1427 let mut pending =
1429 client.permissions().list().await.map_err(|e| {
1430 ToolError::Internal(format!("Failed to list permissions: {e}"))
1431 })?;
1432
1433 let perm = if let Some(req_id) = input.permission_request_id.as_deref() {
1434 let idx = pending.iter().position(|p| p.id == req_id).ok_or_else(|| {
1435 ToolError::InvalidInput(format!(
1436 "No pending permission found with id '{req_id}'. \
1437 (session_id='{}')",
1438 input.session_id
1439 ))
1440 })?;
1441
1442 let perm = pending.remove(idx);
1443
1444 if perm.session_id != input.session_id {
1445 return Err(ToolError::InvalidInput(format!(
1446 "Permission request '{req_id}' belongs to session '{}', not '{}'.",
1447 perm.session_id, input.session_id
1448 )));
1449 }
1450
1451 perm
1452 } else {
1453 let mut perms: Vec<_> = pending
1454 .into_iter()
1455 .filter(|p| p.session_id == input.session_id)
1456 .collect();
1457
1458 match perms.as_slice() {
1459 [] => {
1460 return Err(ToolError::InvalidInput(format!(
1461 "No pending permission found for session '{}'. \
1462 The permission may have already been responded to.",
1463 input.session_id
1464 )));
1465 }
1466 [_single] => perms.swap_remove(0),
1467 multiple => {
1468 let ids = multiple
1469 .iter()
1470 .map(|p| p.id.as_str())
1471 .collect::<Vec<_>>()
1472 .join(", ");
1473 return Err(ToolError::InvalidInput(format!(
1474 "Multiple pending permissions found for session '{}': {ids}. \
1475 Please retry with permission_request_id (returned by run).",
1476 input.session_id
1477 )));
1478 }
1479 }
1480 };
1481
1482 let is_reject = matches!(input.reply, PermissionReply::Reject);
1484
1485 let permission_type = perm.permission.clone();
1487 let permission_patterns = perm.patterns.clone();
1488
1489 let mut pre_warnings: Vec<String> = Vec::new();
1492 let baseline = if is_reject {
1493 match client.messages().list(&input.session_id).await {
1494 Ok(msgs) => OrchestratorServer::extract_assistant_text(&msgs),
1495 Err(e) => {
1496 pre_warnings.push(format!("Failed to fetch baseline messages: {e}"));
1497 None
1498 }
1499 }
1500 } else {
1501 None
1502 };
1503
1504 let api_reply = match input.reply {
1506 PermissionReply::Once => ApiPermissionReply::Once,
1507 PermissionReply::Always => ApiPermissionReply::Always,
1508 PermissionReply::Reject => ApiPermissionReply::Reject,
1509 };
1510
1511 client
1513 .permissions()
1514 .reply(
1515 &perm.id,
1516 &PermissionReplyRequest {
1517 reply: api_reply,
1518 message: input.message,
1519 },
1520 )
1521 .await
1522 .map_err(|e| {
1523 ToolError::Internal(format!("Failed to reply to permission: {e}"))
1524 })?;
1525
1526 let run_tool = OrchestratorRunTool::new(Arc::clone(&server_handle));
1528 let wait_for_activity = (!is_reject).then_some(true);
1529 let outcome = run_tool
1530 .run_impl_outcome(
1531 OrchestratorRunInput {
1532 session_id: Some(input.session_id),
1533 command: None,
1534 message: None,
1535 wait_for_activity,
1536 },
1537 &ctx,
1538 )
1539 .await?;
1540 let mut out = outcome.output;
1541
1542 out.warnings.extend(pre_warnings);
1544
1545 if is_reject && matches!(out.status, RunStatus::Completed) {
1547 let final_resp = out.response.as_deref();
1548 let baseline_resp = baseline.as_deref();
1549
1550 if final_resp.is_none() || final_resp == baseline_resp {
1552 out.response = None;
1553 let patterns_str = if permission_patterns.is_empty() {
1554 "(none)".to_string()
1555 } else {
1556 permission_patterns.join(", ")
1557 };
1558 out.warnings.push(format!(
1559 "Permission rejected for '{permission_type}'. Patterns: {patterns_str}. \
1560 Session stopped without generating a new assistant response."
1561 ));
1562 tracing::debug!(
1563 permission_type = %permission_type,
1564 "rejection override applied: response set to None"
1565 );
1566 }
1567 }
1568
1569 Ok((out, outcome.log_meta))
1570 }
1571 .await;
1572
1573 match result {
1574 Ok((output, log_meta)) => {
1575 log_tool_success(&timer, Self::NAME, &request, &output, log_meta, true);
1576 Ok(output)
1577 }
1578 Err(error) => {
1579 log_tool_error(&timer, Self::NAME, &request, &error);
1580 Err(error)
1581 }
1582 }
1583 })
1584 }
1585}
1586
1587#[derive(Clone)]
1592pub struct RespondQuestionTool {
1593 server: Arc<OrchestratorServerHandle>,
1594}
1595
1596impl RespondQuestionTool {
1597 pub fn new(server: Arc<OrchestratorServerHandle>) -> Self {
1598 Self { server }
1599 }
1600}
1601
1602impl Tool for RespondQuestionTool {
1603 type Input = RespondQuestionInput;
1604 type Output = RespondQuestionOutput;
1605 const NAME: &'static str = "respond_question";
1606 const DESCRIPTION: &'static str = r#"Respond to a question request from an OpenCode session.
1607
1608After replying, continues monitoring the session and returns when complete or when another interruption is required.
1609
1610Parameters:
1611- session_id: Session with pending question
1612- action: "reply" or "reject"
1613- answers: Required when action=reply; one list per question"#;
1614
1615 fn call(
1616 &self,
1617 input: Self::Input,
1618 ctx: &ToolContext,
1619 ) -> BoxFuture<'static, Result<Self::Output, ToolError>> {
1620 let server_handle = Arc::clone(&self.server);
1621 let ctx = ctx.clone();
1622 Box::pin(async move {
1623 let timer = CallTimer::start();
1624 let request = input.clone();
1625 let result: Result<(RespondQuestionOutput, ToolLogMeta), ToolError> = async {
1626 let server = server_handle
1627 .acquire()
1628 .await
1629 .map_err(|e| ToolError::Internal(e.to_string()))?;
1630
1631 let client = server.client();
1632 let mut pending = client
1633 .question()
1634 .list()
1635 .await
1636 .map_err(|e| ToolError::Internal(format!("Failed to list questions: {e}")))?;
1637
1638 let question = if let Some(req_id) = input.question_request_id.as_deref() {
1639 let idx = pending
1640 .iter()
1641 .position(|question| question.id == req_id)
1642 .ok_or_else(|| {
1643 ToolError::InvalidInput(format!(
1644 "No pending question found with id '{req_id}'. (session_id='{}')",
1645 input.session_id
1646 ))
1647 })?;
1648
1649 let question = pending.remove(idx);
1650 if question.session_id != input.session_id {
1651 return Err(ToolError::InvalidInput(format!(
1652 "Question request '{req_id}' belongs to session '{}', not '{}'.",
1653 question.session_id, input.session_id
1654 )));
1655 }
1656
1657 question
1658 } else {
1659 let mut questions: Vec<_> = pending
1660 .into_iter()
1661 .filter(|question| question.session_id == input.session_id)
1662 .collect();
1663
1664 match questions.as_slice() {
1665 [] => {
1666 return Err(ToolError::InvalidInput(format!(
1667 "No pending question found for session '{}'. The question may have already been responded to.",
1668 input.session_id
1669 )));
1670 }
1671 [_single] => questions.swap_remove(0),
1672 multiple => {
1673 let ids = multiple
1674 .iter()
1675 .map(|question| question.id.as_str())
1676 .collect::<Vec<_>>()
1677 .join(", ");
1678 return Err(ToolError::InvalidInput(format!(
1679 "Multiple pending questions found for session '{}': {ids}. Please retry with question_request_id (returned by run).",
1680 input.session_id
1681 )));
1682 }
1683 }
1684 };
1685
1686 match input.action {
1687 QuestionAction::Reply => {
1688 if input.answers.is_empty() {
1689 return Err(ToolError::InvalidInput(
1690 "answers is required when action=reply".into(),
1691 ));
1692 }
1693
1694 client
1695 .question()
1696 .reply(
1697 &question.id,
1698 &QuestionReply {
1699 answers: input.answers,
1700 },
1701 )
1702 .await
1703 .map_err(|e| {
1704 ToolError::Internal(format!("Failed to reply to question: {e}"))
1705 })?;
1706
1707 let outcome = OrchestratorRunTool::new(Arc::clone(&server_handle))
1708 .run_impl_outcome(OrchestratorRunInput {
1709 session_id: Some(input.session_id),
1710 command: None,
1711 message: None,
1712 wait_for_activity: Some(true),
1713 }, &ctx)
1714 .await?;
1715 Ok((outcome.output, outcome.log_meta))
1716 }
1717 QuestionAction::Reject => {
1718 client.question().reject(&question.id).await.map_err(|e| {
1719 ToolError::Internal(format!("Failed to reject question: {e}"))
1720 })?;
1721
1722 let outcome = OrchestratorRunTool::new(Arc::clone(&server_handle))
1723 .run_impl_outcome(OrchestratorRunInput {
1724 session_id: Some(input.session_id),
1725 command: None,
1726 message: None,
1727 wait_for_activity: None,
1728 }, &ctx)
1729 .await?;
1730 Ok((outcome.output, outcome.log_meta))
1731 }
1732 }
1733 }
1734 .await;
1735
1736 match result {
1737 Ok((output, log_meta)) => {
1738 log_tool_success(&timer, Self::NAME, &request, &output, log_meta, true);
1739 Ok(output)
1740 }
1741 Err(error) => {
1742 log_tool_error(&timer, Self::NAME, &request, &error);
1743 Err(error)
1744 }
1745 }
1746 })
1747 }
1748}
1749
1750pub fn build_registry(server: &Arc<OrchestratorServerHandle>) -> ToolRegistry {
1758 ToolRegistry::builder()
1759 .register::<OrchestratorRunTool, ()>(OrchestratorRunTool::new(Arc::clone(server)))
1760 .register::<ListSessionsTool, ()>(ListSessionsTool::new(Arc::clone(server)))
1761 .register::<GetSessionStateTool, ()>(GetSessionStateTool::new(Arc::clone(server)))
1762 .register::<ListCommandsTool, ()>(ListCommandsTool::new(Arc::clone(server)))
1763 .register::<RespondPermissionTool, ()>(RespondPermissionTool::new(Arc::clone(server)))
1764 .register::<RespondQuestionTool, ()>(RespondQuestionTool::new(Arc::clone(server)))
1765 .finish()
1766}
1767
1768#[cfg(test)]
1769mod tests {
1770 use super::*;
1771 use agentic_tools_core::Tool;
1772
1773 #[test]
1774 fn tool_names_are_short() {
1775 assert_eq!(<OrchestratorRunTool as Tool>::NAME, "run");
1776 assert_eq!(<ListSessionsTool as Tool>::NAME, "list_sessions");
1777 assert_eq!(<GetSessionStateTool as Tool>::NAME, "get_session_state");
1778 assert_eq!(<ListCommandsTool as Tool>::NAME, "list_commands");
1779 assert_eq!(<RespondPermissionTool as Tool>::NAME, "respond_permission");
1780 assert_eq!(<RespondQuestionTool as Tool>::NAME, "respond_question");
1781 }
1782
1783 #[test]
1784 fn last_activity_falls_back_to_session_timestamp_when_messages_are_empty() {
1785 assert_eq!(get_last_activity_time(&[], Some(1_234)), Some(1_234));
1786 }
1787}