Skip to main content

opencode_orchestrator_mcp/
tools.rs

1//! Tool implementations for orchestrator MCP server.
2
3use crate::config;
4use crate::logging;
5use crate::server::OrchestratorServer;
6use crate::token_tracker::TokenTracker;
7use crate::types::ChangeStats;
8use crate::types::CommandInfo;
9use crate::types::GetSessionStateInput;
10use crate::types::GetSessionStateOutput;
11use crate::types::ListCommandsInput;
12use crate::types::ListCommandsOutput;
13use crate::types::ListSessionsInput;
14use crate::types::ListSessionsOutput;
15use crate::types::OrchestratorRunInput;
16use crate::types::OrchestratorRunOutput;
17use crate::types::PermissionReply;
18use crate::types::QuestionAction;
19use crate::types::QuestionInfoView;
20use crate::types::QuestionOptionView;
21use crate::types::RespondPermissionInput;
22use crate::types::RespondPermissionOutput;
23use crate::types::RespondQuestionInput;
24use crate::types::RespondQuestionOutput;
25use crate::types::RunStatus;
26use crate::types::SessionStatusSummary;
27use crate::types::SessionSummary;
28use crate::types::ToolCallSummary;
29use crate::types::ToolStateSummary;
30use agentic_logging::CallTimer;
31use agentic_logging::ToolCallRecord;
32use agentic_tools_core::Tool;
33use agentic_tools_core::ToolContext;
34use agentic_tools_core::ToolError;
35use agentic_tools_core::ToolRegistry;
36use agentic_tools_core::fmt::TextFormat;
37use agentic_tools_core::fmt::TextOptions;
38use futures::future::BoxFuture;
39use opencode_rs::types::event::Event;
40use opencode_rs::types::message::CommandRequest;
41use opencode_rs::types::message::Message;
42use opencode_rs::types::message::Part;
43use opencode_rs::types::message::PromptPart;
44use opencode_rs::types::message::PromptRequest;
45use opencode_rs::types::message::ToolState;
46use opencode_rs::types::permission::PermissionReply as ApiPermissionReply;
47use opencode_rs::types::permission::PermissionReplyRequest;
48use opencode_rs::types::question::QuestionReply;
49use opencode_rs::types::question::QuestionRequest;
50use opencode_rs::types::session::CreateSessionRequest;
51use opencode_rs::types::session::SessionStatusInfo;
52use opencode_rs::types::session::SummarizeRequest;
53use serde::Serialize;
54use std::sync::Arc;
55use std::time::Duration;
56use tokio::sync::OnceCell;
57use tokio::task::JoinHandle;
58
59const SERVER_NAME: &str = "opencode-orchestrator-mcp";
60
61#[derive(Debug, Clone, Default)]
62struct ToolLogMeta {
63    token_usage: Option<agentic_logging::TokenUsage>,
64    token_usage_saturated: bool,
65}
66
67struct RunOutcome {
68    output: OrchestratorRunOutput,
69    log_meta: ToolLogMeta,
70}
71
72impl RunOutcome {
73    fn without_tokens(output: OrchestratorRunOutput) -> Self {
74        Self {
75            output,
76            log_meta: ToolLogMeta::default(),
77        }
78    }
79
80    fn with_tracker(output: OrchestratorRunOutput, token_tracker: &TokenTracker) -> Self {
81        let (token_usage, token_usage_saturated) = token_tracker.to_log_token_usage();
82        Self {
83            output,
84            log_meta: ToolLogMeta {
85                token_usage,
86                token_usage_saturated,
87            },
88        }
89    }
90}
91
92async fn abort_command_task(task: &mut Option<JoinHandle<Result<(), String>>>) {
93    if let Some(handle) = task.take() {
94        handle.abort();
95        let _ = handle.await;
96    }
97}
98
99fn request_json<T: Serialize>(request: &T) -> serde_json::Value {
100    serde_json::to_value(request)
101        .unwrap_or_else(|error| serde_json::json!({"serialization_error": error.to_string()}))
102}
103
104fn log_tool_success<TReq: Serialize, TOut: TextFormat>(
105    timer: &CallTimer,
106    tool: &str,
107    request: &TReq,
108    output: &TOut,
109    log_meta: ToolLogMeta,
110    write_markdown: bool,
111) {
112    let (completed_at, duration_ms) = timer.finish();
113    let rendered = output.fmt_text(&TextOptions::default());
114    let response_file = write_markdown
115        .then(|| logging::write_markdown_best_effort(completed_at, &timer.call_id, &rendered))
116        .flatten();
117
118    let record = ToolCallRecord {
119        call_id: timer.call_id.clone(),
120        server: SERVER_NAME.into(),
121        tool: tool.into(),
122        started_at: timer.started_at,
123        completed_at,
124        duration_ms,
125        request: request_json(request),
126        response_file,
127        success: true,
128        error: None,
129        model: None,
130        token_usage: log_meta.token_usage,
131        summary: log_meta
132            .token_usage_saturated
133            .then(|| serde_json::json!({"token_usage_saturated": true})),
134    };
135
136    logging::append_record_best_effort(&record);
137}
138
139fn log_tool_error<TReq: Serialize>(
140    timer: &CallTimer,
141    tool: &str,
142    request: &TReq,
143    error: &ToolError,
144) {
145    let (completed_at, duration_ms) = timer.finish();
146    let record = ToolCallRecord {
147        call_id: timer.call_id.clone(),
148        server: SERVER_NAME.into(),
149        tool: tool.into(),
150        started_at: timer.started_at,
151        completed_at,
152        duration_ms,
153        request: request_json(request),
154        response_file: None,
155        success: false,
156        error: Some(error.to_string()),
157        model: None,
158        token_usage: None,
159        summary: None,
160    };
161
162    logging::append_record_best_effort(&record);
163}
164
165// ============================================================================
166// run
167// ============================================================================
168
169/// Tool for starting or resuming `OpenCode` sessions.
170///
171/// Handles session creation, prompt/command execution, SSE event monitoring,
172/// and permission request detection. Returns when the session completes or
173/// when a permission is requested.
174#[derive(Clone)]
175pub struct OrchestratorRunTool {
176    server: Arc<OnceCell<OrchestratorServer>>,
177}
178
179impl OrchestratorRunTool {
180    /// Create a new `OrchestratorRunTool` with the given server cell.
181    pub fn new(server: Arc<OnceCell<OrchestratorServer>>) -> Self {
182        Self { server }
183    }
184
185    /// Finalize a completed session by fetching messages and optionally triggering summarization.
186    ///
187    /// This is called when we detect the session is idle, either via SSE `SessionIdle` event
188    /// or via polling `sessions().status()`.
189    ///
190    /// Uses bounded retry with backoff (0/50/100/200/400ms) if assistant text is not immediately
191    /// available, handling the race condition where the session becomes idle before messages
192    /// are fully persisted.
193    async fn finalize_completed(
194        client: &opencode_rs::Client,
195        session_id: String,
196        token_tracker: &TokenTracker,
197        mut warnings: Vec<String>,
198    ) -> Result<OrchestratorRunOutput, ToolError> {
199        // Bounded backoff delays for message extraction retry (~750ms total budget)
200        const BACKOFFS_MS: &[u64] = &[0, 50, 100, 200, 400];
201
202        let mut response: Option<String> = None;
203
204        for (attempt, &delay_ms) in BACKOFFS_MS.iter().enumerate() {
205            if delay_ms > 0 {
206                tokio::time::sleep(Duration::from_millis(delay_ms)).await;
207            }
208
209            let messages = client
210                .messages()
211                .list(&session_id)
212                .await
213                .map_err(|e| ToolError::Internal(format!("Failed to list messages: {e}")))?;
214
215            response = OrchestratorServer::extract_assistant_text(&messages);
216
217            if response.is_some() {
218                if attempt > 0 {
219                    tracing::debug!(
220                        session_id = %session_id,
221                        attempt,
222                        "assistant response became available after retry"
223                    );
224                }
225                break;
226            }
227        }
228
229        if response.is_none() {
230            tracing::debug!(
231                session_id = %session_id,
232                "no assistant response found after bounded retry"
233            );
234        }
235
236        // Handle context limit summarization if needed
237        if token_tracker.compaction_needed
238            && let (Some(pid), Some(mid)) = (&token_tracker.provider_id, &token_tracker.model_id)
239        {
240            let summarize_req = SummarizeRequest {
241                provider_id: pid.clone(),
242                model_id: mid.clone(),
243                auto: None,
244            };
245
246            match client
247                .sessions()
248                .summarize(&session_id, &summarize_req)
249                .await
250            {
251                Ok(_) => {
252                    tracing::info!(session_id = %session_id, "context summarization triggered");
253                    warnings.push("Context limit reached; summarization triggered".into());
254                }
255                Err(e) => {
256                    tracing::warn!(session_id = %session_id, error = %e, "summarization failed");
257                    warnings.push(format!("Summarization failed: {e}"));
258                }
259            }
260        }
261
262        Ok(OrchestratorRunOutput {
263            session_id,
264            status: RunStatus::Completed,
265            response,
266            partial_response: None,
267            permission_request_id: None,
268            permission_type: None,
269            permission_patterns: vec![],
270            question_request_id: None,
271            questions: vec![],
272            warnings,
273        })
274    }
275
276    fn map_questions(req: &QuestionRequest) -> Vec<QuestionInfoView> {
277        req.questions
278            .iter()
279            .map(|question| QuestionInfoView {
280                question: question.question.clone(),
281                header: question.header.clone(),
282                options: question
283                    .options
284                    .iter()
285                    .map(|option| QuestionOptionView {
286                        label: option.label.clone(),
287                        description: option.description.clone(),
288                    })
289                    .collect(),
290                multiple: question.multiple,
291                custom: question.custom,
292            })
293            .collect()
294    }
295
296    fn question_required_output(
297        session_id: String,
298        partial_response: Option<String>,
299        request: &QuestionRequest,
300        warnings: Vec<String>,
301    ) -> OrchestratorRunOutput {
302        OrchestratorRunOutput {
303            session_id,
304            status: RunStatus::QuestionRequired,
305            response: None,
306            partial_response,
307            permission_request_id: None,
308            permission_type: None,
309            permission_patterns: vec![],
310            question_request_id: Some(request.id.clone()),
311            questions: Self::map_questions(request),
312            warnings,
313        }
314    }
315
316    async fn run_impl_outcome(
317        &self,
318        input: OrchestratorRunInput,
319        ctx: &ToolContext,
320    ) -> Result<RunOutcome, ToolError> {
321        // Input validation
322        if input.session_id.is_none() && input.message.is_none() && input.command.is_none() {
323            return Err(ToolError::InvalidInput(
324                "Either session_id (to resume/check status) or message/command (to start work) is required"
325                    .into(),
326            ));
327        }
328
329        if input.command.is_some() && input.message.is_none() {
330            return Err(ToolError::InvalidInput(
331                "message is required when command is specified (becomes $ARGUMENTS for template expansion)"
332                    .into(),
333            ));
334        }
335
336        // Trim and validate message content
337        let message = input.message.map(|m| m.trim().to_string());
338        if let Some(ref m) = message
339            && m.is_empty()
340        {
341            return Err(ToolError::InvalidInput(
342                "message cannot be empty or whitespace-only".into(),
343            ));
344        }
345
346        let wait_for_activity = input.wait_for_activity.unwrap_or(false);
347
348        // Lazy initialization: spawn server on first tool call
349        let server = self
350            .server
351            .get_or_try_init(OrchestratorServer::start_lazy)
352            .await
353            .map_err(|e| ToolError::Internal(e.to_string()))?;
354
355        let client = server.client();
356
357        tracing::debug!(
358            command = ?input.command,
359            has_message = message.is_some(),
360            message_len = message.as_ref().map(String::len),
361            session_id = ?input.session_id,
362            "run: starting"
363        );
364
365        // 1. Resolve session: validate existing or create new
366        let session_id = if let Some(sid) = input.session_id {
367            // Validate session exists
368            client.sessions().get(&sid).await.map_err(|e| {
369                if e.is_not_found() {
370                    ToolError::InvalidInput(format!(
371                        "Session '{sid}' not found. Use list_sessions to discover sessions, \
372                         or omit session_id to create a new session."
373                    ))
374                } else {
375                    ToolError::Internal(format!("Failed to get session: {e}"))
376                }
377            })?;
378            sid
379        } else {
380            // Create new session
381            let session = client
382                .sessions()
383                .create(&CreateSessionRequest::default())
384                .await
385                .map_err(|e| ToolError::Internal(format!("Failed to create session: {e}")))?;
386
387            {
388                let mut spawned = server.spawned_sessions().write().await;
389                spawned.insert(session.id.clone());
390            }
391
392            session.id
393        };
394
395        tracing::info!(session_id = %session_id, "run: session resolved");
396
397        // 2. Check if session is already idle (for resume-only case)
398        let status = client
399            .sessions()
400            .status_for(&session_id)
401            .await
402            .map_err(|e| ToolError::Internal(format!("Failed to get session status: {e}")))?;
403
404        let is_idle = matches!(status, SessionStatusInfo::Idle);
405
406        // 3. Check for pending permissions before doing anything else
407        let pending_permissions = client
408            .permissions()
409            .list()
410            .await
411            .map_err(|e| ToolError::Internal(format!("Failed to list permissions: {e}")))?;
412
413        let my_permission = pending_permissions
414            .into_iter()
415            .find(|p| p.session_id == session_id);
416
417        if let Some(perm) = my_permission {
418            tracing::info!(
419                session_id = %session_id,
420                permission_type = %perm.permission,
421                "run: pending permission found"
422            );
423            return Ok(RunOutcome::without_tokens(OrchestratorRunOutput {
424                session_id,
425                status: RunStatus::PermissionRequired,
426                response: None,
427                partial_response: None,
428                permission_request_id: Some(perm.id),
429                permission_type: Some(perm.permission),
430                permission_patterns: perm.patterns,
431                question_request_id: None,
432                questions: vec![],
433                warnings: vec![],
434            }));
435        }
436
437        let pending_questions = client
438            .question()
439            .list()
440            .await
441            .map_err(|e| ToolError::Internal(format!("Failed to list questions: {e}")))?;
442
443        if let Some(question) = pending_questions
444            .into_iter()
445            .find(|question| question.session_id == session_id)
446        {
447            tracing::info!(session_id = %session_id, question_id = %question.id, "run: pending question found");
448            return Ok(RunOutcome::without_tokens(Self::question_required_output(
449                session_id,
450                None,
451                &question,
452                vec![],
453            )));
454        }
455
456        // 4. If no message/command and session is idle, just return current state
457        // Uses finalize_completed to get retry logic for message extraction
458        if message.is_none() && input.command.is_none() && is_idle && !wait_for_activity {
459            let token_tracker = TokenTracker::with_threshold(server.compaction_threshold());
460            let output =
461                Self::finalize_completed(client, session_id, &token_tracker, vec![]).await?;
462            return Ok(RunOutcome::with_tracker(output, &token_tracker));
463        }
464
465        // 5. Subscribe to SSE BEFORE sending prompt/command
466        let mut subscription = client
467            .subscribe_session(&session_id)
468            .map_err(|e| ToolError::Internal(format!("Failed to subscribe to session: {e}")))?;
469
470        // Track whether this call is dispatching new work (command or message)
471        // vs just resuming/monitoring an existing session.
472        let dispatched_new_work = input.command.is_some() || message.is_some() || wait_for_activity;
473        let idle_grace = config::idle_grace();
474        let mut idle_grace_deadline: Option<tokio::time::Instant> = None;
475        let mut awaiting_idle_grace_check = false;
476
477        if wait_for_activity && input.command.is_none() && message.is_none() {
478            idle_grace_deadline = Some(tokio::time::Instant::now() + idle_grace);
479        }
480
481        // 6. Kick off the work
482        let mut command_task: Option<JoinHandle<Result<(), String>>> = None;
483        let mut command_name_for_logging: Option<String> = None;
484
485        if let Some(command) = &input.command {
486            command_name_for_logging = Some(command.clone());
487
488            let cmd_client = client.clone();
489            let cmd_session_id = session_id.clone();
490            let cmd_name = command.clone();
491            let cmd_arguments = message.clone().unwrap_or_default();
492
493            command_task = Some(tokio::spawn(async move {
494                let req = CommandRequest {
495                    command: cmd_name,
496                    arguments: cmd_arguments,
497                    message_id: None,
498                };
499
500                cmd_client
501                    .messages()
502                    .command(&cmd_session_id, &req)
503                    .await
504                    .map(|_| ())
505                    .map_err(|e| e.to_string())
506            }));
507        } else if let Some(msg) = &message {
508            // Send prompt asynchronously
509            let req = PromptRequest {
510                parts: vec![PromptPart::Text {
511                    text: msg.clone(),
512                    synthetic: None,
513                    ignored: None,
514                    metadata: None,
515                }],
516                message_id: None,
517                model: None,
518                agent: None,
519                no_reply: None,
520                system: None,
521                variant: None,
522            };
523
524            client
525                .messages()
526                .prompt_async(&session_id, &req)
527                .await
528                .map_err(|e| ToolError::Internal(format!("Failed to send prompt: {e}")))?;
529
530            idle_grace_deadline = Some(tokio::time::Instant::now() + idle_grace);
531        }
532
533        // 7. Event loop: wait for completion or permission
534        // Overall timeout to prevent infinite hangs (configurable, default 1 hour)
535        let deadline = tokio::time::Instant::now() + server.session_deadline();
536        let inactivity_timeout = server.inactivity_timeout();
537        let mut last_activity_time = tokio::time::Instant::now();
538
539        tracing::debug!(session_id = %session_id, "run: entering event loop");
540        let mut token_tracker = TokenTracker::with_threshold(server.compaction_threshold());
541        let mut partial_response = String::new();
542        let warnings = Vec::new();
543
544        let mut poll_interval = tokio::time::interval(Duration::from_secs(1));
545        poll_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
546
547        // Track whether we've observed the session as busy at least once.
548        // This prevents completing immediately if we call run_impl on an already-idle
549        // session before our new work has started processing.
550        let mut observed_busy = false;
551
552        // Track whether SSE is still active. If the stream closes, we fall back
553        // to polling-only mode rather than returning an error.
554        let mut sse_active = true;
555
556        // === Post-subscribe status re-check (latency optimization) ===
557        // If we're just monitoring (no new work dispatched), check if session is already idle.
558        // This handles the race where session completed between our initial status check
559        // and SSE subscription becoming ready.
560        if !dispatched_new_work
561            && let Ok(status) = client.sessions().status_for(&session_id).await
562            && matches!(status, SessionStatusInfo::Idle)
563        {
564            tracing::debug!(
565                session_id = %session_id,
566                "session already idle on post-subscribe check"
567            );
568            let output =
569                Self::finalize_completed(client, session_id, &token_tracker, warnings).await?;
570            return Ok(RunOutcome::with_tracker(output, &token_tracker));
571        }
572        // If check fails or session is busy, continue to event loop
573
574        loop {
575            // Check timeout before processing
576            let now = tokio::time::Instant::now();
577
578            if now.duration_since(last_activity_time) >= inactivity_timeout {
579                return Err(ToolError::Internal(format!(
580                    "Session idle timeout: no activity for 5 minutes (session_id={session_id}). \
581                     The session may still be running; use run(session_id=...) to check status."
582                )));
583            }
584
585            if now >= deadline {
586                return Err(ToolError::Internal(
587                    "Session execution timed out after 1 hour. \
588                     The session may still be running; use run with the session_id to check status."
589                        .into(),
590                ));
591            }
592
593            let command_task_active = command_task.is_some();
594
595            tokio::select! {
596                () = ctx.cancelled() => {
597                    abort_command_task(&mut command_task).await;
598                    return Err(ToolError::cancelled(None));
599                }
600
601                maybe_event = subscription.recv(), if sse_active => {
602                    let Some(event) = maybe_event else {
603                        // SSE stream closed - this can happen due to network issues,
604                        // server restarts, or connection timeouts. Fall back to polling
605                        // rather than failing immediately.
606                        tracing::warn!(
607                            session_id = %session_id,
608                            "SSE stream closed unexpectedly; falling back to polling-only mode"
609                        );
610                        sse_active = false;
611                        continue; // The poll_interval branch will now drive completion detection
612                    };
613
614                    // Track tokens (server is already initialized at this point)
615                    token_tracker.observe_event(&event, |pid, mid| {
616                        server.context_limit(pid, mid)
617                    });
618
619                    match event {
620                        Event::PermissionAsked { properties } => {
621                            tracing::info!(
622                                session_id = %session_id,
623                                permission_type = %properties.request.permission,
624                                "run: permission requested"
625                            );
626                            return Ok(RunOutcome::with_tracker(OrchestratorRunOutput {
627                                session_id,
628                                status: RunStatus::PermissionRequired,
629                                response: None,
630                                partial_response: if partial_response.is_empty() {
631                                    None
632                                } else {
633                                    Some(partial_response)
634                                },
635                                permission_request_id: Some(properties.request.id),
636                                permission_type: Some(properties.request.permission),
637                                permission_patterns: properties.request.patterns,
638                                question_request_id: None,
639                                questions: vec![],
640                                warnings,
641                            }, &token_tracker));
642                        }
643
644                        Event::QuestionAsked { properties } => {
645                            return Ok(RunOutcome::with_tracker(Self::question_required_output(
646                                session_id,
647                                if partial_response.is_empty() {
648                                    None
649                                } else {
650                                    Some(partial_response)
651                                },
652                                &properties.request,
653                                warnings,
654                            ), &token_tracker));
655                        }
656
657                        Event::MessagePartDelta { properties } => {
658                            last_activity_time = tokio::time::Instant::now();
659                            // Message streaming means session is actively processing
660                            observed_busy = true;
661                            awaiting_idle_grace_check = false;
662                            // Collect streaming text from field-level delta events.
663                            if let Some(delta) = &properties.delta {
664                                partial_response.push_str(delta);
665                            }
666                        }
667
668                        Event::MessagePartUpdated { .. } | Event::MessageUpdated { .. } => {
669                            last_activity_time = tokio::time::Instant::now();
670                            observed_busy = true;
671                            awaiting_idle_grace_check = false;
672                        }
673
674                        Event::SessionError { properties } => {
675                            let error_msg = properties
676                                .error
677                                .map_or_else(|| "Unknown error".to_string(), |e| format!("{e:?}"));
678                            tracing::error!(
679                                session_id = %session_id,
680                                error = %error_msg,
681                                "run: session error"
682                            );
683                            return Err(ToolError::Internal(format!("Session error: {error_msg}")));
684                        }
685
686                        Event::SessionIdle { .. } => {
687                            tracing::debug!(session_id = %session_id, "received SessionIdle event");
688                            let output = Self::finalize_completed(client, session_id, &token_tracker, warnings).await?;
689                            return Ok(RunOutcome::with_tracker(output, &token_tracker));
690                        }
691
692                        _ => {
693                            // Other events - continue
694                        }
695                    }
696                }
697
698                _ = poll_interval.tick() => {
699                    // === 1. Permission fallback (check first, permissions take priority) ===
700                    let pending = match client.permissions().list().await {
701                        Ok(p) => p,
702                        Err(e) => {
703                            // Log but continue - permission list failure shouldn't block completion detection
704                            tracing::warn!(
705                                session_id = %session_id,
706                                error = %e,
707                                "failed to list permissions during poll fallback"
708                            );
709                            vec![]
710                        }
711                    };
712
713                    if let Some(perm) = pending.into_iter().find(|p| p.session_id == session_id) {
714                        tracing::debug!(
715                            session_id = %session_id,
716                            permission_id = %perm.id,
717                            "detected pending permission via polling fallback"
718                        );
719                        return Ok(RunOutcome::with_tracker(OrchestratorRunOutput {
720                            session_id,
721                            status: RunStatus::PermissionRequired,
722                            response: None,
723                            partial_response: if partial_response.is_empty() {
724                                None
725                            } else {
726                                Some(partial_response)
727                                },
728                                permission_request_id: Some(perm.id),
729                                permission_type: Some(perm.permission),
730                            permission_patterns: perm.patterns,
731                            question_request_id: None,
732                            questions: vec![],
733                            warnings,
734                        }, &token_tracker));
735                    }
736
737                    let pending_questions = match client.question().list().await {
738                        Ok(questions) => questions,
739                        Err(e) => {
740                            tracing::warn!(
741                                session_id = %session_id,
742                                error = %e,
743                                "failed to list questions during poll fallback"
744                            );
745                            vec![]
746                        }
747                    };
748
749                    if let Some(question) = pending_questions
750                        .into_iter()
751                        .find(|question| question.session_id == session_id)
752                    {
753                        tracing::debug!(
754                            session_id = %session_id,
755                            question_id = %question.id,
756                            "detected pending question via polling fallback"
757                        );
758                        return Ok(RunOutcome::with_tracker(Self::question_required_output(
759                            session_id,
760                            if partial_response.is_empty() {
761                                None
762                            } else {
763                                Some(partial_response)
764                            },
765                            &question,
766                            warnings,
767                        ), &token_tracker));
768                    }
769
770                    // === 2. Session idle detection fallback (NEW) ===
771                    // This is the key fix for race conditions. If SSE missed SessionIdle,
772                    // we detect completion via polling sessions().status_for(session_id).
773                    match client.sessions().status_for(&session_id).await {
774                        Ok(SessionStatusInfo::Busy | SessionStatusInfo::Retry { .. }) => {
775                            last_activity_time = tokio::time::Instant::now();
776                            observed_busy = true;
777                            awaiting_idle_grace_check = false;
778                            tracing::trace!(
779                                session_id = %session_id,
780                                "our session is busy/retry, waiting"
781                            );
782                        }
783                        Ok(SessionStatusInfo::Idle) => {
784                            if !dispatched_new_work || observed_busy {
785                                // Session is idle AND either:
786                                // - We didn't dispatch new work (just monitoring), OR
787                                // - We did dispatch work and have seen it become busy at least once
788                                //
789                                // This guards against completing before our work starts processing.
790                                tracing::debug!(
791                                    session_id = %session_id,
792                                    dispatched_new_work = dispatched_new_work,
793                                    observed_busy = observed_busy,
794                                    "detected session idle via polling fallback"
795                                );
796                                let output = Self::finalize_completed(client, session_id, &token_tracker, warnings).await?;
797                                return Ok(RunOutcome::with_tracker(output, &token_tracker));
798                            }
799
800                            let Some(deadline) = idle_grace_deadline else {
801                                tracing::trace!(
802                                    session_id = %session_id,
803                                    command_task_active = command_task_active,
804                                    "idle seen before dispatch confirmed; waiting"
805                                );
806                                continue;
807                            };
808
809                            let now = tokio::time::Instant::now();
810                            if now >= deadline {
811                                tracing::debug!(
812                                    session_id = %session_id,
813                                    idle_grace_ms = idle_grace.as_millis(),
814                                    "accepting idle via bounded idle grace (no busy observed)"
815                                );
816                                let output = Self::finalize_completed(client, session_id, &token_tracker, warnings).await?;
817                                return Ok(RunOutcome::with_tracker(output, &token_tracker));
818                            }
819
820                            awaiting_idle_grace_check = true;
821                            tracing::trace!(
822                                session_id = %session_id,
823                                remaining_ms = (deadline - now).as_millis(),
824                                "idle detected before busy; waiting for idle-grace deadline"
825                            );
826                        }
827                        Err(e) => {
828                            // Log but continue - status check failure shouldn't block the loop
829                            tracing::warn!(
830                                session_id = %session_id,
831                                error = %e,
832                                "failed to get session status during poll fallback"
833                            );
834                        }
835                    }
836                }
837
838                () = async {
839                    match idle_grace_deadline {
840                        Some(deadline) => tokio::time::sleep_until(deadline).await,
841                        None => std::future::pending::<()>().await,
842                    }
843                }, if awaiting_idle_grace_check => {
844                    awaiting_idle_grace_check = false;
845
846                    match client.sessions().status_for(&session_id).await {
847                        Ok(SessionStatusInfo::Idle) => {
848                            tracing::debug!(session_id = %session_id, "idle-grace deadline reached; finalizing");
849                            let output = Self::finalize_completed(client, session_id, &token_tracker, warnings).await?;
850                            return Ok(RunOutcome::with_tracker(output, &token_tracker));
851                        }
852                        Ok(SessionStatusInfo::Busy | SessionStatusInfo::Retry { .. }) => {
853                            last_activity_time = tokio::time::Instant::now();
854                            observed_busy = true;
855                        }
856                        Err(e) => {
857                            tracing::warn!(
858                                session_id = %session_id,
859                                error = %e,
860                                "status check failed at idle-grace deadline"
861                            );
862                        }
863                    }
864                }
865
866                cmd_result = async {
867                    match command_task.as_mut() {
868                        Some(handle) => Some(handle.await),
869                        None => {
870                            std::future::pending::<
871                                Option<Result<Result<(), String>, tokio::task::JoinError>>,
872                            >()
873                            .await
874                        }
875                    }
876                }, if command_task_active => {
877                    match cmd_result {
878                        Some(Ok(Ok(()))) => {
879                            idle_grace_deadline = Some(tokio::time::Instant::now() + idle_grace);
880                            tracing::debug!(
881                                session_id = %session_id,
882                                command = ?command_name_for_logging,
883                                "run: command dispatch completed successfully"
884                            );
885                            command_task = None;
886                        }
887                        Some(Ok(Err(e))) => {
888                            tracing::error!(
889                                session_id = %session_id,
890                                command = ?command_name_for_logging,
891                                error = %e,
892                                "run: command dispatch failed"
893                            );
894                            return Err(ToolError::Internal(format!(
895                                "Failed to execute command '{}': {e}",
896                                command_name_for_logging.as_deref().unwrap_or("unknown")
897                            )));
898                        }
899                        Some(Err(join_err)) => {
900                            tracing::error!(
901                                session_id = %session_id,
902                                command = ?command_name_for_logging,
903                                error = %join_err,
904                                "run: command task panicked"
905                            );
906                            return Err(ToolError::Internal(format!("Command task panicked: {join_err}")));
907                        }
908                        None => {
909                            unreachable!("command_task_active guard should prevent None");
910                        }
911                    }
912                }
913            }
914        }
915    }
916}
917
918impl Tool for OrchestratorRunTool {
919    type Input = OrchestratorRunInput;
920    type Output = OrchestratorRunOutput;
921    const NAME: &'static str = "run";
922    const DESCRIPTION: &'static str = r#"Start or resume an OpenCode session. Optionally run a named command or send a raw prompt.
923
924Returns when:
925- status=completed: Session finished executing. Response contains final assistant output.
926- status=permission_required: Session needs permission approval. Call respond_permission to continue.
927- status=question_required: Session needs question answers. Call respond_question to continue.
928
929Parameters:
930- session_id: Existing session to resume (omit to create new)
931- command: OpenCode command name (e.g., "research", "implement_plan")
932- message: Prompt text or $ARGUMENTS for command template
933
934Examples:
935- New session with prompt: run(message="explain this code")
936- New session with command: run(command="research", message="caching strategies")
937- Resume session: run(session_id="...", message="continue")
938- Check status: run(session_id="...")"#;
939
940    fn call(
941        &self,
942        input: Self::Input,
943        ctx: &ToolContext,
944    ) -> BoxFuture<'static, Result<Self::Output, ToolError>> {
945        let this = self.clone();
946        let ctx = ctx.clone();
947        Box::pin(async move {
948            let timer = CallTimer::start();
949            match this.run_impl_outcome(input.clone(), &ctx).await {
950                Ok(outcome) => {
951                    log_tool_success(
952                        &timer,
953                        Self::NAME,
954                        &input,
955                        &outcome.output,
956                        outcome.log_meta,
957                        true,
958                    );
959                    Ok(outcome.output)
960                }
961                Err(error) => {
962                    log_tool_error(&timer, Self::NAME, &input, &error);
963                    Err(error)
964                }
965            }
966        })
967    }
968}
969
970// ============================================================================
971// list_sessions
972// ============================================================================
973
974/// Tool for listing available `OpenCode` sessions in the current directory.
975#[derive(Clone)]
976pub struct ListSessionsTool {
977    server: Arc<OnceCell<OrchestratorServer>>,
978}
979
980impl ListSessionsTool {
981    /// Create a new `ListSessionsTool` with the given server cell.
982    pub fn new(server: Arc<OnceCell<OrchestratorServer>>) -> Self {
983        Self { server }
984    }
985}
986
987impl Tool for ListSessionsTool {
988    type Input = ListSessionsInput;
989    type Output = ListSessionsOutput;
990    const NAME: &'static str = "list_sessions";
991    const DESCRIPTION: &'static str =
992        "List available OpenCode sessions in the current directory context.";
993
994    fn call(
995        &self,
996        input: Self::Input,
997        _ctx: &ToolContext,
998    ) -> BoxFuture<'static, Result<Self::Output, ToolError>> {
999        let server_cell = Arc::clone(&self.server);
1000        Box::pin(async move {
1001            let timer = CallTimer::start();
1002            let result: Result<ListSessionsOutput, ToolError> = async {
1003                let server = server_cell
1004                    .get_or_try_init(OrchestratorServer::start_lazy)
1005                    .await
1006                    .map_err(|e| ToolError::Internal(e.to_string()))?;
1007
1008                let sessions =
1009                    // Intentionally keep zero-arg list() so SDK directory context preserves launch-directory scoping.
1010                    server.client().sessions().list().await.map_err(|e| {
1011                        ToolError::Internal(format!("Failed to list sessions: {e}"))
1012                    })?;
1013                let status_map = server.client().sessions().status_map().await.ok();
1014                let spawned = server.spawned_sessions().read().await;
1015
1016                let limit = input.limit.unwrap_or(20);
1017                let summaries: Vec<SessionSummary> = sessions
1018                    .into_iter()
1019                    .take(limit)
1020                    .map(|s| {
1021                        let status =
1022                            status_map
1023                                .as_ref()
1024                                .map(|status_map| match status_map.get(&s.id) {
1025                                    Some(SessionStatusInfo::Busy) => SessionStatusSummary::Busy,
1026                                    Some(SessionStatusInfo::Retry {
1027                                        attempt,
1028                                        message,
1029                                        next,
1030                                    }) => SessionStatusSummary::Retry {
1031                                        attempt: *attempt,
1032                                        message: message.clone(),
1033                                        next: *next,
1034                                    },
1035                                    Some(SessionStatusInfo::Idle) | None => {
1036                                        SessionStatusSummary::Idle
1037                                    }
1038                                });
1039
1040                        let change_stats = s.summary.as_ref().map(|summary| ChangeStats {
1041                            additions: summary.additions,
1042                            deletions: summary.deletions,
1043                            files: summary.files,
1044                        });
1045
1046                        SessionSummary {
1047                            launched_by_you: spawned.contains(&s.id),
1048                            created: s.time.as_ref().map(|t| t.created),
1049                            updated: s.time.as_ref().map(|t| t.updated),
1050                            directory: s.directory,
1051                            path: s.path,
1052                            title: s.title,
1053                            id: s.id,
1054                            status,
1055                            change_stats,
1056                        }
1057                    })
1058                    .collect();
1059
1060                Ok(ListSessionsOutput {
1061                    sessions: summaries,
1062                })
1063            }
1064            .await;
1065
1066            match result {
1067                Ok(output) => {
1068                    log_tool_success(
1069                        &timer,
1070                        Self::NAME,
1071                        &input,
1072                        &output,
1073                        ToolLogMeta::default(),
1074                        false,
1075                    );
1076                    Ok(output)
1077                }
1078                Err(error) => {
1079                    log_tool_error(&timer, Self::NAME, &input, &error);
1080                    Err(error)
1081                }
1082            }
1083        })
1084    }
1085}
1086
1087fn count_pending_messages(messages: &[Message]) -> usize {
1088    let mut pending = 0;
1089
1090    for message in messages.iter().rev() {
1091        if message.role() == "user" {
1092            pending += 1;
1093        } else if message.role() == "assistant" {
1094            break;
1095        }
1096    }
1097
1098    pending
1099}
1100
1101fn get_last_activity_time(messages: &[Message], fallback: Option<i64>) -> Option<i64> {
1102    messages.last().map_or(fallback, |message| {
1103        Some(
1104            message
1105                .info
1106                .time
1107                .completed
1108                .unwrap_or(message.info.time.created),
1109        )
1110    })
1111}
1112
1113fn extract_recent_tool_calls(messages: &[Message], limit: usize) -> Vec<ToolCallSummary> {
1114    let mut tool_calls = Vec::new();
1115
1116    for message in messages.iter().rev() {
1117        for part in message.parts.iter().rev() {
1118            if let Part::Tool {
1119                call_id,
1120                tool,
1121                state,
1122                ..
1123            } = part
1124            {
1125                let (state, started_at, completed_at) = match state {
1126                    Some(ToolState::Running(running)) => {
1127                        (ToolStateSummary::Running, Some(running.time.start), None)
1128                    }
1129                    Some(ToolState::Completed(completed)) => (
1130                        ToolStateSummary::Completed,
1131                        Some(completed.time.start),
1132                        Some(completed.time.end),
1133                    ),
1134                    Some(ToolState::Error(error)) => (
1135                        ToolStateSummary::Error {
1136                            message: error.error.clone(),
1137                        },
1138                        Some(error.time.start),
1139                        Some(error.time.end),
1140                    ),
1141                    _ => (ToolStateSummary::Pending, None, None),
1142                };
1143
1144                tool_calls.push(ToolCallSummary {
1145                    call_id: call_id.clone(),
1146                    tool_name: tool.clone(),
1147                    state,
1148                    started_at,
1149                    completed_at,
1150                });
1151
1152                if tool_calls.len() >= limit {
1153                    return tool_calls;
1154                }
1155            }
1156        }
1157    }
1158
1159    tool_calls
1160}
1161
1162/// Tool for getting detailed state of a specific `OpenCode` session.
1163#[derive(Clone)]
1164pub struct GetSessionStateTool {
1165    server: Arc<OnceCell<OrchestratorServer>>,
1166}
1167
1168impl GetSessionStateTool {
1169    pub fn new(server: Arc<OnceCell<OrchestratorServer>>) -> Self {
1170        Self { server }
1171    }
1172}
1173
1174impl Tool for GetSessionStateTool {
1175    type Input = GetSessionStateInput;
1176    type Output = GetSessionStateOutput;
1177    const NAME: &'static str = "get_session_state";
1178    const DESCRIPTION: &'static str = "Get detailed state of a specific session including status, pending messages, and recent tool calls.";
1179
1180    fn call(
1181        &self,
1182        input: Self::Input,
1183        _ctx: &ToolContext,
1184    ) -> BoxFuture<'static, Result<Self::Output, ToolError>> {
1185        let server_cell = Arc::clone(&self.server);
1186        Box::pin(async move {
1187            let timer = CallTimer::start();
1188            let result: Result<GetSessionStateOutput, ToolError> = async {
1189                let server = server_cell
1190                    .get_or_try_init(OrchestratorServer::start_lazy)
1191                    .await
1192                    .map_err(|e| ToolError::Internal(e.to_string()))?;
1193
1194                let client = server.client();
1195                let session_id = &input.session_id;
1196
1197                let session = client.sessions().get(session_id).await.map_err(|e| {
1198                    if e.is_not_found() {
1199                        ToolError::InvalidInput(format!(
1200                            "Session '{session_id}' not found. Use list_sessions to discover available sessions."
1201                        ))
1202                    } else {
1203                        ToolError::Internal(format!("Failed to get session: {e}"))
1204                    }
1205                })?;
1206
1207                let status = match client.sessions().status_for(session_id).await.map_err(|e| {
1208                    ToolError::Internal(format!("Failed to get session status: {e}"))
1209                })? {
1210                    SessionStatusInfo::Busy => SessionStatusSummary::Busy,
1211                    SessionStatusInfo::Retry {
1212                        attempt,
1213                        message,
1214                        next,
1215                    } => SessionStatusSummary::Retry {
1216                        attempt,
1217                        message,
1218                        next,
1219                    },
1220                    SessionStatusInfo::Idle => SessionStatusSummary::Idle,
1221                };
1222
1223                let messages = client.messages().list(session_id).await.map_err(|e| {
1224                    ToolError::Internal(format!("Failed to list messages: {e}"))
1225                })?;
1226                let pending_message_count = count_pending_messages(&messages);
1227                let last_activity = get_last_activity_time(
1228                    &messages,
1229                    session.time.as_ref().map(|time| time.updated),
1230                );
1231                let recent_tool_calls = extract_recent_tool_calls(&messages, 10);
1232
1233                let spawned = server.spawned_sessions().read().await;
1234                let launched_by_you = spawned.contains(session_id);
1235
1236                Ok(GetSessionStateOutput {
1237                    session_id: session.id,
1238                    title: session.title,
1239                    directory: session.directory,
1240                    path: session.path,
1241                    status,
1242                    launched_by_you,
1243                    pending_message_count,
1244                    last_activity,
1245                    recent_tool_calls,
1246                })
1247            }
1248            .await;
1249
1250            match result {
1251                Ok(output) => {
1252                    log_tool_success(
1253                        &timer,
1254                        Self::NAME,
1255                        &input,
1256                        &output,
1257                        ToolLogMeta::default(),
1258                        false,
1259                    );
1260                    Ok(output)
1261                }
1262                Err(error) => {
1263                    log_tool_error(&timer, Self::NAME, &input, &error);
1264                    Err(error)
1265                }
1266            }
1267        })
1268    }
1269}
1270
1271// ============================================================================
1272// list_commands
1273// ============================================================================
1274
1275/// Tool for listing available `OpenCode` commands that can be executed.
1276#[derive(Clone)]
1277pub struct ListCommandsTool {
1278    server: Arc<OnceCell<OrchestratorServer>>,
1279}
1280
1281impl ListCommandsTool {
1282    /// Create a new `ListCommandsTool` with the given server cell.
1283    pub fn new(server: Arc<OnceCell<OrchestratorServer>>) -> Self {
1284        Self { server }
1285    }
1286}
1287
1288impl Tool for ListCommandsTool {
1289    type Input = ListCommandsInput;
1290    type Output = ListCommandsOutput;
1291    const NAME: &'static str = "list_commands";
1292    const DESCRIPTION: &'static str = "List available OpenCode commands that can be used with run.";
1293
1294    fn call(
1295        &self,
1296        input: Self::Input,
1297        _ctx: &ToolContext,
1298    ) -> BoxFuture<'static, Result<Self::Output, ToolError>> {
1299        let server_cell = Arc::clone(&self.server);
1300        Box::pin(async move {
1301            let timer = CallTimer::start();
1302            let result: Result<ListCommandsOutput, ToolError> = async {
1303                let server = server_cell
1304                    .get_or_try_init(OrchestratorServer::start_lazy)
1305                    .await
1306                    .map_err(|e| ToolError::Internal(e.to_string()))?;
1307
1308                let commands =
1309                    server.client().tools().commands().await.map_err(|e| {
1310                        ToolError::Internal(format!("Failed to list commands: {e}"))
1311                    })?;
1312
1313                let command_infos: Vec<CommandInfo> = commands
1314                    .into_iter()
1315                    .map(|c| CommandInfo {
1316                        name: c.name,
1317                        description: c.description,
1318                    })
1319                    .collect();
1320
1321                Ok(ListCommandsOutput {
1322                    commands: command_infos,
1323                })
1324            }
1325            .await;
1326
1327            match result {
1328                Ok(output) => {
1329                    log_tool_success(
1330                        &timer,
1331                        Self::NAME,
1332                        &input,
1333                        &output,
1334                        ToolLogMeta::default(),
1335                        false,
1336                    );
1337                    Ok(output)
1338                }
1339                Err(error) => {
1340                    log_tool_error(&timer, Self::NAME, &input, &error);
1341                    Err(error)
1342                }
1343            }
1344        })
1345    }
1346}
1347
1348// ============================================================================
1349// respond_permission
1350// ============================================================================
1351
1352/// Tool for responding to permission requests from `OpenCode` sessions.
1353///
1354/// After sending the reply, continues monitoring the session and returns
1355/// when the session completes or another permission is requested.
1356#[derive(Clone)]
1357pub struct RespondPermissionTool {
1358    server: Arc<OnceCell<OrchestratorServer>>,
1359}
1360
1361impl RespondPermissionTool {
1362    /// Create a new `RespondPermissionTool` with the given server cell.
1363    pub fn new(server: Arc<OnceCell<OrchestratorServer>>) -> Self {
1364        Self { server }
1365    }
1366}
1367
1368impl Tool for RespondPermissionTool {
1369    type Input = RespondPermissionInput;
1370    type Output = RespondPermissionOutput;
1371    const NAME: &'static str = "respond_permission";
1372    const DESCRIPTION: &'static str = r#"Respond to a permission request from an OpenCode session.
1373
1374After responding, continues monitoring the session and returns when complete or when another permission is required.
1375
1376Parameters:
1377- session_id: Session with pending permission
1378- reply: "once" (allow this request), "always" (allow for matching patterns), or "reject" (deny)
1379- message: Optional message to include with reply"#;
1380
1381    fn call(
1382        &self,
1383        input: Self::Input,
1384        ctx: &ToolContext,
1385    ) -> BoxFuture<'static, Result<Self::Output, ToolError>> {
1386        let server_cell = Arc::clone(&self.server);
1387        let ctx = ctx.clone();
1388        Box::pin(async move {
1389            let timer = CallTimer::start();
1390            let request = input.clone();
1391            let result: Result<(RespondPermissionOutput, ToolLogMeta), ToolError> = async {
1392                let server = server_cell
1393                    .get_or_try_init(OrchestratorServer::start_lazy)
1394                    .await
1395                    .map_err(|e| ToolError::Internal(e.to_string()))?;
1396
1397                let client = server.client();
1398
1399                // Find the pending permission for this session
1400                let mut pending =
1401                    client.permissions().list().await.map_err(|e| {
1402                        ToolError::Internal(format!("Failed to list permissions: {e}"))
1403                    })?;
1404
1405                let perm = if let Some(req_id) = input.permission_request_id.as_deref() {
1406                    let idx = pending.iter().position(|p| p.id == req_id).ok_or_else(|| {
1407                        ToolError::InvalidInput(format!(
1408                            "No pending permission found with id '{req_id}'. \
1409                         (session_id='{}')",
1410                            input.session_id
1411                        ))
1412                    })?;
1413
1414                    let perm = pending.remove(idx);
1415
1416                    if perm.session_id != input.session_id {
1417                        return Err(ToolError::InvalidInput(format!(
1418                            "Permission request '{req_id}' belongs to session '{}', not '{}'.",
1419                            perm.session_id, input.session_id
1420                        )));
1421                    }
1422
1423                    perm
1424                } else {
1425                    let mut perms: Vec<_> = pending
1426                        .into_iter()
1427                        .filter(|p| p.session_id == input.session_id)
1428                        .collect();
1429
1430                    match perms.as_slice() {
1431                        [] => {
1432                            return Err(ToolError::InvalidInput(format!(
1433                                "No pending permission found for session '{}'. \
1434                             The permission may have already been responded to.",
1435                                input.session_id
1436                            )));
1437                        }
1438                        [_single] => perms.swap_remove(0),
1439                        multiple => {
1440                            let ids = multiple
1441                                .iter()
1442                                .map(|p| p.id.as_str())
1443                                .collect::<Vec<_>>()
1444                                .join(", ");
1445                            return Err(ToolError::InvalidInput(format!(
1446                                "Multiple pending permissions found for session '{}': {ids}. \
1447                             Please retry with permission_request_id (returned by run).",
1448                                input.session_id
1449                            )));
1450                        }
1451                    }
1452                };
1453
1454                // Track if this is a rejection for post-processing
1455                let is_reject = matches!(input.reply, PermissionReply::Reject);
1456
1457                // Capture permission details for warning message
1458                let permission_type = perm.permission.clone();
1459                let permission_patterns = perm.patterns.clone();
1460
1461                // Capture baseline assistant text BEFORE sending reject
1462                // This lets us detect stale text after rejection
1463                let mut pre_warnings: Vec<String> = Vec::new();
1464                let baseline = if is_reject {
1465                    match client.messages().list(&input.session_id).await {
1466                        Ok(msgs) => OrchestratorServer::extract_assistant_text(&msgs),
1467                        Err(e) => {
1468                            pre_warnings.push(format!("Failed to fetch baseline messages: {e}"));
1469                            None
1470                        }
1471                    }
1472                } else {
1473                    None
1474                };
1475
1476                // Convert our reply type to API type
1477                let api_reply = match input.reply {
1478                    PermissionReply::Once => ApiPermissionReply::Once,
1479                    PermissionReply::Always => ApiPermissionReply::Always,
1480                    PermissionReply::Reject => ApiPermissionReply::Reject,
1481                };
1482
1483                // Send the reply
1484                client
1485                    .permissions()
1486                    .reply(
1487                        &perm.id,
1488                        &PermissionReplyRequest {
1489                            reply: api_reply,
1490                            message: input.message,
1491                        },
1492                    )
1493                    .await
1494                    .map_err(|e| {
1495                        ToolError::Internal(format!("Failed to reply to permission: {e}"))
1496                    })?;
1497
1498                // Now continue monitoring the session using run logic
1499                let run_tool = OrchestratorRunTool::new(Arc::clone(&server_cell));
1500                let wait_for_activity = (!is_reject).then_some(true);
1501                let outcome = run_tool
1502                    .run_impl_outcome(
1503                        OrchestratorRunInput {
1504                            session_id: Some(input.session_id),
1505                            command: None,
1506                            message: None,
1507                            wait_for_activity,
1508                        },
1509                        &ctx,
1510                    )
1511                    .await?;
1512                let mut out = outcome.output;
1513
1514                // Merge pre-warnings
1515                out.warnings.extend(pre_warnings);
1516
1517                // Apply rejection-aware output mutation
1518                if is_reject && matches!(out.status, RunStatus::Completed) {
1519                    let final_resp = out.response.as_deref();
1520                    let baseline_resp = baseline.as_deref();
1521
1522                    // If response unchanged or None, it's stale pre-rejection text
1523                    if final_resp.is_none() || final_resp == baseline_resp {
1524                        out.response = None;
1525                        let patterns_str = if permission_patterns.is_empty() {
1526                            "(none)".to_string()
1527                        } else {
1528                            permission_patterns.join(", ")
1529                        };
1530                        out.warnings.push(format!(
1531                        "Permission rejected for '{permission_type}'. Patterns: {patterns_str}. \
1532                         Session stopped without generating a new assistant response."
1533                    ));
1534                        tracing::debug!(
1535                            permission_type = %permission_type,
1536                            "rejection override applied: response set to None"
1537                        );
1538                    }
1539                }
1540
1541                Ok((out, outcome.log_meta))
1542            }
1543            .await;
1544
1545            match result {
1546                Ok((output, log_meta)) => {
1547                    log_tool_success(&timer, Self::NAME, &request, &output, log_meta, true);
1548                    Ok(output)
1549                }
1550                Err(error) => {
1551                    log_tool_error(&timer, Self::NAME, &request, &error);
1552                    Err(error)
1553                }
1554            }
1555        })
1556    }
1557}
1558
1559// ============================================================================
1560// respond_question
1561// ============================================================================
1562
1563#[derive(Clone)]
1564pub struct RespondQuestionTool {
1565    server: Arc<OnceCell<OrchestratorServer>>,
1566}
1567
1568impl RespondQuestionTool {
1569    pub fn new(server: Arc<OnceCell<OrchestratorServer>>) -> Self {
1570        Self { server }
1571    }
1572}
1573
1574impl Tool for RespondQuestionTool {
1575    type Input = RespondQuestionInput;
1576    type Output = RespondQuestionOutput;
1577    const NAME: &'static str = "respond_question";
1578    const DESCRIPTION: &'static str = r#"Respond to a question request from an OpenCode session.
1579
1580After replying, continues monitoring the session and returns when complete or when another interruption is required.
1581
1582Parameters:
1583- session_id: Session with pending question
1584- action: "reply" or "reject"
1585- answers: Required when action=reply; one list per question"#;
1586
1587    fn call(
1588        &self,
1589        input: Self::Input,
1590        ctx: &ToolContext,
1591    ) -> BoxFuture<'static, Result<Self::Output, ToolError>> {
1592        let server_cell = Arc::clone(&self.server);
1593        let ctx = ctx.clone();
1594        Box::pin(async move {
1595            let timer = CallTimer::start();
1596            let request = input.clone();
1597            let result: Result<(RespondQuestionOutput, ToolLogMeta), ToolError> = async {
1598            let server = server_cell
1599                .get_or_try_init(OrchestratorServer::start_lazy)
1600                .await
1601                .map_err(|e| ToolError::Internal(e.to_string()))?;
1602
1603            let client = server.client();
1604            let mut pending = client
1605                .question()
1606                .list()
1607                .await
1608                .map_err(|e| ToolError::Internal(format!("Failed to list questions: {e}")))?;
1609
1610            let question = if let Some(req_id) = input.question_request_id.as_deref() {
1611                let idx = pending
1612                    .iter()
1613                    .position(|question| question.id == req_id)
1614                    .ok_or_else(|| {
1615                        ToolError::InvalidInput(format!(
1616                            "No pending question found with id '{req_id}'. (session_id='{}')",
1617                            input.session_id
1618                        ))
1619                    })?;
1620
1621                let question = pending.remove(idx);
1622                if question.session_id != input.session_id {
1623                    return Err(ToolError::InvalidInput(format!(
1624                        "Question request '{req_id}' belongs to session '{}', not '{}'.",
1625                        question.session_id, input.session_id
1626                    )));
1627                }
1628
1629                question
1630            } else {
1631                let mut questions: Vec<_> = pending
1632                    .into_iter()
1633                    .filter(|question| question.session_id == input.session_id)
1634                    .collect();
1635
1636                match questions.as_slice() {
1637                    [] => {
1638                        return Err(ToolError::InvalidInput(format!(
1639                            "No pending question found for session '{}'. The question may have already been responded to.",
1640                            input.session_id
1641                        )));
1642                    }
1643                    [_single] => questions.swap_remove(0),
1644                    multiple => {
1645                        let ids = multiple
1646                            .iter()
1647                            .map(|question| question.id.as_str())
1648                            .collect::<Vec<_>>()
1649                            .join(", ");
1650                        return Err(ToolError::InvalidInput(format!(
1651                            "Multiple pending questions found for session '{}': {ids}. Please retry with question_request_id (returned by run).",
1652                            input.session_id
1653                        )));
1654                    }
1655                }
1656            };
1657
1658            match input.action {
1659                QuestionAction::Reply => {
1660                    if input.answers.is_empty() {
1661                        return Err(ToolError::InvalidInput(
1662                            "answers is required when action=reply".into(),
1663                        ));
1664                    }
1665
1666                    client
1667                        .question()
1668                        .reply(
1669                            &question.id,
1670                            &QuestionReply {
1671                                answers: input.answers,
1672                            },
1673                        )
1674                        .await
1675                        .map_err(|e| {
1676                            ToolError::Internal(format!("Failed to reply to question: {e}"))
1677                        })?;
1678
1679                    let outcome = OrchestratorRunTool::new(Arc::clone(&server_cell))
1680                        .run_impl_outcome(OrchestratorRunInput {
1681                            session_id: Some(input.session_id),
1682                            command: None,
1683                            message: None,
1684                            wait_for_activity: Some(true),
1685                        }, &ctx)
1686                        .await?;
1687                    Ok((outcome.output, outcome.log_meta))
1688                }
1689                QuestionAction::Reject => {
1690                    client.question().reject(&question.id).await.map_err(|e| {
1691                        ToolError::Internal(format!("Failed to reject question: {e}"))
1692                    })?;
1693
1694                    let outcome = OrchestratorRunTool::new(Arc::clone(&server_cell))
1695                        .run_impl_outcome(OrchestratorRunInput {
1696                            session_id: Some(input.session_id),
1697                            command: None,
1698                            message: None,
1699                            wait_for_activity: None,
1700                        }, &ctx)
1701                        .await?;
1702                    Ok((outcome.output, outcome.log_meta))
1703                }
1704            }
1705        }
1706        .await;
1707
1708            match result {
1709                Ok((output, log_meta)) => {
1710                    log_tool_success(&timer, Self::NAME, &request, &output, log_meta, true);
1711                    Ok(output)
1712                }
1713                Err(error) => {
1714                    log_tool_error(&timer, Self::NAME, &request, &error);
1715                    Err(error)
1716                }
1717            }
1718        })
1719    }
1720}
1721
1722// ============================================================================
1723// Registry builder
1724// ============================================================================
1725
1726/// Build the tool registry with all orchestrator tools.
1727///
1728/// The server cell is lazily initialized on first tool call.
1729pub fn build_registry(server: &Arc<OnceCell<OrchestratorServer>>) -> ToolRegistry {
1730    ToolRegistry::builder()
1731        .register::<OrchestratorRunTool, ()>(OrchestratorRunTool::new(Arc::clone(server)))
1732        .register::<ListSessionsTool, ()>(ListSessionsTool::new(Arc::clone(server)))
1733        .register::<GetSessionStateTool, ()>(GetSessionStateTool::new(Arc::clone(server)))
1734        .register::<ListCommandsTool, ()>(ListCommandsTool::new(Arc::clone(server)))
1735        .register::<RespondPermissionTool, ()>(RespondPermissionTool::new(Arc::clone(server)))
1736        .register::<RespondQuestionTool, ()>(RespondQuestionTool::new(Arc::clone(server)))
1737        .finish()
1738}
1739
1740#[cfg(test)]
1741mod tests {
1742    use super::*;
1743    use agentic_tools_core::Tool;
1744
1745    #[test]
1746    fn tool_names_are_short() {
1747        assert_eq!(<OrchestratorRunTool as Tool>::NAME, "run");
1748        assert_eq!(<ListSessionsTool as Tool>::NAME, "list_sessions");
1749        assert_eq!(<GetSessionStateTool as Tool>::NAME, "get_session_state");
1750        assert_eq!(<ListCommandsTool as Tool>::NAME, "list_commands");
1751        assert_eq!(<RespondPermissionTool as Tool>::NAME, "respond_permission");
1752        assert_eq!(<RespondQuestionTool as Tool>::NAME, "respond_question");
1753    }
1754
1755    #[test]
1756    fn last_activity_falls_back_to_session_timestamp_when_messages_are_empty() {
1757        assert_eq!(get_last_activity_time(&[], Some(1_234)), Some(1_234));
1758    }
1759}