Skip to main content

opencode_orchestrator_mcp/
tools.rs

1//! Tool implementations for orchestrator MCP server.
2
3use crate::config;
4use crate::logging;
5use crate::server::CommandPolicyDecision;
6use crate::server::OrchestratorServer;
7use crate::server::OrchestratorServerHandle;
8use crate::token_tracker::TokenTracker;
9use crate::types::ChangeStats;
10use crate::types::CommandInfo;
11use crate::types::GetSessionStateInput;
12use crate::types::GetSessionStateOutput;
13use crate::types::ListCommandsInput;
14use crate::types::ListCommandsOutput;
15use crate::types::ListSessionsInput;
16use crate::types::ListSessionsOutput;
17use crate::types::OrchestratorRunInput;
18use crate::types::OrchestratorRunOutput;
19use crate::types::PermissionReply;
20use crate::types::QuestionAction;
21use crate::types::QuestionInfoView;
22use crate::types::QuestionOptionView;
23use crate::types::RespondPermissionInput;
24use crate::types::RespondPermissionOutput;
25use crate::types::RespondQuestionInput;
26use crate::types::RespondQuestionOutput;
27use crate::types::RunStatus;
28use crate::types::SessionStatusSummary;
29use crate::types::SessionSummary;
30use crate::types::ToolCallSummary;
31use crate::types::ToolStateSummary;
32use agentic_logging::CallTimer;
33use agentic_logging::ToolCallRecord;
34use agentic_tools_core::Tool;
35use agentic_tools_core::ToolContext;
36use agentic_tools_core::ToolError;
37use agentic_tools_core::ToolRegistry;
38use agentic_tools_core::fmt::TextFormat;
39use agentic_tools_core::fmt::TextOptions;
40use futures::future::BoxFuture;
41use opencode_rs::types::event::Event;
42use opencode_rs::types::message::CommandRequest;
43use opencode_rs::types::message::Message;
44use opencode_rs::types::message::Part;
45use opencode_rs::types::message::PromptPart;
46use opencode_rs::types::message::PromptRequest;
47use opencode_rs::types::message::ToolState;
48use opencode_rs::types::permission::PermissionReply as ApiPermissionReply;
49use opencode_rs::types::permission::PermissionReplyRequest;
50use opencode_rs::types::question::QuestionReply;
51use opencode_rs::types::question::QuestionRequest;
52use opencode_rs::types::session::CreateSessionRequest;
53use opencode_rs::types::session::SessionStatusInfo;
54use opencode_rs::types::session::SummarizeRequest;
55use serde::Serialize;
56use std::sync::Arc;
57use std::time::Duration;
58use tokio::task::JoinHandle;
59
60const SERVER_NAME: &str = "opencode-orchestrator-mcp";
61
62#[derive(Debug, Clone, Default)]
63struct ToolLogMeta {
64    token_usage: Option<agentic_logging::TokenUsage>,
65    token_usage_saturated: bool,
66}
67
68struct RunOutcome {
69    output: OrchestratorRunOutput,
70    log_meta: ToolLogMeta,
71}
72
73fn blocked_command_error(command: &str, decision: CommandPolicyDecision) -> ToolError {
74    let reason = match decision {
75        CommandPolicyDecision::Allowed => {
76            return ToolError::Internal("command unexpectedly allowed".into());
77        }
78        CommandPolicyDecision::DeniedByAllowlist => {
79            "it is not present in orchestrator.commands.allow"
80        }
81        CommandPolicyDecision::DeniedByDenylist => "it is blocked by orchestrator.commands.deny",
82    };
83
84    ToolError::InvalidInput(format!(
85        "Command '{command}' cannot be run because {reason}."
86    ))
87}
88
89impl RunOutcome {
90    fn without_tokens(output: OrchestratorRunOutput) -> Self {
91        Self {
92            output,
93            log_meta: ToolLogMeta::default(),
94        }
95    }
96
97    fn with_tracker(output: OrchestratorRunOutput, token_tracker: &TokenTracker) -> Self {
98        let (token_usage, token_usage_saturated) = token_tracker.to_log_token_usage();
99        Self {
100            output,
101            log_meta: ToolLogMeta {
102                token_usage,
103                token_usage_saturated,
104            },
105        }
106    }
107}
108
109async fn abort_command_task(task: &mut Option<JoinHandle<Result<(), String>>>) {
110    if let Some(handle) = task.take() {
111        handle.abort();
112        let _ = handle.await;
113    }
114}
115
116fn request_json<T: Serialize>(request: &T) -> serde_json::Value {
117    serde_json::to_value(request)
118        .unwrap_or_else(|error| serde_json::json!({"serialization_error": error.to_string()}))
119}
120
121fn log_tool_success<TReq: Serialize, TOut: TextFormat>(
122    timer: &CallTimer,
123    tool: &str,
124    request: &TReq,
125    output: &TOut,
126    log_meta: ToolLogMeta,
127    write_markdown: bool,
128) {
129    let (completed_at, duration_ms) = timer.finish();
130    let rendered = output.fmt_text(&TextOptions::default());
131    let response_file = write_markdown
132        .then(|| logging::write_markdown_best_effort(completed_at, &timer.call_id, &rendered))
133        .flatten();
134
135    let record = ToolCallRecord {
136        call_id: timer.call_id.clone(),
137        server: SERVER_NAME.into(),
138        tool: tool.into(),
139        started_at: timer.started_at,
140        completed_at,
141        duration_ms,
142        request: request_json(request),
143        response_file,
144        success: true,
145        error: None,
146        failure_kind: None,
147        model: None,
148        token_usage: log_meta.token_usage,
149        summary: log_meta
150            .token_usage_saturated
151            .then(|| serde_json::json!({"token_usage_saturated": true})),
152    };
153
154    logging::append_record_best_effort(&record);
155}
156
157fn log_tool_error<TReq: Serialize>(
158    timer: &CallTimer,
159    tool: &str,
160    request: &TReq,
161    error: &ToolError,
162) {
163    let (completed_at, duration_ms) = timer.finish();
164    let error = error.to_string();
165    let record = ToolCallRecord {
166        call_id: timer.call_id.clone(),
167        server: SERVER_NAME.into(),
168        tool: tool.into(),
169        started_at: timer.started_at,
170        completed_at,
171        duration_ms,
172        request: request_json(request),
173        response_file: None,
174        success: false,
175        error: Some(error.clone()),
176        failure_kind: agentic_logging::classify_failure_kind(false, Some(&error)),
177        model: None,
178        token_usage: None,
179        summary: None,
180    };
181
182    logging::append_record_best_effort(&record);
183}
184
185// ============================================================================
186// run
187// ============================================================================
188
189/// Tool for starting or resuming `OpenCode` sessions.
190///
191/// Handles session creation, prompt/command execution, SSE event monitoring,
192/// and permission request detection. Returns when the session completes or
193/// when a permission is requested.
194#[derive(Clone)]
195pub struct OrchestratorRunTool {
196    server: Arc<OrchestratorServerHandle>,
197}
198
199impl OrchestratorRunTool {
200    /// Create a new `OrchestratorRunTool` with the shared server handle.
201    pub fn new(server: Arc<OrchestratorServerHandle>) -> Self {
202        Self { server }
203    }
204
205    /// Finalize a completed session by fetching messages and optionally triggering summarization.
206    ///
207    /// This is called when we detect the session is idle, either via SSE `SessionIdle` event
208    /// or via polling `sessions().status()`.
209    ///
210    /// Uses bounded retry with backoff (0/50/100/200/400ms) if assistant text is not immediately
211    /// available, handling the race condition where the session becomes idle before messages
212    /// are fully persisted.
213    async fn finalize_completed(
214        client: &opencode_rs::Client,
215        session_id: String,
216        token_tracker: &TokenTracker,
217        mut warnings: Vec<String>,
218    ) -> Result<OrchestratorRunOutput, ToolError> {
219        // Bounded backoff delays for message extraction retry (~750ms total budget)
220        const BACKOFFS_MS: &[u64] = &[0, 50, 100, 200, 400];
221
222        let mut response: Option<String> = None;
223
224        for (attempt, &delay_ms) in BACKOFFS_MS.iter().enumerate() {
225            if delay_ms > 0 {
226                tokio::time::sleep(Duration::from_millis(delay_ms)).await;
227            }
228
229            let messages = client
230                .messages()
231                .list(&session_id)
232                .await
233                .map_err(|e| ToolError::Internal(format!("Failed to list messages: {e}")))?;
234
235            response = OrchestratorServer::extract_assistant_text(&messages);
236
237            if response.is_some() {
238                if attempt > 0 {
239                    tracing::debug!(
240                        session_id = %session_id,
241                        attempt,
242                        "assistant response became available after retry"
243                    );
244                }
245                break;
246            }
247        }
248
249        if response.is_none() {
250            tracing::debug!(
251                session_id = %session_id,
252                "no assistant response found after bounded retry"
253            );
254        }
255
256        // Handle context limit summarization if needed
257        if token_tracker.compaction_needed
258            && let (Some(pid), Some(mid)) = (&token_tracker.provider_id, &token_tracker.model_id)
259        {
260            let summarize_req = SummarizeRequest {
261                provider_id: pid.clone(),
262                model_id: mid.clone(),
263                auto: None,
264            };
265
266            match client
267                .sessions()
268                .summarize(&session_id, &summarize_req)
269                .await
270            {
271                Ok(_) => {
272                    tracing::info!(session_id = %session_id, "context summarization triggered");
273                    warnings.push("Context limit reached; summarization triggered".into());
274                }
275                Err(e) => {
276                    tracing::warn!(session_id = %session_id, error = %e, "summarization failed");
277                    warnings.push(format!("Summarization failed: {e}"));
278                }
279            }
280        }
281
282        Ok(OrchestratorRunOutput {
283            session_id,
284            status: RunStatus::Completed,
285            response,
286            partial_response: None,
287            permission_request_id: None,
288            permission_type: None,
289            permission_patterns: vec![],
290            question_request_id: None,
291            questions: vec![],
292            warnings,
293        })
294    }
295
296    fn map_questions(req: &QuestionRequest) -> Vec<QuestionInfoView> {
297        req.questions
298            .iter()
299            .map(|question| QuestionInfoView {
300                question: question.question.clone(),
301                header: question.header.clone(),
302                options: question
303                    .options
304                    .iter()
305                    .map(|option| QuestionOptionView {
306                        label: option.label.clone(),
307                        description: option.description.clone(),
308                    })
309                    .collect(),
310                multiple: question.multiple,
311                custom: question.custom,
312            })
313            .collect()
314    }
315
316    fn question_required_output(
317        session_id: String,
318        partial_response: Option<String>,
319        request: &QuestionRequest,
320        warnings: Vec<String>,
321    ) -> OrchestratorRunOutput {
322        OrchestratorRunOutput {
323            session_id,
324            status: RunStatus::QuestionRequired,
325            response: None,
326            partial_response,
327            permission_request_id: None,
328            permission_type: None,
329            permission_patterns: vec![],
330            question_request_id: Some(request.id.clone()),
331            questions: Self::map_questions(request),
332            warnings,
333        }
334    }
335
336    async fn run_impl_outcome(
337        &self,
338        input: OrchestratorRunInput,
339        ctx: &ToolContext,
340    ) -> Result<RunOutcome, ToolError> {
341        // Input validation
342        if input.session_id.is_none() && input.message.is_none() && input.command.is_none() {
343            return Err(ToolError::InvalidInput(
344                "Either session_id (to resume/check status) or message/command (to start work) is required"
345                    .into(),
346            ));
347        }
348
349        if input.command.is_some() && input.message.is_none() {
350            return Err(ToolError::InvalidInput(
351                "message is required when command is specified (becomes $ARGUMENTS for template expansion)"
352                    .into(),
353            ));
354        }
355
356        // Trim and validate message content
357        let message = input.message.map(|m| m.trim().to_string());
358        if let Some(ref m) = message
359            && m.is_empty()
360        {
361            return Err(ToolError::InvalidInput(
362                "message cannot be empty or whitespace-only".into(),
363            ));
364        }
365
366        let wait_for_activity = input.wait_for_activity.unwrap_or(false);
367
368        // Lazy initialization: spawn server on first tool call
369        let server = self
370            .server
371            .acquire()
372            .await
373            .map_err(|e| ToolError::Internal(e.to_string()))?;
374
375        if let Some(command) = input.command.as_deref() {
376            let decision = server.command_policy_decision(command);
377            if !decision.is_allowed() {
378                return Err(blocked_command_error(command, decision));
379            }
380        }
381
382        let client = server.client();
383
384        tracing::debug!(
385            command = ?input.command,
386            has_message = message.is_some(),
387            message_len = message.as_ref().map(String::len),
388            session_id = ?input.session_id,
389            "run: starting"
390        );
391
392        // 1. Resolve session: validate existing or create new
393        let session_id = if let Some(sid) = input.session_id {
394            // Validate session exists
395            client.sessions().get(&sid).await.map_err(|e| {
396                if e.is_not_found() {
397                    ToolError::InvalidInput(format!(
398                        "Session '{sid}' not found. Use list_sessions to discover sessions, \
399                         or omit session_id to create a new session."
400                    ))
401                } else {
402                    ToolError::Internal(format!("Failed to get session: {e}"))
403                }
404            })?;
405            sid
406        } else {
407            // Create new session
408            let session = client
409                .sessions()
410                .create(&CreateSessionRequest::default())
411                .await
412                .map_err(|e| ToolError::Internal(format!("Failed to create session: {e}")))?;
413
414            {
415                let mut spawned = server.spawned_sessions().write().await;
416                spawned.insert(session.id.clone());
417            }
418
419            session.id
420        };
421
422        tracing::info!(session_id = %session_id, "run: session resolved");
423
424        // 2. Check if session is already idle (for resume-only case)
425        let status = client
426            .sessions()
427            .status_for(&session_id)
428            .await
429            .map_err(|e| ToolError::Internal(format!("Failed to get session status: {e}")))?;
430
431        let is_idle = matches!(status, SessionStatusInfo::Idle);
432
433        // 3. Check for pending permissions before doing anything else
434        let pending_permissions = client
435            .permissions()
436            .list()
437            .await
438            .map_err(|e| ToolError::Internal(format!("Failed to list permissions: {e}")))?;
439
440        let my_permission = pending_permissions
441            .into_iter()
442            .find(|p| p.session_id == session_id);
443
444        if let Some(perm) = my_permission {
445            tracing::info!(
446                session_id = %session_id,
447                permission_type = %perm.permission,
448                "run: pending permission found"
449            );
450            return Ok(RunOutcome::without_tokens(OrchestratorRunOutput {
451                session_id,
452                status: RunStatus::PermissionRequired,
453                response: None,
454                partial_response: None,
455                permission_request_id: Some(perm.id),
456                permission_type: Some(perm.permission),
457                permission_patterns: perm.patterns,
458                question_request_id: None,
459                questions: vec![],
460                warnings: vec![],
461            }));
462        }
463
464        let pending_questions = client
465            .question()
466            .list()
467            .await
468            .map_err(|e| ToolError::Internal(format!("Failed to list questions: {e}")))?;
469
470        if let Some(question) = pending_questions
471            .into_iter()
472            .find(|question| question.session_id == session_id)
473        {
474            tracing::info!(session_id = %session_id, question_id = %question.id, "run: pending question found");
475            return Ok(RunOutcome::without_tokens(Self::question_required_output(
476                session_id,
477                None,
478                &question,
479                vec![],
480            )));
481        }
482
483        // 4. If no message/command and session is idle, just return current state
484        // Uses finalize_completed to get retry logic for message extraction
485        if message.is_none() && input.command.is_none() && is_idle && !wait_for_activity {
486            let token_tracker = TokenTracker::with_threshold(server.compaction_threshold());
487            let output =
488                Self::finalize_completed(client, session_id, &token_tracker, vec![]).await?;
489            return Ok(RunOutcome::with_tracker(output, &token_tracker));
490        }
491
492        // 5. Subscribe to SSE BEFORE sending prompt/command
493        let mut subscription = client
494            .subscribe_session(&session_id)
495            .map_err(|e| ToolError::Internal(format!("Failed to subscribe to session: {e}")))?;
496
497        // Track whether this call is dispatching new work (command or message)
498        // vs just resuming/monitoring an existing session.
499        let dispatched_new_work = input.command.is_some() || message.is_some() || wait_for_activity;
500        let idle_grace = config::idle_grace();
501        let mut idle_grace_deadline: Option<tokio::time::Instant> = None;
502        let mut awaiting_idle_grace_check = false;
503
504        if wait_for_activity && input.command.is_none() && message.is_none() {
505            idle_grace_deadline = Some(tokio::time::Instant::now() + idle_grace);
506        }
507
508        // 6. Kick off the work
509        let mut command_task: Option<JoinHandle<Result<(), String>>> = None;
510        let mut command_name_for_logging: Option<String> = None;
511
512        if let Some(command) = &input.command {
513            command_name_for_logging = Some(command.clone());
514
515            let cmd_client = client.clone();
516            let cmd_session_id = session_id.clone();
517            let cmd_name = command.clone();
518            let cmd_arguments = message.clone().unwrap_or_default();
519
520            command_task = Some(tokio::spawn(async move {
521                let req = CommandRequest {
522                    command: cmd_name,
523                    arguments: cmd_arguments,
524                    message_id: None,
525                };
526
527                cmd_client
528                    .messages()
529                    .command(&cmd_session_id, &req)
530                    .await
531                    .map(|_| ())
532                    .map_err(|e| e.to_string())
533            }));
534        } else if let Some(msg) = &message {
535            // Send prompt asynchronously
536            let req = PromptRequest {
537                parts: vec![PromptPart::Text {
538                    text: msg.clone(),
539                    synthetic: None,
540                    ignored: None,
541                    metadata: None,
542                }],
543                message_id: None,
544                model: None,
545                agent: None,
546                no_reply: None,
547                system: None,
548                variant: None,
549            };
550
551            client
552                .messages()
553                .prompt_async(&session_id, &req)
554                .await
555                .map_err(|e| ToolError::Internal(format!("Failed to send prompt: {e}")))?;
556
557            idle_grace_deadline = Some(tokio::time::Instant::now() + idle_grace);
558        }
559
560        // 7. Event loop: wait for completion or permission
561        // Overall timeout to prevent infinite hangs (configurable, default 1 hour)
562        let deadline = tokio::time::Instant::now() + server.session_deadline();
563        let inactivity_timeout = server.inactivity_timeout();
564        let mut last_activity_time = tokio::time::Instant::now();
565
566        tracing::debug!(session_id = %session_id, "run: entering event loop");
567        let mut token_tracker = TokenTracker::with_threshold(server.compaction_threshold());
568        let mut partial_response = String::new();
569        let warnings = Vec::new();
570
571        let mut poll_interval = tokio::time::interval(Duration::from_secs(1));
572        poll_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
573
574        // Track whether we've observed the session as busy at least once.
575        // This prevents completing immediately if we call run_impl on an already-idle
576        // session before our new work has started processing.
577        let mut observed_busy = false;
578
579        // Track whether SSE is still active. If the stream closes, we fall back
580        // to polling-only mode rather than returning an error.
581        let mut sse_active = true;
582
583        // === Post-subscribe status re-check (latency optimization) ===
584        // If we're just monitoring (no new work dispatched), check if session is already idle.
585        // This handles the race where session completed between our initial status check
586        // and SSE subscription becoming ready.
587        if !dispatched_new_work
588            && let Ok(status) = client.sessions().status_for(&session_id).await
589            && matches!(status, SessionStatusInfo::Idle)
590        {
591            tracing::debug!(
592                session_id = %session_id,
593                "session already idle on post-subscribe check"
594            );
595            let output =
596                Self::finalize_completed(client, session_id, &token_tracker, warnings).await?;
597            return Ok(RunOutcome::with_tracker(output, &token_tracker));
598        }
599        // If check fails or session is busy, continue to event loop
600
601        loop {
602            // Check timeout before processing
603            let now = tokio::time::Instant::now();
604
605            if now.duration_since(last_activity_time) >= inactivity_timeout {
606                return Err(ToolError::Internal(format!(
607                    "Session idle timeout: no activity for 5 minutes (session_id={session_id}). \
608                     The session may still be running; use run(session_id=...) to check status."
609                )));
610            }
611
612            if now >= deadline {
613                return Err(ToolError::Internal(
614                    "Session execution timed out after 1 hour. \
615                     The session may still be running; use run with the session_id to check status."
616                        .into(),
617                ));
618            }
619
620            let command_task_active = command_task.is_some();
621
622            tokio::select! {
623                () = ctx.cancelled() => {
624                    abort_command_task(&mut command_task).await;
625                    return Err(ToolError::cancelled(None));
626                }
627
628                maybe_event = subscription.recv(), if sse_active => {
629                    let Some(event) = maybe_event else {
630                        // SSE stream closed - this can happen due to network issues,
631                        // server restarts, or connection timeouts. Fall back to polling
632                        // rather than failing immediately.
633                        tracing::warn!(
634                            session_id = %session_id,
635                            "SSE stream closed unexpectedly; falling back to polling-only mode"
636                        );
637                        sse_active = false;
638                        continue; // The poll_interval branch will now drive completion detection
639                    };
640
641                    // Track tokens (server is already initialized at this point)
642                    token_tracker.observe_event(&event, |pid, mid| {
643                        server.context_limit(pid, mid)
644                    });
645
646                    match event {
647                        Event::PermissionAsked { properties } => {
648                            tracing::info!(
649                                session_id = %session_id,
650                                permission_type = %properties.request.permission,
651                                "run: permission requested"
652                            );
653                            return Ok(RunOutcome::with_tracker(OrchestratorRunOutput {
654                                session_id,
655                                status: RunStatus::PermissionRequired,
656                                response: None,
657                                partial_response: if partial_response.is_empty() {
658                                    None
659                                } else {
660                                    Some(partial_response)
661                                },
662                                permission_request_id: Some(properties.request.id),
663                                permission_type: Some(properties.request.permission),
664                                permission_patterns: properties.request.patterns,
665                                question_request_id: None,
666                                questions: vec![],
667                                warnings,
668                            }, &token_tracker));
669                        }
670
671                        Event::QuestionAsked { properties } => {
672                            return Ok(RunOutcome::with_tracker(Self::question_required_output(
673                                session_id,
674                                if partial_response.is_empty() {
675                                    None
676                                } else {
677                                    Some(partial_response)
678                                },
679                                &properties.request,
680                                warnings,
681                            ), &token_tracker));
682                        }
683
684                        Event::MessagePartDelta { properties } => {
685                            last_activity_time = tokio::time::Instant::now();
686                            // Message streaming means session is actively processing
687                            observed_busy = true;
688                            awaiting_idle_grace_check = false;
689                            // Collect streaming text from field-level delta events.
690                            if let Some(delta) = &properties.delta {
691                                partial_response.push_str(delta);
692                            }
693                        }
694
695                        Event::MessagePartUpdated { .. } | Event::MessageUpdated { .. } => {
696                            last_activity_time = tokio::time::Instant::now();
697                            observed_busy = true;
698                            awaiting_idle_grace_check = false;
699                        }
700
701                        Event::SessionError { properties } => {
702                            let error_msg = properties
703                                .error
704                                .map_or_else(|| "Unknown error".to_string(), |e| format!("{e:?}"));
705                            tracing::error!(
706                                session_id = %session_id,
707                                error = %error_msg,
708                                "run: session error"
709                            );
710                            return Err(ToolError::Internal(format!("Session error: {error_msg}")));
711                        }
712
713                        Event::SessionIdle { .. } => {
714                            tracing::debug!(session_id = %session_id, "received SessionIdle event");
715                            let output = Self::finalize_completed(client, session_id, &token_tracker, warnings).await?;
716                            return Ok(RunOutcome::with_tracker(output, &token_tracker));
717                        }
718
719                        _ => {
720                            // Other events - continue
721                        }
722                    }
723                }
724
725                _ = poll_interval.tick() => {
726                    // === 1. Permission fallback (check first, permissions take priority) ===
727                    let pending = match client.permissions().list().await {
728                        Ok(p) => p,
729                        Err(e) => {
730                            // Log but continue - permission list failure shouldn't block completion detection
731                            tracing::warn!(
732                                session_id = %session_id,
733                                error = %e,
734                                "failed to list permissions during poll fallback"
735                            );
736                            vec![]
737                        }
738                    };
739
740                    if let Some(perm) = pending.into_iter().find(|p| p.session_id == session_id) {
741                        tracing::debug!(
742                            session_id = %session_id,
743                            permission_id = %perm.id,
744                            "detected pending permission via polling fallback"
745                        );
746                        return Ok(RunOutcome::with_tracker(OrchestratorRunOutput {
747                            session_id,
748                            status: RunStatus::PermissionRequired,
749                            response: None,
750                            partial_response: if partial_response.is_empty() {
751                                None
752                            } else {
753                                Some(partial_response)
754                                },
755                                permission_request_id: Some(perm.id),
756                                permission_type: Some(perm.permission),
757                            permission_patterns: perm.patterns,
758                            question_request_id: None,
759                            questions: vec![],
760                            warnings,
761                        }, &token_tracker));
762                    }
763
764                    let pending_questions = match client.question().list().await {
765                        Ok(questions) => questions,
766                        Err(e) => {
767                            tracing::warn!(
768                                session_id = %session_id,
769                                error = %e,
770                                "failed to list questions during poll fallback"
771                            );
772                            vec![]
773                        }
774                    };
775
776                    if let Some(question) = pending_questions
777                        .into_iter()
778                        .find(|question| question.session_id == session_id)
779                    {
780                        tracing::debug!(
781                            session_id = %session_id,
782                            question_id = %question.id,
783                            "detected pending question via polling fallback"
784                        );
785                        return Ok(RunOutcome::with_tracker(Self::question_required_output(
786                            session_id,
787                            if partial_response.is_empty() {
788                                None
789                            } else {
790                                Some(partial_response)
791                            },
792                            &question,
793                            warnings,
794                        ), &token_tracker));
795                    }
796
797                    // === 2. Session idle detection fallback (NEW) ===
798                    // This is the key fix for race conditions. If SSE missed SessionIdle,
799                    // we detect completion via polling sessions().status_for(session_id).
800                    match client.sessions().status_for(&session_id).await {
801                        Ok(SessionStatusInfo::Busy | SessionStatusInfo::Retry { .. }) => {
802                            last_activity_time = tokio::time::Instant::now();
803                            observed_busy = true;
804                            awaiting_idle_grace_check = false;
805                            tracing::trace!(
806                                session_id = %session_id,
807                                "our session is busy/retry, waiting"
808                            );
809                        }
810                        Ok(SessionStatusInfo::Idle) => {
811                            if !dispatched_new_work || observed_busy {
812                                // Session is idle AND either:
813                                // - We didn't dispatch new work (just monitoring), OR
814                                // - We did dispatch work and have seen it become busy at least once
815                                //
816                                // This guards against completing before our work starts processing.
817                                tracing::debug!(
818                                    session_id = %session_id,
819                                    dispatched_new_work = dispatched_new_work,
820                                    observed_busy = observed_busy,
821                                    "detected session idle via polling fallback"
822                                );
823                                let output = Self::finalize_completed(client, session_id, &token_tracker, warnings).await?;
824                                return Ok(RunOutcome::with_tracker(output, &token_tracker));
825                            }
826
827                            let Some(deadline) = idle_grace_deadline else {
828                                tracing::trace!(
829                                    session_id = %session_id,
830                                    command_task_active = command_task_active,
831                                    "idle seen before dispatch confirmed; waiting"
832                                );
833                                continue;
834                            };
835
836                            let now = tokio::time::Instant::now();
837                            if now >= deadline {
838                                tracing::debug!(
839                                    session_id = %session_id,
840                                    idle_grace_ms = idle_grace.as_millis(),
841                                    "accepting idle via bounded idle grace (no busy observed)"
842                                );
843                                let output = Self::finalize_completed(client, session_id, &token_tracker, warnings).await?;
844                                return Ok(RunOutcome::with_tracker(output, &token_tracker));
845                            }
846
847                            awaiting_idle_grace_check = true;
848                            tracing::trace!(
849                                session_id = %session_id,
850                                remaining_ms = (deadline - now).as_millis(),
851                                "idle detected before busy; waiting for idle-grace deadline"
852                            );
853                        }
854                        Err(e) => {
855                            // Log but continue - status check failure shouldn't block the loop
856                            tracing::warn!(
857                                session_id = %session_id,
858                                error = %e,
859                                "failed to get session status during poll fallback"
860                            );
861                        }
862                    }
863                }
864
865                () = async {
866                    match idle_grace_deadline {
867                        Some(deadline) => tokio::time::sleep_until(deadline).await,
868                        None => std::future::pending::<()>().await,
869                    }
870                }, if awaiting_idle_grace_check => {
871                    awaiting_idle_grace_check = false;
872
873                    match client.sessions().status_for(&session_id).await {
874                        Ok(SessionStatusInfo::Idle) => {
875                            tracing::debug!(session_id = %session_id, "idle-grace deadline reached; finalizing");
876                            let output = Self::finalize_completed(client, session_id, &token_tracker, warnings).await?;
877                            return Ok(RunOutcome::with_tracker(output, &token_tracker));
878                        }
879                        Ok(SessionStatusInfo::Busy | SessionStatusInfo::Retry { .. }) => {
880                            last_activity_time = tokio::time::Instant::now();
881                            observed_busy = true;
882                        }
883                        Err(e) => {
884                            tracing::warn!(
885                                session_id = %session_id,
886                                error = %e,
887                                "status check failed at idle-grace deadline"
888                            );
889                        }
890                    }
891                }
892
893                cmd_result = async {
894                    match command_task.as_mut() {
895                        Some(handle) => Some(handle.await),
896                        None => {
897                            std::future::pending::<
898                                Option<Result<Result<(), String>, tokio::task::JoinError>>,
899                            >()
900                            .await
901                        }
902                    }
903                }, if command_task_active => {
904                    match cmd_result {
905                        Some(Ok(Ok(()))) => {
906                            idle_grace_deadline = Some(tokio::time::Instant::now() + idle_grace);
907                            tracing::debug!(
908                                session_id = %session_id,
909                                command = ?command_name_for_logging,
910                                "run: command dispatch completed successfully"
911                            );
912                            command_task = None;
913                        }
914                        Some(Ok(Err(e))) => {
915                            tracing::error!(
916                                session_id = %session_id,
917                                command = ?command_name_for_logging,
918                                error = %e,
919                                "run: command dispatch failed"
920                            );
921                            return Err(ToolError::Internal(format!(
922                                "Failed to execute command '{}': {e}",
923                                command_name_for_logging.as_deref().unwrap_or("unknown")
924                            )));
925                        }
926                        Some(Err(join_err)) => {
927                            tracing::error!(
928                                session_id = %session_id,
929                                command = ?command_name_for_logging,
930                                error = %join_err,
931                                "run: command task panicked"
932                            );
933                            return Err(ToolError::Internal(format!("Command task panicked: {join_err}")));
934                        }
935                        None => {
936                            unreachable!("command_task_active guard should prevent None");
937                        }
938                    }
939                }
940            }
941        }
942    }
943}
944
945impl Tool for OrchestratorRunTool {
946    type Input = OrchestratorRunInput;
947    type Output = OrchestratorRunOutput;
948    const NAME: &'static str = "run";
949    const DESCRIPTION: &'static str = r#"Start or resume an OpenCode session. Optionally run a named command or send a raw prompt.
950
951Returns when:
952- status=completed: Session finished executing. Response contains final assistant output.
953- status=permission_required: Session needs permission approval. Call respond_permission to continue.
954- status=question_required: Session needs question answers. Call respond_question to continue.
955
956Parameters:
957- session_id: Existing session to resume (omit to create new)
958- command: OpenCode command name (e.g., "research", "implement_plan")
959- message: Prompt text or $ARGUMENTS for command template
960
961Examples:
962- New session with prompt: run(message="explain this code")
963- New session with command: run(command="research", message="caching strategies")
964- Resume session: run(session_id="...", message="continue")
965- Check status: run(session_id="...")"#;
966
967    fn call(
968        &self,
969        input: Self::Input,
970        ctx: &ToolContext,
971    ) -> BoxFuture<'static, Result<Self::Output, ToolError>> {
972        let this = self.clone();
973        let ctx = ctx.clone();
974        Box::pin(async move {
975            let timer = CallTimer::start();
976            match this.run_impl_outcome(input.clone(), &ctx).await {
977                Ok(outcome) => {
978                    log_tool_success(
979                        &timer,
980                        Self::NAME,
981                        &input,
982                        &outcome.output,
983                        outcome.log_meta,
984                        true,
985                    );
986                    Ok(outcome.output)
987                }
988                Err(error) => {
989                    log_tool_error(&timer, Self::NAME, &input, &error);
990                    Err(error)
991                }
992            }
993        })
994    }
995}
996
997// ============================================================================
998// list_sessions
999// ============================================================================
1000
1001/// Tool for listing available `OpenCode` sessions in the current directory.
1002#[derive(Clone)]
1003pub struct ListSessionsTool {
1004    server: Arc<OrchestratorServerHandle>,
1005}
1006
1007impl ListSessionsTool {
1008    /// Create a new `ListSessionsTool` with the shared server handle.
1009    pub fn new(server: Arc<OrchestratorServerHandle>) -> Self {
1010        Self { server }
1011    }
1012}
1013
1014impl Tool for ListSessionsTool {
1015    type Input = ListSessionsInput;
1016    type Output = ListSessionsOutput;
1017    const NAME: &'static str = "list_sessions";
1018    const DESCRIPTION: &'static str =
1019        "List available OpenCode sessions in the current directory context.";
1020
1021    fn call(
1022        &self,
1023        input: Self::Input,
1024        _ctx: &ToolContext,
1025    ) -> BoxFuture<'static, Result<Self::Output, ToolError>> {
1026        let server_handle = Arc::clone(&self.server);
1027        Box::pin(async move {
1028            let timer = CallTimer::start();
1029            let result: Result<ListSessionsOutput, ToolError> = async {
1030                let server = server_handle
1031                    .acquire()
1032                    .await
1033                    .map_err(|e| ToolError::Internal(e.to_string()))?;
1034
1035                let sessions =
1036                    // Intentionally keep zero-arg list() so SDK directory context preserves launch-directory scoping.
1037                    server.client().sessions().list().await.map_err(|e| {
1038                        ToolError::Internal(format!("Failed to list sessions: {e}"))
1039                    })?;
1040                let status_map = server.client().sessions().status_map().await.ok();
1041                let spawned = server.spawned_sessions().read().await;
1042
1043                let limit = input.limit.unwrap_or(20);
1044                let summaries: Vec<SessionSummary> = sessions
1045                    .into_iter()
1046                    .take(limit)
1047                    .map(|s| {
1048                        let status =
1049                            status_map
1050                                .as_ref()
1051                                .map(|status_map| match status_map.get(&s.id) {
1052                                    Some(SessionStatusInfo::Busy) => SessionStatusSummary::Busy,
1053                                    Some(SessionStatusInfo::Retry {
1054                                        attempt,
1055                                        message,
1056                                        next,
1057                                    }) => SessionStatusSummary::Retry {
1058                                        attempt: *attempt,
1059                                        message: message.clone(),
1060                                        next: *next,
1061                                    },
1062                                    Some(SessionStatusInfo::Idle) | None => {
1063                                        SessionStatusSummary::Idle
1064                                    }
1065                                });
1066
1067                        let change_stats = s.summary.as_ref().map(|summary| ChangeStats {
1068                            additions: summary.additions,
1069                            deletions: summary.deletions,
1070                            files: summary.files,
1071                        });
1072
1073                        SessionSummary {
1074                            launched_by_you: spawned.contains(&s.id),
1075                            created: s.time.as_ref().map(|t| t.created),
1076                            updated: s.time.as_ref().map(|t| t.updated),
1077                            directory: s.directory,
1078                            path: s.path,
1079                            title: s.title,
1080                            id: s.id,
1081                            status,
1082                            change_stats,
1083                        }
1084                    })
1085                    .collect();
1086
1087                Ok(ListSessionsOutput {
1088                    sessions: summaries,
1089                })
1090            }
1091            .await;
1092
1093            match result {
1094                Ok(output) => {
1095                    log_tool_success(
1096                        &timer,
1097                        Self::NAME,
1098                        &input,
1099                        &output,
1100                        ToolLogMeta::default(),
1101                        false,
1102                    );
1103                    Ok(output)
1104                }
1105                Err(error) => {
1106                    log_tool_error(&timer, Self::NAME, &input, &error);
1107                    Err(error)
1108                }
1109            }
1110        })
1111    }
1112}
1113
1114fn count_pending_messages(messages: &[Message]) -> usize {
1115    let mut pending = 0;
1116
1117    for message in messages.iter().rev() {
1118        if message.role() == "user" {
1119            pending += 1;
1120        } else if message.role() == "assistant" {
1121            break;
1122        }
1123    }
1124
1125    pending
1126}
1127
1128fn get_last_activity_time(messages: &[Message], fallback: Option<i64>) -> Option<i64> {
1129    messages.last().map_or(fallback, |message| {
1130        Some(
1131            message
1132                .info
1133                .time
1134                .completed
1135                .unwrap_or(message.info.time.created),
1136        )
1137    })
1138}
1139
1140fn extract_recent_tool_calls(messages: &[Message], limit: usize) -> Vec<ToolCallSummary> {
1141    let mut tool_calls = Vec::new();
1142
1143    for message in messages.iter().rev() {
1144        for part in message.parts.iter().rev() {
1145            if let Part::Tool {
1146                call_id,
1147                tool,
1148                state,
1149                ..
1150            } = part
1151            {
1152                let (state, started_at, completed_at) = match state {
1153                    Some(ToolState::Running(running)) => {
1154                        (ToolStateSummary::Running, Some(running.time.start), None)
1155                    }
1156                    Some(ToolState::Completed(completed)) => (
1157                        ToolStateSummary::Completed,
1158                        Some(completed.time.start),
1159                        Some(completed.time.end),
1160                    ),
1161                    Some(ToolState::Error(error)) => (
1162                        ToolStateSummary::Error {
1163                            message: error.error.clone(),
1164                        },
1165                        Some(error.time.start),
1166                        Some(error.time.end),
1167                    ),
1168                    _ => (ToolStateSummary::Pending, None, None),
1169                };
1170
1171                tool_calls.push(ToolCallSummary {
1172                    call_id: call_id.clone(),
1173                    tool_name: tool.clone(),
1174                    state,
1175                    started_at,
1176                    completed_at,
1177                });
1178
1179                if tool_calls.len() >= limit {
1180                    return tool_calls;
1181                }
1182            }
1183        }
1184    }
1185
1186    tool_calls
1187}
1188
1189/// Tool for getting detailed state of a specific `OpenCode` session.
1190#[derive(Clone)]
1191pub struct GetSessionStateTool {
1192    server: Arc<OrchestratorServerHandle>,
1193}
1194
1195impl GetSessionStateTool {
1196    pub fn new(server: Arc<OrchestratorServerHandle>) -> Self {
1197        Self { server }
1198    }
1199}
1200
1201impl Tool for GetSessionStateTool {
1202    type Input = GetSessionStateInput;
1203    type Output = GetSessionStateOutput;
1204    const NAME: &'static str = "get_session_state";
1205    const DESCRIPTION: &'static str = "Get detailed state of a specific session including status, pending messages, and recent tool calls.";
1206
1207    fn call(
1208        &self,
1209        input: Self::Input,
1210        _ctx: &ToolContext,
1211    ) -> BoxFuture<'static, Result<Self::Output, ToolError>> {
1212        let server_handle = Arc::clone(&self.server);
1213        Box::pin(async move {
1214            let timer = CallTimer::start();
1215            let result: Result<GetSessionStateOutput, ToolError> = async {
1216                let server = server_handle
1217                    .acquire()
1218                    .await
1219                    .map_err(|e| ToolError::Internal(e.to_string()))?;
1220
1221                let client = server.client();
1222                let session_id = &input.session_id;
1223
1224                let session = client.sessions().get(session_id).await.map_err(|e| {
1225                    if e.is_not_found() {
1226                        ToolError::InvalidInput(format!(
1227                            "Session '{session_id}' not found. Use list_sessions to discover available sessions."
1228                        ))
1229                    } else {
1230                        ToolError::Internal(format!("Failed to get session: {e}"))
1231                    }
1232                })?;
1233
1234                let status = match client.sessions().status_for(session_id).await.map_err(|e| {
1235                    ToolError::Internal(format!("Failed to get session status: {e}"))
1236                })? {
1237                    SessionStatusInfo::Busy => SessionStatusSummary::Busy,
1238                    SessionStatusInfo::Retry {
1239                        attempt,
1240                        message,
1241                        next,
1242                    } => SessionStatusSummary::Retry {
1243                        attempt,
1244                        message,
1245                        next,
1246                    },
1247                    SessionStatusInfo::Idle => SessionStatusSummary::Idle,
1248                };
1249
1250                let messages = client.messages().list(session_id).await.map_err(|e| {
1251                    ToolError::Internal(format!("Failed to list messages: {e}"))
1252                })?;
1253                let pending_message_count = count_pending_messages(&messages);
1254                let last_activity = get_last_activity_time(
1255                    &messages,
1256                    session.time.as_ref().map(|time| time.updated),
1257                );
1258                let recent_tool_calls = extract_recent_tool_calls(&messages, 10);
1259
1260                let spawned = server.spawned_sessions().read().await;
1261                let launched_by_you = spawned.contains(session_id);
1262
1263                Ok(GetSessionStateOutput {
1264                    session_id: session.id,
1265                    title: session.title,
1266                    directory: session.directory,
1267                    path: session.path,
1268                    status,
1269                    launched_by_you,
1270                    pending_message_count,
1271                    last_activity,
1272                    recent_tool_calls,
1273                })
1274            }
1275            .await;
1276
1277            match result {
1278                Ok(output) => {
1279                    log_tool_success(
1280                        &timer,
1281                        Self::NAME,
1282                        &input,
1283                        &output,
1284                        ToolLogMeta::default(),
1285                        false,
1286                    );
1287                    Ok(output)
1288                }
1289                Err(error) => {
1290                    log_tool_error(&timer, Self::NAME, &input, &error);
1291                    Err(error)
1292                }
1293            }
1294        })
1295    }
1296}
1297
1298// ============================================================================
1299// list_commands
1300// ============================================================================
1301
1302/// Tool for listing available `OpenCode` commands that can be executed.
1303#[derive(Clone)]
1304pub struct ListCommandsTool {
1305    server: Arc<OrchestratorServerHandle>,
1306}
1307
1308impl ListCommandsTool {
1309    /// Create a new `ListCommandsTool` with the shared server handle.
1310    pub fn new(server: Arc<OrchestratorServerHandle>) -> Self {
1311        Self { server }
1312    }
1313}
1314
1315impl Tool for ListCommandsTool {
1316    type Input = ListCommandsInput;
1317    type Output = ListCommandsOutput;
1318    const NAME: &'static str = "list_commands";
1319    const DESCRIPTION: &'static str = "List available OpenCode commands that can be used with run.";
1320
1321    fn call(
1322        &self,
1323        input: Self::Input,
1324        _ctx: &ToolContext,
1325    ) -> BoxFuture<'static, Result<Self::Output, ToolError>> {
1326        let server_handle = Arc::clone(&self.server);
1327        Box::pin(async move {
1328            let timer = CallTimer::start();
1329            let result: Result<ListCommandsOutput, ToolError> = async {
1330                let server = server_handle
1331                    .acquire()
1332                    .await
1333                    .map_err(|e| ToolError::Internal(e.to_string()))?;
1334
1335                let commands =
1336                    server.client().tools().commands().await.map_err(|e| {
1337                        ToolError::Internal(format!("Failed to list commands: {e}"))
1338                    })?;
1339
1340                let command_infos: Vec<CommandInfo> = commands
1341                    .into_iter()
1342                    .filter(|command| server.is_command_allowed(&command.name))
1343                    .map(|c| CommandInfo {
1344                        name: c.name,
1345                        description: c.description,
1346                    })
1347                    .collect();
1348
1349                Ok(ListCommandsOutput {
1350                    commands: command_infos,
1351                })
1352            }
1353            .await;
1354
1355            match result {
1356                Ok(output) => {
1357                    log_tool_success(
1358                        &timer,
1359                        Self::NAME,
1360                        &input,
1361                        &output,
1362                        ToolLogMeta::default(),
1363                        false,
1364                    );
1365                    Ok(output)
1366                }
1367                Err(error) => {
1368                    log_tool_error(&timer, Self::NAME, &input, &error);
1369                    Err(error)
1370                }
1371            }
1372        })
1373    }
1374}
1375
1376// ============================================================================
1377// respond_permission
1378// ============================================================================
1379
1380/// Tool for responding to permission requests from `OpenCode` sessions.
1381///
1382/// After sending the reply, continues monitoring the session and returns
1383/// when the session completes or another permission is requested.
1384#[derive(Clone)]
1385pub struct RespondPermissionTool {
1386    server: Arc<OrchestratorServerHandle>,
1387}
1388
1389impl RespondPermissionTool {
1390    /// Create a new `RespondPermissionTool` with the shared server handle.
1391    pub fn new(server: Arc<OrchestratorServerHandle>) -> Self {
1392        Self { server }
1393    }
1394}
1395
1396impl Tool for RespondPermissionTool {
1397    type Input = RespondPermissionInput;
1398    type Output = RespondPermissionOutput;
1399    const NAME: &'static str = "respond_permission";
1400    const DESCRIPTION: &'static str = r#"Respond to a permission request from an OpenCode session.
1401
1402After responding, continues monitoring the session and returns when complete or when another permission is required.
1403
1404Parameters:
1405- session_id: Session with pending permission
1406- reply: "once" (allow this request), "always" (allow for matching patterns), or "reject" (deny)
1407- message: Optional message to include with reply"#;
1408
1409    fn call(
1410        &self,
1411        input: Self::Input,
1412        ctx: &ToolContext,
1413    ) -> BoxFuture<'static, Result<Self::Output, ToolError>> {
1414        let server_handle = Arc::clone(&self.server);
1415        let ctx = ctx.clone();
1416        Box::pin(async move {
1417            let timer = CallTimer::start();
1418            let request = input.clone();
1419            let result: Result<(RespondPermissionOutput, ToolLogMeta), ToolError> = async {
1420                let server = server_handle
1421                    .acquire()
1422                    .await
1423                    .map_err(|e| ToolError::Internal(e.to_string()))?;
1424
1425                let client = server.client();
1426
1427                // Find the pending permission for this session
1428                let mut pending =
1429                    client.permissions().list().await.map_err(|e| {
1430                        ToolError::Internal(format!("Failed to list permissions: {e}"))
1431                    })?;
1432
1433                let perm = if let Some(req_id) = input.permission_request_id.as_deref() {
1434                    let idx = pending.iter().position(|p| p.id == req_id).ok_or_else(|| {
1435                        ToolError::InvalidInput(format!(
1436                            "No pending permission found with id '{req_id}'. \
1437                         (session_id='{}')",
1438                            input.session_id
1439                        ))
1440                    })?;
1441
1442                    let perm = pending.remove(idx);
1443
1444                    if perm.session_id != input.session_id {
1445                        return Err(ToolError::InvalidInput(format!(
1446                            "Permission request '{req_id}' belongs to session '{}', not '{}'.",
1447                            perm.session_id, input.session_id
1448                        )));
1449                    }
1450
1451                    perm
1452                } else {
1453                    let mut perms: Vec<_> = pending
1454                        .into_iter()
1455                        .filter(|p| p.session_id == input.session_id)
1456                        .collect();
1457
1458                    match perms.as_slice() {
1459                        [] => {
1460                            return Err(ToolError::InvalidInput(format!(
1461                                "No pending permission found for session '{}'. \
1462                             The permission may have already been responded to.",
1463                                input.session_id
1464                            )));
1465                        }
1466                        [_single] => perms.swap_remove(0),
1467                        multiple => {
1468                            let ids = multiple
1469                                .iter()
1470                                .map(|p| p.id.as_str())
1471                                .collect::<Vec<_>>()
1472                                .join(", ");
1473                            return Err(ToolError::InvalidInput(format!(
1474                                "Multiple pending permissions found for session '{}': {ids}. \
1475                             Please retry with permission_request_id (returned by run).",
1476                                input.session_id
1477                            )));
1478                        }
1479                    }
1480                };
1481
1482                // Track if this is a rejection for post-processing
1483                let is_reject = matches!(input.reply, PermissionReply::Reject);
1484
1485                // Capture permission details for warning message
1486                let permission_type = perm.permission.clone();
1487                let permission_patterns = perm.patterns.clone();
1488
1489                // Capture baseline assistant text BEFORE sending reject
1490                // This lets us detect stale text after rejection
1491                let mut pre_warnings: Vec<String> = Vec::new();
1492                let baseline = if is_reject {
1493                    match client.messages().list(&input.session_id).await {
1494                        Ok(msgs) => OrchestratorServer::extract_assistant_text(&msgs),
1495                        Err(e) => {
1496                            pre_warnings.push(format!("Failed to fetch baseline messages: {e}"));
1497                            None
1498                        }
1499                    }
1500                } else {
1501                    None
1502                };
1503
1504                // Convert our reply type to API type
1505                let api_reply = match input.reply {
1506                    PermissionReply::Once => ApiPermissionReply::Once,
1507                    PermissionReply::Always => ApiPermissionReply::Always,
1508                    PermissionReply::Reject => ApiPermissionReply::Reject,
1509                };
1510
1511                // Send the reply
1512                client
1513                    .permissions()
1514                    .reply(
1515                        &perm.id,
1516                        &PermissionReplyRequest {
1517                            reply: api_reply,
1518                            message: input.message,
1519                        },
1520                    )
1521                    .await
1522                    .map_err(|e| {
1523                        ToolError::Internal(format!("Failed to reply to permission: {e}"))
1524                    })?;
1525
1526                // Now continue monitoring the session using run logic
1527                let run_tool = OrchestratorRunTool::new(Arc::clone(&server_handle));
1528                let wait_for_activity = (!is_reject).then_some(true);
1529                let outcome = run_tool
1530                    .run_impl_outcome(
1531                        OrchestratorRunInput {
1532                            session_id: Some(input.session_id),
1533                            command: None,
1534                            message: None,
1535                            wait_for_activity,
1536                        },
1537                        &ctx,
1538                    )
1539                    .await?;
1540                let mut out = outcome.output;
1541
1542                // Merge pre-warnings
1543                out.warnings.extend(pre_warnings);
1544
1545                // Apply rejection-aware output mutation
1546                if is_reject && matches!(out.status, RunStatus::Completed) {
1547                    let final_resp = out.response.as_deref();
1548                    let baseline_resp = baseline.as_deref();
1549
1550                    // If response unchanged or None, it's stale pre-rejection text
1551                    if final_resp.is_none() || final_resp == baseline_resp {
1552                        out.response = None;
1553                        let patterns_str = if permission_patterns.is_empty() {
1554                            "(none)".to_string()
1555                        } else {
1556                            permission_patterns.join(", ")
1557                        };
1558                        out.warnings.push(format!(
1559                        "Permission rejected for '{permission_type}'. Patterns: {patterns_str}. \
1560                         Session stopped without generating a new assistant response."
1561                    ));
1562                        tracing::debug!(
1563                            permission_type = %permission_type,
1564                            "rejection override applied: response set to None"
1565                        );
1566                    }
1567                }
1568
1569                Ok((out, outcome.log_meta))
1570            }
1571            .await;
1572
1573            match result {
1574                Ok((output, log_meta)) => {
1575                    log_tool_success(&timer, Self::NAME, &request, &output, log_meta, true);
1576                    Ok(output)
1577                }
1578                Err(error) => {
1579                    log_tool_error(&timer, Self::NAME, &request, &error);
1580                    Err(error)
1581                }
1582            }
1583        })
1584    }
1585}
1586
1587// ============================================================================
1588// respond_question
1589// ============================================================================
1590
1591#[derive(Clone)]
1592pub struct RespondQuestionTool {
1593    server: Arc<OrchestratorServerHandle>,
1594}
1595
1596impl RespondQuestionTool {
1597    pub fn new(server: Arc<OrchestratorServerHandle>) -> Self {
1598        Self { server }
1599    }
1600}
1601
1602impl Tool for RespondQuestionTool {
1603    type Input = RespondQuestionInput;
1604    type Output = RespondQuestionOutput;
1605    const NAME: &'static str = "respond_question";
1606    const DESCRIPTION: &'static str = r#"Respond to a question request from an OpenCode session.
1607
1608After replying, continues monitoring the session and returns when complete or when another interruption is required.
1609
1610Parameters:
1611- session_id: Session with pending question
1612- action: "reply" or "reject"
1613- answers: Required when action=reply; one list per question"#;
1614
1615    fn call(
1616        &self,
1617        input: Self::Input,
1618        ctx: &ToolContext,
1619    ) -> BoxFuture<'static, Result<Self::Output, ToolError>> {
1620        let server_handle = Arc::clone(&self.server);
1621        let ctx = ctx.clone();
1622        Box::pin(async move {
1623            let timer = CallTimer::start();
1624            let request = input.clone();
1625            let result: Result<(RespondQuestionOutput, ToolLogMeta), ToolError> = async {
1626            let server = server_handle
1627                .acquire()
1628                .await
1629                .map_err(|e| ToolError::Internal(e.to_string()))?;
1630
1631            let client = server.client();
1632            let mut pending = client
1633                .question()
1634                .list()
1635                .await
1636                .map_err(|e| ToolError::Internal(format!("Failed to list questions: {e}")))?;
1637
1638            let question = if let Some(req_id) = input.question_request_id.as_deref() {
1639                let idx = pending
1640                    .iter()
1641                    .position(|question| question.id == req_id)
1642                    .ok_or_else(|| {
1643                        ToolError::InvalidInput(format!(
1644                            "No pending question found with id '{req_id}'. (session_id='{}')",
1645                            input.session_id
1646                        ))
1647                    })?;
1648
1649                let question = pending.remove(idx);
1650                if question.session_id != input.session_id {
1651                    return Err(ToolError::InvalidInput(format!(
1652                        "Question request '{req_id}' belongs to session '{}', not '{}'.",
1653                        question.session_id, input.session_id
1654                    )));
1655                }
1656
1657                question
1658            } else {
1659                let mut questions: Vec<_> = pending
1660                    .into_iter()
1661                    .filter(|question| question.session_id == input.session_id)
1662                    .collect();
1663
1664                match questions.as_slice() {
1665                    [] => {
1666                        return Err(ToolError::InvalidInput(format!(
1667                            "No pending question found for session '{}'. The question may have already been responded to.",
1668                            input.session_id
1669                        )));
1670                    }
1671                    [_single] => questions.swap_remove(0),
1672                    multiple => {
1673                        let ids = multiple
1674                            .iter()
1675                            .map(|question| question.id.as_str())
1676                            .collect::<Vec<_>>()
1677                            .join(", ");
1678                        return Err(ToolError::InvalidInput(format!(
1679                            "Multiple pending questions found for session '{}': {ids}. Please retry with question_request_id (returned by run).",
1680                            input.session_id
1681                        )));
1682                    }
1683                }
1684            };
1685
1686            match input.action {
1687                QuestionAction::Reply => {
1688                    if input.answers.is_empty() {
1689                        return Err(ToolError::InvalidInput(
1690                            "answers is required when action=reply".into(),
1691                        ));
1692                    }
1693
1694                    client
1695                        .question()
1696                        .reply(
1697                            &question.id,
1698                            &QuestionReply {
1699                                answers: input.answers,
1700                            },
1701                        )
1702                        .await
1703                        .map_err(|e| {
1704                            ToolError::Internal(format!("Failed to reply to question: {e}"))
1705                        })?;
1706
1707                    let outcome = OrchestratorRunTool::new(Arc::clone(&server_handle))
1708                        .run_impl_outcome(OrchestratorRunInput {
1709                            session_id: Some(input.session_id),
1710                            command: None,
1711                            message: None,
1712                            wait_for_activity: Some(true),
1713                        }, &ctx)
1714                        .await?;
1715                    Ok((outcome.output, outcome.log_meta))
1716                }
1717                QuestionAction::Reject => {
1718                    client.question().reject(&question.id).await.map_err(|e| {
1719                        ToolError::Internal(format!("Failed to reject question: {e}"))
1720                    })?;
1721
1722                    let outcome = OrchestratorRunTool::new(Arc::clone(&server_handle))
1723                        .run_impl_outcome(OrchestratorRunInput {
1724                            session_id: Some(input.session_id),
1725                            command: None,
1726                            message: None,
1727                            wait_for_activity: None,
1728                        }, &ctx)
1729                        .await?;
1730                    Ok((outcome.output, outcome.log_meta))
1731                }
1732            }
1733        }
1734        .await;
1735
1736            match result {
1737                Ok((output, log_meta)) => {
1738                    log_tool_success(&timer, Self::NAME, &request, &output, log_meta, true);
1739                    Ok(output)
1740                }
1741                Err(error) => {
1742                    log_tool_error(&timer, Self::NAME, &request, &error);
1743                    Err(error)
1744                }
1745            }
1746        })
1747    }
1748}
1749
1750// ============================================================================
1751// Registry builder
1752// ============================================================================
1753
1754/// Build the tool registry with all orchestrator tools.
1755///
1756/// The shared handle lazily starts or recovers the server on tool entry.
1757pub fn build_registry(server: &Arc<OrchestratorServerHandle>) -> ToolRegistry {
1758    ToolRegistry::builder()
1759        .register::<OrchestratorRunTool, ()>(OrchestratorRunTool::new(Arc::clone(server)))
1760        .register::<ListSessionsTool, ()>(ListSessionsTool::new(Arc::clone(server)))
1761        .register::<GetSessionStateTool, ()>(GetSessionStateTool::new(Arc::clone(server)))
1762        .register::<ListCommandsTool, ()>(ListCommandsTool::new(Arc::clone(server)))
1763        .register::<RespondPermissionTool, ()>(RespondPermissionTool::new(Arc::clone(server)))
1764        .register::<RespondQuestionTool, ()>(RespondQuestionTool::new(Arc::clone(server)))
1765        .finish()
1766}
1767
1768#[cfg(test)]
1769mod tests {
1770    use super::*;
1771    use agentic_tools_core::Tool;
1772
1773    #[test]
1774    fn tool_names_are_short() {
1775        assert_eq!(<OrchestratorRunTool as Tool>::NAME, "run");
1776        assert_eq!(<ListSessionsTool as Tool>::NAME, "list_sessions");
1777        assert_eq!(<GetSessionStateTool as Tool>::NAME, "get_session_state");
1778        assert_eq!(<ListCommandsTool as Tool>::NAME, "list_commands");
1779        assert_eq!(<RespondPermissionTool as Tool>::NAME, "respond_permission");
1780        assert_eq!(<RespondQuestionTool as Tool>::NAME, "respond_question");
1781    }
1782
1783    #[test]
1784    fn last_activity_falls_back_to_session_timestamp_when_messages_are_empty() {
1785        assert_eq!(get_last_activity_time(&[], Some(1_234)), Some(1_234));
1786    }
1787}