Skip to main content

opencode_orchestrator_mcp/
tools.rs

1//! Tool implementations for orchestrator MCP server.
2
3use crate::server::OrchestratorServer;
4use crate::token_tracker::TokenTracker;
5use crate::types::CommandInfo;
6use crate::types::ListCommandsInput;
7use crate::types::ListCommandsOutput;
8use crate::types::ListSessionsInput;
9use crate::types::ListSessionsOutput;
10use crate::types::OrchestratorRunInput;
11use crate::types::OrchestratorRunOutput;
12use crate::types::PermissionReply;
13use crate::types::RespondPermissionInput;
14use crate::types::RespondPermissionOutput;
15use crate::types::RunStatus;
16use crate::types::SessionSummary;
17use agentic_tools_core::Tool;
18use agentic_tools_core::ToolContext;
19use agentic_tools_core::ToolError;
20use agentic_tools_core::ToolRegistry;
21use futures::future::BoxFuture;
22use opencode_rs::types::event::Event;
23use opencode_rs::types::message::CommandRequest;
24use opencode_rs::types::message::PromptPart;
25use opencode_rs::types::message::PromptRequest;
26use opencode_rs::types::permission::PermissionReply as ApiPermissionReply;
27use opencode_rs::types::permission::PermissionReplyRequest;
28use opencode_rs::types::session::CreateSessionRequest;
29use opencode_rs::types::session::SessionStatusInfo;
30use opencode_rs::types::session::SummarizeRequest;
31use std::sync::Arc;
32use std::time::Duration;
33use tokio::sync::OnceCell;
34use tokio::task::JoinHandle;
35
36// ============================================================================
37// run
38// ============================================================================
39
40/// Tool for starting or resuming `OpenCode` sessions.
41///
42/// Handles session creation, prompt/command execution, SSE event monitoring,
43/// and permission request detection. Returns when the session completes or
44/// when a permission is requested.
45#[derive(Clone)]
46pub struct OrchestratorRunTool {
47    server: Arc<OnceCell<OrchestratorServer>>,
48}
49
50impl OrchestratorRunTool {
51    /// Create a new `OrchestratorRunTool` with the given server cell.
52    pub fn new(server: Arc<OnceCell<OrchestratorServer>>) -> Self {
53        Self { server }
54    }
55
56    /// Finalize a completed session by fetching messages and optionally triggering summarization.
57    ///
58    /// This is called when we detect the session is idle, either via SSE `SessionIdle` event
59    /// or via polling `sessions().status()`.
60    ///
61    /// Uses bounded retry with backoff (0/50/100/200/400ms) if assistant text is not immediately
62    /// available, handling the race condition where the session becomes idle before messages
63    /// are fully persisted.
64    async fn finalize_completed(
65        client: &opencode_rs::Client,
66        session_id: String,
67        token_tracker: &TokenTracker,
68        mut warnings: Vec<String>,
69    ) -> Result<OrchestratorRunOutput, ToolError> {
70        // Bounded backoff delays for message extraction retry (~750ms total budget)
71        const BACKOFFS_MS: &[u64] = &[0, 50, 100, 200, 400];
72
73        let mut response: Option<String> = None;
74
75        for (attempt, &delay_ms) in BACKOFFS_MS.iter().enumerate() {
76            if delay_ms > 0 {
77                tokio::time::sleep(Duration::from_millis(delay_ms)).await;
78            }
79
80            let messages = client
81                .messages()
82                .list(&session_id)
83                .await
84                .map_err(|e| ToolError::Internal(format!("Failed to list messages: {e}")))?;
85
86            response = OrchestratorServer::extract_assistant_text(&messages);
87
88            if response.is_some() {
89                if attempt > 0 {
90                    tracing::debug!(
91                        session_id = %session_id,
92                        attempt,
93                        "assistant response became available after retry"
94                    );
95                }
96                break;
97            }
98        }
99
100        if response.is_none() {
101            tracing::debug!(
102                session_id = %session_id,
103                "no assistant response found after bounded retry"
104            );
105        }
106
107        // Handle context limit summarization if needed
108        if token_tracker.compaction_needed
109            && let (Some(pid), Some(mid)) = (&token_tracker.provider_id, &token_tracker.model_id)
110        {
111            let summarize_req = SummarizeRequest {
112                provider_id: pid.clone(),
113                model_id: mid.clone(),
114                auto: None,
115            };
116
117            match client
118                .sessions()
119                .summarize(&session_id, &summarize_req)
120                .await
121            {
122                Ok(_) => {
123                    tracing::info!(session_id = %session_id, "context summarization triggered");
124                    warnings.push("Context limit reached; summarization triggered".into());
125                }
126                Err(e) => {
127                    tracing::warn!(session_id = %session_id, error = %e, "summarization failed");
128                    warnings.push(format!("Summarization failed: {e}"));
129                }
130            }
131        }
132
133        Ok(OrchestratorRunOutput {
134            session_id,
135            status: RunStatus::Completed,
136            response,
137            partial_response: None,
138            permission_request_id: None,
139            permission_type: None,
140            permission_patterns: vec![],
141            warnings,
142        })
143    }
144
145    pub async fn run_impl(
146        &self,
147        input: OrchestratorRunInput,
148    ) -> Result<OrchestratorRunOutput, ToolError> {
149        // Input validation
150        if input.session_id.is_none() && input.message.is_none() && input.command.is_none() {
151            return Err(ToolError::InvalidInput(
152                "Either session_id (to resume/check status) or message/command (to start work) is required"
153                    .into(),
154            ));
155        }
156
157        if input.command.is_some() && input.message.is_none() {
158            return Err(ToolError::InvalidInput(
159                "message is required when command is specified (becomes $ARGUMENTS for template expansion)"
160                    .into(),
161            ));
162        }
163
164        // Trim and validate message content
165        let message = input.message.map(|m| m.trim().to_string());
166        if let Some(ref m) = message
167            && m.is_empty()
168        {
169            return Err(ToolError::InvalidInput(
170                "message cannot be empty or whitespace-only".into(),
171            ));
172        }
173
174        let wait_for_activity = input.wait_for_activity.unwrap_or(false);
175
176        // Lazy initialization: spawn server on first tool call
177        let server = self
178            .server
179            .get_or_try_init(OrchestratorServer::start_lazy)
180            .await
181            .map_err(|e| ToolError::Internal(e.to_string()))?;
182
183        let client = server.client();
184
185        tracing::debug!(
186            command = ?input.command,
187            has_message = message.is_some(),
188            message_len = message.as_ref().map(String::len),
189            session_id = ?input.session_id,
190            "run: starting"
191        );
192
193        // 1. Resolve session: validate existing or create new
194        let session_id = if let Some(sid) = input.session_id {
195            // Validate session exists
196            client.sessions().get(&sid).await.map_err(|e| {
197                if e.is_not_found() {
198                    ToolError::InvalidInput(format!(
199                        "Session '{sid}' not found. Use list_sessions to discover sessions, \
200                         or omit session_id to create a new session."
201                    ))
202                } else {
203                    ToolError::Internal(format!("Failed to get session: {e}"))
204                }
205            })?;
206            sid
207        } else {
208            // Create new session
209            let session = client
210                .sessions()
211                .create(&CreateSessionRequest::default())
212                .await
213                .map_err(|e| ToolError::Internal(format!("Failed to create session: {e}")))?;
214            session.id
215        };
216
217        tracing::info!(session_id = %session_id, "run: session resolved");
218
219        // 2. Check if session is already idle (for resume-only case)
220        let status = client
221            .sessions()
222            .status_for(&session_id)
223            .await
224            .map_err(|e| ToolError::Internal(format!("Failed to get session status: {e}")))?;
225
226        let is_idle = matches!(status, SessionStatusInfo::Idle);
227
228        // 3. Check for pending permissions before doing anything else
229        let pending_permissions = client
230            .permissions()
231            .list()
232            .await
233            .map_err(|e| ToolError::Internal(format!("Failed to list permissions: {e}")))?;
234
235        let my_permission = pending_permissions
236            .into_iter()
237            .find(|p| p.session_id == session_id);
238
239        if let Some(perm) = my_permission {
240            tracing::info!(
241                session_id = %session_id,
242                permission_type = %perm.permission,
243                "run: pending permission found"
244            );
245            return Ok(OrchestratorRunOutput {
246                session_id,
247                status: RunStatus::PermissionRequired,
248                response: None,
249                partial_response: None,
250                permission_request_id: Some(perm.id),
251                permission_type: Some(perm.permission),
252                permission_patterns: perm.patterns,
253                warnings: vec![],
254            });
255        }
256
257        // 4. If no message/command and session is idle, just return current state
258        // Uses finalize_completed to get retry logic for message extraction
259        if message.is_none() && input.command.is_none() && is_idle && !wait_for_activity {
260            let token_tracker = TokenTracker::with_threshold(server.compaction_threshold());
261            return Self::finalize_completed(client, session_id, &token_tracker, vec![]).await;
262        }
263
264        // 5. Subscribe to SSE BEFORE sending prompt/command
265        let mut subscription = client
266            .subscribe_session(&session_id)
267            .map_err(|e| ToolError::Internal(format!("Failed to subscribe to session: {e}")))?;
268
269        // 6. Kick off the work
270        let mut command_task: Option<JoinHandle<Result<(), String>>> = None;
271        let mut command_name_for_logging: Option<String> = None;
272
273        if let Some(command) = &input.command {
274            command_name_for_logging = Some(command.clone());
275
276            let cmd_client = client.clone();
277            let cmd_session_id = session_id.clone();
278            let cmd_name = command.clone();
279            let cmd_arguments = message.clone().unwrap_or_default();
280
281            command_task = Some(tokio::spawn(async move {
282                let req = CommandRequest {
283                    command: cmd_name,
284                    arguments: cmd_arguments,
285                    message_id: None,
286                };
287
288                cmd_client
289                    .messages()
290                    .command(&cmd_session_id, &req)
291                    .await
292                    .map(|_| ())
293                    .map_err(|e| e.to_string())
294            }));
295        } else if let Some(msg) = &message {
296            // Send prompt asynchronously
297            let req = PromptRequest {
298                parts: vec![PromptPart::Text {
299                    text: msg.clone(),
300                    synthetic: None,
301                    ignored: None,
302                    metadata: None,
303                }],
304                message_id: None,
305                model: None,
306                agent: None,
307                no_reply: None,
308                system: None,
309                variant: None,
310            };
311
312            client
313                .messages()
314                .prompt_async(&session_id, &req)
315                .await
316                .map_err(|e| ToolError::Internal(format!("Failed to send prompt: {e}")))?;
317        }
318
319        // 7. Event loop: wait for completion or permission
320        // Overall timeout to prevent infinite hangs (configurable, default 1 hour)
321        let deadline = tokio::time::Instant::now() + server.session_deadline();
322        let inactivity_timeout = server.inactivity_timeout();
323        let mut last_activity_time = tokio::time::Instant::now();
324
325        tracing::debug!(session_id = %session_id, "run: entering event loop");
326        let mut token_tracker = TokenTracker::with_threshold(server.compaction_threshold());
327        let mut partial_response = String::new();
328        let warnings = Vec::new();
329
330        let mut poll_interval = tokio::time::interval(Duration::from_secs(1));
331        poll_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
332
333        // Track whether this call is dispatching new work (command or message)
334        // vs just resuming/monitoring an existing session
335        let dispatched_new_work = input.command.is_some() || message.is_some() || wait_for_activity;
336
337        // Track whether we've observed the session as busy at least once.
338        // This prevents completing immediately if we call run_impl on an already-idle
339        // session before our new work has started processing.
340        let mut observed_busy = false;
341
342        // Track whether SSE is still active. If the stream closes, we fall back
343        // to polling-only mode rather than returning an error.
344        let mut sse_active = true;
345
346        // === Post-subscribe status re-check (latency optimization) ===
347        // If we're just monitoring (no new work dispatched), check if session is already idle.
348        // This handles the race where session completed between our initial status check
349        // and SSE subscription becoming ready.
350        if !dispatched_new_work
351            && let Ok(status) = client.sessions().status_for(&session_id).await
352            && matches!(status, SessionStatusInfo::Idle)
353        {
354            tracing::debug!(
355                session_id = %session_id,
356                "session already idle on post-subscribe check"
357            );
358            return Self::finalize_completed(client, session_id, &token_tracker, warnings).await;
359        }
360        // If check fails or session is busy, continue to event loop
361
362        loop {
363            // Check timeout before processing
364            let now = tokio::time::Instant::now();
365
366            if now.duration_since(last_activity_time) >= inactivity_timeout {
367                return Err(ToolError::Internal(format!(
368                    "Session idle timeout: no activity for 5 minutes (session_id={session_id}). \
369                     The session may still be running; use run(session_id=...) to check status."
370                )));
371            }
372
373            if now >= deadline {
374                return Err(ToolError::Internal(
375                    "Session execution timed out after 1 hour. \
376                     The session may still be running; use run with the session_id to check status."
377                        .into(),
378                ));
379            }
380
381            let command_task_active = command_task.is_some();
382
383            tokio::select! {
384                maybe_event = subscription.recv(), if sse_active => {
385                    let Some(event) = maybe_event else {
386                        // SSE stream closed - this can happen due to network issues,
387                        // server restarts, or connection timeouts. Fall back to polling
388                        // rather than failing immediately.
389                        tracing::warn!(
390                            session_id = %session_id,
391                            "SSE stream closed unexpectedly; falling back to polling-only mode"
392                        );
393                        sse_active = false;
394                        continue; // The poll_interval branch will now drive completion detection
395                    };
396
397                    // Track tokens (server is already initialized at this point)
398                    token_tracker.observe_event(&event, |pid, mid| {
399                        server.context_limit(pid, mid)
400                    });
401
402                    match event {
403                        Event::PermissionAsked { properties } => {
404                            tracing::info!(
405                                session_id = %session_id,
406                                permission_type = %properties.request.permission,
407                                "run: permission requested"
408                            );
409                            return Ok(OrchestratorRunOutput {
410                                session_id,
411                                status: RunStatus::PermissionRequired,
412                                response: None,
413                                partial_response: if partial_response.is_empty() {
414                                    None
415                                } else {
416                                    Some(partial_response)
417                                },
418                                permission_request_id: Some(properties.request.id),
419                                permission_type: Some(properties.request.permission),
420                                permission_patterns: properties.request.patterns,
421                                warnings,
422                            });
423                        }
424
425                        Event::MessagePartUpdated { properties } => {
426                            last_activity_time = tokio::time::Instant::now();
427                            // Message streaming means session is actively processing
428                            observed_busy = true;
429                            // Collect streaming text
430                            if let Some(delta) = &properties.delta {
431                                partial_response.push_str(delta);
432                            }
433                        }
434
435                        Event::SessionError { properties } => {
436                            let error_msg = properties
437                                .error
438                                .map_or_else(|| "Unknown error".to_string(), |e| format!("{e:?}"));
439                            tracing::error!(
440                                session_id = %session_id,
441                                error = %error_msg,
442                                "run: session error"
443                            );
444                            return Err(ToolError::Internal(format!("Session error: {error_msg}")));
445                        }
446
447                        Event::SessionIdle { .. } => {
448                            tracing::debug!(session_id = %session_id, "received SessionIdle event");
449                            return Self::finalize_completed(client, session_id, &token_tracker, warnings).await;
450                        }
451
452                        _ => {
453                            // Other events - continue
454                        }
455                    }
456                }
457
458                _ = poll_interval.tick() => {
459                    // === 1. Permission fallback (check first, permissions take priority) ===
460                    let pending = match client.permissions().list().await {
461                        Ok(p) => p,
462                        Err(e) => {
463                            // Log but continue - permission list failure shouldn't block completion detection
464                            tracing::warn!(
465                                session_id = %session_id,
466                                error = %e,
467                                "failed to list permissions during poll fallback"
468                            );
469                            vec![]
470                        }
471                    };
472
473                    if let Some(perm) = pending.into_iter().find(|p| p.session_id == session_id) {
474                        tracing::debug!(
475                            session_id = %session_id,
476                            permission_id = %perm.id,
477                            "detected pending permission via polling fallback"
478                        );
479                        return Ok(OrchestratorRunOutput {
480                            session_id,
481                            status: RunStatus::PermissionRequired,
482                            response: None,
483                            partial_response: if partial_response.is_empty() {
484                                None
485                            } else {
486                                Some(partial_response)
487                            },
488                            permission_request_id: Some(perm.id),
489                            permission_type: Some(perm.permission),
490                            permission_patterns: perm.patterns,
491                            warnings,
492                        });
493                    }
494
495                    // === 2. Session idle detection fallback (NEW) ===
496                    // This is the key fix for race conditions. If SSE missed SessionIdle,
497                    // we detect completion via polling sessions().status_for(session_id).
498                    match client.sessions().status_for(&session_id).await {
499                        Ok(SessionStatusInfo::Busy | SessionStatusInfo::Retry { .. }) => {
500                            last_activity_time = tokio::time::Instant::now();
501                            observed_busy = true;
502                            tracing::trace!(
503                                session_id = %session_id,
504                                "our session is busy/retry, waiting"
505                            );
506                        }
507                        Ok(SessionStatusInfo::Idle) => {
508                            if !dispatched_new_work || observed_busy {
509                                // Session is idle AND either:
510                                // - We didn't dispatch new work (just monitoring), OR
511                                // - We did dispatch work and have seen it become busy at least once
512                                //
513                                // This guards against completing before our work starts processing.
514                                tracing::debug!(
515                                    session_id = %session_id,
516                                    dispatched_new_work = dispatched_new_work,
517                                    observed_busy = observed_busy,
518                                    "detected session idle via polling fallback"
519                                );
520                                return Self::finalize_completed(client, session_id, &token_tracker, warnings).await;
521                            }
522
523                            // Session is idle but we dispatched work and haven't seen busy yet.
524                            // This likely means our work hasn't started processing.
525                            // Wait for next poll tick.
526                            tracing::trace!(
527                                session_id = %session_id,
528                                "session idle but work may not have started yet, waiting"
529                            );
530                        }
531                        Err(e) => {
532                            // Log but continue - status check failure shouldn't block the loop
533                            tracing::warn!(
534                                session_id = %session_id,
535                                error = %e,
536                                "failed to get session status during poll fallback"
537                            );
538                        }
539                    }
540                }
541
542                cmd_result = async {
543                    match command_task.as_mut() {
544                        Some(handle) => Some(handle.await),
545                        None => {
546                            std::future::pending::<
547                                Option<Result<Result<(), String>, tokio::task::JoinError>>,
548                            >()
549                            .await
550                        }
551                    }
552                }, if command_task_active => {
553                    match cmd_result {
554                        Some(Ok(Ok(()))) => {
555                            tracing::debug!(
556                                session_id = %session_id,
557                                command = ?command_name_for_logging,
558                                "run: command dispatch completed successfully"
559                            );
560                            command_task = None;
561                        }
562                        Some(Ok(Err(e))) => {
563                            tracing::error!(
564                                session_id = %session_id,
565                                command = ?command_name_for_logging,
566                                error = %e,
567                                "run: command dispatch failed"
568                            );
569                            return Err(ToolError::Internal(format!(
570                                "Failed to execute command '{}': {e}",
571                                command_name_for_logging.as_deref().unwrap_or("unknown")
572                            )));
573                        }
574                        Some(Err(join_err)) => {
575                            tracing::error!(
576                                session_id = %session_id,
577                                command = ?command_name_for_logging,
578                                error = %join_err,
579                                "run: command task panicked"
580                            );
581                            return Err(ToolError::Internal(format!("Command task panicked: {join_err}")));
582                        }
583                        None => {
584                            unreachable!("command_task_active guard should prevent None");
585                        }
586                    }
587                }
588            }
589        }
590    }
591}
592
593impl Tool for OrchestratorRunTool {
594    type Input = OrchestratorRunInput;
595    type Output = OrchestratorRunOutput;
596    const NAME: &'static str = "run";
597    const DESCRIPTION: &'static str = r#"Start or resume an OpenCode session. Optionally run a named command or send a raw prompt.
598
599Returns when:
600- status=completed: Session finished executing. Response contains final assistant output.
601- status=permission_required: Session needs permission approval. Call respond_permission to continue.
602
603Parameters:
604- session_id: Existing session to resume (omit to create new)
605- command: OpenCode command name (e.g., "research", "implement_plan")
606- message: Prompt text or $ARGUMENTS for command template
607
608Examples:
609- New session with prompt: run(message="explain this code")
610- New session with command: run(command="research", message="caching strategies")
611- Resume session: run(session_id="...", message="continue")
612- Check status: run(session_id="...")"#;
613
614    fn call(
615        &self,
616        input: Self::Input,
617        _ctx: &ToolContext,
618    ) -> BoxFuture<'static, Result<Self::Output, ToolError>> {
619        let this = self.clone();
620        Box::pin(async move { this.run_impl(input).await })
621    }
622}
623
624// ============================================================================
625// list_sessions
626// ============================================================================
627
628/// Tool for listing available `OpenCode` sessions in the current directory.
629#[derive(Clone)]
630pub struct ListSessionsTool {
631    server: Arc<OnceCell<OrchestratorServer>>,
632}
633
634impl ListSessionsTool {
635    /// Create a new `ListSessionsTool` with the given server cell.
636    pub fn new(server: Arc<OnceCell<OrchestratorServer>>) -> Self {
637        Self { server }
638    }
639}
640
641impl Tool for ListSessionsTool {
642    type Input = ListSessionsInput;
643    type Output = ListSessionsOutput;
644    const NAME: &'static str = "list_sessions";
645    const DESCRIPTION: &'static str =
646        "List available OpenCode sessions in the current directory context.";
647
648    fn call(
649        &self,
650        input: Self::Input,
651        _ctx: &ToolContext,
652    ) -> BoxFuture<'static, Result<Self::Output, ToolError>> {
653        let server_cell = Arc::clone(&self.server);
654        Box::pin(async move {
655            let server = server_cell
656                .get_or_try_init(OrchestratorServer::start_lazy)
657                .await
658                .map_err(|e| ToolError::Internal(e.to_string()))?;
659
660            let sessions = server
661                .client()
662                .sessions()
663                .list()
664                .await
665                .map_err(|e| ToolError::Internal(format!("Failed to list sessions: {e}")))?;
666
667            let limit = input.limit.unwrap_or(20);
668            let summaries: Vec<SessionSummary> = sessions
669                .into_iter()
670                .take(limit)
671                .map(|s| SessionSummary {
672                    id: s.id,
673                    title: s.title,
674                    updated: s.time.as_ref().map(|t| t.updated),
675                })
676                .collect();
677
678            Ok(ListSessionsOutput {
679                sessions: summaries,
680            })
681        })
682    }
683}
684
685// ============================================================================
686// list_commands
687// ============================================================================
688
689/// Tool for listing available `OpenCode` commands that can be executed.
690#[derive(Clone)]
691pub struct ListCommandsTool {
692    server: Arc<OnceCell<OrchestratorServer>>,
693}
694
695impl ListCommandsTool {
696    /// Create a new `ListCommandsTool` with the given server cell.
697    pub fn new(server: Arc<OnceCell<OrchestratorServer>>) -> Self {
698        Self { server }
699    }
700}
701
702impl Tool for ListCommandsTool {
703    type Input = ListCommandsInput;
704    type Output = ListCommandsOutput;
705    const NAME: &'static str = "list_commands";
706    const DESCRIPTION: &'static str = "List available OpenCode commands that can be used with run.";
707
708    fn call(
709        &self,
710        _input: Self::Input,
711        _ctx: &ToolContext,
712    ) -> BoxFuture<'static, Result<Self::Output, ToolError>> {
713        let server_cell = Arc::clone(&self.server);
714        Box::pin(async move {
715            let server = server_cell
716                .get_or_try_init(OrchestratorServer::start_lazy)
717                .await
718                .map_err(|e| ToolError::Internal(e.to_string()))?;
719
720            let commands = server
721                .client()
722                .tools()
723                .commands()
724                .await
725                .map_err(|e| ToolError::Internal(format!("Failed to list commands: {e}")))?;
726
727            let command_infos: Vec<CommandInfo> = commands
728                .into_iter()
729                .map(|c| CommandInfo {
730                    name: c.name,
731                    description: c.description,
732                })
733                .collect();
734
735            Ok(ListCommandsOutput {
736                commands: command_infos,
737            })
738        })
739    }
740}
741
742// ============================================================================
743// respond_permission
744// ============================================================================
745
746/// Tool for responding to permission requests from `OpenCode` sessions.
747///
748/// After sending the reply, continues monitoring the session and returns
749/// when the session completes or another permission is requested.
750#[derive(Clone)]
751pub struct RespondPermissionTool {
752    server: Arc<OnceCell<OrchestratorServer>>,
753}
754
755impl RespondPermissionTool {
756    /// Create a new `RespondPermissionTool` with the given server cell.
757    pub fn new(server: Arc<OnceCell<OrchestratorServer>>) -> Self {
758        Self { server }
759    }
760}
761
762impl Tool for RespondPermissionTool {
763    type Input = RespondPermissionInput;
764    type Output = RespondPermissionOutput;
765    const NAME: &'static str = "respond_permission";
766    const DESCRIPTION: &'static str = r#"Respond to a permission request from an OpenCode session.
767
768After responding, continues monitoring the session and returns when complete or when another permission is required.
769
770Parameters:
771- session_id: Session with pending permission
772- reply: "once" (allow this request), "always" (allow for matching patterns), or "reject" (deny)
773- message: Optional message to include with reply"#;
774
775    fn call(
776        &self,
777        input: Self::Input,
778        _ctx: &ToolContext,
779    ) -> BoxFuture<'static, Result<Self::Output, ToolError>> {
780        let server_cell = Arc::clone(&self.server);
781        Box::pin(async move {
782            let server = server_cell
783                .get_or_try_init(OrchestratorServer::start_lazy)
784                .await
785                .map_err(|e| ToolError::Internal(e.to_string()))?;
786
787            let client = server.client();
788
789            // Find the pending permission for this session
790            let mut pending = client
791                .permissions()
792                .list()
793                .await
794                .map_err(|e| ToolError::Internal(format!("Failed to list permissions: {e}")))?;
795
796            let perm = if let Some(req_id) = input.permission_request_id.as_deref() {
797                let idx = pending.iter().position(|p| p.id == req_id).ok_or_else(|| {
798                    ToolError::InvalidInput(format!(
799                        "No pending permission found with id '{req_id}'. \
800                         (session_id='{}')",
801                        input.session_id
802                    ))
803                })?;
804
805                let perm = pending.remove(idx);
806
807                if perm.session_id != input.session_id {
808                    return Err(ToolError::InvalidInput(format!(
809                        "Permission request '{req_id}' belongs to session '{}', not '{}'.",
810                        perm.session_id, input.session_id
811                    )));
812                }
813
814                perm
815            } else {
816                let mut perms: Vec<_> = pending
817                    .into_iter()
818                    .filter(|p| p.session_id == input.session_id)
819                    .collect();
820
821                match perms.as_slice() {
822                    [] => {
823                        return Err(ToolError::InvalidInput(format!(
824                            "No pending permission found for session '{}'. \
825                             The permission may have already been responded to.",
826                            input.session_id
827                        )));
828                    }
829                    [_single] => perms.swap_remove(0),
830                    multiple => {
831                        let ids = multiple
832                            .iter()
833                            .map(|p| p.id.as_str())
834                            .collect::<Vec<_>>()
835                            .join(", ");
836                        return Err(ToolError::InvalidInput(format!(
837                            "Multiple pending permissions found for session '{}': {ids}. \
838                             Please retry with permission_request_id (returned by run).",
839                            input.session_id
840                        )));
841                    }
842                }
843            };
844
845            // Track if this is a rejection for post-processing
846            let is_reject = matches!(input.reply, PermissionReply::Reject);
847
848            // Capture permission details for warning message
849            let permission_type = perm.permission.clone();
850            let permission_patterns = perm.patterns.clone();
851
852            // Capture baseline assistant text BEFORE sending reject
853            // This lets us detect stale text after rejection
854            let mut pre_warnings: Vec<String> = Vec::new();
855            let baseline = if is_reject {
856                match client.messages().list(&input.session_id).await {
857                    Ok(msgs) => OrchestratorServer::extract_assistant_text(&msgs),
858                    Err(e) => {
859                        pre_warnings.push(format!("Failed to fetch baseline messages: {e}"));
860                        None
861                    }
862                }
863            } else {
864                None
865            };
866
867            // Convert our reply type to API type
868            let api_reply = match input.reply {
869                PermissionReply::Once => ApiPermissionReply::Once,
870                PermissionReply::Always => ApiPermissionReply::Always,
871                PermissionReply::Reject => ApiPermissionReply::Reject,
872            };
873
874            // Send the reply
875            client
876                .permissions()
877                .reply(
878                    &perm.id,
879                    &PermissionReplyRequest {
880                        reply: api_reply,
881                        message: input.message,
882                    },
883                )
884                .await
885                .map_err(|e| ToolError::Internal(format!("Failed to reply to permission: {e}")))?;
886
887            // Now continue monitoring the session using run logic
888            let run_tool = OrchestratorRunTool::new(Arc::clone(&server_cell));
889            let wait_for_activity = (!is_reject).then_some(true);
890            let mut out = run_tool
891                .run_impl(OrchestratorRunInput {
892                    session_id: Some(input.session_id),
893                    command: None,
894                    message: None,
895                    wait_for_activity,
896                })
897                .await?;
898
899            // Merge pre-warnings
900            out.warnings.extend(pre_warnings);
901
902            // Apply rejection-aware output mutation
903            if is_reject && matches!(out.status, RunStatus::Completed) {
904                let final_resp = out.response.as_deref();
905                let baseline_resp = baseline.as_deref();
906
907                // If response unchanged or None, it's stale pre-rejection text
908                if final_resp.is_none() || final_resp == baseline_resp {
909                    out.response = None;
910                    let patterns_str = if permission_patterns.is_empty() {
911                        "(none)".to_string()
912                    } else {
913                        permission_patterns.join(", ")
914                    };
915                    out.warnings.push(format!(
916                        "Permission rejected for '{permission_type}'. Patterns: {patterns_str}. \
917                         Session stopped without generating a new assistant response."
918                    ));
919                    tracing::debug!(
920                        permission_type = %permission_type,
921                        "rejection override applied: response set to None"
922                    );
923                }
924            }
925
926            Ok(out)
927        })
928    }
929}
930
931// ============================================================================
932// Registry builder
933// ============================================================================
934
935/// Build the tool registry with all orchestrator tools.
936///
937/// The server cell is lazily initialized on first tool call.
938pub fn build_registry(server: &Arc<OnceCell<OrchestratorServer>>) -> ToolRegistry {
939    ToolRegistry::builder()
940        .register::<OrchestratorRunTool, ()>(OrchestratorRunTool::new(Arc::clone(server)))
941        .register::<ListSessionsTool, ()>(ListSessionsTool::new(Arc::clone(server)))
942        .register::<ListCommandsTool, ()>(ListCommandsTool::new(Arc::clone(server)))
943        .register::<RespondPermissionTool, ()>(RespondPermissionTool::new(Arc::clone(server)))
944        .finish()
945}
946
947#[cfg(test)]
948mod tests {
949    use super::*;
950    use agentic_tools_core::Tool;
951
952    #[test]
953    fn tool_names_are_short() {
954        assert_eq!(<OrchestratorRunTool as Tool>::NAME, "run");
955        assert_eq!(<ListSessionsTool as Tool>::NAME, "list_sessions");
956        assert_eq!(<ListCommandsTool as Tool>::NAME, "list_commands");
957        assert_eq!(<RespondPermissionTool as Tool>::NAME, "respond_permission");
958    }
959}