Skip to main content

opencode_orchestrator_mcp/
tools.rs

1//! Tool implementations for orchestrator MCP server.
2
3use crate::config;
4use crate::logging;
5use crate::server::OrchestratorServer;
6use crate::token_tracker::TokenTracker;
7use crate::types::ChangeStats;
8use crate::types::CommandInfo;
9use crate::types::GetSessionStateInput;
10use crate::types::GetSessionStateOutput;
11use crate::types::ListCommandsInput;
12use crate::types::ListCommandsOutput;
13use crate::types::ListSessionsInput;
14use crate::types::ListSessionsOutput;
15use crate::types::OrchestratorRunInput;
16use crate::types::OrchestratorRunOutput;
17use crate::types::PermissionReply;
18use crate::types::QuestionAction;
19use crate::types::QuestionInfoView;
20use crate::types::QuestionOptionView;
21use crate::types::RespondPermissionInput;
22use crate::types::RespondPermissionOutput;
23use crate::types::RespondQuestionInput;
24use crate::types::RespondQuestionOutput;
25use crate::types::RunStatus;
26use crate::types::SessionStatusSummary;
27use crate::types::SessionSummary;
28use crate::types::ToolCallSummary;
29use crate::types::ToolStateSummary;
30use agentic_logging::CallTimer;
31use agentic_logging::ToolCallRecord;
32use agentic_tools_core::Tool;
33use agentic_tools_core::ToolContext;
34use agentic_tools_core::ToolError;
35use agentic_tools_core::ToolRegistry;
36use agentic_tools_core::fmt::TextFormat;
37use agentic_tools_core::fmt::TextOptions;
38use futures::future::BoxFuture;
39use opencode_rs::types::event::Event;
40use opencode_rs::types::message::CommandRequest;
41use opencode_rs::types::message::Message;
42use opencode_rs::types::message::Part;
43use opencode_rs::types::message::PromptPart;
44use opencode_rs::types::message::PromptRequest;
45use opencode_rs::types::message::ToolState;
46use opencode_rs::types::permission::PermissionReply as ApiPermissionReply;
47use opencode_rs::types::permission::PermissionReplyRequest;
48use opencode_rs::types::question::QuestionReply;
49use opencode_rs::types::question::QuestionRequest;
50use opencode_rs::types::session::CreateSessionRequest;
51use opencode_rs::types::session::SessionStatusInfo;
52use opencode_rs::types::session::SummarizeRequest;
53use serde::Serialize;
54use std::sync::Arc;
55use std::time::Duration;
56use tokio::sync::OnceCell;
57use tokio::task::JoinHandle;
58
59const SERVER_NAME: &str = "opencode-orchestrator-mcp";
60
61#[derive(Debug, Clone, Default)]
62struct ToolLogMeta {
63    token_usage: Option<agentic_logging::TokenUsage>,
64    token_usage_saturated: bool,
65}
66
67struct RunOutcome {
68    output: OrchestratorRunOutput,
69    log_meta: ToolLogMeta,
70}
71
72impl RunOutcome {
73    fn without_tokens(output: OrchestratorRunOutput) -> Self {
74        Self {
75            output,
76            log_meta: ToolLogMeta::default(),
77        }
78    }
79
80    fn with_tracker(output: OrchestratorRunOutput, token_tracker: &TokenTracker) -> Self {
81        let (token_usage, token_usage_saturated) = token_tracker.to_log_token_usage();
82        Self {
83            output,
84            log_meta: ToolLogMeta {
85                token_usage,
86                token_usage_saturated,
87            },
88        }
89    }
90}
91
92fn request_json<T: Serialize>(request: &T) -> serde_json::Value {
93    serde_json::to_value(request)
94        .unwrap_or_else(|error| serde_json::json!({"serialization_error": error.to_string()}))
95}
96
97fn log_tool_success<TReq: Serialize, TOut: TextFormat>(
98    timer: &CallTimer,
99    tool: &str,
100    request: &TReq,
101    output: &TOut,
102    log_meta: ToolLogMeta,
103    write_markdown: bool,
104) {
105    let (completed_at, duration_ms) = timer.finish();
106    let rendered = output.fmt_text(&TextOptions::default());
107    let response_file = write_markdown
108        .then(|| logging::write_markdown_best_effort(completed_at, &timer.call_id, &rendered))
109        .flatten();
110
111    let record = ToolCallRecord {
112        call_id: timer.call_id.clone(),
113        server: SERVER_NAME.into(),
114        tool: tool.into(),
115        started_at: timer.started_at,
116        completed_at,
117        duration_ms,
118        request: request_json(request),
119        response_file,
120        success: true,
121        error: None,
122        model: None,
123        token_usage: log_meta.token_usage,
124        summary: log_meta
125            .token_usage_saturated
126            .then(|| serde_json::json!({"token_usage_saturated": true})),
127    };
128
129    logging::append_record_best_effort(&record);
130}
131
132fn log_tool_error<TReq: Serialize>(
133    timer: &CallTimer,
134    tool: &str,
135    request: &TReq,
136    error: &ToolError,
137) {
138    let (completed_at, duration_ms) = timer.finish();
139    let record = ToolCallRecord {
140        call_id: timer.call_id.clone(),
141        server: SERVER_NAME.into(),
142        tool: tool.into(),
143        started_at: timer.started_at,
144        completed_at,
145        duration_ms,
146        request: request_json(request),
147        response_file: None,
148        success: false,
149        error: Some(error.to_string()),
150        model: None,
151        token_usage: None,
152        summary: None,
153    };
154
155    logging::append_record_best_effort(&record);
156}
157
158// ============================================================================
159// run
160// ============================================================================
161
162/// Tool for starting or resuming `OpenCode` sessions.
163///
164/// Handles session creation, prompt/command execution, SSE event monitoring,
165/// and permission request detection. Returns when the session completes or
166/// when a permission is requested.
167#[derive(Clone)]
168pub struct OrchestratorRunTool {
169    server: Arc<OnceCell<OrchestratorServer>>,
170}
171
172impl OrchestratorRunTool {
173    /// Create a new `OrchestratorRunTool` with the given server cell.
174    pub fn new(server: Arc<OnceCell<OrchestratorServer>>) -> Self {
175        Self { server }
176    }
177
178    /// Finalize a completed session by fetching messages and optionally triggering summarization.
179    ///
180    /// This is called when we detect the session is idle, either via SSE `SessionIdle` event
181    /// or via polling `sessions().status()`.
182    ///
183    /// Uses bounded retry with backoff (0/50/100/200/400ms) if assistant text is not immediately
184    /// available, handling the race condition where the session becomes idle before messages
185    /// are fully persisted.
186    async fn finalize_completed(
187        client: &opencode_rs::Client,
188        session_id: String,
189        token_tracker: &TokenTracker,
190        mut warnings: Vec<String>,
191    ) -> Result<OrchestratorRunOutput, ToolError> {
192        // Bounded backoff delays for message extraction retry (~750ms total budget)
193        const BACKOFFS_MS: &[u64] = &[0, 50, 100, 200, 400];
194
195        let mut response: Option<String> = None;
196
197        for (attempt, &delay_ms) in BACKOFFS_MS.iter().enumerate() {
198            if delay_ms > 0 {
199                tokio::time::sleep(Duration::from_millis(delay_ms)).await;
200            }
201
202            let messages = client
203                .messages()
204                .list(&session_id)
205                .await
206                .map_err(|e| ToolError::Internal(format!("Failed to list messages: {e}")))?;
207
208            response = OrchestratorServer::extract_assistant_text(&messages);
209
210            if response.is_some() {
211                if attempt > 0 {
212                    tracing::debug!(
213                        session_id = %session_id,
214                        attempt,
215                        "assistant response became available after retry"
216                    );
217                }
218                break;
219            }
220        }
221
222        if response.is_none() {
223            tracing::debug!(
224                session_id = %session_id,
225                "no assistant response found after bounded retry"
226            );
227        }
228
229        // Handle context limit summarization if needed
230        if token_tracker.compaction_needed
231            && let (Some(pid), Some(mid)) = (&token_tracker.provider_id, &token_tracker.model_id)
232        {
233            let summarize_req = SummarizeRequest {
234                provider_id: pid.clone(),
235                model_id: mid.clone(),
236                auto: None,
237            };
238
239            match client
240                .sessions()
241                .summarize(&session_id, &summarize_req)
242                .await
243            {
244                Ok(_) => {
245                    tracing::info!(session_id = %session_id, "context summarization triggered");
246                    warnings.push("Context limit reached; summarization triggered".into());
247                }
248                Err(e) => {
249                    tracing::warn!(session_id = %session_id, error = %e, "summarization failed");
250                    warnings.push(format!("Summarization failed: {e}"));
251                }
252            }
253        }
254
255        Ok(OrchestratorRunOutput {
256            session_id,
257            status: RunStatus::Completed,
258            response,
259            partial_response: None,
260            permission_request_id: None,
261            permission_type: None,
262            permission_patterns: vec![],
263            question_request_id: None,
264            questions: vec![],
265            warnings,
266        })
267    }
268
269    fn map_questions(req: &QuestionRequest) -> Vec<QuestionInfoView> {
270        req.questions
271            .iter()
272            .map(|question| QuestionInfoView {
273                question: question.question.clone(),
274                header: question.header.clone(),
275                options: question
276                    .options
277                    .iter()
278                    .map(|option| QuestionOptionView {
279                        label: option.label.clone(),
280                        description: option.description.clone(),
281                    })
282                    .collect(),
283                multiple: question.multiple,
284                custom: question.custom,
285            })
286            .collect()
287    }
288
289    fn question_required_output(
290        session_id: String,
291        partial_response: Option<String>,
292        request: &QuestionRequest,
293        warnings: Vec<String>,
294    ) -> OrchestratorRunOutput {
295        OrchestratorRunOutput {
296            session_id,
297            status: RunStatus::QuestionRequired,
298            response: None,
299            partial_response,
300            permission_request_id: None,
301            permission_type: None,
302            permission_patterns: vec![],
303            question_request_id: Some(request.id.clone()),
304            questions: Self::map_questions(request),
305            warnings,
306        }
307    }
308
309    async fn run_impl_outcome(&self, input: OrchestratorRunInput) -> Result<RunOutcome, ToolError> {
310        // Input validation
311        if input.session_id.is_none() && input.message.is_none() && input.command.is_none() {
312            return Err(ToolError::InvalidInput(
313                "Either session_id (to resume/check status) or message/command (to start work) is required"
314                    .into(),
315            ));
316        }
317
318        if input.command.is_some() && input.message.is_none() {
319            return Err(ToolError::InvalidInput(
320                "message is required when command is specified (becomes $ARGUMENTS for template expansion)"
321                    .into(),
322            ));
323        }
324
325        // Trim and validate message content
326        let message = input.message.map(|m| m.trim().to_string());
327        if let Some(ref m) = message
328            && m.is_empty()
329        {
330            return Err(ToolError::InvalidInput(
331                "message cannot be empty or whitespace-only".into(),
332            ));
333        }
334
335        let wait_for_activity = input.wait_for_activity.unwrap_or(false);
336
337        // Lazy initialization: spawn server on first tool call
338        let server = self
339            .server
340            .get_or_try_init(OrchestratorServer::start_lazy)
341            .await
342            .map_err(|e| ToolError::Internal(e.to_string()))?;
343
344        let client = server.client();
345
346        tracing::debug!(
347            command = ?input.command,
348            has_message = message.is_some(),
349            message_len = message.as_ref().map(String::len),
350            session_id = ?input.session_id,
351            "run: starting"
352        );
353
354        // 1. Resolve session: validate existing or create new
355        let session_id = if let Some(sid) = input.session_id {
356            // Validate session exists
357            client.sessions().get(&sid).await.map_err(|e| {
358                if e.is_not_found() {
359                    ToolError::InvalidInput(format!(
360                        "Session '{sid}' not found. Use list_sessions to discover sessions, \
361                         or omit session_id to create a new session."
362                    ))
363                } else {
364                    ToolError::Internal(format!("Failed to get session: {e}"))
365                }
366            })?;
367            sid
368        } else {
369            // Create new session
370            let session = client
371                .sessions()
372                .create(&CreateSessionRequest::default())
373                .await
374                .map_err(|e| ToolError::Internal(format!("Failed to create session: {e}")))?;
375
376            {
377                let mut spawned = server.spawned_sessions().write().await;
378                spawned.insert(session.id.clone());
379            }
380
381            session.id
382        };
383
384        tracing::info!(session_id = %session_id, "run: session resolved");
385
386        // 2. Check if session is already idle (for resume-only case)
387        let status = client
388            .sessions()
389            .status_for(&session_id)
390            .await
391            .map_err(|e| ToolError::Internal(format!("Failed to get session status: {e}")))?;
392
393        let is_idle = matches!(status, SessionStatusInfo::Idle);
394
395        // 3. Check for pending permissions before doing anything else
396        let pending_permissions = client
397            .permissions()
398            .list()
399            .await
400            .map_err(|e| ToolError::Internal(format!("Failed to list permissions: {e}")))?;
401
402        let my_permission = pending_permissions
403            .into_iter()
404            .find(|p| p.session_id == session_id);
405
406        if let Some(perm) = my_permission {
407            tracing::info!(
408                session_id = %session_id,
409                permission_type = %perm.permission,
410                "run: pending permission found"
411            );
412            return Ok(RunOutcome::without_tokens(OrchestratorRunOutput {
413                session_id,
414                status: RunStatus::PermissionRequired,
415                response: None,
416                partial_response: None,
417                permission_request_id: Some(perm.id),
418                permission_type: Some(perm.permission),
419                permission_patterns: perm.patterns,
420                question_request_id: None,
421                questions: vec![],
422                warnings: vec![],
423            }));
424        }
425
426        let pending_questions = client
427            .question()
428            .list()
429            .await
430            .map_err(|e| ToolError::Internal(format!("Failed to list questions: {e}")))?;
431
432        if let Some(question) = pending_questions
433            .into_iter()
434            .find(|question| question.session_id == session_id)
435        {
436            tracing::info!(session_id = %session_id, question_id = %question.id, "run: pending question found");
437            return Ok(RunOutcome::without_tokens(Self::question_required_output(
438                session_id,
439                None,
440                &question,
441                vec![],
442            )));
443        }
444
445        // 4. If no message/command and session is idle, just return current state
446        // Uses finalize_completed to get retry logic for message extraction
447        if message.is_none() && input.command.is_none() && is_idle && !wait_for_activity {
448            let token_tracker = TokenTracker::with_threshold(server.compaction_threshold());
449            let output =
450                Self::finalize_completed(client, session_id, &token_tracker, vec![]).await?;
451            return Ok(RunOutcome::with_tracker(output, &token_tracker));
452        }
453
454        // 5. Subscribe to SSE BEFORE sending prompt/command
455        let mut subscription = client
456            .subscribe_session(&session_id)
457            .map_err(|e| ToolError::Internal(format!("Failed to subscribe to session: {e}")))?;
458
459        // Track whether this call is dispatching new work (command or message)
460        // vs just resuming/monitoring an existing session.
461        let dispatched_new_work = input.command.is_some() || message.is_some() || wait_for_activity;
462        let idle_grace = config::idle_grace();
463        let mut idle_grace_deadline: Option<tokio::time::Instant> = None;
464        let mut awaiting_idle_grace_check = false;
465
466        if wait_for_activity && input.command.is_none() && message.is_none() {
467            idle_grace_deadline = Some(tokio::time::Instant::now() + idle_grace);
468        }
469
470        // 6. Kick off the work
471        let mut command_task: Option<JoinHandle<Result<(), String>>> = None;
472        let mut command_name_for_logging: Option<String> = None;
473
474        if let Some(command) = &input.command {
475            command_name_for_logging = Some(command.clone());
476
477            let cmd_client = client.clone();
478            let cmd_session_id = session_id.clone();
479            let cmd_name = command.clone();
480            let cmd_arguments = message.clone().unwrap_or_default();
481
482            command_task = Some(tokio::spawn(async move {
483                let req = CommandRequest {
484                    command: cmd_name,
485                    arguments: cmd_arguments,
486                    message_id: None,
487                };
488
489                cmd_client
490                    .messages()
491                    .command(&cmd_session_id, &req)
492                    .await
493                    .map(|_| ())
494                    .map_err(|e| e.to_string())
495            }));
496        } else if let Some(msg) = &message {
497            // Send prompt asynchronously
498            let req = PromptRequest {
499                parts: vec![PromptPart::Text {
500                    text: msg.clone(),
501                    synthetic: None,
502                    ignored: None,
503                    metadata: None,
504                }],
505                message_id: None,
506                model: None,
507                agent: None,
508                no_reply: None,
509                system: None,
510                variant: None,
511            };
512
513            client
514                .messages()
515                .prompt_async(&session_id, &req)
516                .await
517                .map_err(|e| ToolError::Internal(format!("Failed to send prompt: {e}")))?;
518
519            idle_grace_deadline = Some(tokio::time::Instant::now() + idle_grace);
520        }
521
522        // 7. Event loop: wait for completion or permission
523        // Overall timeout to prevent infinite hangs (configurable, default 1 hour)
524        let deadline = tokio::time::Instant::now() + server.session_deadline();
525        let inactivity_timeout = server.inactivity_timeout();
526        let mut last_activity_time = tokio::time::Instant::now();
527
528        tracing::debug!(session_id = %session_id, "run: entering event loop");
529        let mut token_tracker = TokenTracker::with_threshold(server.compaction_threshold());
530        let mut partial_response = String::new();
531        let warnings = Vec::new();
532
533        let mut poll_interval = tokio::time::interval(Duration::from_secs(1));
534        poll_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
535
536        // Track whether we've observed the session as busy at least once.
537        // This prevents completing immediately if we call run_impl on an already-idle
538        // session before our new work has started processing.
539        let mut observed_busy = false;
540
541        // Track whether SSE is still active. If the stream closes, we fall back
542        // to polling-only mode rather than returning an error.
543        let mut sse_active = true;
544
545        // === Post-subscribe status re-check (latency optimization) ===
546        // If we're just monitoring (no new work dispatched), check if session is already idle.
547        // This handles the race where session completed between our initial status check
548        // and SSE subscription becoming ready.
549        if !dispatched_new_work
550            && let Ok(status) = client.sessions().status_for(&session_id).await
551            && matches!(status, SessionStatusInfo::Idle)
552        {
553            tracing::debug!(
554                session_id = %session_id,
555                "session already idle on post-subscribe check"
556            );
557            let output =
558                Self::finalize_completed(client, session_id, &token_tracker, warnings).await?;
559            return Ok(RunOutcome::with_tracker(output, &token_tracker));
560        }
561        // If check fails or session is busy, continue to event loop
562
563        loop {
564            // Check timeout before processing
565            let now = tokio::time::Instant::now();
566
567            if now.duration_since(last_activity_time) >= inactivity_timeout {
568                return Err(ToolError::Internal(format!(
569                    "Session idle timeout: no activity for 5 minutes (session_id={session_id}). \
570                     The session may still be running; use run(session_id=...) to check status."
571                )));
572            }
573
574            if now >= deadline {
575                return Err(ToolError::Internal(
576                    "Session execution timed out after 1 hour. \
577                     The session may still be running; use run with the session_id to check status."
578                        .into(),
579                ));
580            }
581
582            let command_task_active = command_task.is_some();
583
584            tokio::select! {
585                maybe_event = subscription.recv(), if sse_active => {
586                    let Some(event) = maybe_event else {
587                        // SSE stream closed - this can happen due to network issues,
588                        // server restarts, or connection timeouts. Fall back to polling
589                        // rather than failing immediately.
590                        tracing::warn!(
591                            session_id = %session_id,
592                            "SSE stream closed unexpectedly; falling back to polling-only mode"
593                        );
594                        sse_active = false;
595                        continue; // The poll_interval branch will now drive completion detection
596                    };
597
598                    // Track tokens (server is already initialized at this point)
599                    token_tracker.observe_event(&event, |pid, mid| {
600                        server.context_limit(pid, mid)
601                    });
602
603                    match event {
604                        Event::PermissionAsked { properties } => {
605                            tracing::info!(
606                                session_id = %session_id,
607                                permission_type = %properties.request.permission,
608                                "run: permission requested"
609                            );
610                            return Ok(RunOutcome::with_tracker(OrchestratorRunOutput {
611                                session_id,
612                                status: RunStatus::PermissionRequired,
613                                response: None,
614                                partial_response: if partial_response.is_empty() {
615                                    None
616                                } else {
617                                    Some(partial_response)
618                                },
619                                permission_request_id: Some(properties.request.id),
620                                permission_type: Some(properties.request.permission),
621                                permission_patterns: properties.request.patterns,
622                                question_request_id: None,
623                                questions: vec![],
624                                warnings,
625                            }, &token_tracker));
626                        }
627
628                        Event::QuestionAsked { properties } => {
629                            return Ok(RunOutcome::with_tracker(Self::question_required_output(
630                                session_id,
631                                if partial_response.is_empty() {
632                                    None
633                                } else {
634                                    Some(partial_response)
635                                },
636                                &properties.request,
637                                warnings,
638                            ), &token_tracker));
639                        }
640
641                        Event::MessagePartDelta { properties } => {
642                            last_activity_time = tokio::time::Instant::now();
643                            // Message streaming means session is actively processing
644                            observed_busy = true;
645                            awaiting_idle_grace_check = false;
646                            // Collect streaming text from field-level delta events.
647                            if let Some(delta) = &properties.delta {
648                                partial_response.push_str(delta);
649                            }
650                        }
651
652                        Event::MessagePartUpdated { .. } | Event::MessageUpdated { .. } => {
653                            last_activity_time = tokio::time::Instant::now();
654                            observed_busy = true;
655                            awaiting_idle_grace_check = false;
656                        }
657
658                        Event::SessionError { properties } => {
659                            let error_msg = properties
660                                .error
661                                .map_or_else(|| "Unknown error".to_string(), |e| format!("{e:?}"));
662                            tracing::error!(
663                                session_id = %session_id,
664                                error = %error_msg,
665                                "run: session error"
666                            );
667                            return Err(ToolError::Internal(format!("Session error: {error_msg}")));
668                        }
669
670                        Event::SessionIdle { .. } => {
671                            tracing::debug!(session_id = %session_id, "received SessionIdle event");
672                            let output = Self::finalize_completed(client, session_id, &token_tracker, warnings).await?;
673                            return Ok(RunOutcome::with_tracker(output, &token_tracker));
674                        }
675
676                        _ => {
677                            // Other events - continue
678                        }
679                    }
680                }
681
682                _ = poll_interval.tick() => {
683                    // === 1. Permission fallback (check first, permissions take priority) ===
684                    let pending = match client.permissions().list().await {
685                        Ok(p) => p,
686                        Err(e) => {
687                            // Log but continue - permission list failure shouldn't block completion detection
688                            tracing::warn!(
689                                session_id = %session_id,
690                                error = %e,
691                                "failed to list permissions during poll fallback"
692                            );
693                            vec![]
694                        }
695                    };
696
697                    if let Some(perm) = pending.into_iter().find(|p| p.session_id == session_id) {
698                        tracing::debug!(
699                            session_id = %session_id,
700                            permission_id = %perm.id,
701                            "detected pending permission via polling fallback"
702                        );
703                        return Ok(RunOutcome::with_tracker(OrchestratorRunOutput {
704                            session_id,
705                            status: RunStatus::PermissionRequired,
706                            response: None,
707                            partial_response: if partial_response.is_empty() {
708                                None
709                            } else {
710                                Some(partial_response)
711                                },
712                                permission_request_id: Some(perm.id),
713                                permission_type: Some(perm.permission),
714                            permission_patterns: perm.patterns,
715                            question_request_id: None,
716                            questions: vec![],
717                            warnings,
718                        }, &token_tracker));
719                    }
720
721                    let pending_questions = match client.question().list().await {
722                        Ok(questions) => questions,
723                        Err(e) => {
724                            tracing::warn!(
725                                session_id = %session_id,
726                                error = %e,
727                                "failed to list questions during poll fallback"
728                            );
729                            vec![]
730                        }
731                    };
732
733                    if let Some(question) = pending_questions
734                        .into_iter()
735                        .find(|question| question.session_id == session_id)
736                    {
737                        tracing::debug!(
738                            session_id = %session_id,
739                            question_id = %question.id,
740                            "detected pending question via polling fallback"
741                        );
742                        return Ok(RunOutcome::with_tracker(Self::question_required_output(
743                            session_id,
744                            if partial_response.is_empty() {
745                                None
746                            } else {
747                                Some(partial_response)
748                            },
749                            &question,
750                            warnings,
751                        ), &token_tracker));
752                    }
753
754                    // === 2. Session idle detection fallback (NEW) ===
755                    // This is the key fix for race conditions. If SSE missed SessionIdle,
756                    // we detect completion via polling sessions().status_for(session_id).
757                    match client.sessions().status_for(&session_id).await {
758                        Ok(SessionStatusInfo::Busy | SessionStatusInfo::Retry { .. }) => {
759                            last_activity_time = tokio::time::Instant::now();
760                            observed_busy = true;
761                            awaiting_idle_grace_check = false;
762                            tracing::trace!(
763                                session_id = %session_id,
764                                "our session is busy/retry, waiting"
765                            );
766                        }
767                        Ok(SessionStatusInfo::Idle) => {
768                            if !dispatched_new_work || observed_busy {
769                                // Session is idle AND either:
770                                // - We didn't dispatch new work (just monitoring), OR
771                                // - We did dispatch work and have seen it become busy at least once
772                                //
773                                // This guards against completing before our work starts processing.
774                                tracing::debug!(
775                                    session_id = %session_id,
776                                    dispatched_new_work = dispatched_new_work,
777                                    observed_busy = observed_busy,
778                                    "detected session idle via polling fallback"
779                                );
780                                let output = Self::finalize_completed(client, session_id, &token_tracker, warnings).await?;
781                                return Ok(RunOutcome::with_tracker(output, &token_tracker));
782                            }
783
784                            let Some(deadline) = idle_grace_deadline else {
785                                tracing::trace!(
786                                    session_id = %session_id,
787                                    command_task_active = command_task_active,
788                                    "idle seen before dispatch confirmed; waiting"
789                                );
790                                continue;
791                            };
792
793                            let now = tokio::time::Instant::now();
794                            if now >= deadline {
795                                tracing::debug!(
796                                    session_id = %session_id,
797                                    idle_grace_ms = idle_grace.as_millis(),
798                                    "accepting idle via bounded idle grace (no busy observed)"
799                                );
800                                let output = Self::finalize_completed(client, session_id, &token_tracker, warnings).await?;
801                                return Ok(RunOutcome::with_tracker(output, &token_tracker));
802                            }
803
804                            awaiting_idle_grace_check = true;
805                            tracing::trace!(
806                                session_id = %session_id,
807                                remaining_ms = (deadline - now).as_millis(),
808                                "idle detected before busy; waiting for idle-grace deadline"
809                            );
810                        }
811                        Err(e) => {
812                            // Log but continue - status check failure shouldn't block the loop
813                            tracing::warn!(
814                                session_id = %session_id,
815                                error = %e,
816                                "failed to get session status during poll fallback"
817                            );
818                        }
819                    }
820                }
821
822                () = async {
823                    match idle_grace_deadline {
824                        Some(deadline) => tokio::time::sleep_until(deadline).await,
825                        None => std::future::pending::<()>().await,
826                    }
827                }, if awaiting_idle_grace_check => {
828                    awaiting_idle_grace_check = false;
829
830                    match client.sessions().status_for(&session_id).await {
831                        Ok(SessionStatusInfo::Idle) => {
832                            tracing::debug!(session_id = %session_id, "idle-grace deadline reached; finalizing");
833                            let output = Self::finalize_completed(client, session_id, &token_tracker, warnings).await?;
834                            return Ok(RunOutcome::with_tracker(output, &token_tracker));
835                        }
836                        Ok(SessionStatusInfo::Busy | SessionStatusInfo::Retry { .. }) => {
837                            last_activity_time = tokio::time::Instant::now();
838                            observed_busy = true;
839                        }
840                        Err(e) => {
841                            tracing::warn!(
842                                session_id = %session_id,
843                                error = %e,
844                                "status check failed at idle-grace deadline"
845                            );
846                        }
847                    }
848                }
849
850                cmd_result = async {
851                    match command_task.as_mut() {
852                        Some(handle) => Some(handle.await),
853                        None => {
854                            std::future::pending::<
855                                Option<Result<Result<(), String>, tokio::task::JoinError>>,
856                            >()
857                            .await
858                        }
859                    }
860                }, if command_task_active => {
861                    match cmd_result {
862                        Some(Ok(Ok(()))) => {
863                            idle_grace_deadline = Some(tokio::time::Instant::now() + idle_grace);
864                            tracing::debug!(
865                                session_id = %session_id,
866                                command = ?command_name_for_logging,
867                                "run: command dispatch completed successfully"
868                            );
869                            command_task = None;
870                        }
871                        Some(Ok(Err(e))) => {
872                            tracing::error!(
873                                session_id = %session_id,
874                                command = ?command_name_for_logging,
875                                error = %e,
876                                "run: command dispatch failed"
877                            );
878                            return Err(ToolError::Internal(format!(
879                                "Failed to execute command '{}': {e}",
880                                command_name_for_logging.as_deref().unwrap_or("unknown")
881                            )));
882                        }
883                        Some(Err(join_err)) => {
884                            tracing::error!(
885                                session_id = %session_id,
886                                command = ?command_name_for_logging,
887                                error = %join_err,
888                                "run: command task panicked"
889                            );
890                            return Err(ToolError::Internal(format!("Command task panicked: {join_err}")));
891                        }
892                        None => {
893                            unreachable!("command_task_active guard should prevent None");
894                        }
895                    }
896                }
897            }
898        }
899    }
900}
901
902impl Tool for OrchestratorRunTool {
903    type Input = OrchestratorRunInput;
904    type Output = OrchestratorRunOutput;
905    const NAME: &'static str = "run";
906    const DESCRIPTION: &'static str = r#"Start or resume an OpenCode session. Optionally run a named command or send a raw prompt.
907
908Returns when:
909- status=completed: Session finished executing. Response contains final assistant output.
910- status=permission_required: Session needs permission approval. Call respond_permission to continue.
911- status=question_required: Session needs question answers. Call respond_question to continue.
912
913Parameters:
914- session_id: Existing session to resume (omit to create new)
915- command: OpenCode command name (e.g., "research", "implement_plan")
916- message: Prompt text or $ARGUMENTS for command template
917
918Examples:
919- New session with prompt: run(message="explain this code")
920- New session with command: run(command="research", message="caching strategies")
921- Resume session: run(session_id="...", message="continue")
922- Check status: run(session_id="...")"#;
923
924    fn call(
925        &self,
926        input: Self::Input,
927        _ctx: &ToolContext,
928    ) -> BoxFuture<'static, Result<Self::Output, ToolError>> {
929        let this = self.clone();
930        Box::pin(async move {
931            let timer = CallTimer::start();
932            match this.run_impl_outcome(input.clone()).await {
933                Ok(outcome) => {
934                    log_tool_success(
935                        &timer,
936                        Self::NAME,
937                        &input,
938                        &outcome.output,
939                        outcome.log_meta,
940                        true,
941                    );
942                    Ok(outcome.output)
943                }
944                Err(error) => {
945                    log_tool_error(&timer, Self::NAME, &input, &error);
946                    Err(error)
947                }
948            }
949        })
950    }
951}
952
953// ============================================================================
954// list_sessions
955// ============================================================================
956
957/// Tool for listing available `OpenCode` sessions in the current directory.
958#[derive(Clone)]
959pub struct ListSessionsTool {
960    server: Arc<OnceCell<OrchestratorServer>>,
961}
962
963impl ListSessionsTool {
964    /// Create a new `ListSessionsTool` with the given server cell.
965    pub fn new(server: Arc<OnceCell<OrchestratorServer>>) -> Self {
966        Self { server }
967    }
968}
969
970impl Tool for ListSessionsTool {
971    type Input = ListSessionsInput;
972    type Output = ListSessionsOutput;
973    const NAME: &'static str = "list_sessions";
974    const DESCRIPTION: &'static str =
975        "List available OpenCode sessions in the current directory context.";
976
977    fn call(
978        &self,
979        input: Self::Input,
980        _ctx: &ToolContext,
981    ) -> BoxFuture<'static, Result<Self::Output, ToolError>> {
982        let server_cell = Arc::clone(&self.server);
983        Box::pin(async move {
984            let timer = CallTimer::start();
985            let result: Result<ListSessionsOutput, ToolError> = async {
986                let server = server_cell
987                    .get_or_try_init(OrchestratorServer::start_lazy)
988                    .await
989                    .map_err(|e| ToolError::Internal(e.to_string()))?;
990
991                let sessions =
992                    server.client().sessions().list().await.map_err(|e| {
993                        ToolError::Internal(format!("Failed to list sessions: {e}"))
994                    })?;
995                let status_map = server.client().sessions().status_map().await.ok();
996                let spawned = server.spawned_sessions().read().await;
997
998                let limit = input.limit.unwrap_or(20);
999                let summaries: Vec<SessionSummary> = sessions
1000                    .into_iter()
1001                    .take(limit)
1002                    .map(|s| {
1003                        let status =
1004                            status_map
1005                                .as_ref()
1006                                .map(|status_map| match status_map.get(&s.id) {
1007                                    Some(SessionStatusInfo::Busy) => SessionStatusSummary::Busy,
1008                                    Some(SessionStatusInfo::Retry {
1009                                        attempt,
1010                                        message,
1011                                        next,
1012                                    }) => SessionStatusSummary::Retry {
1013                                        attempt: *attempt,
1014                                        message: message.clone(),
1015                                        next: *next,
1016                                    },
1017                                    Some(SessionStatusInfo::Idle) | None => {
1018                                        SessionStatusSummary::Idle
1019                                    }
1020                                });
1021
1022                        let change_stats = s.summary.as_ref().map(|summary| ChangeStats {
1023                            additions: summary.additions,
1024                            deletions: summary.deletions,
1025                            files: summary.files,
1026                        });
1027
1028                        SessionSummary {
1029                            launched_by_you: spawned.contains(&s.id),
1030                            created: s.time.as_ref().map(|t| t.created),
1031                            updated: s.time.as_ref().map(|t| t.updated),
1032                            directory: s.directory,
1033                            title: s.title,
1034                            id: s.id,
1035                            status,
1036                            change_stats,
1037                        }
1038                    })
1039                    .collect();
1040
1041                Ok(ListSessionsOutput {
1042                    sessions: summaries,
1043                })
1044            }
1045            .await;
1046
1047            match result {
1048                Ok(output) => {
1049                    log_tool_success(
1050                        &timer,
1051                        Self::NAME,
1052                        &input,
1053                        &output,
1054                        ToolLogMeta::default(),
1055                        false,
1056                    );
1057                    Ok(output)
1058                }
1059                Err(error) => {
1060                    log_tool_error(&timer, Self::NAME, &input, &error);
1061                    Err(error)
1062                }
1063            }
1064        })
1065    }
1066}
1067
1068fn count_pending_messages(messages: &[Message]) -> usize {
1069    let mut pending = 0;
1070
1071    for message in messages.iter().rev() {
1072        if message.role() == "user" {
1073            pending += 1;
1074        } else if message.role() == "assistant" {
1075            break;
1076        }
1077    }
1078
1079    pending
1080}
1081
1082fn get_last_activity_time(messages: &[Message], fallback: Option<i64>) -> Option<i64> {
1083    messages.last().map_or(fallback, |message| {
1084        Some(
1085            message
1086                .info
1087                .time
1088                .completed
1089                .unwrap_or(message.info.time.created),
1090        )
1091    })
1092}
1093
1094fn extract_recent_tool_calls(messages: &[Message], limit: usize) -> Vec<ToolCallSummary> {
1095    let mut tool_calls = Vec::new();
1096
1097    for message in messages.iter().rev() {
1098        for part in message.parts.iter().rev() {
1099            if let Part::Tool {
1100                call_id,
1101                tool,
1102                state,
1103                ..
1104            } = part
1105            {
1106                let (state, started_at, completed_at) = match state {
1107                    Some(ToolState::Running(running)) => {
1108                        (ToolStateSummary::Running, Some(running.time.start), None)
1109                    }
1110                    Some(ToolState::Completed(completed)) => (
1111                        ToolStateSummary::Completed,
1112                        Some(completed.time.start),
1113                        Some(completed.time.end),
1114                    ),
1115                    Some(ToolState::Error(error)) => (
1116                        ToolStateSummary::Error {
1117                            message: error.error.clone(),
1118                        },
1119                        Some(error.time.start),
1120                        Some(error.time.end),
1121                    ),
1122                    _ => (ToolStateSummary::Pending, None, None),
1123                };
1124
1125                tool_calls.push(ToolCallSummary {
1126                    call_id: call_id.clone(),
1127                    tool_name: tool.clone(),
1128                    state,
1129                    started_at,
1130                    completed_at,
1131                });
1132
1133                if tool_calls.len() >= limit {
1134                    return tool_calls;
1135                }
1136            }
1137        }
1138    }
1139
1140    tool_calls
1141}
1142
1143/// Tool for getting detailed state of a specific `OpenCode` session.
1144#[derive(Clone)]
1145pub struct GetSessionStateTool {
1146    server: Arc<OnceCell<OrchestratorServer>>,
1147}
1148
1149impl GetSessionStateTool {
1150    pub fn new(server: Arc<OnceCell<OrchestratorServer>>) -> Self {
1151        Self { server }
1152    }
1153}
1154
1155impl Tool for GetSessionStateTool {
1156    type Input = GetSessionStateInput;
1157    type Output = GetSessionStateOutput;
1158    const NAME: &'static str = "get_session_state";
1159    const DESCRIPTION: &'static str = "Get detailed state of a specific session including status, pending messages, and recent tool calls.";
1160
1161    fn call(
1162        &self,
1163        input: Self::Input,
1164        _ctx: &ToolContext,
1165    ) -> BoxFuture<'static, Result<Self::Output, ToolError>> {
1166        let server_cell = Arc::clone(&self.server);
1167        Box::pin(async move {
1168            let timer = CallTimer::start();
1169            let result: Result<GetSessionStateOutput, ToolError> = async {
1170                let server = server_cell
1171                    .get_or_try_init(OrchestratorServer::start_lazy)
1172                    .await
1173                    .map_err(|e| ToolError::Internal(e.to_string()))?;
1174
1175                let client = server.client();
1176                let session_id = &input.session_id;
1177
1178                let session = client.sessions().get(session_id).await.map_err(|e| {
1179                    if e.is_not_found() {
1180                        ToolError::InvalidInput(format!(
1181                            "Session '{session_id}' not found. Use list_sessions to discover available sessions."
1182                        ))
1183                    } else {
1184                        ToolError::Internal(format!("Failed to get session: {e}"))
1185                    }
1186                })?;
1187
1188                let status = match client.sessions().status_for(session_id).await.map_err(|e| {
1189                    ToolError::Internal(format!("Failed to get session status: {e}"))
1190                })? {
1191                    SessionStatusInfo::Busy => SessionStatusSummary::Busy,
1192                    SessionStatusInfo::Retry {
1193                        attempt,
1194                        message,
1195                        next,
1196                    } => SessionStatusSummary::Retry {
1197                        attempt,
1198                        message,
1199                        next,
1200                    },
1201                    SessionStatusInfo::Idle => SessionStatusSummary::Idle,
1202                };
1203
1204                let messages = client.messages().list(session_id).await.map_err(|e| {
1205                    ToolError::Internal(format!("Failed to list messages: {e}"))
1206                })?;
1207                let pending_message_count = count_pending_messages(&messages);
1208                let last_activity = get_last_activity_time(
1209                    &messages,
1210                    session.time.as_ref().map(|time| time.updated),
1211                );
1212                let recent_tool_calls = extract_recent_tool_calls(&messages, 10);
1213
1214                let spawned = server.spawned_sessions().read().await;
1215                let launched_by_you = spawned.contains(session_id);
1216
1217                Ok(GetSessionStateOutput {
1218                    session_id: session.id,
1219                    title: session.title,
1220                    directory: session.directory,
1221                    status,
1222                    launched_by_you,
1223                    pending_message_count,
1224                    last_activity,
1225                    recent_tool_calls,
1226                })
1227            }
1228            .await;
1229
1230            match result {
1231                Ok(output) => {
1232                    log_tool_success(
1233                        &timer,
1234                        Self::NAME,
1235                        &input,
1236                        &output,
1237                        ToolLogMeta::default(),
1238                        false,
1239                    );
1240                    Ok(output)
1241                }
1242                Err(error) => {
1243                    log_tool_error(&timer, Self::NAME, &input, &error);
1244                    Err(error)
1245                }
1246            }
1247        })
1248    }
1249}
1250
1251// ============================================================================
1252// list_commands
1253// ============================================================================
1254
1255/// Tool for listing available `OpenCode` commands that can be executed.
1256#[derive(Clone)]
1257pub struct ListCommandsTool {
1258    server: Arc<OnceCell<OrchestratorServer>>,
1259}
1260
1261impl ListCommandsTool {
1262    /// Create a new `ListCommandsTool` with the given server cell.
1263    pub fn new(server: Arc<OnceCell<OrchestratorServer>>) -> Self {
1264        Self { server }
1265    }
1266}
1267
1268impl Tool for ListCommandsTool {
1269    type Input = ListCommandsInput;
1270    type Output = ListCommandsOutput;
1271    const NAME: &'static str = "list_commands";
1272    const DESCRIPTION: &'static str = "List available OpenCode commands that can be used with run.";
1273
1274    fn call(
1275        &self,
1276        input: Self::Input,
1277        _ctx: &ToolContext,
1278    ) -> BoxFuture<'static, Result<Self::Output, ToolError>> {
1279        let server_cell = Arc::clone(&self.server);
1280        Box::pin(async move {
1281            let timer = CallTimer::start();
1282            let result: Result<ListCommandsOutput, ToolError> = async {
1283                let server = server_cell
1284                    .get_or_try_init(OrchestratorServer::start_lazy)
1285                    .await
1286                    .map_err(|e| ToolError::Internal(e.to_string()))?;
1287
1288                let commands =
1289                    server.client().tools().commands().await.map_err(|e| {
1290                        ToolError::Internal(format!("Failed to list commands: {e}"))
1291                    })?;
1292
1293                let command_infos: Vec<CommandInfo> = commands
1294                    .into_iter()
1295                    .map(|c| CommandInfo {
1296                        name: c.name,
1297                        description: c.description,
1298                    })
1299                    .collect();
1300
1301                Ok(ListCommandsOutput {
1302                    commands: command_infos,
1303                })
1304            }
1305            .await;
1306
1307            match result {
1308                Ok(output) => {
1309                    log_tool_success(
1310                        &timer,
1311                        Self::NAME,
1312                        &input,
1313                        &output,
1314                        ToolLogMeta::default(),
1315                        false,
1316                    );
1317                    Ok(output)
1318                }
1319                Err(error) => {
1320                    log_tool_error(&timer, Self::NAME, &input, &error);
1321                    Err(error)
1322                }
1323            }
1324        })
1325    }
1326}
1327
1328// ============================================================================
1329// respond_permission
1330// ============================================================================
1331
1332/// Tool for responding to permission requests from `OpenCode` sessions.
1333///
1334/// After sending the reply, continues monitoring the session and returns
1335/// when the session completes or another permission is requested.
1336#[derive(Clone)]
1337pub struct RespondPermissionTool {
1338    server: Arc<OnceCell<OrchestratorServer>>,
1339}
1340
1341impl RespondPermissionTool {
1342    /// Create a new `RespondPermissionTool` with the given server cell.
1343    pub fn new(server: Arc<OnceCell<OrchestratorServer>>) -> Self {
1344        Self { server }
1345    }
1346}
1347
1348impl Tool for RespondPermissionTool {
1349    type Input = RespondPermissionInput;
1350    type Output = RespondPermissionOutput;
1351    const NAME: &'static str = "respond_permission";
1352    const DESCRIPTION: &'static str = r#"Respond to a permission request from an OpenCode session.
1353
1354After responding, continues monitoring the session and returns when complete or when another permission is required.
1355
1356Parameters:
1357- session_id: Session with pending permission
1358- reply: "once" (allow this request), "always" (allow for matching patterns), or "reject" (deny)
1359- message: Optional message to include with reply"#;
1360
1361    fn call(
1362        &self,
1363        input: Self::Input,
1364        _ctx: &ToolContext,
1365    ) -> BoxFuture<'static, Result<Self::Output, ToolError>> {
1366        let server_cell = Arc::clone(&self.server);
1367        Box::pin(async move {
1368            let timer = CallTimer::start();
1369            let request = input.clone();
1370            let result: Result<(RespondPermissionOutput, ToolLogMeta), ToolError> = async {
1371                let server = server_cell
1372                    .get_or_try_init(OrchestratorServer::start_lazy)
1373                    .await
1374                    .map_err(|e| ToolError::Internal(e.to_string()))?;
1375
1376                let client = server.client();
1377
1378                // Find the pending permission for this session
1379                let mut pending =
1380                    client.permissions().list().await.map_err(|e| {
1381                        ToolError::Internal(format!("Failed to list permissions: {e}"))
1382                    })?;
1383
1384                let perm = if let Some(req_id) = input.permission_request_id.as_deref() {
1385                    let idx = pending.iter().position(|p| p.id == req_id).ok_or_else(|| {
1386                        ToolError::InvalidInput(format!(
1387                            "No pending permission found with id '{req_id}'. \
1388                         (session_id='{}')",
1389                            input.session_id
1390                        ))
1391                    })?;
1392
1393                    let perm = pending.remove(idx);
1394
1395                    if perm.session_id != input.session_id {
1396                        return Err(ToolError::InvalidInput(format!(
1397                            "Permission request '{req_id}' belongs to session '{}', not '{}'.",
1398                            perm.session_id, input.session_id
1399                        )));
1400                    }
1401
1402                    perm
1403                } else {
1404                    let mut perms: Vec<_> = pending
1405                        .into_iter()
1406                        .filter(|p| p.session_id == input.session_id)
1407                        .collect();
1408
1409                    match perms.as_slice() {
1410                        [] => {
1411                            return Err(ToolError::InvalidInput(format!(
1412                                "No pending permission found for session '{}'. \
1413                             The permission may have already been responded to.",
1414                                input.session_id
1415                            )));
1416                        }
1417                        [_single] => perms.swap_remove(0),
1418                        multiple => {
1419                            let ids = multiple
1420                                .iter()
1421                                .map(|p| p.id.as_str())
1422                                .collect::<Vec<_>>()
1423                                .join(", ");
1424                            return Err(ToolError::InvalidInput(format!(
1425                                "Multiple pending permissions found for session '{}': {ids}. \
1426                             Please retry with permission_request_id (returned by run).",
1427                                input.session_id
1428                            )));
1429                        }
1430                    }
1431                };
1432
1433                // Track if this is a rejection for post-processing
1434                let is_reject = matches!(input.reply, PermissionReply::Reject);
1435
1436                // Capture permission details for warning message
1437                let permission_type = perm.permission.clone();
1438                let permission_patterns = perm.patterns.clone();
1439
1440                // Capture baseline assistant text BEFORE sending reject
1441                // This lets us detect stale text after rejection
1442                let mut pre_warnings: Vec<String> = Vec::new();
1443                let baseline = if is_reject {
1444                    match client.messages().list(&input.session_id).await {
1445                        Ok(msgs) => OrchestratorServer::extract_assistant_text(&msgs),
1446                        Err(e) => {
1447                            pre_warnings.push(format!("Failed to fetch baseline messages: {e}"));
1448                            None
1449                        }
1450                    }
1451                } else {
1452                    None
1453                };
1454
1455                // Convert our reply type to API type
1456                let api_reply = match input.reply {
1457                    PermissionReply::Once => ApiPermissionReply::Once,
1458                    PermissionReply::Always => ApiPermissionReply::Always,
1459                    PermissionReply::Reject => ApiPermissionReply::Reject,
1460                };
1461
1462                // Send the reply
1463                client
1464                    .permissions()
1465                    .reply(
1466                        &perm.id,
1467                        &PermissionReplyRequest {
1468                            reply: api_reply,
1469                            message: input.message,
1470                        },
1471                    )
1472                    .await
1473                    .map_err(|e| {
1474                        ToolError::Internal(format!("Failed to reply to permission: {e}"))
1475                    })?;
1476
1477                // Now continue monitoring the session using run logic
1478                let run_tool = OrchestratorRunTool::new(Arc::clone(&server_cell));
1479                let wait_for_activity = (!is_reject).then_some(true);
1480                let outcome = run_tool
1481                    .run_impl_outcome(OrchestratorRunInput {
1482                        session_id: Some(input.session_id),
1483                        command: None,
1484                        message: None,
1485                        wait_for_activity,
1486                    })
1487                    .await?;
1488                let mut out = outcome.output;
1489
1490                // Merge pre-warnings
1491                out.warnings.extend(pre_warnings);
1492
1493                // Apply rejection-aware output mutation
1494                if is_reject && matches!(out.status, RunStatus::Completed) {
1495                    let final_resp = out.response.as_deref();
1496                    let baseline_resp = baseline.as_deref();
1497
1498                    // If response unchanged or None, it's stale pre-rejection text
1499                    if final_resp.is_none() || final_resp == baseline_resp {
1500                        out.response = None;
1501                        let patterns_str = if permission_patterns.is_empty() {
1502                            "(none)".to_string()
1503                        } else {
1504                            permission_patterns.join(", ")
1505                        };
1506                        out.warnings.push(format!(
1507                        "Permission rejected for '{permission_type}'. Patterns: {patterns_str}. \
1508                         Session stopped without generating a new assistant response."
1509                    ));
1510                        tracing::debug!(
1511                            permission_type = %permission_type,
1512                            "rejection override applied: response set to None"
1513                        );
1514                    }
1515                }
1516
1517                Ok((out, outcome.log_meta))
1518            }
1519            .await;
1520
1521            match result {
1522                Ok((output, log_meta)) => {
1523                    log_tool_success(&timer, Self::NAME, &request, &output, log_meta, true);
1524                    Ok(output)
1525                }
1526                Err(error) => {
1527                    log_tool_error(&timer, Self::NAME, &request, &error);
1528                    Err(error)
1529                }
1530            }
1531        })
1532    }
1533}
1534
1535// ============================================================================
1536// respond_question
1537// ============================================================================
1538
1539#[derive(Clone)]
1540pub struct RespondQuestionTool {
1541    server: Arc<OnceCell<OrchestratorServer>>,
1542}
1543
1544impl RespondQuestionTool {
1545    pub fn new(server: Arc<OnceCell<OrchestratorServer>>) -> Self {
1546        Self { server }
1547    }
1548}
1549
1550impl Tool for RespondQuestionTool {
1551    type Input = RespondQuestionInput;
1552    type Output = RespondQuestionOutput;
1553    const NAME: &'static str = "respond_question";
1554    const DESCRIPTION: &'static str = r#"Respond to a question request from an OpenCode session.
1555
1556After replying, continues monitoring the session and returns when complete or when another interruption is required.
1557
1558Parameters:
1559- session_id: Session with pending question
1560- action: "reply" or "reject"
1561- answers: Required when action=reply; one list per question"#;
1562
1563    fn call(
1564        &self,
1565        input: Self::Input,
1566        _ctx: &ToolContext,
1567    ) -> BoxFuture<'static, Result<Self::Output, ToolError>> {
1568        let server_cell = Arc::clone(&self.server);
1569        Box::pin(async move {
1570            let timer = CallTimer::start();
1571            let request = input.clone();
1572            let result: Result<(RespondQuestionOutput, ToolLogMeta), ToolError> = async {
1573            let server = server_cell
1574                .get_or_try_init(OrchestratorServer::start_lazy)
1575                .await
1576                .map_err(|e| ToolError::Internal(e.to_string()))?;
1577
1578            let client = server.client();
1579            let mut pending = client
1580                .question()
1581                .list()
1582                .await
1583                .map_err(|e| ToolError::Internal(format!("Failed to list questions: {e}")))?;
1584
1585            let question = if let Some(req_id) = input.question_request_id.as_deref() {
1586                let idx = pending
1587                    .iter()
1588                    .position(|question| question.id == req_id)
1589                    .ok_or_else(|| {
1590                        ToolError::InvalidInput(format!(
1591                            "No pending question found with id '{req_id}'. (session_id='{}')",
1592                            input.session_id
1593                        ))
1594                    })?;
1595
1596                let question = pending.remove(idx);
1597                if question.session_id != input.session_id {
1598                    return Err(ToolError::InvalidInput(format!(
1599                        "Question request '{req_id}' belongs to session '{}', not '{}'.",
1600                        question.session_id, input.session_id
1601                    )));
1602                }
1603
1604                question
1605            } else {
1606                let mut questions: Vec<_> = pending
1607                    .into_iter()
1608                    .filter(|question| question.session_id == input.session_id)
1609                    .collect();
1610
1611                match questions.as_slice() {
1612                    [] => {
1613                        return Err(ToolError::InvalidInput(format!(
1614                            "No pending question found for session '{}'. The question may have already been responded to.",
1615                            input.session_id
1616                        )));
1617                    }
1618                    [_single] => questions.swap_remove(0),
1619                    multiple => {
1620                        let ids = multiple
1621                            .iter()
1622                            .map(|question| question.id.as_str())
1623                            .collect::<Vec<_>>()
1624                            .join(", ");
1625                        return Err(ToolError::InvalidInput(format!(
1626                            "Multiple pending questions found for session '{}': {ids}. Please retry with question_request_id (returned by run).",
1627                            input.session_id
1628                        )));
1629                    }
1630                }
1631            };
1632
1633            match input.action {
1634                QuestionAction::Reply => {
1635                    if input.answers.is_empty() {
1636                        return Err(ToolError::InvalidInput(
1637                            "answers is required when action=reply".into(),
1638                        ));
1639                    }
1640
1641                    client
1642                        .question()
1643                        .reply(
1644                            &question.id,
1645                            &QuestionReply {
1646                                answers: input.answers,
1647                            },
1648                        )
1649                        .await
1650                        .map_err(|e| {
1651                            ToolError::Internal(format!("Failed to reply to question: {e}"))
1652                        })?;
1653
1654                    let outcome = OrchestratorRunTool::new(Arc::clone(&server_cell))
1655                        .run_impl_outcome(OrchestratorRunInput {
1656                            session_id: Some(input.session_id),
1657                            command: None,
1658                            message: None,
1659                            wait_for_activity: Some(true),
1660                        })
1661                        .await?;
1662                    Ok((outcome.output, outcome.log_meta))
1663                }
1664                QuestionAction::Reject => {
1665                    client.question().reject(&question.id).await.map_err(|e| {
1666                        ToolError::Internal(format!("Failed to reject question: {e}"))
1667                    })?;
1668
1669                    let outcome = OrchestratorRunTool::new(Arc::clone(&server_cell))
1670                        .run_impl_outcome(OrchestratorRunInput {
1671                            session_id: Some(input.session_id),
1672                            command: None,
1673                            message: None,
1674                            wait_for_activity: None,
1675                        })
1676                        .await?;
1677                    Ok((outcome.output, outcome.log_meta))
1678                }
1679            }
1680        }
1681        .await;
1682
1683            match result {
1684                Ok((output, log_meta)) => {
1685                    log_tool_success(&timer, Self::NAME, &request, &output, log_meta, true);
1686                    Ok(output)
1687                }
1688                Err(error) => {
1689                    log_tool_error(&timer, Self::NAME, &request, &error);
1690                    Err(error)
1691                }
1692            }
1693        })
1694    }
1695}
1696
1697// ============================================================================
1698// Registry builder
1699// ============================================================================
1700
1701/// Build the tool registry with all orchestrator tools.
1702///
1703/// The server cell is lazily initialized on first tool call.
1704pub fn build_registry(server: &Arc<OnceCell<OrchestratorServer>>) -> ToolRegistry {
1705    ToolRegistry::builder()
1706        .register::<OrchestratorRunTool, ()>(OrchestratorRunTool::new(Arc::clone(server)))
1707        .register::<ListSessionsTool, ()>(ListSessionsTool::new(Arc::clone(server)))
1708        .register::<GetSessionStateTool, ()>(GetSessionStateTool::new(Arc::clone(server)))
1709        .register::<ListCommandsTool, ()>(ListCommandsTool::new(Arc::clone(server)))
1710        .register::<RespondPermissionTool, ()>(RespondPermissionTool::new(Arc::clone(server)))
1711        .register::<RespondQuestionTool, ()>(RespondQuestionTool::new(Arc::clone(server)))
1712        .finish()
1713}
1714
1715#[cfg(test)]
1716mod tests {
1717    use super::*;
1718    use agentic_tools_core::Tool;
1719
1720    #[test]
1721    fn tool_names_are_short() {
1722        assert_eq!(<OrchestratorRunTool as Tool>::NAME, "run");
1723        assert_eq!(<ListSessionsTool as Tool>::NAME, "list_sessions");
1724        assert_eq!(<GetSessionStateTool as Tool>::NAME, "get_session_state");
1725        assert_eq!(<ListCommandsTool as Tool>::NAME, "list_commands");
1726        assert_eq!(<RespondPermissionTool as Tool>::NAME, "respond_permission");
1727        assert_eq!(<RespondQuestionTool as Tool>::NAME, "respond_question");
1728    }
1729
1730    #[test]
1731    fn last_activity_falls_back_to_session_timestamp_when_messages_are_empty() {
1732        assert_eq!(get_last_activity_time(&[], Some(1_234)), Some(1_234));
1733    }
1734}