1use crate::config;
4use crate::logging;
5use crate::server::OrchestratorServer;
6use crate::token_tracker::TokenTracker;
7use crate::types::ChangeStats;
8use crate::types::CommandInfo;
9use crate::types::GetSessionStateInput;
10use crate::types::GetSessionStateOutput;
11use crate::types::ListCommandsInput;
12use crate::types::ListCommandsOutput;
13use crate::types::ListSessionsInput;
14use crate::types::ListSessionsOutput;
15use crate::types::OrchestratorRunInput;
16use crate::types::OrchestratorRunOutput;
17use crate::types::PermissionReply;
18use crate::types::QuestionAction;
19use crate::types::QuestionInfoView;
20use crate::types::QuestionOptionView;
21use crate::types::RespondPermissionInput;
22use crate::types::RespondPermissionOutput;
23use crate::types::RespondQuestionInput;
24use crate::types::RespondQuestionOutput;
25use crate::types::RunStatus;
26use crate::types::SessionStatusSummary;
27use crate::types::SessionSummary;
28use crate::types::ToolCallSummary;
29use crate::types::ToolStateSummary;
30use agentic_logging::CallTimer;
31use agentic_logging::ToolCallRecord;
32use agentic_tools_core::Tool;
33use agentic_tools_core::ToolContext;
34use agentic_tools_core::ToolError;
35use agentic_tools_core::ToolRegistry;
36use agentic_tools_core::fmt::TextFormat;
37use agentic_tools_core::fmt::TextOptions;
38use futures::future::BoxFuture;
39use opencode_rs::types::event::Event;
40use opencode_rs::types::message::CommandRequest;
41use opencode_rs::types::message::Message;
42use opencode_rs::types::message::Part;
43use opencode_rs::types::message::PromptPart;
44use opencode_rs::types::message::PromptRequest;
45use opencode_rs::types::message::ToolState;
46use opencode_rs::types::permission::PermissionReply as ApiPermissionReply;
47use opencode_rs::types::permission::PermissionReplyRequest;
48use opencode_rs::types::question::QuestionReply;
49use opencode_rs::types::question::QuestionRequest;
50use opencode_rs::types::session::CreateSessionRequest;
51use opencode_rs::types::session::SessionStatusInfo;
52use opencode_rs::types::session::SummarizeRequest;
53use serde::Serialize;
54use std::sync::Arc;
55use std::time::Duration;
56use tokio::sync::OnceCell;
57use tokio::task::JoinHandle;
58
59const SERVER_NAME: &str = "opencode-orchestrator-mcp";
60
61#[derive(Debug, Clone, Default)]
62struct ToolLogMeta {
63 token_usage: Option<agentic_logging::TokenUsage>,
64 token_usage_saturated: bool,
65}
66
67struct RunOutcome {
68 output: OrchestratorRunOutput,
69 log_meta: ToolLogMeta,
70}
71
72impl RunOutcome {
73 fn without_tokens(output: OrchestratorRunOutput) -> Self {
74 Self {
75 output,
76 log_meta: ToolLogMeta::default(),
77 }
78 }
79
80 fn with_tracker(output: OrchestratorRunOutput, token_tracker: &TokenTracker) -> Self {
81 let (token_usage, token_usage_saturated) = token_tracker.to_log_token_usage();
82 Self {
83 output,
84 log_meta: ToolLogMeta {
85 token_usage,
86 token_usage_saturated,
87 },
88 }
89 }
90}
91
92async fn abort_command_task(task: &mut Option<JoinHandle<Result<(), String>>>) {
93 if let Some(handle) = task.take() {
94 handle.abort();
95 let _ = handle.await;
96 }
97}
98
99fn request_json<T: Serialize>(request: &T) -> serde_json::Value {
100 serde_json::to_value(request)
101 .unwrap_or_else(|error| serde_json::json!({"serialization_error": error.to_string()}))
102}
103
104fn log_tool_success<TReq: Serialize, TOut: TextFormat>(
105 timer: &CallTimer,
106 tool: &str,
107 request: &TReq,
108 output: &TOut,
109 log_meta: ToolLogMeta,
110 write_markdown: bool,
111) {
112 let (completed_at, duration_ms) = timer.finish();
113 let rendered = output.fmt_text(&TextOptions::default());
114 let response_file = write_markdown
115 .then(|| logging::write_markdown_best_effort(completed_at, &timer.call_id, &rendered))
116 .flatten();
117
118 let record = ToolCallRecord {
119 call_id: timer.call_id.clone(),
120 server: SERVER_NAME.into(),
121 tool: tool.into(),
122 started_at: timer.started_at,
123 completed_at,
124 duration_ms,
125 request: request_json(request),
126 response_file,
127 success: true,
128 error: None,
129 model: None,
130 token_usage: log_meta.token_usage,
131 summary: log_meta
132 .token_usage_saturated
133 .then(|| serde_json::json!({"token_usage_saturated": true})),
134 };
135
136 logging::append_record_best_effort(&record);
137}
138
139fn log_tool_error<TReq: Serialize>(
140 timer: &CallTimer,
141 tool: &str,
142 request: &TReq,
143 error: &ToolError,
144) {
145 let (completed_at, duration_ms) = timer.finish();
146 let record = ToolCallRecord {
147 call_id: timer.call_id.clone(),
148 server: SERVER_NAME.into(),
149 tool: tool.into(),
150 started_at: timer.started_at,
151 completed_at,
152 duration_ms,
153 request: request_json(request),
154 response_file: None,
155 success: false,
156 error: Some(error.to_string()),
157 model: None,
158 token_usage: None,
159 summary: None,
160 };
161
162 logging::append_record_best_effort(&record);
163}
164
165#[derive(Clone)]
175pub struct OrchestratorRunTool {
176 server: Arc<OnceCell<OrchestratorServer>>,
177}
178
179impl OrchestratorRunTool {
180 pub fn new(server: Arc<OnceCell<OrchestratorServer>>) -> Self {
182 Self { server }
183 }
184
185 async fn finalize_completed(
194 client: &opencode_rs::Client,
195 session_id: String,
196 token_tracker: &TokenTracker,
197 mut warnings: Vec<String>,
198 ) -> Result<OrchestratorRunOutput, ToolError> {
199 const BACKOFFS_MS: &[u64] = &[0, 50, 100, 200, 400];
201
202 let mut response: Option<String> = None;
203
204 for (attempt, &delay_ms) in BACKOFFS_MS.iter().enumerate() {
205 if delay_ms > 0 {
206 tokio::time::sleep(Duration::from_millis(delay_ms)).await;
207 }
208
209 let messages = client
210 .messages()
211 .list(&session_id)
212 .await
213 .map_err(|e| ToolError::Internal(format!("Failed to list messages: {e}")))?;
214
215 response = OrchestratorServer::extract_assistant_text(&messages);
216
217 if response.is_some() {
218 if attempt > 0 {
219 tracing::debug!(
220 session_id = %session_id,
221 attempt,
222 "assistant response became available after retry"
223 );
224 }
225 break;
226 }
227 }
228
229 if response.is_none() {
230 tracing::debug!(
231 session_id = %session_id,
232 "no assistant response found after bounded retry"
233 );
234 }
235
236 if token_tracker.compaction_needed
238 && let (Some(pid), Some(mid)) = (&token_tracker.provider_id, &token_tracker.model_id)
239 {
240 let summarize_req = SummarizeRequest {
241 provider_id: pid.clone(),
242 model_id: mid.clone(),
243 auto: None,
244 };
245
246 match client
247 .sessions()
248 .summarize(&session_id, &summarize_req)
249 .await
250 {
251 Ok(_) => {
252 tracing::info!(session_id = %session_id, "context summarization triggered");
253 warnings.push("Context limit reached; summarization triggered".into());
254 }
255 Err(e) => {
256 tracing::warn!(session_id = %session_id, error = %e, "summarization failed");
257 warnings.push(format!("Summarization failed: {e}"));
258 }
259 }
260 }
261
262 Ok(OrchestratorRunOutput {
263 session_id,
264 status: RunStatus::Completed,
265 response,
266 partial_response: None,
267 permission_request_id: None,
268 permission_type: None,
269 permission_patterns: vec![],
270 question_request_id: None,
271 questions: vec![],
272 warnings,
273 })
274 }
275
276 fn map_questions(req: &QuestionRequest) -> Vec<QuestionInfoView> {
277 req.questions
278 .iter()
279 .map(|question| QuestionInfoView {
280 question: question.question.clone(),
281 header: question.header.clone(),
282 options: question
283 .options
284 .iter()
285 .map(|option| QuestionOptionView {
286 label: option.label.clone(),
287 description: option.description.clone(),
288 })
289 .collect(),
290 multiple: question.multiple,
291 custom: question.custom,
292 })
293 .collect()
294 }
295
296 fn question_required_output(
297 session_id: String,
298 partial_response: Option<String>,
299 request: &QuestionRequest,
300 warnings: Vec<String>,
301 ) -> OrchestratorRunOutput {
302 OrchestratorRunOutput {
303 session_id,
304 status: RunStatus::QuestionRequired,
305 response: None,
306 partial_response,
307 permission_request_id: None,
308 permission_type: None,
309 permission_patterns: vec![],
310 question_request_id: Some(request.id.clone()),
311 questions: Self::map_questions(request),
312 warnings,
313 }
314 }
315
316 async fn run_impl_outcome(
317 &self,
318 input: OrchestratorRunInput,
319 ctx: &ToolContext,
320 ) -> Result<RunOutcome, ToolError> {
321 if input.session_id.is_none() && input.message.is_none() && input.command.is_none() {
323 return Err(ToolError::InvalidInput(
324 "Either session_id (to resume/check status) or message/command (to start work) is required"
325 .into(),
326 ));
327 }
328
329 if input.command.is_some() && input.message.is_none() {
330 return Err(ToolError::InvalidInput(
331 "message is required when command is specified (becomes $ARGUMENTS for template expansion)"
332 .into(),
333 ));
334 }
335
336 let message = input.message.map(|m| m.trim().to_string());
338 if let Some(ref m) = message
339 && m.is_empty()
340 {
341 return Err(ToolError::InvalidInput(
342 "message cannot be empty or whitespace-only".into(),
343 ));
344 }
345
346 let wait_for_activity = input.wait_for_activity.unwrap_or(false);
347
348 let server = self
350 .server
351 .get_or_try_init(OrchestratorServer::start_lazy)
352 .await
353 .map_err(|e| ToolError::Internal(e.to_string()))?;
354
355 let client = server.client();
356
357 tracing::debug!(
358 command = ?input.command,
359 has_message = message.is_some(),
360 message_len = message.as_ref().map(String::len),
361 session_id = ?input.session_id,
362 "run: starting"
363 );
364
365 let session_id = if let Some(sid) = input.session_id {
367 client.sessions().get(&sid).await.map_err(|e| {
369 if e.is_not_found() {
370 ToolError::InvalidInput(format!(
371 "Session '{sid}' not found. Use list_sessions to discover sessions, \
372 or omit session_id to create a new session."
373 ))
374 } else {
375 ToolError::Internal(format!("Failed to get session: {e}"))
376 }
377 })?;
378 sid
379 } else {
380 let session = client
382 .sessions()
383 .create(&CreateSessionRequest::default())
384 .await
385 .map_err(|e| ToolError::Internal(format!("Failed to create session: {e}")))?;
386
387 {
388 let mut spawned = server.spawned_sessions().write().await;
389 spawned.insert(session.id.clone());
390 }
391
392 session.id
393 };
394
395 tracing::info!(session_id = %session_id, "run: session resolved");
396
397 let status = client
399 .sessions()
400 .status_for(&session_id)
401 .await
402 .map_err(|e| ToolError::Internal(format!("Failed to get session status: {e}")))?;
403
404 let is_idle = matches!(status, SessionStatusInfo::Idle);
405
406 let pending_permissions = client
408 .permissions()
409 .list()
410 .await
411 .map_err(|e| ToolError::Internal(format!("Failed to list permissions: {e}")))?;
412
413 let my_permission = pending_permissions
414 .into_iter()
415 .find(|p| p.session_id == session_id);
416
417 if let Some(perm) = my_permission {
418 tracing::info!(
419 session_id = %session_id,
420 permission_type = %perm.permission,
421 "run: pending permission found"
422 );
423 return Ok(RunOutcome::without_tokens(OrchestratorRunOutput {
424 session_id,
425 status: RunStatus::PermissionRequired,
426 response: None,
427 partial_response: None,
428 permission_request_id: Some(perm.id),
429 permission_type: Some(perm.permission),
430 permission_patterns: perm.patterns,
431 question_request_id: None,
432 questions: vec![],
433 warnings: vec![],
434 }));
435 }
436
437 let pending_questions = client
438 .question()
439 .list()
440 .await
441 .map_err(|e| ToolError::Internal(format!("Failed to list questions: {e}")))?;
442
443 if let Some(question) = pending_questions
444 .into_iter()
445 .find(|question| question.session_id == session_id)
446 {
447 tracing::info!(session_id = %session_id, question_id = %question.id, "run: pending question found");
448 return Ok(RunOutcome::without_tokens(Self::question_required_output(
449 session_id,
450 None,
451 &question,
452 vec![],
453 )));
454 }
455
456 if message.is_none() && input.command.is_none() && is_idle && !wait_for_activity {
459 let token_tracker = TokenTracker::with_threshold(server.compaction_threshold());
460 let output =
461 Self::finalize_completed(client, session_id, &token_tracker, vec![]).await?;
462 return Ok(RunOutcome::with_tracker(output, &token_tracker));
463 }
464
465 let mut subscription = client
467 .subscribe_session(&session_id)
468 .map_err(|e| ToolError::Internal(format!("Failed to subscribe to session: {e}")))?;
469
470 let dispatched_new_work = input.command.is_some() || message.is_some() || wait_for_activity;
473 let idle_grace = config::idle_grace();
474 let mut idle_grace_deadline: Option<tokio::time::Instant> = None;
475 let mut awaiting_idle_grace_check = false;
476
477 if wait_for_activity && input.command.is_none() && message.is_none() {
478 idle_grace_deadline = Some(tokio::time::Instant::now() + idle_grace);
479 }
480
481 let mut command_task: Option<JoinHandle<Result<(), String>>> = None;
483 let mut command_name_for_logging: Option<String> = None;
484
485 if let Some(command) = &input.command {
486 command_name_for_logging = Some(command.clone());
487
488 let cmd_client = client.clone();
489 let cmd_session_id = session_id.clone();
490 let cmd_name = command.clone();
491 let cmd_arguments = message.clone().unwrap_or_default();
492
493 command_task = Some(tokio::spawn(async move {
494 let req = CommandRequest {
495 command: cmd_name,
496 arguments: cmd_arguments,
497 message_id: None,
498 };
499
500 cmd_client
501 .messages()
502 .command(&cmd_session_id, &req)
503 .await
504 .map(|_| ())
505 .map_err(|e| e.to_string())
506 }));
507 } else if let Some(msg) = &message {
508 let req = PromptRequest {
510 parts: vec![PromptPart::Text {
511 text: msg.clone(),
512 synthetic: None,
513 ignored: None,
514 metadata: None,
515 }],
516 message_id: None,
517 model: None,
518 agent: None,
519 no_reply: None,
520 system: None,
521 variant: None,
522 };
523
524 client
525 .messages()
526 .prompt_async(&session_id, &req)
527 .await
528 .map_err(|e| ToolError::Internal(format!("Failed to send prompt: {e}")))?;
529
530 idle_grace_deadline = Some(tokio::time::Instant::now() + idle_grace);
531 }
532
533 let deadline = tokio::time::Instant::now() + server.session_deadline();
536 let inactivity_timeout = server.inactivity_timeout();
537 let mut last_activity_time = tokio::time::Instant::now();
538
539 tracing::debug!(session_id = %session_id, "run: entering event loop");
540 let mut token_tracker = TokenTracker::with_threshold(server.compaction_threshold());
541 let mut partial_response = String::new();
542 let warnings = Vec::new();
543
544 let mut poll_interval = tokio::time::interval(Duration::from_secs(1));
545 poll_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
546
547 let mut observed_busy = false;
551
552 let mut sse_active = true;
555
556 if !dispatched_new_work
561 && let Ok(status) = client.sessions().status_for(&session_id).await
562 && matches!(status, SessionStatusInfo::Idle)
563 {
564 tracing::debug!(
565 session_id = %session_id,
566 "session already idle on post-subscribe check"
567 );
568 let output =
569 Self::finalize_completed(client, session_id, &token_tracker, warnings).await?;
570 return Ok(RunOutcome::with_tracker(output, &token_tracker));
571 }
572 loop {
575 let now = tokio::time::Instant::now();
577
578 if now.duration_since(last_activity_time) >= inactivity_timeout {
579 return Err(ToolError::Internal(format!(
580 "Session idle timeout: no activity for 5 minutes (session_id={session_id}). \
581 The session may still be running; use run(session_id=...) to check status."
582 )));
583 }
584
585 if now >= deadline {
586 return Err(ToolError::Internal(
587 "Session execution timed out after 1 hour. \
588 The session may still be running; use run with the session_id to check status."
589 .into(),
590 ));
591 }
592
593 let command_task_active = command_task.is_some();
594
595 tokio::select! {
596 () = ctx.cancelled() => {
597 abort_command_task(&mut command_task).await;
598 return Err(ToolError::cancelled(None));
599 }
600
601 maybe_event = subscription.recv(), if sse_active => {
602 let Some(event) = maybe_event else {
603 tracing::warn!(
607 session_id = %session_id,
608 "SSE stream closed unexpectedly; falling back to polling-only mode"
609 );
610 sse_active = false;
611 continue; };
613
614 token_tracker.observe_event(&event, |pid, mid| {
616 server.context_limit(pid, mid)
617 });
618
619 match event {
620 Event::PermissionAsked { properties } => {
621 tracing::info!(
622 session_id = %session_id,
623 permission_type = %properties.request.permission,
624 "run: permission requested"
625 );
626 return Ok(RunOutcome::with_tracker(OrchestratorRunOutput {
627 session_id,
628 status: RunStatus::PermissionRequired,
629 response: None,
630 partial_response: if partial_response.is_empty() {
631 None
632 } else {
633 Some(partial_response)
634 },
635 permission_request_id: Some(properties.request.id),
636 permission_type: Some(properties.request.permission),
637 permission_patterns: properties.request.patterns,
638 question_request_id: None,
639 questions: vec![],
640 warnings,
641 }, &token_tracker));
642 }
643
644 Event::QuestionAsked { properties } => {
645 return Ok(RunOutcome::with_tracker(Self::question_required_output(
646 session_id,
647 if partial_response.is_empty() {
648 None
649 } else {
650 Some(partial_response)
651 },
652 &properties.request,
653 warnings,
654 ), &token_tracker));
655 }
656
657 Event::MessagePartDelta { properties } => {
658 last_activity_time = tokio::time::Instant::now();
659 observed_busy = true;
661 awaiting_idle_grace_check = false;
662 if let Some(delta) = &properties.delta {
664 partial_response.push_str(delta);
665 }
666 }
667
668 Event::MessagePartUpdated { .. } | Event::MessageUpdated { .. } => {
669 last_activity_time = tokio::time::Instant::now();
670 observed_busy = true;
671 awaiting_idle_grace_check = false;
672 }
673
674 Event::SessionError { properties } => {
675 let error_msg = properties
676 .error
677 .map_or_else(|| "Unknown error".to_string(), |e| format!("{e:?}"));
678 tracing::error!(
679 session_id = %session_id,
680 error = %error_msg,
681 "run: session error"
682 );
683 return Err(ToolError::Internal(format!("Session error: {error_msg}")));
684 }
685
686 Event::SessionIdle { .. } => {
687 tracing::debug!(session_id = %session_id, "received SessionIdle event");
688 let output = Self::finalize_completed(client, session_id, &token_tracker, warnings).await?;
689 return Ok(RunOutcome::with_tracker(output, &token_tracker));
690 }
691
692 _ => {
693 }
695 }
696 }
697
698 _ = poll_interval.tick() => {
699 let pending = match client.permissions().list().await {
701 Ok(p) => p,
702 Err(e) => {
703 tracing::warn!(
705 session_id = %session_id,
706 error = %e,
707 "failed to list permissions during poll fallback"
708 );
709 vec![]
710 }
711 };
712
713 if let Some(perm) = pending.into_iter().find(|p| p.session_id == session_id) {
714 tracing::debug!(
715 session_id = %session_id,
716 permission_id = %perm.id,
717 "detected pending permission via polling fallback"
718 );
719 return Ok(RunOutcome::with_tracker(OrchestratorRunOutput {
720 session_id,
721 status: RunStatus::PermissionRequired,
722 response: None,
723 partial_response: if partial_response.is_empty() {
724 None
725 } else {
726 Some(partial_response)
727 },
728 permission_request_id: Some(perm.id),
729 permission_type: Some(perm.permission),
730 permission_patterns: perm.patterns,
731 question_request_id: None,
732 questions: vec![],
733 warnings,
734 }, &token_tracker));
735 }
736
737 let pending_questions = match client.question().list().await {
738 Ok(questions) => questions,
739 Err(e) => {
740 tracing::warn!(
741 session_id = %session_id,
742 error = %e,
743 "failed to list questions during poll fallback"
744 );
745 vec![]
746 }
747 };
748
749 if let Some(question) = pending_questions
750 .into_iter()
751 .find(|question| question.session_id == session_id)
752 {
753 tracing::debug!(
754 session_id = %session_id,
755 question_id = %question.id,
756 "detected pending question via polling fallback"
757 );
758 return Ok(RunOutcome::with_tracker(Self::question_required_output(
759 session_id,
760 if partial_response.is_empty() {
761 None
762 } else {
763 Some(partial_response)
764 },
765 &question,
766 warnings,
767 ), &token_tracker));
768 }
769
770 match client.sessions().status_for(&session_id).await {
774 Ok(SessionStatusInfo::Busy | SessionStatusInfo::Retry { .. }) => {
775 last_activity_time = tokio::time::Instant::now();
776 observed_busy = true;
777 awaiting_idle_grace_check = false;
778 tracing::trace!(
779 session_id = %session_id,
780 "our session is busy/retry, waiting"
781 );
782 }
783 Ok(SessionStatusInfo::Idle) => {
784 if !dispatched_new_work || observed_busy {
785 tracing::debug!(
791 session_id = %session_id,
792 dispatched_new_work = dispatched_new_work,
793 observed_busy = observed_busy,
794 "detected session idle via polling fallback"
795 );
796 let output = Self::finalize_completed(client, session_id, &token_tracker, warnings).await?;
797 return Ok(RunOutcome::with_tracker(output, &token_tracker));
798 }
799
800 let Some(deadline) = idle_grace_deadline else {
801 tracing::trace!(
802 session_id = %session_id,
803 command_task_active = command_task_active,
804 "idle seen before dispatch confirmed; waiting"
805 );
806 continue;
807 };
808
809 let now = tokio::time::Instant::now();
810 if now >= deadline {
811 tracing::debug!(
812 session_id = %session_id,
813 idle_grace_ms = idle_grace.as_millis(),
814 "accepting idle via bounded idle grace (no busy observed)"
815 );
816 let output = Self::finalize_completed(client, session_id, &token_tracker, warnings).await?;
817 return Ok(RunOutcome::with_tracker(output, &token_tracker));
818 }
819
820 awaiting_idle_grace_check = true;
821 tracing::trace!(
822 session_id = %session_id,
823 remaining_ms = (deadline - now).as_millis(),
824 "idle detected before busy; waiting for idle-grace deadline"
825 );
826 }
827 Err(e) => {
828 tracing::warn!(
830 session_id = %session_id,
831 error = %e,
832 "failed to get session status during poll fallback"
833 );
834 }
835 }
836 }
837
838 () = async {
839 match idle_grace_deadline {
840 Some(deadline) => tokio::time::sleep_until(deadline).await,
841 None => std::future::pending::<()>().await,
842 }
843 }, if awaiting_idle_grace_check => {
844 awaiting_idle_grace_check = false;
845
846 match client.sessions().status_for(&session_id).await {
847 Ok(SessionStatusInfo::Idle) => {
848 tracing::debug!(session_id = %session_id, "idle-grace deadline reached; finalizing");
849 let output = Self::finalize_completed(client, session_id, &token_tracker, warnings).await?;
850 return Ok(RunOutcome::with_tracker(output, &token_tracker));
851 }
852 Ok(SessionStatusInfo::Busy | SessionStatusInfo::Retry { .. }) => {
853 last_activity_time = tokio::time::Instant::now();
854 observed_busy = true;
855 }
856 Err(e) => {
857 tracing::warn!(
858 session_id = %session_id,
859 error = %e,
860 "status check failed at idle-grace deadline"
861 );
862 }
863 }
864 }
865
866 cmd_result = async {
867 match command_task.as_mut() {
868 Some(handle) => Some(handle.await),
869 None => {
870 std::future::pending::<
871 Option<Result<Result<(), String>, tokio::task::JoinError>>,
872 >()
873 .await
874 }
875 }
876 }, if command_task_active => {
877 match cmd_result {
878 Some(Ok(Ok(()))) => {
879 idle_grace_deadline = Some(tokio::time::Instant::now() + idle_grace);
880 tracing::debug!(
881 session_id = %session_id,
882 command = ?command_name_for_logging,
883 "run: command dispatch completed successfully"
884 );
885 command_task = None;
886 }
887 Some(Ok(Err(e))) => {
888 tracing::error!(
889 session_id = %session_id,
890 command = ?command_name_for_logging,
891 error = %e,
892 "run: command dispatch failed"
893 );
894 return Err(ToolError::Internal(format!(
895 "Failed to execute command '{}': {e}",
896 command_name_for_logging.as_deref().unwrap_or("unknown")
897 )));
898 }
899 Some(Err(join_err)) => {
900 tracing::error!(
901 session_id = %session_id,
902 command = ?command_name_for_logging,
903 error = %join_err,
904 "run: command task panicked"
905 );
906 return Err(ToolError::Internal(format!("Command task panicked: {join_err}")));
907 }
908 None => {
909 unreachable!("command_task_active guard should prevent None");
910 }
911 }
912 }
913 }
914 }
915 }
916}
917
918impl Tool for OrchestratorRunTool {
919 type Input = OrchestratorRunInput;
920 type Output = OrchestratorRunOutput;
921 const NAME: &'static str = "run";
922 const DESCRIPTION: &'static str = r#"Start or resume an OpenCode session. Optionally run a named command or send a raw prompt.
923
924Returns when:
925- status=completed: Session finished executing. Response contains final assistant output.
926- status=permission_required: Session needs permission approval. Call respond_permission to continue.
927- status=question_required: Session needs question answers. Call respond_question to continue.
928
929Parameters:
930- session_id: Existing session to resume (omit to create new)
931- command: OpenCode command name (e.g., "research", "implement_plan")
932- message: Prompt text or $ARGUMENTS for command template
933
934Examples:
935- New session with prompt: run(message="explain this code")
936- New session with command: run(command="research", message="caching strategies")
937- Resume session: run(session_id="...", message="continue")
938- Check status: run(session_id="...")"#;
939
940 fn call(
941 &self,
942 input: Self::Input,
943 ctx: &ToolContext,
944 ) -> BoxFuture<'static, Result<Self::Output, ToolError>> {
945 let this = self.clone();
946 let ctx = ctx.clone();
947 Box::pin(async move {
948 let timer = CallTimer::start();
949 match this.run_impl_outcome(input.clone(), &ctx).await {
950 Ok(outcome) => {
951 log_tool_success(
952 &timer,
953 Self::NAME,
954 &input,
955 &outcome.output,
956 outcome.log_meta,
957 true,
958 );
959 Ok(outcome.output)
960 }
961 Err(error) => {
962 log_tool_error(&timer, Self::NAME, &input, &error);
963 Err(error)
964 }
965 }
966 })
967 }
968}
969
970#[derive(Clone)]
976pub struct ListSessionsTool {
977 server: Arc<OnceCell<OrchestratorServer>>,
978}
979
980impl ListSessionsTool {
981 pub fn new(server: Arc<OnceCell<OrchestratorServer>>) -> Self {
983 Self { server }
984 }
985}
986
987impl Tool for ListSessionsTool {
988 type Input = ListSessionsInput;
989 type Output = ListSessionsOutput;
990 const NAME: &'static str = "list_sessions";
991 const DESCRIPTION: &'static str =
992 "List available OpenCode sessions in the current directory context.";
993
994 fn call(
995 &self,
996 input: Self::Input,
997 _ctx: &ToolContext,
998 ) -> BoxFuture<'static, Result<Self::Output, ToolError>> {
999 let server_cell = Arc::clone(&self.server);
1000 Box::pin(async move {
1001 let timer = CallTimer::start();
1002 let result: Result<ListSessionsOutput, ToolError> = async {
1003 let server = server_cell
1004 .get_or_try_init(OrchestratorServer::start_lazy)
1005 .await
1006 .map_err(|e| ToolError::Internal(e.to_string()))?;
1007
1008 let sessions =
1009 server.client().sessions().list().await.map_err(|e| {
1010 ToolError::Internal(format!("Failed to list sessions: {e}"))
1011 })?;
1012 let status_map = server.client().sessions().status_map().await.ok();
1013 let spawned = server.spawned_sessions().read().await;
1014
1015 let limit = input.limit.unwrap_or(20);
1016 let summaries: Vec<SessionSummary> = sessions
1017 .into_iter()
1018 .take(limit)
1019 .map(|s| {
1020 let status =
1021 status_map
1022 .as_ref()
1023 .map(|status_map| match status_map.get(&s.id) {
1024 Some(SessionStatusInfo::Busy) => SessionStatusSummary::Busy,
1025 Some(SessionStatusInfo::Retry {
1026 attempt,
1027 message,
1028 next,
1029 }) => SessionStatusSummary::Retry {
1030 attempt: *attempt,
1031 message: message.clone(),
1032 next: *next,
1033 },
1034 Some(SessionStatusInfo::Idle) | None => {
1035 SessionStatusSummary::Idle
1036 }
1037 });
1038
1039 let change_stats = s.summary.as_ref().map(|summary| ChangeStats {
1040 additions: summary.additions,
1041 deletions: summary.deletions,
1042 files: summary.files,
1043 });
1044
1045 SessionSummary {
1046 launched_by_you: spawned.contains(&s.id),
1047 created: s.time.as_ref().map(|t| t.created),
1048 updated: s.time.as_ref().map(|t| t.updated),
1049 directory: s.directory,
1050 title: s.title,
1051 id: s.id,
1052 status,
1053 change_stats,
1054 }
1055 })
1056 .collect();
1057
1058 Ok(ListSessionsOutput {
1059 sessions: summaries,
1060 })
1061 }
1062 .await;
1063
1064 match result {
1065 Ok(output) => {
1066 log_tool_success(
1067 &timer,
1068 Self::NAME,
1069 &input,
1070 &output,
1071 ToolLogMeta::default(),
1072 false,
1073 );
1074 Ok(output)
1075 }
1076 Err(error) => {
1077 log_tool_error(&timer, Self::NAME, &input, &error);
1078 Err(error)
1079 }
1080 }
1081 })
1082 }
1083}
1084
1085fn count_pending_messages(messages: &[Message]) -> usize {
1086 let mut pending = 0;
1087
1088 for message in messages.iter().rev() {
1089 if message.role() == "user" {
1090 pending += 1;
1091 } else if message.role() == "assistant" {
1092 break;
1093 }
1094 }
1095
1096 pending
1097}
1098
1099fn get_last_activity_time(messages: &[Message], fallback: Option<i64>) -> Option<i64> {
1100 messages.last().map_or(fallback, |message| {
1101 Some(
1102 message
1103 .info
1104 .time
1105 .completed
1106 .unwrap_or(message.info.time.created),
1107 )
1108 })
1109}
1110
1111fn extract_recent_tool_calls(messages: &[Message], limit: usize) -> Vec<ToolCallSummary> {
1112 let mut tool_calls = Vec::new();
1113
1114 for message in messages.iter().rev() {
1115 for part in message.parts.iter().rev() {
1116 if let Part::Tool {
1117 call_id,
1118 tool,
1119 state,
1120 ..
1121 } = part
1122 {
1123 let (state, started_at, completed_at) = match state {
1124 Some(ToolState::Running(running)) => {
1125 (ToolStateSummary::Running, Some(running.time.start), None)
1126 }
1127 Some(ToolState::Completed(completed)) => (
1128 ToolStateSummary::Completed,
1129 Some(completed.time.start),
1130 Some(completed.time.end),
1131 ),
1132 Some(ToolState::Error(error)) => (
1133 ToolStateSummary::Error {
1134 message: error.error.clone(),
1135 },
1136 Some(error.time.start),
1137 Some(error.time.end),
1138 ),
1139 _ => (ToolStateSummary::Pending, None, None),
1140 };
1141
1142 tool_calls.push(ToolCallSummary {
1143 call_id: call_id.clone(),
1144 tool_name: tool.clone(),
1145 state,
1146 started_at,
1147 completed_at,
1148 });
1149
1150 if tool_calls.len() >= limit {
1151 return tool_calls;
1152 }
1153 }
1154 }
1155 }
1156
1157 tool_calls
1158}
1159
1160#[derive(Clone)]
1162pub struct GetSessionStateTool {
1163 server: Arc<OnceCell<OrchestratorServer>>,
1164}
1165
1166impl GetSessionStateTool {
1167 pub fn new(server: Arc<OnceCell<OrchestratorServer>>) -> Self {
1168 Self { server }
1169 }
1170}
1171
1172impl Tool for GetSessionStateTool {
1173 type Input = GetSessionStateInput;
1174 type Output = GetSessionStateOutput;
1175 const NAME: &'static str = "get_session_state";
1176 const DESCRIPTION: &'static str = "Get detailed state of a specific session including status, pending messages, and recent tool calls.";
1177
1178 fn call(
1179 &self,
1180 input: Self::Input,
1181 _ctx: &ToolContext,
1182 ) -> BoxFuture<'static, Result<Self::Output, ToolError>> {
1183 let server_cell = Arc::clone(&self.server);
1184 Box::pin(async move {
1185 let timer = CallTimer::start();
1186 let result: Result<GetSessionStateOutput, ToolError> = async {
1187 let server = server_cell
1188 .get_or_try_init(OrchestratorServer::start_lazy)
1189 .await
1190 .map_err(|e| ToolError::Internal(e.to_string()))?;
1191
1192 let client = server.client();
1193 let session_id = &input.session_id;
1194
1195 let session = client.sessions().get(session_id).await.map_err(|e| {
1196 if e.is_not_found() {
1197 ToolError::InvalidInput(format!(
1198 "Session '{session_id}' not found. Use list_sessions to discover available sessions."
1199 ))
1200 } else {
1201 ToolError::Internal(format!("Failed to get session: {e}"))
1202 }
1203 })?;
1204
1205 let status = match client.sessions().status_for(session_id).await.map_err(|e| {
1206 ToolError::Internal(format!("Failed to get session status: {e}"))
1207 })? {
1208 SessionStatusInfo::Busy => SessionStatusSummary::Busy,
1209 SessionStatusInfo::Retry {
1210 attempt,
1211 message,
1212 next,
1213 } => SessionStatusSummary::Retry {
1214 attempt,
1215 message,
1216 next,
1217 },
1218 SessionStatusInfo::Idle => SessionStatusSummary::Idle,
1219 };
1220
1221 let messages = client.messages().list(session_id).await.map_err(|e| {
1222 ToolError::Internal(format!("Failed to list messages: {e}"))
1223 })?;
1224 let pending_message_count = count_pending_messages(&messages);
1225 let last_activity = get_last_activity_time(
1226 &messages,
1227 session.time.as_ref().map(|time| time.updated),
1228 );
1229 let recent_tool_calls = extract_recent_tool_calls(&messages, 10);
1230
1231 let spawned = server.spawned_sessions().read().await;
1232 let launched_by_you = spawned.contains(session_id);
1233
1234 Ok(GetSessionStateOutput {
1235 session_id: session.id,
1236 title: session.title,
1237 directory: session.directory,
1238 status,
1239 launched_by_you,
1240 pending_message_count,
1241 last_activity,
1242 recent_tool_calls,
1243 })
1244 }
1245 .await;
1246
1247 match result {
1248 Ok(output) => {
1249 log_tool_success(
1250 &timer,
1251 Self::NAME,
1252 &input,
1253 &output,
1254 ToolLogMeta::default(),
1255 false,
1256 );
1257 Ok(output)
1258 }
1259 Err(error) => {
1260 log_tool_error(&timer, Self::NAME, &input, &error);
1261 Err(error)
1262 }
1263 }
1264 })
1265 }
1266}
1267
1268#[derive(Clone)]
1274pub struct ListCommandsTool {
1275 server: Arc<OnceCell<OrchestratorServer>>,
1276}
1277
1278impl ListCommandsTool {
1279 pub fn new(server: Arc<OnceCell<OrchestratorServer>>) -> Self {
1281 Self { server }
1282 }
1283}
1284
1285impl Tool for ListCommandsTool {
1286 type Input = ListCommandsInput;
1287 type Output = ListCommandsOutput;
1288 const NAME: &'static str = "list_commands";
1289 const DESCRIPTION: &'static str = "List available OpenCode commands that can be used with run.";
1290
1291 fn call(
1292 &self,
1293 input: Self::Input,
1294 _ctx: &ToolContext,
1295 ) -> BoxFuture<'static, Result<Self::Output, ToolError>> {
1296 let server_cell = Arc::clone(&self.server);
1297 Box::pin(async move {
1298 let timer = CallTimer::start();
1299 let result: Result<ListCommandsOutput, ToolError> = async {
1300 let server = server_cell
1301 .get_or_try_init(OrchestratorServer::start_lazy)
1302 .await
1303 .map_err(|e| ToolError::Internal(e.to_string()))?;
1304
1305 let commands =
1306 server.client().tools().commands().await.map_err(|e| {
1307 ToolError::Internal(format!("Failed to list commands: {e}"))
1308 })?;
1309
1310 let command_infos: Vec<CommandInfo> = commands
1311 .into_iter()
1312 .map(|c| CommandInfo {
1313 name: c.name,
1314 description: c.description,
1315 })
1316 .collect();
1317
1318 Ok(ListCommandsOutput {
1319 commands: command_infos,
1320 })
1321 }
1322 .await;
1323
1324 match result {
1325 Ok(output) => {
1326 log_tool_success(
1327 &timer,
1328 Self::NAME,
1329 &input,
1330 &output,
1331 ToolLogMeta::default(),
1332 false,
1333 );
1334 Ok(output)
1335 }
1336 Err(error) => {
1337 log_tool_error(&timer, Self::NAME, &input, &error);
1338 Err(error)
1339 }
1340 }
1341 })
1342 }
1343}
1344
1345#[derive(Clone)]
1354pub struct RespondPermissionTool {
1355 server: Arc<OnceCell<OrchestratorServer>>,
1356}
1357
1358impl RespondPermissionTool {
1359 pub fn new(server: Arc<OnceCell<OrchestratorServer>>) -> Self {
1361 Self { server }
1362 }
1363}
1364
1365impl Tool for RespondPermissionTool {
1366 type Input = RespondPermissionInput;
1367 type Output = RespondPermissionOutput;
1368 const NAME: &'static str = "respond_permission";
1369 const DESCRIPTION: &'static str = r#"Respond to a permission request from an OpenCode session.
1370
1371After responding, continues monitoring the session and returns when complete or when another permission is required.
1372
1373Parameters:
1374- session_id: Session with pending permission
1375- reply: "once" (allow this request), "always" (allow for matching patterns), or "reject" (deny)
1376- message: Optional message to include with reply"#;
1377
1378 fn call(
1379 &self,
1380 input: Self::Input,
1381 ctx: &ToolContext,
1382 ) -> BoxFuture<'static, Result<Self::Output, ToolError>> {
1383 let server_cell = Arc::clone(&self.server);
1384 let ctx = ctx.clone();
1385 Box::pin(async move {
1386 let timer = CallTimer::start();
1387 let request = input.clone();
1388 let result: Result<(RespondPermissionOutput, ToolLogMeta), ToolError> = async {
1389 let server = server_cell
1390 .get_or_try_init(OrchestratorServer::start_lazy)
1391 .await
1392 .map_err(|e| ToolError::Internal(e.to_string()))?;
1393
1394 let client = server.client();
1395
1396 let mut pending =
1398 client.permissions().list().await.map_err(|e| {
1399 ToolError::Internal(format!("Failed to list permissions: {e}"))
1400 })?;
1401
1402 let perm = if let Some(req_id) = input.permission_request_id.as_deref() {
1403 let idx = pending.iter().position(|p| p.id == req_id).ok_or_else(|| {
1404 ToolError::InvalidInput(format!(
1405 "No pending permission found with id '{req_id}'. \
1406 (session_id='{}')",
1407 input.session_id
1408 ))
1409 })?;
1410
1411 let perm = pending.remove(idx);
1412
1413 if perm.session_id != input.session_id {
1414 return Err(ToolError::InvalidInput(format!(
1415 "Permission request '{req_id}' belongs to session '{}', not '{}'.",
1416 perm.session_id, input.session_id
1417 )));
1418 }
1419
1420 perm
1421 } else {
1422 let mut perms: Vec<_> = pending
1423 .into_iter()
1424 .filter(|p| p.session_id == input.session_id)
1425 .collect();
1426
1427 match perms.as_slice() {
1428 [] => {
1429 return Err(ToolError::InvalidInput(format!(
1430 "No pending permission found for session '{}'. \
1431 The permission may have already been responded to.",
1432 input.session_id
1433 )));
1434 }
1435 [_single] => perms.swap_remove(0),
1436 multiple => {
1437 let ids = multiple
1438 .iter()
1439 .map(|p| p.id.as_str())
1440 .collect::<Vec<_>>()
1441 .join(", ");
1442 return Err(ToolError::InvalidInput(format!(
1443 "Multiple pending permissions found for session '{}': {ids}. \
1444 Please retry with permission_request_id (returned by run).",
1445 input.session_id
1446 )));
1447 }
1448 }
1449 };
1450
1451 let is_reject = matches!(input.reply, PermissionReply::Reject);
1453
1454 let permission_type = perm.permission.clone();
1456 let permission_patterns = perm.patterns.clone();
1457
1458 let mut pre_warnings: Vec<String> = Vec::new();
1461 let baseline = if is_reject {
1462 match client.messages().list(&input.session_id).await {
1463 Ok(msgs) => OrchestratorServer::extract_assistant_text(&msgs),
1464 Err(e) => {
1465 pre_warnings.push(format!("Failed to fetch baseline messages: {e}"));
1466 None
1467 }
1468 }
1469 } else {
1470 None
1471 };
1472
1473 let api_reply = match input.reply {
1475 PermissionReply::Once => ApiPermissionReply::Once,
1476 PermissionReply::Always => ApiPermissionReply::Always,
1477 PermissionReply::Reject => ApiPermissionReply::Reject,
1478 };
1479
1480 client
1482 .permissions()
1483 .reply(
1484 &perm.id,
1485 &PermissionReplyRequest {
1486 reply: api_reply,
1487 message: input.message,
1488 },
1489 )
1490 .await
1491 .map_err(|e| {
1492 ToolError::Internal(format!("Failed to reply to permission: {e}"))
1493 })?;
1494
1495 let run_tool = OrchestratorRunTool::new(Arc::clone(&server_cell));
1497 let wait_for_activity = (!is_reject).then_some(true);
1498 let outcome = run_tool
1499 .run_impl_outcome(
1500 OrchestratorRunInput {
1501 session_id: Some(input.session_id),
1502 command: None,
1503 message: None,
1504 wait_for_activity,
1505 },
1506 &ctx,
1507 )
1508 .await?;
1509 let mut out = outcome.output;
1510
1511 out.warnings.extend(pre_warnings);
1513
1514 if is_reject && matches!(out.status, RunStatus::Completed) {
1516 let final_resp = out.response.as_deref();
1517 let baseline_resp = baseline.as_deref();
1518
1519 if final_resp.is_none() || final_resp == baseline_resp {
1521 out.response = None;
1522 let patterns_str = if permission_patterns.is_empty() {
1523 "(none)".to_string()
1524 } else {
1525 permission_patterns.join(", ")
1526 };
1527 out.warnings.push(format!(
1528 "Permission rejected for '{permission_type}'. Patterns: {patterns_str}. \
1529 Session stopped without generating a new assistant response."
1530 ));
1531 tracing::debug!(
1532 permission_type = %permission_type,
1533 "rejection override applied: response set to None"
1534 );
1535 }
1536 }
1537
1538 Ok((out, outcome.log_meta))
1539 }
1540 .await;
1541
1542 match result {
1543 Ok((output, log_meta)) => {
1544 log_tool_success(&timer, Self::NAME, &request, &output, log_meta, true);
1545 Ok(output)
1546 }
1547 Err(error) => {
1548 log_tool_error(&timer, Self::NAME, &request, &error);
1549 Err(error)
1550 }
1551 }
1552 })
1553 }
1554}
1555
1556#[derive(Clone)]
1561pub struct RespondQuestionTool {
1562 server: Arc<OnceCell<OrchestratorServer>>,
1563}
1564
1565impl RespondQuestionTool {
1566 pub fn new(server: Arc<OnceCell<OrchestratorServer>>) -> Self {
1567 Self { server }
1568 }
1569}
1570
1571impl Tool for RespondQuestionTool {
1572 type Input = RespondQuestionInput;
1573 type Output = RespondQuestionOutput;
1574 const NAME: &'static str = "respond_question";
1575 const DESCRIPTION: &'static str = r#"Respond to a question request from an OpenCode session.
1576
1577After replying, continues monitoring the session and returns when complete or when another interruption is required.
1578
1579Parameters:
1580- session_id: Session with pending question
1581- action: "reply" or "reject"
1582- answers: Required when action=reply; one list per question"#;
1583
1584 fn call(
1585 &self,
1586 input: Self::Input,
1587 ctx: &ToolContext,
1588 ) -> BoxFuture<'static, Result<Self::Output, ToolError>> {
1589 let server_cell = Arc::clone(&self.server);
1590 let ctx = ctx.clone();
1591 Box::pin(async move {
1592 let timer = CallTimer::start();
1593 let request = input.clone();
1594 let result: Result<(RespondQuestionOutput, ToolLogMeta), ToolError> = async {
1595 let server = server_cell
1596 .get_or_try_init(OrchestratorServer::start_lazy)
1597 .await
1598 .map_err(|e| ToolError::Internal(e.to_string()))?;
1599
1600 let client = server.client();
1601 let mut pending = client
1602 .question()
1603 .list()
1604 .await
1605 .map_err(|e| ToolError::Internal(format!("Failed to list questions: {e}")))?;
1606
1607 let question = if let Some(req_id) = input.question_request_id.as_deref() {
1608 let idx = pending
1609 .iter()
1610 .position(|question| question.id == req_id)
1611 .ok_or_else(|| {
1612 ToolError::InvalidInput(format!(
1613 "No pending question found with id '{req_id}'. (session_id='{}')",
1614 input.session_id
1615 ))
1616 })?;
1617
1618 let question = pending.remove(idx);
1619 if question.session_id != input.session_id {
1620 return Err(ToolError::InvalidInput(format!(
1621 "Question request '{req_id}' belongs to session '{}', not '{}'.",
1622 question.session_id, input.session_id
1623 )));
1624 }
1625
1626 question
1627 } else {
1628 let mut questions: Vec<_> = pending
1629 .into_iter()
1630 .filter(|question| question.session_id == input.session_id)
1631 .collect();
1632
1633 match questions.as_slice() {
1634 [] => {
1635 return Err(ToolError::InvalidInput(format!(
1636 "No pending question found for session '{}'. The question may have already been responded to.",
1637 input.session_id
1638 )));
1639 }
1640 [_single] => questions.swap_remove(0),
1641 multiple => {
1642 let ids = multiple
1643 .iter()
1644 .map(|question| question.id.as_str())
1645 .collect::<Vec<_>>()
1646 .join(", ");
1647 return Err(ToolError::InvalidInput(format!(
1648 "Multiple pending questions found for session '{}': {ids}. Please retry with question_request_id (returned by run).",
1649 input.session_id
1650 )));
1651 }
1652 }
1653 };
1654
1655 match input.action {
1656 QuestionAction::Reply => {
1657 if input.answers.is_empty() {
1658 return Err(ToolError::InvalidInput(
1659 "answers is required when action=reply".into(),
1660 ));
1661 }
1662
1663 client
1664 .question()
1665 .reply(
1666 &question.id,
1667 &QuestionReply {
1668 answers: input.answers,
1669 },
1670 )
1671 .await
1672 .map_err(|e| {
1673 ToolError::Internal(format!("Failed to reply to question: {e}"))
1674 })?;
1675
1676 let outcome = OrchestratorRunTool::new(Arc::clone(&server_cell))
1677 .run_impl_outcome(OrchestratorRunInput {
1678 session_id: Some(input.session_id),
1679 command: None,
1680 message: None,
1681 wait_for_activity: Some(true),
1682 }, &ctx)
1683 .await?;
1684 Ok((outcome.output, outcome.log_meta))
1685 }
1686 QuestionAction::Reject => {
1687 client.question().reject(&question.id).await.map_err(|e| {
1688 ToolError::Internal(format!("Failed to reject question: {e}"))
1689 })?;
1690
1691 let outcome = OrchestratorRunTool::new(Arc::clone(&server_cell))
1692 .run_impl_outcome(OrchestratorRunInput {
1693 session_id: Some(input.session_id),
1694 command: None,
1695 message: None,
1696 wait_for_activity: None,
1697 }, &ctx)
1698 .await?;
1699 Ok((outcome.output, outcome.log_meta))
1700 }
1701 }
1702 }
1703 .await;
1704
1705 match result {
1706 Ok((output, log_meta)) => {
1707 log_tool_success(&timer, Self::NAME, &request, &output, log_meta, true);
1708 Ok(output)
1709 }
1710 Err(error) => {
1711 log_tool_error(&timer, Self::NAME, &request, &error);
1712 Err(error)
1713 }
1714 }
1715 })
1716 }
1717}
1718
1719pub fn build_registry(server: &Arc<OnceCell<OrchestratorServer>>) -> ToolRegistry {
1727 ToolRegistry::builder()
1728 .register::<OrchestratorRunTool, ()>(OrchestratorRunTool::new(Arc::clone(server)))
1729 .register::<ListSessionsTool, ()>(ListSessionsTool::new(Arc::clone(server)))
1730 .register::<GetSessionStateTool, ()>(GetSessionStateTool::new(Arc::clone(server)))
1731 .register::<ListCommandsTool, ()>(ListCommandsTool::new(Arc::clone(server)))
1732 .register::<RespondPermissionTool, ()>(RespondPermissionTool::new(Arc::clone(server)))
1733 .register::<RespondQuestionTool, ()>(RespondQuestionTool::new(Arc::clone(server)))
1734 .finish()
1735}
1736
1737#[cfg(test)]
1738mod tests {
1739 use super::*;
1740 use agentic_tools_core::Tool;
1741
1742 #[test]
1743 fn tool_names_are_short() {
1744 assert_eq!(<OrchestratorRunTool as Tool>::NAME, "run");
1745 assert_eq!(<ListSessionsTool as Tool>::NAME, "list_sessions");
1746 assert_eq!(<GetSessionStateTool as Tool>::NAME, "get_session_state");
1747 assert_eq!(<ListCommandsTool as Tool>::NAME, "list_commands");
1748 assert_eq!(<RespondPermissionTool as Tool>::NAME, "respond_permission");
1749 assert_eq!(<RespondQuestionTool as Tool>::NAME, "respond_question");
1750 }
1751
1752 #[test]
1753 fn last_activity_falls_back_to_session_timestamp_when_messages_are_empty() {
1754 assert_eq!(get_last_activity_time(&[], Some(1_234)), Some(1_234));
1755 }
1756}