Skip to main content

opencode_orchestrator_mcp/
tools.rs

1//! Tool implementations for orchestrator MCP server.
2
3use crate::config;
4use crate::logging;
5use crate::server::OrchestratorServer;
6use crate::token_tracker::TokenTracker;
7use crate::types::ChangeStats;
8use crate::types::CommandInfo;
9use crate::types::GetSessionStateInput;
10use crate::types::GetSessionStateOutput;
11use crate::types::ListCommandsInput;
12use crate::types::ListCommandsOutput;
13use crate::types::ListSessionsInput;
14use crate::types::ListSessionsOutput;
15use crate::types::OrchestratorRunInput;
16use crate::types::OrchestratorRunOutput;
17use crate::types::PermissionReply;
18use crate::types::QuestionAction;
19use crate::types::QuestionInfoView;
20use crate::types::QuestionOptionView;
21use crate::types::RespondPermissionInput;
22use crate::types::RespondPermissionOutput;
23use crate::types::RespondQuestionInput;
24use crate::types::RespondQuestionOutput;
25use crate::types::RunStatus;
26use crate::types::SessionStatusSummary;
27use crate::types::SessionSummary;
28use crate::types::ToolCallSummary;
29use crate::types::ToolStateSummary;
30use agentic_logging::CallTimer;
31use agentic_logging::ToolCallRecord;
32use agentic_tools_core::Tool;
33use agentic_tools_core::ToolContext;
34use agentic_tools_core::ToolError;
35use agentic_tools_core::ToolRegistry;
36use agentic_tools_core::fmt::TextFormat;
37use agentic_tools_core::fmt::TextOptions;
38use futures::future::BoxFuture;
39use opencode_rs::types::event::Event;
40use opencode_rs::types::message::CommandRequest;
41use opencode_rs::types::message::Message;
42use opencode_rs::types::message::Part;
43use opencode_rs::types::message::PromptPart;
44use opencode_rs::types::message::PromptRequest;
45use opencode_rs::types::message::ToolState;
46use opencode_rs::types::permission::PermissionReply as ApiPermissionReply;
47use opencode_rs::types::permission::PermissionReplyRequest;
48use opencode_rs::types::question::QuestionReply;
49use opencode_rs::types::question::QuestionRequest;
50use opencode_rs::types::session::CreateSessionRequest;
51use opencode_rs::types::session::SessionStatusInfo;
52use opencode_rs::types::session::SummarizeRequest;
53use serde::Serialize;
54use std::sync::Arc;
55use std::time::Duration;
56use tokio::sync::OnceCell;
57use tokio::task::JoinHandle;
58
59const SERVER_NAME: &str = "opencode-orchestrator-mcp";
60
61#[derive(Debug, Clone, Default)]
62struct ToolLogMeta {
63    token_usage: Option<agentic_logging::TokenUsage>,
64    token_usage_saturated: bool,
65}
66
67struct RunOutcome {
68    output: OrchestratorRunOutput,
69    log_meta: ToolLogMeta,
70}
71
72impl RunOutcome {
73    fn without_tokens(output: OrchestratorRunOutput) -> Self {
74        Self {
75            output,
76            log_meta: ToolLogMeta::default(),
77        }
78    }
79
80    fn with_tracker(output: OrchestratorRunOutput, token_tracker: &TokenTracker) -> Self {
81        let (token_usage, token_usage_saturated) = token_tracker.to_log_token_usage();
82        Self {
83            output,
84            log_meta: ToolLogMeta {
85                token_usage,
86                token_usage_saturated,
87            },
88        }
89    }
90}
91
92async fn abort_command_task(task: &mut Option<JoinHandle<Result<(), String>>>) {
93    if let Some(handle) = task.take() {
94        handle.abort();
95        let _ = handle.await;
96    }
97}
98
99fn request_json<T: Serialize>(request: &T) -> serde_json::Value {
100    serde_json::to_value(request)
101        .unwrap_or_else(|error| serde_json::json!({"serialization_error": error.to_string()}))
102}
103
104fn log_tool_success<TReq: Serialize, TOut: TextFormat>(
105    timer: &CallTimer,
106    tool: &str,
107    request: &TReq,
108    output: &TOut,
109    log_meta: ToolLogMeta,
110    write_markdown: bool,
111) {
112    let (completed_at, duration_ms) = timer.finish();
113    let rendered = output.fmt_text(&TextOptions::default());
114    let response_file = write_markdown
115        .then(|| logging::write_markdown_best_effort(completed_at, &timer.call_id, &rendered))
116        .flatten();
117
118    let record = ToolCallRecord {
119        call_id: timer.call_id.clone(),
120        server: SERVER_NAME.into(),
121        tool: tool.into(),
122        started_at: timer.started_at,
123        completed_at,
124        duration_ms,
125        request: request_json(request),
126        response_file,
127        success: true,
128        error: None,
129        model: None,
130        token_usage: log_meta.token_usage,
131        summary: log_meta
132            .token_usage_saturated
133            .then(|| serde_json::json!({"token_usage_saturated": true})),
134    };
135
136    logging::append_record_best_effort(&record);
137}
138
139fn log_tool_error<TReq: Serialize>(
140    timer: &CallTimer,
141    tool: &str,
142    request: &TReq,
143    error: &ToolError,
144) {
145    let (completed_at, duration_ms) = timer.finish();
146    let record = ToolCallRecord {
147        call_id: timer.call_id.clone(),
148        server: SERVER_NAME.into(),
149        tool: tool.into(),
150        started_at: timer.started_at,
151        completed_at,
152        duration_ms,
153        request: request_json(request),
154        response_file: None,
155        success: false,
156        error: Some(error.to_string()),
157        model: None,
158        token_usage: None,
159        summary: None,
160    };
161
162    logging::append_record_best_effort(&record);
163}
164
165// ============================================================================
166// run
167// ============================================================================
168
169/// Tool for starting or resuming `OpenCode` sessions.
170///
171/// Handles session creation, prompt/command execution, SSE event monitoring,
172/// and permission request detection. Returns when the session completes or
173/// when a permission is requested.
174#[derive(Clone)]
175pub struct OrchestratorRunTool {
176    server: Arc<OnceCell<OrchestratorServer>>,
177}
178
179impl OrchestratorRunTool {
180    /// Create a new `OrchestratorRunTool` with the given server cell.
181    pub fn new(server: Arc<OnceCell<OrchestratorServer>>) -> Self {
182        Self { server }
183    }
184
185    /// Finalize a completed session by fetching messages and optionally triggering summarization.
186    ///
187    /// This is called when we detect the session is idle, either via SSE `SessionIdle` event
188    /// or via polling `sessions().status()`.
189    ///
190    /// Uses bounded retry with backoff (0/50/100/200/400ms) if assistant text is not immediately
191    /// available, handling the race condition where the session becomes idle before messages
192    /// are fully persisted.
193    async fn finalize_completed(
194        client: &opencode_rs::Client,
195        session_id: String,
196        token_tracker: &TokenTracker,
197        mut warnings: Vec<String>,
198    ) -> Result<OrchestratorRunOutput, ToolError> {
199        // Bounded backoff delays for message extraction retry (~750ms total budget)
200        const BACKOFFS_MS: &[u64] = &[0, 50, 100, 200, 400];
201
202        let mut response: Option<String> = None;
203
204        for (attempt, &delay_ms) in BACKOFFS_MS.iter().enumerate() {
205            if delay_ms > 0 {
206                tokio::time::sleep(Duration::from_millis(delay_ms)).await;
207            }
208
209            let messages = client
210                .messages()
211                .list(&session_id)
212                .await
213                .map_err(|e| ToolError::Internal(format!("Failed to list messages: {e}")))?;
214
215            response = OrchestratorServer::extract_assistant_text(&messages);
216
217            if response.is_some() {
218                if attempt > 0 {
219                    tracing::debug!(
220                        session_id = %session_id,
221                        attempt,
222                        "assistant response became available after retry"
223                    );
224                }
225                break;
226            }
227        }
228
229        if response.is_none() {
230            tracing::debug!(
231                session_id = %session_id,
232                "no assistant response found after bounded retry"
233            );
234        }
235
236        // Handle context limit summarization if needed
237        if token_tracker.compaction_needed
238            && let (Some(pid), Some(mid)) = (&token_tracker.provider_id, &token_tracker.model_id)
239        {
240            let summarize_req = SummarizeRequest {
241                provider_id: pid.clone(),
242                model_id: mid.clone(),
243                auto: None,
244            };
245
246            match client
247                .sessions()
248                .summarize(&session_id, &summarize_req)
249                .await
250            {
251                Ok(_) => {
252                    tracing::info!(session_id = %session_id, "context summarization triggered");
253                    warnings.push("Context limit reached; summarization triggered".into());
254                }
255                Err(e) => {
256                    tracing::warn!(session_id = %session_id, error = %e, "summarization failed");
257                    warnings.push(format!("Summarization failed: {e}"));
258                }
259            }
260        }
261
262        Ok(OrchestratorRunOutput {
263            session_id,
264            status: RunStatus::Completed,
265            response,
266            partial_response: None,
267            permission_request_id: None,
268            permission_type: None,
269            permission_patterns: vec![],
270            question_request_id: None,
271            questions: vec![],
272            warnings,
273        })
274    }
275
276    fn map_questions(req: &QuestionRequest) -> Vec<QuestionInfoView> {
277        req.questions
278            .iter()
279            .map(|question| QuestionInfoView {
280                question: question.question.clone(),
281                header: question.header.clone(),
282                options: question
283                    .options
284                    .iter()
285                    .map(|option| QuestionOptionView {
286                        label: option.label.clone(),
287                        description: option.description.clone(),
288                    })
289                    .collect(),
290                multiple: question.multiple,
291                custom: question.custom,
292            })
293            .collect()
294    }
295
296    fn question_required_output(
297        session_id: String,
298        partial_response: Option<String>,
299        request: &QuestionRequest,
300        warnings: Vec<String>,
301    ) -> OrchestratorRunOutput {
302        OrchestratorRunOutput {
303            session_id,
304            status: RunStatus::QuestionRequired,
305            response: None,
306            partial_response,
307            permission_request_id: None,
308            permission_type: None,
309            permission_patterns: vec![],
310            question_request_id: Some(request.id.clone()),
311            questions: Self::map_questions(request),
312            warnings,
313        }
314    }
315
316    async fn run_impl_outcome(
317        &self,
318        input: OrchestratorRunInput,
319        ctx: &ToolContext,
320    ) -> Result<RunOutcome, ToolError> {
321        // Input validation
322        if input.session_id.is_none() && input.message.is_none() && input.command.is_none() {
323            return Err(ToolError::InvalidInput(
324                "Either session_id (to resume/check status) or message/command (to start work) is required"
325                    .into(),
326            ));
327        }
328
329        if input.command.is_some() && input.message.is_none() {
330            return Err(ToolError::InvalidInput(
331                "message is required when command is specified (becomes $ARGUMENTS for template expansion)"
332                    .into(),
333            ));
334        }
335
336        // Trim and validate message content
337        let message = input.message.map(|m| m.trim().to_string());
338        if let Some(ref m) = message
339            && m.is_empty()
340        {
341            return Err(ToolError::InvalidInput(
342                "message cannot be empty or whitespace-only".into(),
343            ));
344        }
345
346        let wait_for_activity = input.wait_for_activity.unwrap_or(false);
347
348        // Lazy initialization: spawn server on first tool call
349        let server = self
350            .server
351            .get_or_try_init(OrchestratorServer::start_lazy)
352            .await
353            .map_err(|e| ToolError::Internal(e.to_string()))?;
354
355        let client = server.client();
356
357        tracing::debug!(
358            command = ?input.command,
359            has_message = message.is_some(),
360            message_len = message.as_ref().map(String::len),
361            session_id = ?input.session_id,
362            "run: starting"
363        );
364
365        // 1. Resolve session: validate existing or create new
366        let session_id = if let Some(sid) = input.session_id {
367            // Validate session exists
368            client.sessions().get(&sid).await.map_err(|e| {
369                if e.is_not_found() {
370                    ToolError::InvalidInput(format!(
371                        "Session '{sid}' not found. Use list_sessions to discover sessions, \
372                         or omit session_id to create a new session."
373                    ))
374                } else {
375                    ToolError::Internal(format!("Failed to get session: {e}"))
376                }
377            })?;
378            sid
379        } else {
380            // Create new session
381            let session = client
382                .sessions()
383                .create(&CreateSessionRequest::default())
384                .await
385                .map_err(|e| ToolError::Internal(format!("Failed to create session: {e}")))?;
386
387            {
388                let mut spawned = server.spawned_sessions().write().await;
389                spawned.insert(session.id.clone());
390            }
391
392            session.id
393        };
394
395        tracing::info!(session_id = %session_id, "run: session resolved");
396
397        // 2. Check if session is already idle (for resume-only case)
398        let status = client
399            .sessions()
400            .status_for(&session_id)
401            .await
402            .map_err(|e| ToolError::Internal(format!("Failed to get session status: {e}")))?;
403
404        let is_idle = matches!(status, SessionStatusInfo::Idle);
405
406        // 3. Check for pending permissions before doing anything else
407        let pending_permissions = client
408            .permissions()
409            .list()
410            .await
411            .map_err(|e| ToolError::Internal(format!("Failed to list permissions: {e}")))?;
412
413        let my_permission = pending_permissions
414            .into_iter()
415            .find(|p| p.session_id == session_id);
416
417        if let Some(perm) = my_permission {
418            tracing::info!(
419                session_id = %session_id,
420                permission_type = %perm.permission,
421                "run: pending permission found"
422            );
423            return Ok(RunOutcome::without_tokens(OrchestratorRunOutput {
424                session_id,
425                status: RunStatus::PermissionRequired,
426                response: None,
427                partial_response: None,
428                permission_request_id: Some(perm.id),
429                permission_type: Some(perm.permission),
430                permission_patterns: perm.patterns,
431                question_request_id: None,
432                questions: vec![],
433                warnings: vec![],
434            }));
435        }
436
437        let pending_questions = client
438            .question()
439            .list()
440            .await
441            .map_err(|e| ToolError::Internal(format!("Failed to list questions: {e}")))?;
442
443        if let Some(question) = pending_questions
444            .into_iter()
445            .find(|question| question.session_id == session_id)
446        {
447            tracing::info!(session_id = %session_id, question_id = %question.id, "run: pending question found");
448            return Ok(RunOutcome::without_tokens(Self::question_required_output(
449                session_id,
450                None,
451                &question,
452                vec![],
453            )));
454        }
455
456        // 4. If no message/command and session is idle, just return current state
457        // Uses finalize_completed to get retry logic for message extraction
458        if message.is_none() && input.command.is_none() && is_idle && !wait_for_activity {
459            let token_tracker = TokenTracker::with_threshold(server.compaction_threshold());
460            let output =
461                Self::finalize_completed(client, session_id, &token_tracker, vec![]).await?;
462            return Ok(RunOutcome::with_tracker(output, &token_tracker));
463        }
464
465        // 5. Subscribe to SSE BEFORE sending prompt/command
466        let mut subscription = client
467            .subscribe_session(&session_id)
468            .map_err(|e| ToolError::Internal(format!("Failed to subscribe to session: {e}")))?;
469
470        // Track whether this call is dispatching new work (command or message)
471        // vs just resuming/monitoring an existing session.
472        let dispatched_new_work = input.command.is_some() || message.is_some() || wait_for_activity;
473        let idle_grace = config::idle_grace();
474        let mut idle_grace_deadline: Option<tokio::time::Instant> = None;
475        let mut awaiting_idle_grace_check = false;
476
477        if wait_for_activity && input.command.is_none() && message.is_none() {
478            idle_grace_deadline = Some(tokio::time::Instant::now() + idle_grace);
479        }
480
481        // 6. Kick off the work
482        let mut command_task: Option<JoinHandle<Result<(), String>>> = None;
483        let mut command_name_for_logging: Option<String> = None;
484
485        if let Some(command) = &input.command {
486            command_name_for_logging = Some(command.clone());
487
488            let cmd_client = client.clone();
489            let cmd_session_id = session_id.clone();
490            let cmd_name = command.clone();
491            let cmd_arguments = message.clone().unwrap_or_default();
492
493            command_task = Some(tokio::spawn(async move {
494                let req = CommandRequest {
495                    command: cmd_name,
496                    arguments: cmd_arguments,
497                    message_id: None,
498                };
499
500                cmd_client
501                    .messages()
502                    .command(&cmd_session_id, &req)
503                    .await
504                    .map(|_| ())
505                    .map_err(|e| e.to_string())
506            }));
507        } else if let Some(msg) = &message {
508            // Send prompt asynchronously
509            let req = PromptRequest {
510                parts: vec![PromptPart::Text {
511                    text: msg.clone(),
512                    synthetic: None,
513                    ignored: None,
514                    metadata: None,
515                }],
516                message_id: None,
517                model: None,
518                agent: None,
519                no_reply: None,
520                system: None,
521                variant: None,
522            };
523
524            client
525                .messages()
526                .prompt_async(&session_id, &req)
527                .await
528                .map_err(|e| ToolError::Internal(format!("Failed to send prompt: {e}")))?;
529
530            idle_grace_deadline = Some(tokio::time::Instant::now() + idle_grace);
531        }
532
533        // 7. Event loop: wait for completion or permission
534        // Overall timeout to prevent infinite hangs (configurable, default 1 hour)
535        let deadline = tokio::time::Instant::now() + server.session_deadline();
536        let inactivity_timeout = server.inactivity_timeout();
537        let mut last_activity_time = tokio::time::Instant::now();
538
539        tracing::debug!(session_id = %session_id, "run: entering event loop");
540        let mut token_tracker = TokenTracker::with_threshold(server.compaction_threshold());
541        let mut partial_response = String::new();
542        let warnings = Vec::new();
543
544        let mut poll_interval = tokio::time::interval(Duration::from_secs(1));
545        poll_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
546
547        // Track whether we've observed the session as busy at least once.
548        // This prevents completing immediately if we call run_impl on an already-idle
549        // session before our new work has started processing.
550        let mut observed_busy = false;
551
552        // Track whether SSE is still active. If the stream closes, we fall back
553        // to polling-only mode rather than returning an error.
554        let mut sse_active = true;
555
556        // === Post-subscribe status re-check (latency optimization) ===
557        // If we're just monitoring (no new work dispatched), check if session is already idle.
558        // This handles the race where session completed between our initial status check
559        // and SSE subscription becoming ready.
560        if !dispatched_new_work
561            && let Ok(status) = client.sessions().status_for(&session_id).await
562            && matches!(status, SessionStatusInfo::Idle)
563        {
564            tracing::debug!(
565                session_id = %session_id,
566                "session already idle on post-subscribe check"
567            );
568            let output =
569                Self::finalize_completed(client, session_id, &token_tracker, warnings).await?;
570            return Ok(RunOutcome::with_tracker(output, &token_tracker));
571        }
572        // If check fails or session is busy, continue to event loop
573
574        loop {
575            // Check timeout before processing
576            let now = tokio::time::Instant::now();
577
578            if now.duration_since(last_activity_time) >= inactivity_timeout {
579                return Err(ToolError::Internal(format!(
580                    "Session idle timeout: no activity for 5 minutes (session_id={session_id}). \
581                     The session may still be running; use run(session_id=...) to check status."
582                )));
583            }
584
585            if now >= deadline {
586                return Err(ToolError::Internal(
587                    "Session execution timed out after 1 hour. \
588                     The session may still be running; use run with the session_id to check status."
589                        .into(),
590                ));
591            }
592
593            let command_task_active = command_task.is_some();
594
595            tokio::select! {
596                () = ctx.cancelled() => {
597                    abort_command_task(&mut command_task).await;
598                    return Err(ToolError::cancelled(None));
599                }
600
601                maybe_event = subscription.recv(), if sse_active => {
602                    let Some(event) = maybe_event else {
603                        // SSE stream closed - this can happen due to network issues,
604                        // server restarts, or connection timeouts. Fall back to polling
605                        // rather than failing immediately.
606                        tracing::warn!(
607                            session_id = %session_id,
608                            "SSE stream closed unexpectedly; falling back to polling-only mode"
609                        );
610                        sse_active = false;
611                        continue; // The poll_interval branch will now drive completion detection
612                    };
613
614                    // Track tokens (server is already initialized at this point)
615                    token_tracker.observe_event(&event, |pid, mid| {
616                        server.context_limit(pid, mid)
617                    });
618
619                    match event {
620                        Event::PermissionAsked { properties } => {
621                            tracing::info!(
622                                session_id = %session_id,
623                                permission_type = %properties.request.permission,
624                                "run: permission requested"
625                            );
626                            return Ok(RunOutcome::with_tracker(OrchestratorRunOutput {
627                                session_id,
628                                status: RunStatus::PermissionRequired,
629                                response: None,
630                                partial_response: if partial_response.is_empty() {
631                                    None
632                                } else {
633                                    Some(partial_response)
634                                },
635                                permission_request_id: Some(properties.request.id),
636                                permission_type: Some(properties.request.permission),
637                                permission_patterns: properties.request.patterns,
638                                question_request_id: None,
639                                questions: vec![],
640                                warnings,
641                            }, &token_tracker));
642                        }
643
644                        Event::QuestionAsked { properties } => {
645                            return Ok(RunOutcome::with_tracker(Self::question_required_output(
646                                session_id,
647                                if partial_response.is_empty() {
648                                    None
649                                } else {
650                                    Some(partial_response)
651                                },
652                                &properties.request,
653                                warnings,
654                            ), &token_tracker));
655                        }
656
657                        Event::MessagePartDelta { properties } => {
658                            last_activity_time = tokio::time::Instant::now();
659                            // Message streaming means session is actively processing
660                            observed_busy = true;
661                            awaiting_idle_grace_check = false;
662                            // Collect streaming text from field-level delta events.
663                            if let Some(delta) = &properties.delta {
664                                partial_response.push_str(delta);
665                            }
666                        }
667
668                        Event::MessagePartUpdated { .. } | Event::MessageUpdated { .. } => {
669                            last_activity_time = tokio::time::Instant::now();
670                            observed_busy = true;
671                            awaiting_idle_grace_check = false;
672                        }
673
674                        Event::SessionError { properties } => {
675                            let error_msg = properties
676                                .error
677                                .map_or_else(|| "Unknown error".to_string(), |e| format!("{e:?}"));
678                            tracing::error!(
679                                session_id = %session_id,
680                                error = %error_msg,
681                                "run: session error"
682                            );
683                            return Err(ToolError::Internal(format!("Session error: {error_msg}")));
684                        }
685
686                        Event::SessionIdle { .. } => {
687                            tracing::debug!(session_id = %session_id, "received SessionIdle event");
688                            let output = Self::finalize_completed(client, session_id, &token_tracker, warnings).await?;
689                            return Ok(RunOutcome::with_tracker(output, &token_tracker));
690                        }
691
692                        _ => {
693                            // Other events - continue
694                        }
695                    }
696                }
697
698                _ = poll_interval.tick() => {
699                    // === 1. Permission fallback (check first, permissions take priority) ===
700                    let pending = match client.permissions().list().await {
701                        Ok(p) => p,
702                        Err(e) => {
703                            // Log but continue - permission list failure shouldn't block completion detection
704                            tracing::warn!(
705                                session_id = %session_id,
706                                error = %e,
707                                "failed to list permissions during poll fallback"
708                            );
709                            vec![]
710                        }
711                    };
712
713                    if let Some(perm) = pending.into_iter().find(|p| p.session_id == session_id) {
714                        tracing::debug!(
715                            session_id = %session_id,
716                            permission_id = %perm.id,
717                            "detected pending permission via polling fallback"
718                        );
719                        return Ok(RunOutcome::with_tracker(OrchestratorRunOutput {
720                            session_id,
721                            status: RunStatus::PermissionRequired,
722                            response: None,
723                            partial_response: if partial_response.is_empty() {
724                                None
725                            } else {
726                                Some(partial_response)
727                                },
728                                permission_request_id: Some(perm.id),
729                                permission_type: Some(perm.permission),
730                            permission_patterns: perm.patterns,
731                            question_request_id: None,
732                            questions: vec![],
733                            warnings,
734                        }, &token_tracker));
735                    }
736
737                    let pending_questions = match client.question().list().await {
738                        Ok(questions) => questions,
739                        Err(e) => {
740                            tracing::warn!(
741                                session_id = %session_id,
742                                error = %e,
743                                "failed to list questions during poll fallback"
744                            );
745                            vec![]
746                        }
747                    };
748
749                    if let Some(question) = pending_questions
750                        .into_iter()
751                        .find(|question| question.session_id == session_id)
752                    {
753                        tracing::debug!(
754                            session_id = %session_id,
755                            question_id = %question.id,
756                            "detected pending question via polling fallback"
757                        );
758                        return Ok(RunOutcome::with_tracker(Self::question_required_output(
759                            session_id,
760                            if partial_response.is_empty() {
761                                None
762                            } else {
763                                Some(partial_response)
764                            },
765                            &question,
766                            warnings,
767                        ), &token_tracker));
768                    }
769
770                    // === 2. Session idle detection fallback (NEW) ===
771                    // This is the key fix for race conditions. If SSE missed SessionIdle,
772                    // we detect completion via polling sessions().status_for(session_id).
773                    match client.sessions().status_for(&session_id).await {
774                        Ok(SessionStatusInfo::Busy | SessionStatusInfo::Retry { .. }) => {
775                            last_activity_time = tokio::time::Instant::now();
776                            observed_busy = true;
777                            awaiting_idle_grace_check = false;
778                            tracing::trace!(
779                                session_id = %session_id,
780                                "our session is busy/retry, waiting"
781                            );
782                        }
783                        Ok(SessionStatusInfo::Idle) => {
784                            if !dispatched_new_work || observed_busy {
785                                // Session is idle AND either:
786                                // - We didn't dispatch new work (just monitoring), OR
787                                // - We did dispatch work and have seen it become busy at least once
788                                //
789                                // This guards against completing before our work starts processing.
790                                tracing::debug!(
791                                    session_id = %session_id,
792                                    dispatched_new_work = dispatched_new_work,
793                                    observed_busy = observed_busy,
794                                    "detected session idle via polling fallback"
795                                );
796                                let output = Self::finalize_completed(client, session_id, &token_tracker, warnings).await?;
797                                return Ok(RunOutcome::with_tracker(output, &token_tracker));
798                            }
799
800                            let Some(deadline) = idle_grace_deadline else {
801                                tracing::trace!(
802                                    session_id = %session_id,
803                                    command_task_active = command_task_active,
804                                    "idle seen before dispatch confirmed; waiting"
805                                );
806                                continue;
807                            };
808
809                            let now = tokio::time::Instant::now();
810                            if now >= deadline {
811                                tracing::debug!(
812                                    session_id = %session_id,
813                                    idle_grace_ms = idle_grace.as_millis(),
814                                    "accepting idle via bounded idle grace (no busy observed)"
815                                );
816                                let output = Self::finalize_completed(client, session_id, &token_tracker, warnings).await?;
817                                return Ok(RunOutcome::with_tracker(output, &token_tracker));
818                            }
819
820                            awaiting_idle_grace_check = true;
821                            tracing::trace!(
822                                session_id = %session_id,
823                                remaining_ms = (deadline - now).as_millis(),
824                                "idle detected before busy; waiting for idle-grace deadline"
825                            );
826                        }
827                        Err(e) => {
828                            // Log but continue - status check failure shouldn't block the loop
829                            tracing::warn!(
830                                session_id = %session_id,
831                                error = %e,
832                                "failed to get session status during poll fallback"
833                            );
834                        }
835                    }
836                }
837
838                () = async {
839                    match idle_grace_deadline {
840                        Some(deadline) => tokio::time::sleep_until(deadline).await,
841                        None => std::future::pending::<()>().await,
842                    }
843                }, if awaiting_idle_grace_check => {
844                    awaiting_idle_grace_check = false;
845
846                    match client.sessions().status_for(&session_id).await {
847                        Ok(SessionStatusInfo::Idle) => {
848                            tracing::debug!(session_id = %session_id, "idle-grace deadline reached; finalizing");
849                            let output = Self::finalize_completed(client, session_id, &token_tracker, warnings).await?;
850                            return Ok(RunOutcome::with_tracker(output, &token_tracker));
851                        }
852                        Ok(SessionStatusInfo::Busy | SessionStatusInfo::Retry { .. }) => {
853                            last_activity_time = tokio::time::Instant::now();
854                            observed_busy = true;
855                        }
856                        Err(e) => {
857                            tracing::warn!(
858                                session_id = %session_id,
859                                error = %e,
860                                "status check failed at idle-grace deadline"
861                            );
862                        }
863                    }
864                }
865
866                cmd_result = async {
867                    match command_task.as_mut() {
868                        Some(handle) => Some(handle.await),
869                        None => {
870                            std::future::pending::<
871                                Option<Result<Result<(), String>, tokio::task::JoinError>>,
872                            >()
873                            .await
874                        }
875                    }
876                }, if command_task_active => {
877                    match cmd_result {
878                        Some(Ok(Ok(()))) => {
879                            idle_grace_deadline = Some(tokio::time::Instant::now() + idle_grace);
880                            tracing::debug!(
881                                session_id = %session_id,
882                                command = ?command_name_for_logging,
883                                "run: command dispatch completed successfully"
884                            );
885                            command_task = None;
886                        }
887                        Some(Ok(Err(e))) => {
888                            tracing::error!(
889                                session_id = %session_id,
890                                command = ?command_name_for_logging,
891                                error = %e,
892                                "run: command dispatch failed"
893                            );
894                            return Err(ToolError::Internal(format!(
895                                "Failed to execute command '{}': {e}",
896                                command_name_for_logging.as_deref().unwrap_or("unknown")
897                            )));
898                        }
899                        Some(Err(join_err)) => {
900                            tracing::error!(
901                                session_id = %session_id,
902                                command = ?command_name_for_logging,
903                                error = %join_err,
904                                "run: command task panicked"
905                            );
906                            return Err(ToolError::Internal(format!("Command task panicked: {join_err}")));
907                        }
908                        None => {
909                            unreachable!("command_task_active guard should prevent None");
910                        }
911                    }
912                }
913            }
914        }
915    }
916}
917
918impl Tool for OrchestratorRunTool {
919    type Input = OrchestratorRunInput;
920    type Output = OrchestratorRunOutput;
921    const NAME: &'static str = "run";
922    const DESCRIPTION: &'static str = r#"Start or resume an OpenCode session. Optionally run a named command or send a raw prompt.
923
924Returns when:
925- status=completed: Session finished executing. Response contains final assistant output.
926- status=permission_required: Session needs permission approval. Call respond_permission to continue.
927- status=question_required: Session needs question answers. Call respond_question to continue.
928
929Parameters:
930- session_id: Existing session to resume (omit to create new)
931- command: OpenCode command name (e.g., "research", "implement_plan")
932- message: Prompt text or $ARGUMENTS for command template
933
934Examples:
935- New session with prompt: run(message="explain this code")
936- New session with command: run(command="research", message="caching strategies")
937- Resume session: run(session_id="...", message="continue")
938- Check status: run(session_id="...")"#;
939
940    fn call(
941        &self,
942        input: Self::Input,
943        ctx: &ToolContext,
944    ) -> BoxFuture<'static, Result<Self::Output, ToolError>> {
945        let this = self.clone();
946        let ctx = ctx.clone();
947        Box::pin(async move {
948            let timer = CallTimer::start();
949            match this.run_impl_outcome(input.clone(), &ctx).await {
950                Ok(outcome) => {
951                    log_tool_success(
952                        &timer,
953                        Self::NAME,
954                        &input,
955                        &outcome.output,
956                        outcome.log_meta,
957                        true,
958                    );
959                    Ok(outcome.output)
960                }
961                Err(error) => {
962                    log_tool_error(&timer, Self::NAME, &input, &error);
963                    Err(error)
964                }
965            }
966        })
967    }
968}
969
970// ============================================================================
971// list_sessions
972// ============================================================================
973
974/// Tool for listing available `OpenCode` sessions in the current directory.
975#[derive(Clone)]
976pub struct ListSessionsTool {
977    server: Arc<OnceCell<OrchestratorServer>>,
978}
979
980impl ListSessionsTool {
981    /// Create a new `ListSessionsTool` with the given server cell.
982    pub fn new(server: Arc<OnceCell<OrchestratorServer>>) -> Self {
983        Self { server }
984    }
985}
986
987impl Tool for ListSessionsTool {
988    type Input = ListSessionsInput;
989    type Output = ListSessionsOutput;
990    const NAME: &'static str = "list_sessions";
991    const DESCRIPTION: &'static str =
992        "List available OpenCode sessions in the current directory context.";
993
994    fn call(
995        &self,
996        input: Self::Input,
997        _ctx: &ToolContext,
998    ) -> BoxFuture<'static, Result<Self::Output, ToolError>> {
999        let server_cell = Arc::clone(&self.server);
1000        Box::pin(async move {
1001            let timer = CallTimer::start();
1002            let result: Result<ListSessionsOutput, ToolError> = async {
1003                let server = server_cell
1004                    .get_or_try_init(OrchestratorServer::start_lazy)
1005                    .await
1006                    .map_err(|e| ToolError::Internal(e.to_string()))?;
1007
1008                let sessions =
1009                    server.client().sessions().list().await.map_err(|e| {
1010                        ToolError::Internal(format!("Failed to list sessions: {e}"))
1011                    })?;
1012                let status_map = server.client().sessions().status_map().await.ok();
1013                let spawned = server.spawned_sessions().read().await;
1014
1015                let limit = input.limit.unwrap_or(20);
1016                let summaries: Vec<SessionSummary> = sessions
1017                    .into_iter()
1018                    .take(limit)
1019                    .map(|s| {
1020                        let status =
1021                            status_map
1022                                .as_ref()
1023                                .map(|status_map| match status_map.get(&s.id) {
1024                                    Some(SessionStatusInfo::Busy) => SessionStatusSummary::Busy,
1025                                    Some(SessionStatusInfo::Retry {
1026                                        attempt,
1027                                        message,
1028                                        next,
1029                                    }) => SessionStatusSummary::Retry {
1030                                        attempt: *attempt,
1031                                        message: message.clone(),
1032                                        next: *next,
1033                                    },
1034                                    Some(SessionStatusInfo::Idle) | None => {
1035                                        SessionStatusSummary::Idle
1036                                    }
1037                                });
1038
1039                        let change_stats = s.summary.as_ref().map(|summary| ChangeStats {
1040                            additions: summary.additions,
1041                            deletions: summary.deletions,
1042                            files: summary.files,
1043                        });
1044
1045                        SessionSummary {
1046                            launched_by_you: spawned.contains(&s.id),
1047                            created: s.time.as_ref().map(|t| t.created),
1048                            updated: s.time.as_ref().map(|t| t.updated),
1049                            directory: s.directory,
1050                            title: s.title,
1051                            id: s.id,
1052                            status,
1053                            change_stats,
1054                        }
1055                    })
1056                    .collect();
1057
1058                Ok(ListSessionsOutput {
1059                    sessions: summaries,
1060                })
1061            }
1062            .await;
1063
1064            match result {
1065                Ok(output) => {
1066                    log_tool_success(
1067                        &timer,
1068                        Self::NAME,
1069                        &input,
1070                        &output,
1071                        ToolLogMeta::default(),
1072                        false,
1073                    );
1074                    Ok(output)
1075                }
1076                Err(error) => {
1077                    log_tool_error(&timer, Self::NAME, &input, &error);
1078                    Err(error)
1079                }
1080            }
1081        })
1082    }
1083}
1084
1085fn count_pending_messages(messages: &[Message]) -> usize {
1086    let mut pending = 0;
1087
1088    for message in messages.iter().rev() {
1089        if message.role() == "user" {
1090            pending += 1;
1091        } else if message.role() == "assistant" {
1092            break;
1093        }
1094    }
1095
1096    pending
1097}
1098
1099fn get_last_activity_time(messages: &[Message], fallback: Option<i64>) -> Option<i64> {
1100    messages.last().map_or(fallback, |message| {
1101        Some(
1102            message
1103                .info
1104                .time
1105                .completed
1106                .unwrap_or(message.info.time.created),
1107        )
1108    })
1109}
1110
1111fn extract_recent_tool_calls(messages: &[Message], limit: usize) -> Vec<ToolCallSummary> {
1112    let mut tool_calls = Vec::new();
1113
1114    for message in messages.iter().rev() {
1115        for part in message.parts.iter().rev() {
1116            if let Part::Tool {
1117                call_id,
1118                tool,
1119                state,
1120                ..
1121            } = part
1122            {
1123                let (state, started_at, completed_at) = match state {
1124                    Some(ToolState::Running(running)) => {
1125                        (ToolStateSummary::Running, Some(running.time.start), None)
1126                    }
1127                    Some(ToolState::Completed(completed)) => (
1128                        ToolStateSummary::Completed,
1129                        Some(completed.time.start),
1130                        Some(completed.time.end),
1131                    ),
1132                    Some(ToolState::Error(error)) => (
1133                        ToolStateSummary::Error {
1134                            message: error.error.clone(),
1135                        },
1136                        Some(error.time.start),
1137                        Some(error.time.end),
1138                    ),
1139                    _ => (ToolStateSummary::Pending, None, None),
1140                };
1141
1142                tool_calls.push(ToolCallSummary {
1143                    call_id: call_id.clone(),
1144                    tool_name: tool.clone(),
1145                    state,
1146                    started_at,
1147                    completed_at,
1148                });
1149
1150                if tool_calls.len() >= limit {
1151                    return tool_calls;
1152                }
1153            }
1154        }
1155    }
1156
1157    tool_calls
1158}
1159
1160/// Tool for getting detailed state of a specific `OpenCode` session.
1161#[derive(Clone)]
1162pub struct GetSessionStateTool {
1163    server: Arc<OnceCell<OrchestratorServer>>,
1164}
1165
1166impl GetSessionStateTool {
1167    pub fn new(server: Arc<OnceCell<OrchestratorServer>>) -> Self {
1168        Self { server }
1169    }
1170}
1171
1172impl Tool for GetSessionStateTool {
1173    type Input = GetSessionStateInput;
1174    type Output = GetSessionStateOutput;
1175    const NAME: &'static str = "get_session_state";
1176    const DESCRIPTION: &'static str = "Get detailed state of a specific session including status, pending messages, and recent tool calls.";
1177
1178    fn call(
1179        &self,
1180        input: Self::Input,
1181        _ctx: &ToolContext,
1182    ) -> BoxFuture<'static, Result<Self::Output, ToolError>> {
1183        let server_cell = Arc::clone(&self.server);
1184        Box::pin(async move {
1185            let timer = CallTimer::start();
1186            let result: Result<GetSessionStateOutput, ToolError> = async {
1187                let server = server_cell
1188                    .get_or_try_init(OrchestratorServer::start_lazy)
1189                    .await
1190                    .map_err(|e| ToolError::Internal(e.to_string()))?;
1191
1192                let client = server.client();
1193                let session_id = &input.session_id;
1194
1195                let session = client.sessions().get(session_id).await.map_err(|e| {
1196                    if e.is_not_found() {
1197                        ToolError::InvalidInput(format!(
1198                            "Session '{session_id}' not found. Use list_sessions to discover available sessions."
1199                        ))
1200                    } else {
1201                        ToolError::Internal(format!("Failed to get session: {e}"))
1202                    }
1203                })?;
1204
1205                let status = match client.sessions().status_for(session_id).await.map_err(|e| {
1206                    ToolError::Internal(format!("Failed to get session status: {e}"))
1207                })? {
1208                    SessionStatusInfo::Busy => SessionStatusSummary::Busy,
1209                    SessionStatusInfo::Retry {
1210                        attempt,
1211                        message,
1212                        next,
1213                    } => SessionStatusSummary::Retry {
1214                        attempt,
1215                        message,
1216                        next,
1217                    },
1218                    SessionStatusInfo::Idle => SessionStatusSummary::Idle,
1219                };
1220
1221                let messages = client.messages().list(session_id).await.map_err(|e| {
1222                    ToolError::Internal(format!("Failed to list messages: {e}"))
1223                })?;
1224                let pending_message_count = count_pending_messages(&messages);
1225                let last_activity = get_last_activity_time(
1226                    &messages,
1227                    session.time.as_ref().map(|time| time.updated),
1228                );
1229                let recent_tool_calls = extract_recent_tool_calls(&messages, 10);
1230
1231                let spawned = server.spawned_sessions().read().await;
1232                let launched_by_you = spawned.contains(session_id);
1233
1234                Ok(GetSessionStateOutput {
1235                    session_id: session.id,
1236                    title: session.title,
1237                    directory: session.directory,
1238                    status,
1239                    launched_by_you,
1240                    pending_message_count,
1241                    last_activity,
1242                    recent_tool_calls,
1243                })
1244            }
1245            .await;
1246
1247            match result {
1248                Ok(output) => {
1249                    log_tool_success(
1250                        &timer,
1251                        Self::NAME,
1252                        &input,
1253                        &output,
1254                        ToolLogMeta::default(),
1255                        false,
1256                    );
1257                    Ok(output)
1258                }
1259                Err(error) => {
1260                    log_tool_error(&timer, Self::NAME, &input, &error);
1261                    Err(error)
1262                }
1263            }
1264        })
1265    }
1266}
1267
1268// ============================================================================
1269// list_commands
1270// ============================================================================
1271
1272/// Tool for listing available `OpenCode` commands that can be executed.
1273#[derive(Clone)]
1274pub struct ListCommandsTool {
1275    server: Arc<OnceCell<OrchestratorServer>>,
1276}
1277
1278impl ListCommandsTool {
1279    /// Create a new `ListCommandsTool` with the given server cell.
1280    pub fn new(server: Arc<OnceCell<OrchestratorServer>>) -> Self {
1281        Self { server }
1282    }
1283}
1284
1285impl Tool for ListCommandsTool {
1286    type Input = ListCommandsInput;
1287    type Output = ListCommandsOutput;
1288    const NAME: &'static str = "list_commands";
1289    const DESCRIPTION: &'static str = "List available OpenCode commands that can be used with run.";
1290
1291    fn call(
1292        &self,
1293        input: Self::Input,
1294        _ctx: &ToolContext,
1295    ) -> BoxFuture<'static, Result<Self::Output, ToolError>> {
1296        let server_cell = Arc::clone(&self.server);
1297        Box::pin(async move {
1298            let timer = CallTimer::start();
1299            let result: Result<ListCommandsOutput, ToolError> = async {
1300                let server = server_cell
1301                    .get_or_try_init(OrchestratorServer::start_lazy)
1302                    .await
1303                    .map_err(|e| ToolError::Internal(e.to_string()))?;
1304
1305                let commands =
1306                    server.client().tools().commands().await.map_err(|e| {
1307                        ToolError::Internal(format!("Failed to list commands: {e}"))
1308                    })?;
1309
1310                let command_infos: Vec<CommandInfo> = commands
1311                    .into_iter()
1312                    .map(|c| CommandInfo {
1313                        name: c.name,
1314                        description: c.description,
1315                    })
1316                    .collect();
1317
1318                Ok(ListCommandsOutput {
1319                    commands: command_infos,
1320                })
1321            }
1322            .await;
1323
1324            match result {
1325                Ok(output) => {
1326                    log_tool_success(
1327                        &timer,
1328                        Self::NAME,
1329                        &input,
1330                        &output,
1331                        ToolLogMeta::default(),
1332                        false,
1333                    );
1334                    Ok(output)
1335                }
1336                Err(error) => {
1337                    log_tool_error(&timer, Self::NAME, &input, &error);
1338                    Err(error)
1339                }
1340            }
1341        })
1342    }
1343}
1344
1345// ============================================================================
1346// respond_permission
1347// ============================================================================
1348
1349/// Tool for responding to permission requests from `OpenCode` sessions.
1350///
1351/// After sending the reply, continues monitoring the session and returns
1352/// when the session completes or another permission is requested.
1353#[derive(Clone)]
1354pub struct RespondPermissionTool {
1355    server: Arc<OnceCell<OrchestratorServer>>,
1356}
1357
1358impl RespondPermissionTool {
1359    /// Create a new `RespondPermissionTool` with the given server cell.
1360    pub fn new(server: Arc<OnceCell<OrchestratorServer>>) -> Self {
1361        Self { server }
1362    }
1363}
1364
1365impl Tool for RespondPermissionTool {
1366    type Input = RespondPermissionInput;
1367    type Output = RespondPermissionOutput;
1368    const NAME: &'static str = "respond_permission";
1369    const DESCRIPTION: &'static str = r#"Respond to a permission request from an OpenCode session.
1370
1371After responding, continues monitoring the session and returns when complete or when another permission is required.
1372
1373Parameters:
1374- session_id: Session with pending permission
1375- reply: "once" (allow this request), "always" (allow for matching patterns), or "reject" (deny)
1376- message: Optional message to include with reply"#;
1377
1378    fn call(
1379        &self,
1380        input: Self::Input,
1381        ctx: &ToolContext,
1382    ) -> BoxFuture<'static, Result<Self::Output, ToolError>> {
1383        let server_cell = Arc::clone(&self.server);
1384        let ctx = ctx.clone();
1385        Box::pin(async move {
1386            let timer = CallTimer::start();
1387            let request = input.clone();
1388            let result: Result<(RespondPermissionOutput, ToolLogMeta), ToolError> = async {
1389                let server = server_cell
1390                    .get_or_try_init(OrchestratorServer::start_lazy)
1391                    .await
1392                    .map_err(|e| ToolError::Internal(e.to_string()))?;
1393
1394                let client = server.client();
1395
1396                // Find the pending permission for this session
1397                let mut pending =
1398                    client.permissions().list().await.map_err(|e| {
1399                        ToolError::Internal(format!("Failed to list permissions: {e}"))
1400                    })?;
1401
1402                let perm = if let Some(req_id) = input.permission_request_id.as_deref() {
1403                    let idx = pending.iter().position(|p| p.id == req_id).ok_or_else(|| {
1404                        ToolError::InvalidInput(format!(
1405                            "No pending permission found with id '{req_id}'. \
1406                         (session_id='{}')",
1407                            input.session_id
1408                        ))
1409                    })?;
1410
1411                    let perm = pending.remove(idx);
1412
1413                    if perm.session_id != input.session_id {
1414                        return Err(ToolError::InvalidInput(format!(
1415                            "Permission request '{req_id}' belongs to session '{}', not '{}'.",
1416                            perm.session_id, input.session_id
1417                        )));
1418                    }
1419
1420                    perm
1421                } else {
1422                    let mut perms: Vec<_> = pending
1423                        .into_iter()
1424                        .filter(|p| p.session_id == input.session_id)
1425                        .collect();
1426
1427                    match perms.as_slice() {
1428                        [] => {
1429                            return Err(ToolError::InvalidInput(format!(
1430                                "No pending permission found for session '{}'. \
1431                             The permission may have already been responded to.",
1432                                input.session_id
1433                            )));
1434                        }
1435                        [_single] => perms.swap_remove(0),
1436                        multiple => {
1437                            let ids = multiple
1438                                .iter()
1439                                .map(|p| p.id.as_str())
1440                                .collect::<Vec<_>>()
1441                                .join(", ");
1442                            return Err(ToolError::InvalidInput(format!(
1443                                "Multiple pending permissions found for session '{}': {ids}. \
1444                             Please retry with permission_request_id (returned by run).",
1445                                input.session_id
1446                            )));
1447                        }
1448                    }
1449                };
1450
1451                // Track if this is a rejection for post-processing
1452                let is_reject = matches!(input.reply, PermissionReply::Reject);
1453
1454                // Capture permission details for warning message
1455                let permission_type = perm.permission.clone();
1456                let permission_patterns = perm.patterns.clone();
1457
1458                // Capture baseline assistant text BEFORE sending reject
1459                // This lets us detect stale text after rejection
1460                let mut pre_warnings: Vec<String> = Vec::new();
1461                let baseline = if is_reject {
1462                    match client.messages().list(&input.session_id).await {
1463                        Ok(msgs) => OrchestratorServer::extract_assistant_text(&msgs),
1464                        Err(e) => {
1465                            pre_warnings.push(format!("Failed to fetch baseline messages: {e}"));
1466                            None
1467                        }
1468                    }
1469                } else {
1470                    None
1471                };
1472
1473                // Convert our reply type to API type
1474                let api_reply = match input.reply {
1475                    PermissionReply::Once => ApiPermissionReply::Once,
1476                    PermissionReply::Always => ApiPermissionReply::Always,
1477                    PermissionReply::Reject => ApiPermissionReply::Reject,
1478                };
1479
1480                // Send the reply
1481                client
1482                    .permissions()
1483                    .reply(
1484                        &perm.id,
1485                        &PermissionReplyRequest {
1486                            reply: api_reply,
1487                            message: input.message,
1488                        },
1489                    )
1490                    .await
1491                    .map_err(|e| {
1492                        ToolError::Internal(format!("Failed to reply to permission: {e}"))
1493                    })?;
1494
1495                // Now continue monitoring the session using run logic
1496                let run_tool = OrchestratorRunTool::new(Arc::clone(&server_cell));
1497                let wait_for_activity = (!is_reject).then_some(true);
1498                let outcome = run_tool
1499                    .run_impl_outcome(
1500                        OrchestratorRunInput {
1501                            session_id: Some(input.session_id),
1502                            command: None,
1503                            message: None,
1504                            wait_for_activity,
1505                        },
1506                        &ctx,
1507                    )
1508                    .await?;
1509                let mut out = outcome.output;
1510
1511                // Merge pre-warnings
1512                out.warnings.extend(pre_warnings);
1513
1514                // Apply rejection-aware output mutation
1515                if is_reject && matches!(out.status, RunStatus::Completed) {
1516                    let final_resp = out.response.as_deref();
1517                    let baseline_resp = baseline.as_deref();
1518
1519                    // If response unchanged or None, it's stale pre-rejection text
1520                    if final_resp.is_none() || final_resp == baseline_resp {
1521                        out.response = None;
1522                        let patterns_str = if permission_patterns.is_empty() {
1523                            "(none)".to_string()
1524                        } else {
1525                            permission_patterns.join(", ")
1526                        };
1527                        out.warnings.push(format!(
1528                        "Permission rejected for '{permission_type}'. Patterns: {patterns_str}. \
1529                         Session stopped without generating a new assistant response."
1530                    ));
1531                        tracing::debug!(
1532                            permission_type = %permission_type,
1533                            "rejection override applied: response set to None"
1534                        );
1535                    }
1536                }
1537
1538                Ok((out, outcome.log_meta))
1539            }
1540            .await;
1541
1542            match result {
1543                Ok((output, log_meta)) => {
1544                    log_tool_success(&timer, Self::NAME, &request, &output, log_meta, true);
1545                    Ok(output)
1546                }
1547                Err(error) => {
1548                    log_tool_error(&timer, Self::NAME, &request, &error);
1549                    Err(error)
1550                }
1551            }
1552        })
1553    }
1554}
1555
1556// ============================================================================
1557// respond_question
1558// ============================================================================
1559
1560#[derive(Clone)]
1561pub struct RespondQuestionTool {
1562    server: Arc<OnceCell<OrchestratorServer>>,
1563}
1564
1565impl RespondQuestionTool {
1566    pub fn new(server: Arc<OnceCell<OrchestratorServer>>) -> Self {
1567        Self { server }
1568    }
1569}
1570
1571impl Tool for RespondQuestionTool {
1572    type Input = RespondQuestionInput;
1573    type Output = RespondQuestionOutput;
1574    const NAME: &'static str = "respond_question";
1575    const DESCRIPTION: &'static str = r#"Respond to a question request from an OpenCode session.
1576
1577After replying, continues monitoring the session and returns when complete or when another interruption is required.
1578
1579Parameters:
1580- session_id: Session with pending question
1581- action: "reply" or "reject"
1582- answers: Required when action=reply; one list per question"#;
1583
1584    fn call(
1585        &self,
1586        input: Self::Input,
1587        ctx: &ToolContext,
1588    ) -> BoxFuture<'static, Result<Self::Output, ToolError>> {
1589        let server_cell = Arc::clone(&self.server);
1590        let ctx = ctx.clone();
1591        Box::pin(async move {
1592            let timer = CallTimer::start();
1593            let request = input.clone();
1594            let result: Result<(RespondQuestionOutput, ToolLogMeta), ToolError> = async {
1595            let server = server_cell
1596                .get_or_try_init(OrchestratorServer::start_lazy)
1597                .await
1598                .map_err(|e| ToolError::Internal(e.to_string()))?;
1599
1600            let client = server.client();
1601            let mut pending = client
1602                .question()
1603                .list()
1604                .await
1605                .map_err(|e| ToolError::Internal(format!("Failed to list questions: {e}")))?;
1606
1607            let question = if let Some(req_id) = input.question_request_id.as_deref() {
1608                let idx = pending
1609                    .iter()
1610                    .position(|question| question.id == req_id)
1611                    .ok_or_else(|| {
1612                        ToolError::InvalidInput(format!(
1613                            "No pending question found with id '{req_id}'. (session_id='{}')",
1614                            input.session_id
1615                        ))
1616                    })?;
1617
1618                let question = pending.remove(idx);
1619                if question.session_id != input.session_id {
1620                    return Err(ToolError::InvalidInput(format!(
1621                        "Question request '{req_id}' belongs to session '{}', not '{}'.",
1622                        question.session_id, input.session_id
1623                    )));
1624                }
1625
1626                question
1627            } else {
1628                let mut questions: Vec<_> = pending
1629                    .into_iter()
1630                    .filter(|question| question.session_id == input.session_id)
1631                    .collect();
1632
1633                match questions.as_slice() {
1634                    [] => {
1635                        return Err(ToolError::InvalidInput(format!(
1636                            "No pending question found for session '{}'. The question may have already been responded to.",
1637                            input.session_id
1638                        )));
1639                    }
1640                    [_single] => questions.swap_remove(0),
1641                    multiple => {
1642                        let ids = multiple
1643                            .iter()
1644                            .map(|question| question.id.as_str())
1645                            .collect::<Vec<_>>()
1646                            .join(", ");
1647                        return Err(ToolError::InvalidInput(format!(
1648                            "Multiple pending questions found for session '{}': {ids}. Please retry with question_request_id (returned by run).",
1649                            input.session_id
1650                        )));
1651                    }
1652                }
1653            };
1654
1655            match input.action {
1656                QuestionAction::Reply => {
1657                    if input.answers.is_empty() {
1658                        return Err(ToolError::InvalidInput(
1659                            "answers is required when action=reply".into(),
1660                        ));
1661                    }
1662
1663                    client
1664                        .question()
1665                        .reply(
1666                            &question.id,
1667                            &QuestionReply {
1668                                answers: input.answers,
1669                            },
1670                        )
1671                        .await
1672                        .map_err(|e| {
1673                            ToolError::Internal(format!("Failed to reply to question: {e}"))
1674                        })?;
1675
1676                    let outcome = OrchestratorRunTool::new(Arc::clone(&server_cell))
1677                        .run_impl_outcome(OrchestratorRunInput {
1678                            session_id: Some(input.session_id),
1679                            command: None,
1680                            message: None,
1681                            wait_for_activity: Some(true),
1682                        }, &ctx)
1683                        .await?;
1684                    Ok((outcome.output, outcome.log_meta))
1685                }
1686                QuestionAction::Reject => {
1687                    client.question().reject(&question.id).await.map_err(|e| {
1688                        ToolError::Internal(format!("Failed to reject question: {e}"))
1689                    })?;
1690
1691                    let outcome = OrchestratorRunTool::new(Arc::clone(&server_cell))
1692                        .run_impl_outcome(OrchestratorRunInput {
1693                            session_id: Some(input.session_id),
1694                            command: None,
1695                            message: None,
1696                            wait_for_activity: None,
1697                        }, &ctx)
1698                        .await?;
1699                    Ok((outcome.output, outcome.log_meta))
1700                }
1701            }
1702        }
1703        .await;
1704
1705            match result {
1706                Ok((output, log_meta)) => {
1707                    log_tool_success(&timer, Self::NAME, &request, &output, log_meta, true);
1708                    Ok(output)
1709                }
1710                Err(error) => {
1711                    log_tool_error(&timer, Self::NAME, &request, &error);
1712                    Err(error)
1713                }
1714            }
1715        })
1716    }
1717}
1718
1719// ============================================================================
1720// Registry builder
1721// ============================================================================
1722
1723/// Build the tool registry with all orchestrator tools.
1724///
1725/// The server cell is lazily initialized on first tool call.
1726pub fn build_registry(server: &Arc<OnceCell<OrchestratorServer>>) -> ToolRegistry {
1727    ToolRegistry::builder()
1728        .register::<OrchestratorRunTool, ()>(OrchestratorRunTool::new(Arc::clone(server)))
1729        .register::<ListSessionsTool, ()>(ListSessionsTool::new(Arc::clone(server)))
1730        .register::<GetSessionStateTool, ()>(GetSessionStateTool::new(Arc::clone(server)))
1731        .register::<ListCommandsTool, ()>(ListCommandsTool::new(Arc::clone(server)))
1732        .register::<RespondPermissionTool, ()>(RespondPermissionTool::new(Arc::clone(server)))
1733        .register::<RespondQuestionTool, ()>(RespondQuestionTool::new(Arc::clone(server)))
1734        .finish()
1735}
1736
1737#[cfg(test)]
1738mod tests {
1739    use super::*;
1740    use agentic_tools_core::Tool;
1741
1742    #[test]
1743    fn tool_names_are_short() {
1744        assert_eq!(<OrchestratorRunTool as Tool>::NAME, "run");
1745        assert_eq!(<ListSessionsTool as Tool>::NAME, "list_sessions");
1746        assert_eq!(<GetSessionStateTool as Tool>::NAME, "get_session_state");
1747        assert_eq!(<ListCommandsTool as Tool>::NAME, "list_commands");
1748        assert_eq!(<RespondPermissionTool as Tool>::NAME, "respond_permission");
1749        assert_eq!(<RespondQuestionTool as Tool>::NAME, "respond_question");
1750    }
1751
1752    #[test]
1753    fn last_activity_falls_back_to_session_timestamp_when_messages_are_empty() {
1754        assert_eq!(get_last_activity_time(&[], Some(1_234)), Some(1_234));
1755    }
1756}