Skip to main content

sandbox_agent_opencode_adapter/
lib.rs

1use std::collections::{HashMap, HashSet, VecDeque};
2use std::convert::Infallible;
3use std::future::Future;
4use std::path::PathBuf;
5use std::pin::Pin;
6use std::str::FromStr;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::{Arc, Mutex as StdMutex};
9use std::time::{Duration, SystemTime, UNIX_EPOCH};
10
11use axum::body::Body;
12use axum::extract::{Path, Query, State};
13use axum::http::{header, HeaderMap, HeaderName, HeaderValue, Request, StatusCode};
14use axum::middleware::Next;
15use axum::response::sse::{Event, KeepAlive};
16use axum::response::{IntoResponse, Response, Sse};
17use axum::routing::{get, patch, post};
18use axum::{Json, Router};
19use futures::stream;
20use futures::{Stream, StreamExt};
21use sandbox_agent_opencode_server_manager::OpenCodeServerManager;
22use serde::{Deserialize, Serialize};
23use serde_json::{json, Value};
24use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions};
25use sqlx::{Row, SqlitePool};
26use tokio::sync::{broadcast, Mutex, OnceCell};
27use tokio::time::interval;
28use tracing::warn;
29
30const DEFAULT_REPLAY_MAX_EVENTS: usize = 50;
31const DEFAULT_REPLAY_MAX_CHARS: usize = 12_000;
32const EVENT_LOG_SIZE: usize = 4096;
33const EVENT_CHANNEL_SIZE: usize = 2048;
34const MODEL_CHANGE_ERROR: &str = "OpenCode compatibility currently does not support changing the model after creating a session. Export with /export and load in to a new session.";
35
36// ---------------------------------------------------------------------------
37// AcpDispatch trait — allows the adapter to dispatch to real ACP agents
38// without depending on the `sandbox-agent` crate (which would be circular).
39// ---------------------------------------------------------------------------
40
41/// Stream of raw JSON-RPC payloads from the ACP agent process.
42pub type AcpPayloadStream = Pin<Box<dyn Stream<Item = Value> + Send>>;
43
44#[derive(Debug)]
45pub enum AcpDispatchResult {
46    Response(Value),
47    Accepted,
48}
49
50/// Trait for dispatching JSON-RPC payloads to ACP agent process instances.
51///
52/// Implementors (e.g. `AcpProxyRuntime`) handle launching, bootstrapping, and
53/// communicating with agent subprocesses via the ACP stdio bridge.
54pub trait AcpDispatch: Send + Sync + 'static {
55    /// Send a JSON-RPC payload to the agent process identified by `server_id`.
56    /// If the instance does not exist yet and `bootstrap_agent` is provided,
57    /// the implementation should create it for that agent.
58    fn post(
59        &self,
60        server_id: &str,
61        bootstrap_agent: Option<&str>,
62        payload: Value,
63    ) -> Pin<Box<dyn Future<Output = Result<AcpDispatchResult, String>> + Send + '_>>;
64
65    /// Open a stream of raw JSON-RPC notification payloads from the agent
66    /// process. Each item is a `serde_json::Value` containing a complete
67    /// JSON-RPC message (notification or response).
68    fn notification_stream(
69        &self,
70        server_id: &str,
71        last_event_id: Option<u64>,
72    ) -> Pin<Box<dyn Future<Output = Result<AcpPayloadStream, String>> + Send + '_>>;
73
74    /// Destroy the agent process instance.
75    fn delete(
76        &self,
77        server_id: &str,
78    ) -> Pin<Box<dyn Future<Output = Result<(), String>> + Send + '_>>;
79}
80
81pub struct OpenCodeAdapterConfig {
82    pub auth_token: Option<String>,
83    pub sqlite_path: Option<String>,
84    pub replay_max_events: usize,
85    pub replay_max_chars: usize,
86    pub native_proxy_base_url: Option<String>,
87    pub native_proxy_manager: Option<Arc<OpenCodeServerManager>>,
88    /// Optional ACP dispatch backend. When `Some`, prompts for non-mock agents
89    /// are routed through real ACP agent processes instead of the mock handler.
90    pub acp_dispatch: Option<Arc<dyn AcpDispatch>>,
91    /// Optional pre-built provider payload for `/provider` and `/config/providers`.
92    /// When `None`, falls back to the hardcoded mock/amp/claude/codex list.
93    pub provider_payload: Option<Value>,
94}
95
96impl Default for OpenCodeAdapterConfig {
97    fn default() -> Self {
98        Self {
99            auth_token: None,
100            sqlite_path: None,
101            replay_max_events: DEFAULT_REPLAY_MAX_EVENTS,
102            replay_max_chars: DEFAULT_REPLAY_MAX_CHARS,
103            native_proxy_base_url: None,
104            native_proxy_manager: None,
105            acp_dispatch: None,
106            provider_payload: None,
107        }
108    }
109}
110
111#[derive(Clone, Debug)]
112struct OpenCodeStreamEvent {
113    id: u64,
114    payload: Value,
115}
116
117#[derive(Clone, Debug)]
118struct SessionState {
119    meta: SessionMeta,
120    messages: Vec<MessageRecord>,
121    status: String,
122    always_permissions: HashSet<String>,
123}
124
125#[derive(Clone, Debug)]
126struct MessageRecord {
127    info: Value,
128    parts: Vec<Value>,
129}
130
131#[derive(Clone, Debug, Serialize, Deserialize)]
132struct SessionMeta {
133    id: String,
134    slug: String,
135    project_id: String,
136    directory: String,
137    parent_id: Option<String>,
138    title: String,
139    version: String,
140    created_at: i64,
141    updated_at: i64,
142    share_url: Option<String>,
143    permission_mode: Option<String>,
144    agent: String,
145    provider_id: String,
146    model_id: String,
147    agent_session_id: String,
148    last_connection_id: String,
149    session_init_json: Option<Value>,
150    destroyed_at: Option<i64>,
151}
152
153#[derive(Debug, Clone, Default)]
154struct Projection {
155    sessions: HashMap<String, SessionState>,
156    permissions: HashMap<String, Value>,
157    questions: HashMap<String, Value>,
158}
159
160#[derive(Debug, Clone)]
161struct AcpPendingRequest {
162    opencode_session_id: String,
163    /// The JSON-RPC `id` from the ACP agent request (permission or question).
164    jsonrpc_id: Value,
165    kind: AcpPendingKind,
166}
167
168#[derive(Debug, Clone)]
169enum AcpPendingKind {
170    Permission,
171    Question,
172}
173
174struct AdapterState {
175    config: OpenCodeAdapterConfig,
176    sqlite_path: String,
177    sqlite_connect_options: SqliteConnectOptions,
178    proxy_http_client: reqwest::Client,
179    pool: OnceCell<SqlitePool>,
180    initialized: OnceCell<()>,
181    project_id: String,
182    projection: Mutex<Projection>,
183    pending_replay: Mutex<HashMap<String, String>>,
184    agent_connections: Mutex<HashMap<String, String>>,
185    event_broadcaster: broadcast::Sender<OpenCodeStreamEvent>,
186    event_log: StdMutex<VecDeque<OpenCodeStreamEvent>>,
187    next_event_id: AtomicU64,
188    next_id: AtomicU64,
189    /// Tracks which ACP server instances have been initialized (initialize + session/new sent).
190    /// Key is the ACP server_id (e.g. "acp_ses_42"), value is the ACP sessionId from session/new.
191    acp_initialized: Mutex<HashMap<String, String>>,
192    /// Maps pending ACP JSON-RPC request IDs to (opencode_session_id, request_kind).
193    /// Used to correlate permission/question requests from the agent SSE stream.
194    acp_request_ids: Mutex<HashMap<String, AcpPendingRequest>>,
195    /// Tracks the last user message ID per session so the SSE translation task
196    /// can set the correct `parentID` on assistant messages.
197    last_user_message_id: Mutex<HashMap<String, String>>,
198}
199
200impl AdapterState {
201    async fn ensure_initialized(&self) -> Result<(), String> {
202        self.initialized
203            .get_or_try_init(|| async {
204                let pool = self.pool().await?;
205                sqlx::query("PRAGMA journal_mode=WAL;")
206                    .execute(pool)
207                    .await
208                    .map_err(|err| err.to_string())?;
209                sqlx::query("PRAGMA synchronous=NORMAL;")
210                    .execute(pool)
211                    .await
212                    .map_err(|err| err.to_string())?;
213
214                // Keep migration SQL in versioned files and run bootstrap migration here.
215                sqlx::query(include_str!("../migrations/0001_init.sql"))
216                    .execute(pool)
217                    .await
218                    .map_err(|err| err.to_string())?;
219
220                self.rebuild_projection().await?;
221                Ok(())
222            })
223            .await
224            .map(|_| ())
225    }
226
227    async fn rebuild_projection(&self) -> Result<(), String> {
228        let mut projection = Projection::default();
229        let pool = self.pool().await?;
230
231        let rows = sqlx::query(
232            r#"SELECT s.id, s.agent, s.agent_session_id, s.last_connection_id, s.created_at, s.destroyed_at, s.session_init_json,
233                      m.metadata_json
234               FROM sessions s
235               JOIN opencode_session_metadata m ON m.session_id = s.id
236               ORDER BY s.created_at ASC, s.id ASC"#,
237        )
238        .fetch_all(pool)
239        .await
240        .map_err(|err| err.to_string())?;
241
242        for row in rows {
243            let id: String = row.try_get("id").map_err(|err| err.to_string())?;
244            let agent: String = row.try_get("agent").map_err(|err| err.to_string())?;
245            let agent_session_id: String = row
246                .try_get("agent_session_id")
247                .map_err(|err| err.to_string())?;
248            let last_connection_id: String = row
249                .try_get("last_connection_id")
250                .map_err(|err| err.to_string())?;
251            let created_at: i64 = row.try_get("created_at").map_err(|err| err.to_string())?;
252            let destroyed_at: Option<i64> =
253                row.try_get("destroyed_at").map_err(|err| err.to_string())?;
254            let session_init_json: Option<String> = row
255                .try_get("session_init_json")
256                .map_err(|err| err.to_string())?;
257            let metadata_json: String = row
258                .try_get("metadata_json")
259                .map_err(|err| err.to_string())?;
260
261            let mut meta: SessionMeta =
262                serde_json::from_str(&metadata_json).map_err(|err| err.to_string())?;
263            meta.id = id.clone();
264            meta.agent = agent;
265            meta.agent_session_id = agent_session_id;
266            meta.last_connection_id = last_connection_id;
267            meta.created_at = created_at;
268            meta.destroyed_at = destroyed_at;
269            meta.session_init_json = session_init_json
270                .as_deref()
271                .and_then(|raw| serde_json::from_str(raw).ok());
272
273            projection.sessions.insert(
274                id,
275                SessionState {
276                    meta,
277                    messages: Vec::new(),
278                    status: "idle".to_string(),
279                    always_permissions: HashSet::new(),
280                },
281            );
282        }
283
284        let event_rows = sqlx::query(
285            r#"SELECT session_id, sender, payload_json
286               FROM events
287               ORDER BY created_at ASC, id ASC"#,
288        )
289        .fetch_all(pool)
290        .await
291        .map_err(|err| err.to_string())?;
292
293        for row in event_rows {
294            let session_id: String = row.try_get("session_id").map_err(|err| err.to_string())?;
295            let sender: String = row.try_get("sender").map_err(|err| err.to_string())?;
296            let payload_json: String =
297                row.try_get("payload_json").map_err(|err| err.to_string())?;
298            let payload: Value =
299                serde_json::from_str(&payload_json).map_err(|err| err.to_string())?;
300            apply_envelope(&mut projection, &session_id, &sender, &payload);
301        }
302
303        let mut guard = self.projection.lock().await;
304        *guard = projection;
305        Ok(())
306    }
307
308    fn emit_event(&self, payload: Value) {
309        let event = OpenCodeStreamEvent {
310            id: self.next_event_id.fetch_add(1, Ordering::Relaxed),
311            payload,
312        };
313
314        if let Ok(mut guard) = self.event_log.lock() {
315            guard.push_back(event.clone());
316            while guard.len() > EVENT_LOG_SIZE {
317                guard.pop_front();
318            }
319        }
320
321        let _ = self.event_broadcaster.send(event);
322    }
323
324    fn buffered_events_after(&self, last_event_id: Option<u64>) -> Vec<OpenCodeStreamEvent> {
325        let Some(last_event_id) = last_event_id else {
326            return Vec::new();
327        };
328        let Ok(guard) = self.event_log.lock() else {
329            return Vec::new();
330        };
331        guard
332            .iter()
333            .filter(|entry| entry.id > last_event_id)
334            .cloned()
335            .collect()
336    }
337
338    fn subscribe(&self) -> broadcast::Receiver<OpenCodeStreamEvent> {
339        self.event_broadcaster.subscribe()
340    }
341
342    fn next_id(&self, prefix: &str) -> String {
343        let value = self.next_id.fetch_add(1, Ordering::Relaxed);
344        format!("{prefix}{value}")
345    }
346
347    async fn current_connection_for_agent(&self, agent: &str) -> String {
348        let mut guard = self.agent_connections.lock().await;
349        guard
350            .entry(agent.to_string())
351            .or_insert_with(|| format!("conn_{}_{}", agent, now_ms()))
352            .clone()
353    }
354
355    async fn pool(&self) -> Result<&SqlitePool, String> {
356        self.pool
357            .get_or_try_init(|| async {
358                if let Some(parent) = PathBuf::from(&self.sqlite_path).parent() {
359                    if !parent.as_os_str().is_empty() {
360                        std::fs::create_dir_all(parent).map_err(|err| err.to_string())?;
361                    }
362                }
363                SqlitePoolOptions::new()
364                    .max_connections(1)
365                    .connect_with(self.sqlite_connect_options.clone())
366                    .await
367                    .map_err(|err| err.to_string())
368            })
369            .await
370    }
371
372    async fn persist_session(&self, meta: &SessionMeta) -> Result<(), String> {
373        let pool = self.pool().await?;
374        let session_init_json = meta
375            .session_init_json
376            .as_ref()
377            .map(|value| serde_json::to_string(value).unwrap_or_else(|_| "{}".to_string()));
378
379        sqlx::query(
380            r#"INSERT INTO sessions (
381                id, agent, agent_session_id, last_connection_id, created_at, destroyed_at, session_init_json
382            ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
383            ON CONFLICT(id) DO UPDATE SET
384                agent = excluded.agent,
385                agent_session_id = excluded.agent_session_id,
386                last_connection_id = excluded.last_connection_id,
387                created_at = excluded.created_at,
388                destroyed_at = excluded.destroyed_at,
389                session_init_json = excluded.session_init_json"#,
390        )
391        .bind(&meta.id)
392        .bind(&meta.agent)
393        .bind(&meta.agent_session_id)
394        .bind(&meta.last_connection_id)
395        .bind(meta.created_at)
396        .bind(meta.destroyed_at)
397        .bind(session_init_json)
398        .execute(pool)
399        .await
400        .map_err(|err| err.to_string())?;
401
402        let metadata_json = serde_json::to_string(meta).map_err(|err| err.to_string())?;
403        sqlx::query(
404            r#"INSERT INTO opencode_session_metadata (session_id, metadata_json)
405               VALUES (?1, ?2)
406               ON CONFLICT(session_id) DO UPDATE SET
407                 metadata_json = excluded.metadata_json"#,
408        )
409        .bind(&meta.id)
410        .bind(metadata_json)
411        .execute(pool)
412        .await
413        .map_err(|err| err.to_string())?;
414
415        Ok(())
416    }
417
418    async fn delete_session(&self, session_id: &str) -> Result<(), String> {
419        let pool = self.pool().await?;
420        sqlx::query("DELETE FROM events WHERE session_id = ?1")
421            .bind(session_id)
422            .execute(pool)
423            .await
424            .map_err(|err| err.to_string())?;
425        sqlx::query("DELETE FROM opencode_session_metadata WHERE session_id = ?1")
426            .bind(session_id)
427            .execute(pool)
428            .await
429            .map_err(|err| err.to_string())?;
430        sqlx::query("DELETE FROM sessions WHERE id = ?1")
431            .bind(session_id)
432            .execute(pool)
433            .await
434            .map_err(|err| err.to_string())?;
435        Ok(())
436    }
437
438    async fn persist_event(
439        &self,
440        session_id: &str,
441        sender: &str,
442        payload: &Value,
443    ) -> Result<(), String> {
444        let pool = self.pool().await?;
445        let id = format!("evt_{}", self.next_id(""));
446        let created_at = now_ms();
447        let connection_id = {
448            let projection = self.projection.lock().await;
449            projection
450                .sessions
451                .get(session_id)
452                .map(|state| state.meta.last_connection_id.clone())
453                .unwrap_or_else(|| "conn_unknown".to_string())
454        };
455        sqlx::query(
456            r#"INSERT INTO events (id, session_id, created_at, connection_id, sender, payload_json)
457               VALUES (?1, ?2, ?3, ?4, ?5, ?6)"#,
458        )
459        .bind(id)
460        .bind(session_id)
461        .bind(created_at)
462        .bind(connection_id)
463        .bind(sender)
464        .bind(serde_json::to_string(payload).map_err(|err| err.to_string())?)
465        .execute(pool)
466        .await
467        .map_err(|err| err.to_string())?;
468
469        let mut projection = self.projection.lock().await;
470        apply_envelope(&mut projection, session_id, sender, payload);
471
472        Ok(())
473    }
474
475    async fn collect_replay_events(
476        &self,
477        session_id: &str,
478        max_events: usize,
479    ) -> Result<Vec<Value>, String> {
480        let pool = self.pool().await?;
481        let rows = sqlx::query(
482            r#"SELECT created_at, sender, payload_json
483               FROM events
484               WHERE session_id = ?1
485               ORDER BY created_at ASC, id ASC"#,
486        )
487        .bind(session_id)
488        .fetch_all(pool)
489        .await
490        .map_err(|err| err.to_string())?;
491
492        let mut values = Vec::new();
493        for row in rows {
494            let created_at: i64 = row.try_get("created_at").map_err(|err| err.to_string())?;
495            let sender: String = row.try_get("sender").map_err(|err| err.to_string())?;
496            let payload_json: String =
497                row.try_get("payload_json").map_err(|err| err.to_string())?;
498            let payload: Value =
499                serde_json::from_str(&payload_json).map_err(|err| err.to_string())?;
500            values.push(json!({
501                "createdAt": created_at,
502                "sender": sender,
503                "payload": payload,
504            }));
505        }
506
507        if values.len() > max_events {
508            Ok(values.split_off(values.len() - max_events))
509        } else {
510            Ok(values)
511        }
512    }
513
514    async fn maybe_restore_session(&self, session_id: &str) -> Result<(), String> {
515        let (agent, stale) = {
516            let projection = self.projection.lock().await;
517            let Some(state) = projection.sessions.get(session_id) else {
518                return Ok(());
519            };
520            (
521                state.meta.agent.clone(),
522                state.meta.last_connection_id.clone(),
523            )
524        };
525
526        let current = self.current_connection_for_agent(&agent).await;
527        if stale == current {
528            return Ok(());
529        }
530
531        let replay_source = self
532            .collect_replay_events(session_id, self.config.replay_max_events)
533            .await?;
534        let replay_text = build_replay_text(&replay_source, self.config.replay_max_chars);
535
536        let request_id = self.next_id("oc_req_");
537        let new_agent_session_id = format!("acp_{}", self.next_id("ses_"));
538        let new_request = json!({
539            "jsonrpc": "2.0",
540            "id": request_id,
541            "method": "session/new",
542            "params": {
543                "cwd": "/",
544                "mcpServers": [],
545            }
546        });
547        self.persist_event(session_id, "client", &new_request)
548            .await?;
549
550        let new_response = json!({
551            "jsonrpc": "2.0",
552            "id": request_id,
553            "result": {
554                "sessionId": new_agent_session_id,
555            }
556        });
557        self.persist_event(session_id, "agent", &new_response)
558            .await?;
559
560        let mut updated_meta = None;
561        {
562            let mut projection = self.projection.lock().await;
563            if let Some(session) = projection.sessions.get_mut(session_id) {
564                session.meta.agent_session_id = new_agent_session_id;
565                session.meta.last_connection_id = current;
566                session.meta.destroyed_at = None;
567                updated_meta = Some(session.meta.clone());
568            }
569        }
570        if let Some(meta) = updated_meta {
571            self.persist_session(&meta).await?;
572        }
573
574        if let Some(text) = replay_text {
575            self.pending_replay
576                .lock()
577                .await
578                .insert(session_id.to_string(), text);
579        }
580
581        Ok(())
582    }
583
584    async fn ensure_session(
585        &self,
586        session_id: &str,
587        directory: String,
588    ) -> Result<SessionMeta, String> {
589        {
590            let projection = self.projection.lock().await;
591            if let Some(existing) = projection.sessions.get(session_id) {
592                return Ok(existing.meta.clone());
593            }
594        }
595
596        let now = now_ms();
597        let connection_id = self.current_connection_for_agent("mock").await;
598        let meta = SessionMeta {
599            id: session_id.to_string(),
600            slug: format!("session-{session_id}"),
601            project_id: self.project_id.clone(),
602            directory,
603            parent_id: None,
604            title: format!("Session {session_id}"),
605            version: "0".to_string(),
606            created_at: now,
607            updated_at: now,
608            share_url: None,
609            permission_mode: None,
610            agent: "mock".to_string(),
611            provider_id: "mock".to_string(),
612            model_id: "mock".to_string(),
613            agent_session_id: format!("acp_{}", self.next_id("ses_")),
614            last_connection_id: connection_id,
615            session_init_json: Some(json!({"cwd": "/", "mcpServers": []})),
616            destroyed_at: None,
617        };
618
619        self.persist_session(&meta).await?;
620
621        let session_value = session_to_value(&meta);
622        {
623            let mut projection = self.projection.lock().await;
624            projection.sessions.insert(
625                session_id.to_string(),
626                SessionState {
627                    meta: meta.clone(),
628                    messages: Vec::new(),
629                    status: "idle".to_string(),
630                    always_permissions: HashSet::new(),
631                },
632            );
633        }
634
635        self.emit_event(json!({
636            "type": "session.created",
637            "properties": { "info": session_value }
638        }));
639
640        Ok(meta)
641    }
642}
643
644pub fn build_opencode_router(config: OpenCodeAdapterConfig) -> Result<Router, String> {
645    let proxy_base_url = config
646        .native_proxy_base_url
647        .clone()
648        .or_else(|| std::env::var("OPENCODE_COMPAT_PROXY_URL").ok())
649        .and_then(normalize_proxy_base_url);
650    let config = OpenCodeAdapterConfig {
651        native_proxy_base_url: proxy_base_url,
652        ..config
653    };
654
655    let sqlite_path = config
656        .sqlite_path
657        .clone()
658        .or_else(|| std::env::var("OPENCODE_COMPAT_DB_PATH").ok())
659        .or_else(|| {
660            std::env::var("OPENCODE_COMPAT_STATE")
661                .ok()
662                .map(|base| format!("{base}/opencode-sessions.db"))
663        })
664        .unwrap_or_else(|| "/tmp/sandbox-agent-opencode.db".to_string());
665
666    let connect = SqliteConnectOptions::from_str(&format!("sqlite://{sqlite_path}"))
667        .map_err(|err| err.to_string())?
668        .create_if_missing(true)
669        .journal_mode(SqliteJournalMode::Wal)
670        .foreign_keys(true);
671
672    let (event_broadcaster, _) = broadcast::channel(EVENT_CHANNEL_SIZE);
673
674    let state = Arc::new(AdapterState {
675        config,
676        sqlite_path,
677        sqlite_connect_options: connect,
678        proxy_http_client: reqwest::Client::builder()
679            .timeout(Duration::from_secs(10))
680            .build()
681            .unwrap_or_else(|_| reqwest::Client::new()),
682        pool: OnceCell::new(),
683        initialized: OnceCell::new(),
684        project_id: format!("proj_{}", now_ms()),
685        projection: Mutex::new(Projection::default()),
686        pending_replay: Mutex::new(HashMap::new()),
687        agent_connections: Mutex::new(HashMap::new()),
688        event_broadcaster,
689        event_log: StdMutex::new(VecDeque::new()),
690        next_event_id: AtomicU64::new(1),
691        next_id: AtomicU64::new(runtime_unique_seed()),
692        acp_initialized: Mutex::new(HashMap::new()),
693        acp_request_ids: Mutex::new(HashMap::new()),
694        last_user_message_id: Mutex::new(HashMap::new()),
695    });
696
697    let mut router = Router::new()
698        .route("/agent", get(oc_agent_list))
699        .route("/command", get(oc_command_list))
700        .route("/config", get(oc_config_get).patch(oc_config_patch))
701        .route("/config/providers", get(oc_config_providers))
702        .route("/event", get(oc_event_subscribe))
703        .route("/global/event", get(oc_global_event))
704        .route("/global/health", get(oc_global_health))
705        .route(
706            "/global/config",
707            get(oc_global_config_get).patch(oc_global_config_patch),
708        )
709        .route("/global/dispose", post(oc_global_dispose))
710        .route("/instance/dispose", post(oc_instance_dispose))
711        .route("/path", get(oc_path))
712        .route("/vcs", get(oc_vcs))
713        .route("/mcp", get(oc_mcp_status))
714        .route("/lsp", get(oc_lsp_status))
715        .route("/formatter", get(oc_formatter_status))
716        .route("/experimental/resource", get(oc_experimental_resource))
717        .route("/skill", get(oc_skill_list))
718        .route("/tui/control/next", get(oc_tui_next))
719        .route("/tui/control/response", post(oc_tui_response))
720        .route("/tui/append-prompt", post(oc_tui_append_prompt))
721        .route("/tui/open-help", post(oc_tui_open_help))
722        .route("/tui/open-sessions", post(oc_tui_open_sessions))
723        .route("/tui/open-themes", post(oc_tui_open_themes))
724        .route("/tui/open-models", post(oc_tui_open_models))
725        .route("/tui/submit-prompt", post(oc_tui_submit_prompt))
726        .route("/tui/clear-prompt", post(oc_tui_clear_prompt))
727        .route("/tui/execute-command", post(oc_tui_execute_command))
728        .route("/tui/show-toast", post(oc_tui_show_toast))
729        .route("/tui/publish", post(oc_tui_publish))
730        .route("/project", get(oc_project_list).post(oc_project_current))
731        .route("/project/current", get(oc_project_current))
732        .route("/session", post(oc_session_create).get(oc_session_list))
733        .route("/session/status", get(oc_session_status))
734        .route(
735            "/session/:sessionID",
736            get(oc_session_get)
737                .patch(oc_session_update)
738                .delete(oc_session_delete),
739        )
740        .route("/session/:sessionID/abort", post(oc_session_abort))
741        .route("/session/:sessionID/children", get(oc_session_children))
742        .route("/session/:sessionID/init", post(oc_session_init))
743        .route("/session/:sessionID/fork", post(oc_session_fork))
744        .route("/session/:sessionID/diff", get(oc_session_diff))
745        .route("/session/:sessionID/todo", get(oc_session_todo))
746        .route("/session/:sessionID/summarize", post(oc_session_summarize))
747        .route(
748            "/session/:sessionID/message",
749            get(oc_session_messages).post(oc_session_prompt),
750        )
751        .route(
752            "/session/:sessionID/message/:messageID",
753            get(oc_session_message_get),
754        )
755        .route(
756            "/session/:sessionID/message/:messageID/part/:partID",
757            patch(oc_part_update).delete(oc_part_delete),
758        )
759        .route(
760            "/session/:sessionID/prompt_async",
761            post(oc_session_prompt_async),
762        )
763        .route(
764            "/session/:sessionID/permissions/:permissionID",
765            post(oc_permission_respond),
766        )
767        .route("/permission", get(oc_permission_list))
768        .route("/permission/:requestID/reply", post(oc_permission_reply))
769        .route("/question", get(oc_question_list))
770        .route("/question/:requestID/reply", post(oc_question_reply))
771        .route("/question/:requestID/reject", post(oc_question_reject))
772        .route("/provider", get(oc_provider_list))
773        .route("/provider/auth", get(oc_provider_auth))
774        .route(
775            "/provider/:providerID/oauth/authorize",
776            post(oc_provider_oauth_authorize),
777        )
778        .route(
779            "/provider/:providerID/oauth/callback",
780            post(oc_provider_oauth_callback),
781        )
782        .with_state(state.clone());
783
784    if state.config.auth_token.is_some() {
785        router = router.layer(axum::middleware::from_fn_with_state(state, require_token));
786    }
787
788    Ok(router)
789}
790
791async fn require_token(
792    State(state): State<Arc<AdapterState>>,
793    request: Request<Body>,
794    next: Next,
795) -> Result<Response, Response> {
796    let Some(expected) = state.config.auth_token.as_deref() else {
797        return Ok(next.run(request).await);
798    };
799
800    let bearer = request
801        .headers()
802        .get(header::AUTHORIZATION)
803        .and_then(|value| value.to_str().ok())
804        .and_then(|value| value.strip_prefix("Bearer "));
805
806    if bearer == Some(expected) {
807        return Ok(next.run(request).await);
808    }
809
810    Err((
811        StatusCode::UNAUTHORIZED,
812        Json(json!({"errors":[{"message":"missing or invalid bearer token"}]})),
813    )
814        .into_response())
815}
816
817#[derive(Debug, Deserialize)]
818struct DirectoryQuery {
819    directory: Option<String>,
820}
821
822#[derive(Debug, Deserialize)]
823#[serde(rename_all = "camelCase")]
824struct SessionCreateBody {
825    title: Option<String>,
826    #[serde(rename = "parentID")]
827    parent_id: Option<String>,
828    permission: Option<Value>,
829    #[serde(alias = "permission_mode")]
830    permission_mode: Option<String>,
831}
832
833#[derive(Debug, Deserialize)]
834#[serde(rename_all = "camelCase")]
835struct SessionUpdateBody {
836    title: Option<String>,
837    model: Option<Value>,
838    #[serde(rename = "providerID", alias = "provider_id", alias = "providerId")]
839    provider_id: Option<String>,
840    #[serde(rename = "modelID", alias = "model_id", alias = "modelId")]
841    model_id: Option<String>,
842}
843
844#[derive(Debug, Deserialize)]
845#[serde(rename_all = "camelCase")]
846struct SessionInitBody {
847    #[serde(rename = "providerID")]
848    provider_id: Option<String>,
849    #[serde(rename = "modelID")]
850    model_id: Option<String>,
851    #[serde(rename = "messageID")]
852    message_id: Option<String>,
853}
854
855#[derive(Debug, Deserialize)]
856#[serde(rename_all = "camelCase")]
857struct PromptBody {
858    #[serde(rename = "messageID")]
859    message_id: Option<String>,
860    model: Option<ModelSelection>,
861    #[serde(rename = "providerID", alias = "provider_id", alias = "providerId")]
862    provider_id: Option<String>,
863    #[serde(rename = "modelID", alias = "model_id", alias = "modelId")]
864    model_id: Option<String>,
865    agent: Option<String>,
866    system: Option<String>,
867    variant: Option<String>,
868    parts: Option<Vec<Value>>,
869}
870
871#[derive(Debug, Deserialize)]
872#[serde(rename_all = "camelCase")]
873struct ModelSelection {
874    #[serde(rename = "providerID", alias = "provider_id", alias = "providerId")]
875    provider_id: Option<String>,
876    #[serde(rename = "modelID", alias = "model_id", alias = "modelId")]
877    model_id: Option<String>,
878}
879
880#[derive(Debug, Deserialize)]
881struct PermissionRespondBody {
882    response: Option<String>,
883}
884
885#[derive(Debug, Deserialize)]
886struct PermissionReplyBody {
887    reply: Option<String>,
888    message: Option<String>,
889}
890
891#[derive(Debug, Deserialize)]
892#[serde(rename_all = "camelCase")]
893struct QuestionReplyBody {
894    answers: Option<Vec<Vec<String>>>,
895}
896
897async fn oc_agent_list(State(state): State<Arc<AdapterState>>) -> Response {
898    if let Err(err) = state.ensure_initialized().await {
899        return internal_error(err);
900    }
901    (
902        StatusCode::OK,
903        Json(json!([
904            {
905                "name": "Sandbox Agent",
906                "description": "Sandbox Agent compatibility layer",
907                "mode": "all",
908                "native": false,
909                "hidden": false,
910                "permission": [],
911                "options": {},
912            }
913        ])),
914    )
915        .into_response()
916}
917
918async fn oc_command_list(State(state): State<Arc<AdapterState>>, headers: HeaderMap) -> Response {
919    if let Err(err) = state.ensure_initialized().await {
920        return internal_error(err);
921    }
922    if let Some(response) =
923        proxy_native_opencode(&state, reqwest::Method::GET, "/command", &headers, None).await
924    {
925        return response;
926    }
927    (StatusCode::OK, Json(json!([]))).into_response()
928}
929
930async fn oc_config_get(State(state): State<Arc<AdapterState>>, headers: HeaderMap) -> Response {
931    if let Err(err) = state.ensure_initialized().await {
932        return internal_error(err);
933    }
934    if let Some(response) =
935        proxy_native_opencode(&state, reqwest::Method::GET, "/config", &headers, None).await
936    {
937        return response;
938    }
939    (
940        StatusCode::OK,
941        Json(json!({
942            "mcp": {},
943            "agent": {},
944            "provider": {},
945        })),
946    )
947        .into_response()
948}
949
950async fn oc_config_patch(
951    State(state): State<Arc<AdapterState>>,
952    headers: HeaderMap,
953    Json(body): Json<Value>,
954) -> Response {
955    if let Err(err) = state.ensure_initialized().await {
956        return internal_error(err);
957    }
958    if let Some(response) = proxy_native_opencode(
959        &state,
960        reqwest::Method::PATCH,
961        "/config",
962        &headers,
963        Some(body.clone()),
964    )
965    .await
966    {
967        return response;
968    }
969    (StatusCode::OK, Json(body)).into_response()
970}
971
972async fn oc_config_providers(State(state): State<Arc<AdapterState>>) -> Response {
973    if let Err(err) = state.ensure_initialized().await {
974        return internal_error(err);
975    }
976    let providers = provider_payload(&state);
977    let mut payload = providers.clone();
978    if let Some(obj) = payload.as_object_mut() {
979        obj.insert("providers".to_string(), providers["all"].clone());
980    }
981    (StatusCode::OK, Json(payload)).into_response()
982}
983
984async fn oc_event_subscribe(
985    State(state): State<Arc<AdapterState>>,
986    headers: HeaderMap,
987    Query(query): Query<DirectoryQuery>,
988) -> Sse<impl futures::Stream<Item = Result<Event, Infallible>>> {
989    let _ = state.ensure_initialized().await;
990
991    let directory = resolve_directory(&headers, query.directory.as_ref());
992    let replay = state.buffered_events_after(parse_last_event_id(&headers));
993    let receiver = state.subscribe();
994
995    state.emit_event(json!({"type":"server.connected","properties":{}}));
996    state.emit_event(
997        json!({"type":"worktree.ready","properties":{"name": directory, "branch": "main"}}),
998    );
999
1000    let stream = stream::unfold(
1001        (
1002            receiver,
1003            VecDeque::from(replay),
1004            interval(Duration::from_secs(30)),
1005        ),
1006        |(mut rx, mut replay, mut ticker)| async move {
1007            if let Some(item) = replay.pop_front() {
1008                let evt = Event::default()
1009                    .id(item.id.to_string())
1010                    .json_data(item.payload)
1011                    .unwrap_or_else(|_| Event::default().data("{}"));
1012                return Some((Ok(evt), (rx, replay, ticker)));
1013            }
1014
1015            loop {
1016                tokio::select! {
1017                    _ = ticker.tick() => {
1018                        let evt = Event::default().json_data(json!({"type":"server.heartbeat","properties":{}}))
1019                            .unwrap_or_else(|_| Event::default().data("{}"));
1020                        return Some((Ok(evt), (rx, replay, ticker)));
1021                    }
1022                    item = rx.recv() => {
1023                        match item {
1024                            Ok(payload) => {
1025                                let evt = Event::default()
1026                                    .id(payload.id.to_string())
1027                                    .json_data(payload.payload)
1028                                    .unwrap_or_else(|_| Event::default().data("{}"));
1029                                return Some((Ok(evt), (rx, replay, ticker)));
1030                            }
1031                            Err(broadcast::error::RecvError::Lagged(_)) => continue,
1032                            Err(broadcast::error::RecvError::Closed) => return None,
1033                        }
1034                    }
1035                }
1036            }
1037        },
1038    );
1039
1040    Sse::new(stream).keep_alive(KeepAlive::new().interval(Duration::from_secs(15)))
1041}
1042
1043async fn oc_global_event(
1044    State(state): State<Arc<AdapterState>>,
1045    headers: HeaderMap,
1046    Query(query): Query<DirectoryQuery>,
1047) -> Sse<impl futures::Stream<Item = Result<Event, Infallible>>> {
1048    oc_event_subscribe(State(state), headers, Query(query)).await
1049}
1050
1051async fn oc_global_health() -> Response {
1052    (
1053        StatusCode::OK,
1054        Json(json!({
1055            "healthy": true,
1056            "version": env!("CARGO_PKG_VERSION"),
1057        })),
1058    )
1059        .into_response()
1060}
1061
1062async fn oc_global_config_get(
1063    State(state): State<Arc<AdapterState>>,
1064    headers: HeaderMap,
1065) -> Response {
1066    if let Some(response) = proxy_native_opencode(
1067        &state,
1068        reqwest::Method::GET,
1069        "/global/config",
1070        &headers,
1071        None,
1072    )
1073    .await
1074    {
1075        return response;
1076    }
1077    oc_config_get(State(state), headers).await
1078}
1079
1080async fn oc_global_config_patch(
1081    State(state): State<Arc<AdapterState>>,
1082    headers: HeaderMap,
1083    Json(body): Json<Value>,
1084) -> Response {
1085    if let Some(response) = proxy_native_opencode(
1086        &state,
1087        reqwest::Method::PATCH,
1088        "/global/config",
1089        &headers,
1090        Some(body.clone()),
1091    )
1092    .await
1093    {
1094        return response;
1095    }
1096    oc_config_patch(State(state), headers, Json(body)).await
1097}
1098
1099async fn oc_global_dispose() -> Response {
1100    bool_ok(true).into_response()
1101}
1102
1103async fn oc_instance_dispose() -> Response {
1104    bool_ok(true).into_response()
1105}
1106
1107async fn oc_path(
1108    State(state): State<Arc<AdapterState>>,
1109    headers: HeaderMap,
1110    Query(query): Query<DirectoryQuery>,
1111) -> Response {
1112    if let Err(err) = state.ensure_initialized().await {
1113        return internal_error(err);
1114    }
1115
1116    let directory = resolve_directory(&headers, query.directory.as_ref());
1117    (
1118        StatusCode::OK,
1119        Json(json!({
1120            "home": std::env::var("HOME").unwrap_or_else(|_| "/".to_string()),
1121            "state": std::env::var("OPENCODE_COMPAT_STATE").unwrap_or_else(|_| "/tmp".to_string()),
1122            "config": std::env::var("OPENCODE_COMPAT_CONFIG").unwrap_or_else(|_| "/tmp".to_string()),
1123            "worktree": directory,
1124            "directory": resolve_directory(&headers, query.directory.as_ref()),
1125        })),
1126    )
1127        .into_response()
1128}
1129
1130async fn oc_vcs(State(state): State<Arc<AdapterState>>) -> Response {
1131    if let Err(err) = state.ensure_initialized().await {
1132        return internal_error(err);
1133    }
1134    (StatusCode::OK, Json(json!({"branch":"main"}))).into_response()
1135}
1136
1137async fn oc_mcp_status(State(state): State<Arc<AdapterState>>) -> Response {
1138    if let Err(err) = state.ensure_initialized().await {
1139        return internal_error(err);
1140    }
1141    (StatusCode::OK, Json(json!({}))).into_response()
1142}
1143
1144async fn oc_lsp_status(State(state): State<Arc<AdapterState>>) -> Response {
1145    if let Err(err) = state.ensure_initialized().await {
1146        return internal_error(err);
1147    }
1148    (StatusCode::OK, Json(json!([]))).into_response()
1149}
1150
1151async fn oc_formatter_status(State(state): State<Arc<AdapterState>>) -> Response {
1152    if let Err(err) = state.ensure_initialized().await {
1153        return internal_error(err);
1154    }
1155    (StatusCode::OK, Json(json!([]))).into_response()
1156}
1157
1158async fn oc_experimental_resource(State(state): State<Arc<AdapterState>>) -> Response {
1159    if let Err(err) = state.ensure_initialized().await {
1160        return internal_error(err);
1161    }
1162    (StatusCode::OK, Json(json!([]))).into_response()
1163}
1164
1165async fn oc_skill_list() -> Response {
1166    (StatusCode::OK, Json(json!([]))).into_response()
1167}
1168
1169async fn oc_tui_next(State(state): State<Arc<AdapterState>>, headers: HeaderMap) -> Response {
1170    if let Err(err) = state.ensure_initialized().await {
1171        return internal_error(err);
1172    }
1173    if let Some(response) = proxy_native_opencode(
1174        &state,
1175        reqwest::Method::GET,
1176        "/tui/control/next",
1177        &headers,
1178        None,
1179    )
1180    .await
1181    {
1182        return response;
1183    }
1184    (StatusCode::OK, Json(json!({"path": "", "body": {}}))).into_response()
1185}
1186
1187async fn oc_tui_response(
1188    State(state): State<Arc<AdapterState>>,
1189    headers: HeaderMap,
1190    body: Option<Json<Value>>,
1191) -> Response {
1192    if let Err(err) = state.ensure_initialized().await {
1193        return internal_error(err);
1194    }
1195    if let Some(response) = proxy_native_opencode(
1196        &state,
1197        reqwest::Method::POST,
1198        "/tui/control/response",
1199        &headers,
1200        body.map(|json| json.0),
1201    )
1202    .await
1203    {
1204        return response;
1205    }
1206    bool_ok(true).into_response()
1207}
1208
1209async fn oc_tui_append_prompt(
1210    State(state): State<Arc<AdapterState>>,
1211    headers: HeaderMap,
1212    body: Option<Json<Value>>,
1213) -> Response {
1214    if let Err(err) = state.ensure_initialized().await {
1215        return internal_error(err);
1216    }
1217    if let Some(response) = proxy_native_opencode(
1218        &state,
1219        reqwest::Method::POST,
1220        "/tui/append-prompt",
1221        &headers,
1222        body.map(|json| json.0),
1223    )
1224    .await
1225    {
1226        return response;
1227    }
1228    bool_ok(true).into_response()
1229}
1230
1231async fn oc_tui_open_help(State(state): State<Arc<AdapterState>>, headers: HeaderMap) -> Response {
1232    if let Err(err) = state.ensure_initialized().await {
1233        return internal_error(err);
1234    }
1235    if let Some(response) = proxy_native_opencode(
1236        &state,
1237        reqwest::Method::POST,
1238        "/tui/open-help",
1239        &headers,
1240        None,
1241    )
1242    .await
1243    {
1244        return response;
1245    }
1246    bool_ok(true).into_response()
1247}
1248
1249async fn oc_tui_open_sessions() -> Response {
1250    bool_ok(true).into_response()
1251}
1252
1253async fn oc_tui_open_themes(
1254    State(state): State<Arc<AdapterState>>,
1255    headers: HeaderMap,
1256) -> Response {
1257    if let Err(err) = state.ensure_initialized().await {
1258        return internal_error(err);
1259    }
1260    if let Some(response) = proxy_native_opencode(
1261        &state,
1262        reqwest::Method::POST,
1263        "/tui/open-themes",
1264        &headers,
1265        None,
1266    )
1267    .await
1268    {
1269        return response;
1270    }
1271    bool_ok(true).into_response()
1272}
1273
1274async fn oc_tui_open_models(
1275    State(state): State<Arc<AdapterState>>,
1276    headers: HeaderMap,
1277) -> Response {
1278    if let Err(err) = state.ensure_initialized().await {
1279        return internal_error(err);
1280    }
1281    if let Some(response) = proxy_native_opencode(
1282        &state,
1283        reqwest::Method::POST,
1284        "/tui/open-models",
1285        &headers,
1286        None,
1287    )
1288    .await
1289    {
1290        return response;
1291    }
1292    bool_ok(true).into_response()
1293}
1294
1295async fn oc_tui_submit_prompt(
1296    State(state): State<Arc<AdapterState>>,
1297    headers: HeaderMap,
1298    body: Option<Json<Value>>,
1299) -> Response {
1300    if let Err(err) = state.ensure_initialized().await {
1301        return internal_error(err);
1302    }
1303    if let Some(response) = proxy_native_opencode(
1304        &state,
1305        reqwest::Method::POST,
1306        "/tui/submit-prompt",
1307        &headers,
1308        body.map(|json| json.0),
1309    )
1310    .await
1311    {
1312        return response;
1313    }
1314    bool_ok(true).into_response()
1315}
1316
1317async fn oc_tui_clear_prompt(
1318    State(state): State<Arc<AdapterState>>,
1319    headers: HeaderMap,
1320) -> Response {
1321    if let Err(err) = state.ensure_initialized().await {
1322        return internal_error(err);
1323    }
1324    if let Some(response) = proxy_native_opencode(
1325        &state,
1326        reqwest::Method::POST,
1327        "/tui/clear-prompt",
1328        &headers,
1329        None,
1330    )
1331    .await
1332    {
1333        return response;
1334    }
1335    bool_ok(true).into_response()
1336}
1337
1338async fn oc_tui_execute_command(
1339    State(state): State<Arc<AdapterState>>,
1340    headers: HeaderMap,
1341    body: Option<Json<Value>>,
1342) -> Response {
1343    if let Err(err) = state.ensure_initialized().await {
1344        return internal_error(err);
1345    }
1346    if let Some(response) = proxy_native_opencode(
1347        &state,
1348        reqwest::Method::POST,
1349        "/tui/execute-command",
1350        &headers,
1351        body.map(|json| json.0),
1352    )
1353    .await
1354    {
1355        return response;
1356    }
1357    bool_ok(true).into_response()
1358}
1359
1360async fn oc_tui_show_toast(
1361    State(state): State<Arc<AdapterState>>,
1362    headers: HeaderMap,
1363    body: Option<Json<Value>>,
1364) -> Response {
1365    if let Err(err) = state.ensure_initialized().await {
1366        return internal_error(err);
1367    }
1368    if let Some(response) = proxy_native_opencode(
1369        &state,
1370        reqwest::Method::POST,
1371        "/tui/show-toast",
1372        &headers,
1373        body.map(|json| json.0),
1374    )
1375    .await
1376    {
1377        return response;
1378    }
1379    bool_ok(true).into_response()
1380}
1381
1382async fn oc_tui_publish(
1383    State(state): State<Arc<AdapterState>>,
1384    headers: HeaderMap,
1385    body: Option<Json<Value>>,
1386) -> Response {
1387    if let Err(err) = state.ensure_initialized().await {
1388        return internal_error(err);
1389    }
1390    if let Some(response) = proxy_native_opencode(
1391        &state,
1392        reqwest::Method::POST,
1393        "/tui/publish",
1394        &headers,
1395        body.map(|json| json.0),
1396    )
1397    .await
1398    {
1399        return response;
1400    }
1401    bool_ok(true).into_response()
1402}
1403
1404async fn oc_project_list(
1405    State(state): State<Arc<AdapterState>>,
1406    headers: HeaderMap,
1407    Query(query): Query<DirectoryQuery>,
1408) -> Response {
1409    if let Err(err) = state.ensure_initialized().await {
1410        return internal_error(err);
1411    }
1412    let directory = resolve_directory(&headers, query.directory.as_ref());
1413    let now = now_ms();
1414    (
1415        StatusCode::OK,
1416        Json(json!([{
1417            "id": state.project_id,
1418            "worktree": directory,
1419            "vcs": "git",
1420            "name": "sandbox-agent",
1421            "time": {"created": now, "updated": now},
1422        }])),
1423    )
1424        .into_response()
1425}
1426
1427async fn oc_project_current(
1428    State(state): State<Arc<AdapterState>>,
1429    headers: HeaderMap,
1430    Query(query): Query<DirectoryQuery>,
1431) -> Response {
1432    if let Err(err) = state.ensure_initialized().await {
1433        return internal_error(err);
1434    }
1435    let directory = resolve_directory(&headers, query.directory.as_ref());
1436    let now = now_ms();
1437    (
1438        StatusCode::OK,
1439        Json(json!({
1440            "id": state.project_id,
1441            "worktree": directory,
1442            "vcs": "git",
1443            "name": "sandbox-agent",
1444            "time": {"created": now, "updated": now},
1445        })),
1446    )
1447        .into_response()
1448}
1449
1450async fn oc_session_create(
1451    State(state): State<Arc<AdapterState>>,
1452    headers: HeaderMap,
1453    Query(query): Query<DirectoryQuery>,
1454    body: Option<Json<SessionCreateBody>>,
1455) -> Response {
1456    if let Err(err) = state.ensure_initialized().await {
1457        return internal_error(err);
1458    }
1459
1460    let body = body.map(|value| value.0).unwrap_or(SessionCreateBody {
1461        title: None,
1462        parent_id: None,
1463        permission: None,
1464        permission_mode: None,
1465    });
1466
1467    let id = state.next_id("ses_");
1468    let now = now_ms();
1469    let directory = resolve_directory(&headers, query.directory.as_ref());
1470
1471    let default_agent = "mock";
1472    let connection_id = state.current_connection_for_agent(default_agent).await;
1473    let meta = SessionMeta {
1474        id: id.clone(),
1475        slug: format!("session-{id}"),
1476        project_id: state.project_id.clone(),
1477        directory,
1478        parent_id: body.parent_id,
1479        title: body.title.unwrap_or_else(|| format!("Session {id}")),
1480        version: "0".to_string(),
1481        created_at: now,
1482        updated_at: now,
1483        share_url: None,
1484        permission_mode: body.permission_mode,
1485        agent: default_agent.to_string(),
1486        provider_id: default_agent.to_string(),
1487        model_id: default_model_for_provider(default_agent)
1488            .unwrap_or("default")
1489            .to_string(),
1490        agent_session_id: format!("acp_{}", state.next_id("ses_")),
1491        last_connection_id: connection_id,
1492        session_init_json: Some(json!({"cwd": "/", "mcpServers": []})),
1493        destroyed_at: None,
1494    };
1495
1496    if let Err(err) = state.persist_session(&meta).await {
1497        return internal_error(err);
1498    }
1499
1500    {
1501        let mut projection = state.projection.lock().await;
1502        projection.sessions.insert(
1503            id,
1504            SessionState {
1505                meta: meta.clone(),
1506                messages: Vec::new(),
1507                status: "idle".to_string(),
1508                always_permissions: HashSet::new(),
1509            },
1510        );
1511    }
1512
1513    let value = session_to_value(&meta);
1514    state.emit_event(json!({"type":"session.created","properties":{"info":value}}));
1515
1516    (StatusCode::OK, Json(value)).into_response()
1517}
1518
1519async fn oc_session_list(State(state): State<Arc<AdapterState>>) -> Response {
1520    if let Err(err) = state.ensure_initialized().await {
1521        return internal_error(err);
1522    }
1523
1524    let projection = state.projection.lock().await;
1525    let mut values = projection
1526        .sessions
1527        .values()
1528        .map(|session| session_to_value(&session.meta))
1529        .collect::<Vec<_>>();
1530    values.sort_by(|a, b| {
1531        let a_id = a.get("id").and_then(Value::as_str).unwrap_or_default();
1532        let b_id = b.get("id").and_then(Value::as_str).unwrap_or_default();
1533        a_id.cmp(b_id)
1534    });
1535
1536    (StatusCode::OK, Json(values)).into_response()
1537}
1538
1539async fn oc_session_get(
1540    State(state): State<Arc<AdapterState>>,
1541    Path(session_id): Path<String>,
1542) -> Response {
1543    if let Err(err) = state.ensure_initialized().await {
1544        return internal_error(err);
1545    }
1546
1547    let projection = state.projection.lock().await;
1548    let Some(session) = projection.sessions.get(&session_id) else {
1549        return not_found("Session not found");
1550    };
1551
1552    (StatusCode::OK, Json(session_to_value(&session.meta))).into_response()
1553}
1554
1555async fn oc_session_update(
1556    State(state): State<Arc<AdapterState>>,
1557    Path(session_id): Path<String>,
1558    Json(body): Json<SessionUpdateBody>,
1559) -> Response {
1560    if let Err(err) = state.ensure_initialized().await {
1561        return internal_error(err);
1562    }
1563
1564    if body.model.is_some() || body.provider_id.is_some() || body.model_id.is_some() {
1565        return bad_request(MODEL_CHANGE_ERROR);
1566    }
1567
1568    let meta = {
1569        let mut projection = state.projection.lock().await;
1570        let Some(session) = projection.sessions.get_mut(&session_id) else {
1571            return not_found("Session not found");
1572        };
1573
1574        if let Some(title) = body.title {
1575            session.meta.title = title;
1576            session.meta.updated_at = now_ms();
1577        }
1578
1579        session.meta.clone()
1580    };
1581
1582    if let Err(err) = state.persist_session(&meta).await {
1583        return internal_error(err);
1584    }
1585
1586    let value = session_to_value(&meta);
1587    state.emit_event(json!({"type":"session.updated","properties":{"info":value}}));
1588    (StatusCode::OK, Json(value)).into_response()
1589}
1590
1591async fn oc_session_delete(
1592    State(state): State<Arc<AdapterState>>,
1593    Path(session_id): Path<String>,
1594) -> Response {
1595    if let Err(err) = state.ensure_initialized().await {
1596        return internal_error(err);
1597    }
1598
1599    let removed = {
1600        let mut projection = state.projection.lock().await;
1601        projection.permissions.retain(|_, value| {
1602            value
1603                .get("sessionID")
1604                .and_then(Value::as_str)
1605                .map(|id| id != session_id)
1606                .unwrap_or(true)
1607        });
1608        projection.questions.retain(|_, value| {
1609            value
1610                .get("sessionID")
1611                .and_then(Value::as_str)
1612                .map(|id| id != session_id)
1613                .unwrap_or(true)
1614        });
1615        projection.sessions.remove(&session_id)
1616    };
1617
1618    let Some(session) = removed else {
1619        return not_found("Session not found");
1620    };
1621
1622    if let Err(err) = state.delete_session(&session_id).await {
1623        return internal_error(err);
1624    }
1625
1626    // Clean up the ACP server instance if one was created for this session.
1627    let server_id = session.meta.agent_session_id.clone();
1628    if state
1629        .acp_initialized
1630        .lock()
1631        .await
1632        .remove(&server_id)
1633        .is_some()
1634    {
1635        if let Some(dispatch) = state.config.acp_dispatch.as_ref() {
1636            if let Err(err) = dispatch.delete(&server_id).await {
1637                warn!(
1638                    ?err,
1639                    "failed to delete ACP server instance on session delete"
1640                );
1641            }
1642        }
1643    }
1644
1645    // Clean up any pending ACP requests for this session.
1646    state
1647        .acp_request_ids
1648        .lock()
1649        .await
1650        .retain(|_, req| req.opencode_session_id != session_id);
1651
1652    let value = session_to_value(&session.meta);
1653    state.emit_event(json!({"type":"session.deleted","properties":{"info":value}}));
1654
1655    (StatusCode::OK, Json(json!(true))).into_response()
1656}
1657
1658async fn oc_session_status(State(state): State<Arc<AdapterState>>) -> Response {
1659    if let Err(err) = state.ensure_initialized().await {
1660        return internal_error(err);
1661    }
1662    let projection = state.projection.lock().await;
1663    let mut map = serde_json::Map::new();
1664    for (id, session) in &projection.sessions {
1665        map.insert(id.clone(), json!({"type": session.status}));
1666    }
1667    (StatusCode::OK, Json(Value::Object(map))).into_response()
1668}
1669
1670async fn oc_session_abort(
1671    State(state): State<Arc<AdapterState>>,
1672    Path(session_id): Path<String>,
1673) -> Response {
1674    if let Err(err) = state.ensure_initialized().await {
1675        return internal_error(err);
1676    }
1677
1678    let mut should_emit_idle = false;
1679    {
1680        let mut projection = state.projection.lock().await;
1681        let Some(session) = projection.sessions.get_mut(&session_id) else {
1682            return not_found("Session not found");
1683        };
1684        if session.status != "idle" {
1685            session.status = "idle".to_string();
1686            should_emit_idle = true;
1687        }
1688        projection.permissions.retain(|_, value| {
1689            value.get("sessionID").and_then(Value::as_str) != Some(session_id.as_str())
1690        });
1691        projection.questions.retain(|_, value| {
1692            value.get("sessionID").and_then(Value::as_str) != Some(session_id.as_str())
1693        });
1694    }
1695
1696    if should_emit_idle {
1697        let payload = json!({"jsonrpc":"2.0","method":"_sandboxagent/opencode/status","params":{"status":"idle"}});
1698        if let Err(err) = state.persist_event(&session_id, "agent", &payload).await {
1699            warn!(?err, "failed to persist abort idle status envelope");
1700        }
1701        state.emit_event(json!({"type":"session.idle","properties":{"sessionID":session_id}}));
1702    }
1703
1704    // Send session/cancel to the ACP agent if dispatch is available.
1705    if let Some(dispatch) = state.config.acp_dispatch.as_ref() {
1706        let agent_session_id = {
1707            let projection = state.projection.lock().await;
1708            projection
1709                .sessions
1710                .get(&session_id)
1711                .map(|s| s.meta.agent_session_id.clone())
1712        };
1713        if let Some(server_id) = agent_session_id {
1714            let acp_session_id = state.acp_initialized.lock().await.get(&server_id).cloned();
1715            if let Some(acp_sid) = acp_session_id {
1716                let cancel_id = state.next_id("oc_rpc_");
1717                let cancel_payload = json!({
1718                    "jsonrpc": "2.0",
1719                    "id": cancel_id,
1720                    "method": "session/cancel",
1721                    "params": {
1722                        "sessionId": acp_sid,
1723                    }
1724                });
1725                if let Err(err) = dispatch.post(&server_id, None, cancel_payload).await {
1726                    warn!(?err, "failed to send session/cancel to ACP agent");
1727                }
1728            }
1729        }
1730    }
1731
1732    (StatusCode::OK, Json(json!(true))).into_response()
1733}
1734
1735async fn oc_session_children() -> Response {
1736    (StatusCode::OK, Json(json!([]))).into_response()
1737}
1738
1739async fn oc_session_init(
1740    State(state): State<Arc<AdapterState>>,
1741    Path(session_id): Path<String>,
1742    headers: HeaderMap,
1743    Query(query): Query<DirectoryQuery>,
1744    body: Option<Json<SessionInitBody>>,
1745) -> Response {
1746    if let Err(err) = state.ensure_initialized().await {
1747        return internal_error(err);
1748    }
1749
1750    let directory = resolve_directory(&headers, query.directory.as_ref());
1751    if let Err(err) = state.ensure_session(&session_id, directory).await {
1752        return internal_error(err);
1753    }
1754
1755    let body = body.map(|json| json.0).unwrap_or(SessionInitBody {
1756        provider_id: None,
1757        model_id: None,
1758        message_id: None,
1759    });
1760
1761    if body.provider_id.is_none() && body.model_id.is_none() {
1762        return (StatusCode::OK, Json(json!(true))).into_response();
1763    }
1764
1765    if body.provider_id.is_none() || body.model_id.is_none() {
1766        return bad_request("providerID and modelID are required when selecting a model");
1767    }
1768
1769    let provider_id = body.provider_id.unwrap_or_else(|| "mock".to_string());
1770    let model_id = body.model_id.unwrap_or_else(|| "mock".to_string());
1771
1772    let meta = {
1773        let mut projection = state.projection.lock().await;
1774        let Some(session) = projection.sessions.get_mut(&session_id) else {
1775            return not_found("Session not found");
1776        };
1777        let has_messages = !session.messages.is_empty();
1778        let selection_changed =
1779            session.meta.provider_id != provider_id || session.meta.model_id != model_id;
1780        if has_messages && selection_changed {
1781            return bad_request(MODEL_CHANGE_ERROR);
1782        }
1783        session.meta.provider_id = provider_id.clone();
1784        session.meta.model_id = model_id.clone();
1785        session.meta.agent = provider_to_agent(&provider_id);
1786        session.meta.updated_at = now_ms();
1787        session.meta.clone()
1788    };
1789
1790    if let Err(err) = state.persist_session(&meta).await {
1791        return internal_error(err);
1792    }
1793
1794    (StatusCode::OK, Json(json!(true))).into_response()
1795}
1796
1797async fn oc_session_fork(
1798    State(state): State<Arc<AdapterState>>,
1799    Path(session_id): Path<String>,
1800    headers: HeaderMap,
1801    Query(query): Query<DirectoryQuery>,
1802) -> Response {
1803    if let Err(err) = state.ensure_initialized().await {
1804        return internal_error(err);
1805    }
1806
1807    let parent = {
1808        let projection = state.projection.lock().await;
1809        projection.sessions.get(&session_id).cloned()
1810    };
1811    let Some(parent) = parent else {
1812        return not_found("Session not found");
1813    };
1814
1815    let id = state.next_id("ses_");
1816    let now = now_ms();
1817    let directory = resolve_directory(&headers, query.directory.as_ref());
1818    let connection_id = state.current_connection_for_agent(&parent.meta.agent).await;
1819
1820    let meta = SessionMeta {
1821        id: id.clone(),
1822        slug: format!("session-{id}"),
1823        project_id: state.project_id.clone(),
1824        directory,
1825        parent_id: Some(session_id),
1826        title: format!("Fork of {}", parent.meta.title),
1827        version: "0".to_string(),
1828        created_at: now,
1829        updated_at: now,
1830        share_url: None,
1831        permission_mode: parent.meta.permission_mode.clone(),
1832        agent: parent.meta.agent.clone(),
1833        provider_id: parent.meta.provider_id.clone(),
1834        model_id: parent.meta.model_id.clone(),
1835        agent_session_id: format!("acp_{}", state.next_id("ses_")),
1836        last_connection_id: connection_id,
1837        session_init_json: parent.meta.session_init_json.clone(),
1838        destroyed_at: None,
1839    };
1840
1841    if let Err(err) = state.persist_session(&meta).await {
1842        return internal_error(err);
1843    }
1844
1845    {
1846        let mut projection = state.projection.lock().await;
1847        projection.sessions.insert(
1848            id.clone(),
1849            SessionState {
1850                meta: meta.clone(),
1851                messages: Vec::new(),
1852                status: "idle".to_string(),
1853                always_permissions: HashSet::new(),
1854            },
1855        );
1856    }
1857
1858    let value = session_to_value(&meta);
1859    state.emit_event(json!({"type":"session.created","properties":{"info":value}}));
1860
1861    (StatusCode::OK, Json(value)).into_response()
1862}
1863
1864async fn oc_session_diff() -> Response {
1865    (StatusCode::OK, Json(json!([]))).into_response()
1866}
1867
1868async fn oc_session_todo() -> Response {
1869    (StatusCode::OK, Json(json!([]))).into_response()
1870}
1871
1872async fn oc_session_summarize(Json(body): Json<Value>) -> Response {
1873    if body.get("providerID").is_none() || body.get("modelID").is_none() {
1874        return bad_request("providerID and modelID are required");
1875    }
1876    (StatusCode::OK, Json(json!(true))).into_response()
1877}
1878
1879async fn oc_session_messages(
1880    State(state): State<Arc<AdapterState>>,
1881    Path(session_id): Path<String>,
1882) -> Response {
1883    if let Err(err) = state.ensure_initialized().await {
1884        return internal_error(err);
1885    }
1886
1887    let projection = state.projection.lock().await;
1888    let Some(session) = projection.sessions.get(&session_id) else {
1889        return not_found("Session not found");
1890    };
1891
1892    let values = session
1893        .messages
1894        .iter()
1895        .map(|record| json!({"info": record.info, "parts": record.parts}))
1896        .collect::<Vec<_>>();
1897
1898    (StatusCode::OK, Json(values)).into_response()
1899}
1900
1901async fn oc_session_prompt(
1902    State(state): State<Arc<AdapterState>>,
1903    Path(session_id): Path<String>,
1904    headers: HeaderMap,
1905    Query(query): Query<DirectoryQuery>,
1906    Json(body): Json<PromptBody>,
1907) -> Response {
1908    if let Err(err) = state.ensure_initialized().await {
1909        return internal_error(err);
1910    }
1911
1912    let directory = resolve_directory(&headers, query.directory.as_ref());
1913    let mut meta = match state.ensure_session(&session_id, directory.clone()).await {
1914        Ok(meta) => meta,
1915        Err(err) => return internal_error(err),
1916    };
1917
1918    let explicit_model_selection = prompt_has_explicit_model_selection(&body);
1919    let requested_selection = resolve_selection_from_prompt(&body);
1920    if explicit_model_selection && requested_selection.is_none() {
1921        return bad_request("providerID and modelID are required when selecting a model");
1922    }
1923
1924    let has_messages = {
1925        let projection = state.projection.lock().await;
1926        projection
1927            .sessions
1928            .get(&session_id)
1929            .map(|session| !session.messages.is_empty())
1930            .unwrap_or(false)
1931    };
1932
1933    if let Some(selection) = requested_selection.as_ref() {
1934        let selection_changed =
1935            meta.provider_id != selection.provider_id || meta.model_id != selection.model_id;
1936        if has_messages && selection_changed {
1937            return bad_request(MODEL_CHANGE_ERROR);
1938        }
1939        meta.provider_id = selection.provider_id.clone();
1940        meta.model_id = selection.model_id.clone();
1941        meta.agent = selection.agent.clone();
1942    } else if let Some(agent) = body.agent.as_ref() {
1943        if has_messages && meta.agent != *agent {
1944            return bad_request(MODEL_CHANGE_ERROR);
1945        }
1946        meta.agent = agent.clone();
1947    }
1948
1949    let parts_input = body.parts.unwrap_or_default();
1950    if parts_input.is_empty() {
1951        return bad_request("parts are required");
1952    }
1953
1954    if let Some(session_mode) = {
1955        let projection = state.projection.lock().await;
1956        projection
1957            .sessions
1958            .get(&session_id)
1959            .and_then(|session| session.meta.permission_mode.clone())
1960    } {
1961        meta.permission_mode = Some(session_mode);
1962    }
1963
1964    {
1965        let mut projection = state.projection.lock().await;
1966        if let Some(session) = projection.sessions.get_mut(&session_id) {
1967            session.meta.agent = meta.agent.clone();
1968            session.meta.provider_id = meta.provider_id.clone();
1969            session.meta.model_id = meta.model_id.clone();
1970            session.meta.updated_at = now_ms();
1971            meta = session.meta.clone();
1972        }
1973    }
1974
1975    if let Err(err) = state.persist_session(&meta).await {
1976        return internal_error(err);
1977    }
1978
1979    if let Err(err) = state.maybe_restore_session(&session_id).await {
1980        return internal_error(err);
1981    }
1982
1983    // Re-read meta after maybe_restore_session, which may have generated a new
1984    // agent_session_id (e.g. when the agent changed from "mock" to a real agent
1985    // and the connection_id differs).
1986    {
1987        let projection = state.projection.lock().await;
1988        if let Some(session) = projection.sessions.get(&session_id) {
1989            meta = session.meta.clone();
1990        }
1991    }
1992
1993    let user_message_id = body
1994        .message_id
1995        .clone()
1996        .unwrap_or_else(|| state.next_id("msg_"));
1997    let now = now_ms();
1998
1999    let user_info = build_user_message(
2000        &session_id,
2001        &user_message_id,
2002        now,
2003        &meta.agent,
2004        &meta.provider_id,
2005        &meta.model_id,
2006        body.system.as_deref(),
2007    );
2008    let user_parts = normalize_parts(&session_id, &user_message_id, &parts_input);
2009
2010    let replay_injected = state.pending_replay.lock().await.remove(&session_id);
2011    let outbound_prompt_parts = if let Some(replay_text) = replay_injected {
2012        let mut prompt = vec![json!({"type":"text", "text": replay_text})];
2013        prompt.extend(parts_input.clone());
2014        prompt
2015    } else {
2016        parts_input.clone()
2017    };
2018
2019    let prompt_envelope = json!({
2020        "jsonrpc": "2.0",
2021        "id": state.next_id("oc_req_"),
2022        "method": "session/prompt",
2023        "params": {
2024            "sessionId": meta.agent_session_id,
2025            "prompt": outbound_prompt_parts,
2026            "sessionID": session_id,
2027            "message": {
2028                "info": user_info,
2029                "parts": user_parts,
2030            }
2031        }
2032    });
2033    if let Err(err) = state
2034        .persist_event(&session_id, "client", &prompt_envelope)
2035        .await
2036    {
2037        return internal_error(err);
2038    }
2039
2040    state.emit_event(message_event("message.updated", &user_info));
2041    for part in &user_parts {
2042        state.emit_event(json!({
2043            "type":"message.part.updated",
2044            "properties":{
2045                "sessionID": session_id,
2046                "messageID": user_message_id,
2047                "part": part
2048            }
2049        }));
2050    }
2051
2052    // Track the user message ID so the SSE translation task can set
2053    // parentID on assistant messages.
2054    state
2055        .last_user_message_id
2056        .lock()
2057        .await
2058        .insert(session_id.clone(), user_message_id.clone());
2059
2060    if let Err(err) = set_session_status(&state, &session_id, "busy").await {
2061        return internal_error(err);
2062    }
2063
2064    // -----------------------------------------------------------------------
2065    // ACP dispatch path — route to real agent processes when acp_dispatch is
2066    // configured and the resolved agent is not "mock".
2067    // -----------------------------------------------------------------------
2068    tracing::info!(
2069        session_id = %session_id,
2070        agent = %meta.agent,
2071        provider_id = %meta.provider_id,
2072        model_id = %meta.model_id,
2073        has_acp_dispatch = state.config.acp_dispatch.is_some(),
2074        "prompt dispatch decision"
2075    );
2076    if let Some(dispatch) = state.config.acp_dispatch.as_ref() {
2077        if meta.agent != "mock" {
2078            let server_id = meta.agent_session_id.clone();
2079
2080            tracing::info!(server_id = %server_id, agent = %meta.agent, "entering ACP dispatch path");
2081
2082            // Bootstrap the ACP server instance if this is the first prompt.
2083            let needs_init = !state.acp_initialized.lock().await.contains_key(&server_id);
2084            if needs_init {
2085                tracing::info!(server_id = %server_id, "bootstrapping ACP session (initialize + session/new)");
2086                // 1) initialize
2087                let init_id = state.next_id("oc_rpc_");
2088                let init_payload = json!({
2089                    "jsonrpc": "2.0",
2090                    "id": init_id,
2091                    "method": "initialize",
2092                    "params": {
2093                        "protocolVersion": 1,
2094                        "capabilities": {},
2095                        "clientInfo": {
2096                            "name": "sandbox-agent-opencode-adapter",
2097                            "version": "0.1.0"
2098                        },
2099                        "_meta": {
2100                            "sandboxagent.dev": {
2101                                "agent": meta.agent.clone()
2102                            }
2103                        }
2104                    }
2105                });
2106                match dispatch
2107                    .post(&server_id, Some(&meta.agent), init_payload)
2108                    .await
2109                {
2110                    Ok(AcpDispatchResult::Response(ref resp)) => {
2111                        if let Some(err) = resp.get("error") {
2112                            tracing::error!(server_id = %server_id, error = %err, "ACP initialize returned JSON-RPC error");
2113                            let _ = set_session_status(&state, &session_id, "idle").await;
2114                            return internal_error(format!("ACP initialize error: {err}"));
2115                        }
2116                        tracing::info!(server_id = %server_id, "ACP initialize succeeded");
2117                    }
2118                    Ok(AcpDispatchResult::Accepted) => {
2119                        tracing::info!(server_id = %server_id, "ACP initialize accepted");
2120                    }
2121                    Err(err) => {
2122                        let _ = set_session_status(&state, &session_id, "idle").await;
2123                        return internal_error(format!("ACP initialize failed: {err}"));
2124                    }
2125                }
2126
2127                // 2) session/new
2128                let new_id = state.next_id("oc_rpc_");
2129                let new_payload = json!({
2130                    "jsonrpc": "2.0",
2131                    "id": new_id,
2132                    "method": "session/new",
2133                    "params": {
2134                        "cwd": directory,
2135                        "mcpServers": [],
2136                        "_meta": {
2137                            "sandboxagent.dev": {
2138                                "model": meta.model_id.clone()
2139                            }
2140                        }
2141                    }
2142                });
2143                let acp_session_id = match dispatch.post(&server_id, None, new_payload).await {
2144                    Ok(AcpDispatchResult::Response(ref resp)) => {
2145                        if let Some(err) = resp.get("error") {
2146                            tracing::error!(server_id = %server_id, error = %err, "ACP session/new returned JSON-RPC error");
2147                            let _ = set_session_status(&state, &session_id, "idle").await;
2148                            return internal_error(format!("ACP session/new error: {err}"));
2149                        }
2150                        let sid = resp
2151                            .pointer("/result/sessionId")
2152                            .and_then(Value::as_str)
2153                            .unwrap_or("")
2154                            .to_string();
2155                        tracing::info!(server_id = %server_id, acp_session_id = %sid, "ACP session/new succeeded");
2156                        sid
2157                    }
2158                    Ok(AcpDispatchResult::Accepted) => {
2159                        tracing::info!(server_id = %server_id, "ACP session/new accepted");
2160                        String::new()
2161                    }
2162                    Err(err) => {
2163                        let _ = set_session_status(&state, &session_id, "idle").await;
2164                        return internal_error(format!("ACP session/new failed: {err}"));
2165                    }
2166                };
2167
2168                // 3) Start SSE translation task.
2169                match dispatch.notification_stream(&server_id, None).await {
2170                    Ok(stream) => {
2171                        let state_for_task = state.clone();
2172                        let session_id_for_task = session_id.clone();
2173                        let directory_for_task = directory.clone();
2174                        let agent_for_task = meta.agent.clone();
2175                        let provider_for_task = meta.provider_id.clone();
2176                        let model_for_task = meta.model_id.clone();
2177                        tokio::spawn(acp_sse_translation_task(
2178                            state_for_task,
2179                            stream,
2180                            session_id_for_task,
2181                            directory_for_task,
2182                            agent_for_task,
2183                            provider_for_task,
2184                            model_for_task,
2185                        ));
2186                    }
2187                    Err(err) => {
2188                        warn!(
2189                            ?err,
2190                            "failed to open ACP SSE stream; events will not be translated"
2191                        );
2192                    }
2193                }
2194
2195                state
2196                    .acp_initialized
2197                    .lock()
2198                    .await
2199                    .insert(server_id.clone(), acp_session_id);
2200            }
2201
2202            // 4) Send session/prompt
2203            let acp_session_id = state
2204                .acp_initialized
2205                .lock()
2206                .await
2207                .get(&server_id)
2208                .cloned()
2209                .unwrap_or_default();
2210            let prompt_id = state.next_id("oc_rpc_");
2211            let prompt_payload = json!({
2212                "jsonrpc": "2.0",
2213                "id": prompt_id,
2214                "method": "session/prompt",
2215                "params": {
2216                    "sessionId": acp_session_id,
2217                    "prompt": outbound_prompt_parts,
2218                }
2219            });
2220            // dispatch.post() blocks until the agent returns the session/prompt
2221            // response.  The response is also broadcast to the notification stream
2222            // so the SSE translation task sees it in-order after all session/update
2223            // notifications and can emit session.idle at the right time.
2224            match dispatch.post(&server_id, None, prompt_payload).await {
2225                Ok(AcpDispatchResult::Response(ref resp)) => {
2226                    if let Some(err) = resp.get("error") {
2227                        tracing::error!(server_id = %server_id, error = %err, "ACP session/prompt returned JSON-RPC error");
2228                        let _ = set_session_status(&state, &session_id, "idle").await;
2229                        return internal_error(format!("ACP session/prompt error: {err}"));
2230                    }
2231                    tracing::info!(server_id = %server_id, "ACP session/prompt response received (turn completion delegated to SSE task)");
2232                }
2233                Ok(AcpDispatchResult::Accepted) => {
2234                    tracing::info!(server_id = %server_id, "ACP session/prompt accepted (streaming)");
2235                }
2236                Err(err) => {
2237                    let _ = set_session_status(&state, &session_id, "idle").await;
2238                    return internal_error(format!("ACP session/prompt failed: {err}"));
2239                }
2240            };
2241
2242            // The SSE translation task handles session.idle and streamed
2243            // content, but the HTTP response needs the pending assistant
2244            // message envelope so the client can correlate future events.
2245            let assistant_message = build_assistant_message(
2246                &session_id,
2247                &format!("{user_message_id}_pending"),
2248                &user_message_id,
2249                now,
2250                &directory,
2251                &meta.agent,
2252                &meta.provider_id,
2253                &meta.model_id,
2254            );
2255            return (
2256                StatusCode::OK,
2257                Json(json!({
2258                    "info": assistant_message,
2259                    "parts": [],
2260                })),
2261            )
2262                .into_response();
2263        }
2264    }
2265
2266    let prompt_text = parts_input
2267        .iter()
2268        .find_map(|part| part.get("text").and_then(Value::as_str))
2269        .unwrap_or("")
2270        .to_string();
2271
2272    let auto_allow = {
2273        let projection = state.projection.lock().await;
2274        projection
2275            .sessions
2276            .get(&session_id)
2277            .map(|session| session.always_permissions.contains("execute"))
2278            .unwrap_or(false)
2279    };
2280
2281    if prompt_text.to_ascii_lowercase().contains("permission") {
2282        let request_id = state.next_id("perm_");
2283        let permission_request = json!({
2284            "id": request_id,
2285            "sessionID": session_id,
2286            "permission": "execute",
2287            "patterns": ["*"],
2288            "metadata": {},
2289            "always": [],
2290        });
2291        let asked = json!({
2292            "jsonrpc":"2.0",
2293            "method":"_sandboxagent/opencode/permission_asked",
2294            "params":{"request": permission_request}
2295        });
2296        if let Err(err) = state.persist_event(&session_id, "agent", &asked).await {
2297            return internal_error(err);
2298        }
2299        state.emit_event(json!({"type":"permission.asked","properties":permission_request}));
2300
2301        if auto_allow {
2302            if let Err(err) =
2303                resolve_permission_inner(&state, &session_id, &request_id, "always").await
2304            {
2305                return internal_error(err);
2306            }
2307        }
2308
2309        let assistant_info = build_assistant_message(
2310            &session_id,
2311            &format!("{user_message_id}_pending"),
2312            &user_message_id,
2313            now,
2314            &directory,
2315            &meta.agent,
2316            &meta.provider_id,
2317            &meta.model_id,
2318        );
2319
2320        return (
2321            StatusCode::OK,
2322            Json(json!({"info": assistant_info, "parts": []})),
2323        )
2324            .into_response();
2325    }
2326
2327    if prompt_text.to_ascii_lowercase().contains("question") {
2328        let request_id = state.next_id("q_");
2329        let question_request = json!({
2330            "id": request_id,
2331            "sessionID": session_id,
2332            "questions": [{
2333                "question": "Choose one option",
2334                "header": "Question",
2335                "options": [
2336                    {"label":"Yes","description":"Accept"},
2337                    {"label":"No","description":"Reject"}
2338                ],
2339                "multiple": false,
2340                "custom": true
2341            }]
2342        });
2343        let asked = json!({
2344            "jsonrpc":"2.0",
2345            "method":"_sandboxagent/opencode/question_asked",
2346            "params":{"request": question_request}
2347        });
2348        if let Err(err) = state.persist_event(&session_id, "agent", &asked).await {
2349            return internal_error(err);
2350        }
2351        state.emit_event(json!({"type":"question.asked","properties":question_request}));
2352
2353        let assistant_info = build_assistant_message(
2354            &session_id,
2355            &format!("{user_message_id}_pending"),
2356            &user_message_id,
2357            now,
2358            &directory,
2359            &meta.agent,
2360            &meta.provider_id,
2361            &meta.model_id,
2362        );
2363
2364        return (
2365            StatusCode::OK,
2366            Json(json!({"info": assistant_info, "parts": []})),
2367        )
2368            .into_response();
2369    }
2370
2371    tokio::time::sleep(Duration::from_millis(120)).await;
2372
2373    if prompt_text.to_ascii_lowercase().contains("error") {
2374        state.emit_event(json!({
2375            "type":"session.error",
2376            "properties":{
2377                "sessionID": session_id,
2378                "error": {"name":"UnknownError","data":{"message":"mock process crashed"}}
2379            }
2380        }));
2381        let err_env = json!({
2382            "jsonrpc":"2.0",
2383            "method":"_sandboxagent/opencode/error",
2384            "params":{"message":"mock process crashed"}
2385        });
2386        if let Err(err) = state.persist_event(&session_id, "agent", &err_env).await {
2387            return internal_error(err);
2388        }
2389        if let Err(err) = set_session_status(&state, &session_id, "idle").await {
2390            return internal_error(err);
2391        }
2392
2393        let assistant_info = build_assistant_message(
2394            &session_id,
2395            &format!("{user_message_id}_error"),
2396            &user_message_id,
2397            now,
2398            &directory,
2399            &meta.agent,
2400            &meta.provider_id,
2401            &meta.model_id,
2402        );
2403
2404        return (
2405            StatusCode::OK,
2406            Json(json!({"info": assistant_info, "parts": []})),
2407        )
2408            .into_response();
2409    }
2410
2411    let assistant_message_id = format!("{user_message_id}_assistant");
2412    let assistant_info = build_completed_assistant_message(
2413        &session_id,
2414        &assistant_message_id,
2415        &user_message_id,
2416        now,
2417        &directory,
2418        &meta.agent,
2419        &meta.provider_id,
2420        &meta.model_id,
2421    );
2422
2423    let mut assistant_parts = Vec::<Value>::new();
2424
2425    if prompt_text.to_ascii_lowercase().contains("tool") {
2426        let tool_part = json!({
2427            "id": state.next_id("part_"),
2428            "sessionID": session_id,
2429            "messageID": assistant_message_id,
2430            "type": "tool",
2431            "callID": state.next_id("call_"),
2432            "tool": "bash",
2433            "state": {
2434                "status": "completed",
2435                "input": {"command": "echo tool"},
2436                "output": "ok",
2437                "title": "bash",
2438                "metadata": {},
2439                "time": {"start": now, "end": now}
2440            }
2441        });
2442        let file_part = json!({
2443            "id": state.next_id("part_"),
2444            "sessionID": session_id,
2445            "messageID": assistant_message_id,
2446            "type": "file",
2447            "mime": "text/plain",
2448            "filename": "README.md",
2449            "url": "file:///README.md",
2450        });
2451
2452        assistant_parts.push(tool_part.clone());
2453        assistant_parts.push(file_part.clone());
2454
2455        state.emit_event(json!({
2456            "type":"message.part.updated",
2457            "properties":{
2458                "sessionID": session_id,
2459                "messageID": assistant_message_id,
2460                "part": tool_part
2461            }
2462        }));
2463        state.emit_event(json!({
2464            "type":"message.part.updated",
2465            "properties":{
2466                "sessionID": session_id,
2467                "messageID": assistant_message_id,
2468                "part": file_part
2469            }
2470        }));
2471        state.emit_event(
2472            json!({"type":"file.edited","properties":{"sessionID":session_id, "path":"README.md"}}),
2473        );
2474    } else {
2475        let response_text = if prompt_text.trim().is_empty() {
2476            "OK".to_string()
2477        } else {
2478            prompt_text.clone()
2479        };
2480        let text_part = json!({
2481            "id": state.next_id("part_"),
2482            "sessionID": session_id,
2483            "messageID": assistant_message_id,
2484            "type": "text",
2485            "text": response_text,
2486        });
2487        assistant_parts.push(text_part.clone());
2488        state.emit_event(json!({
2489            "type":"message.part.updated",
2490            "properties":{
2491                "sessionID": session_id,
2492                "messageID": assistant_message_id,
2493                "part": text_part
2494            }
2495        }));
2496    }
2497
2498    let assistant_env = json!({
2499        "jsonrpc": "2.0",
2500        "method": "_sandboxagent/opencode/message",
2501        "params": {
2502            "message": {
2503                "info": assistant_info,
2504                "parts": assistant_parts,
2505            }
2506        }
2507    });
2508    if let Err(err) = state
2509        .persist_event(&session_id, "agent", &assistant_env)
2510        .await
2511    {
2512        return internal_error(err);
2513    }
2514
2515    state.emit_event(message_event("message.updated", &assistant_info));
2516
2517    if let Err(err) = set_session_status(&state, &session_id, "idle").await {
2518        return internal_error(err);
2519    }
2520
2521    let projection = state.projection.lock().await;
2522    let parts = projection
2523        .sessions
2524        .get(&session_id)
2525        .and_then(|session| {
2526            session
2527                .messages
2528                .iter()
2529                .find(|message| {
2530                    message.info.get("id").and_then(Value::as_str)
2531                        == Some(assistant_message_id.as_str())
2532                })
2533                .map(|message| message.parts.clone())
2534        })
2535        .unwrap_or_default();
2536
2537    (
2538        StatusCode::OK,
2539        Json(json!({"info": assistant_info, "parts": parts})),
2540    )
2541        .into_response()
2542}
2543
2544async fn oc_session_message_get(
2545    State(state): State<Arc<AdapterState>>,
2546    Path((session_id, message_id)): Path<(String, String)>,
2547) -> Response {
2548    if let Err(err) = state.ensure_initialized().await {
2549        return internal_error(err);
2550    }
2551
2552    let projection = state.projection.lock().await;
2553    let Some(session) = projection.sessions.get(&session_id) else {
2554        return not_found("Session not found");
2555    };
2556
2557    let Some(record) = session.messages.iter().find(|message| {
2558        message.info.get("id").and_then(Value::as_str) == Some(message_id.as_str())
2559    }) else {
2560        return not_found("Message not found");
2561    };
2562
2563    (
2564        StatusCode::OK,
2565        Json(json!({
2566            "id": message_id,
2567            "info": record.info,
2568            "parts": record.parts,
2569        })),
2570    )
2571        .into_response()
2572}
2573
2574async fn oc_part_update(
2575    State(state): State<Arc<AdapterState>>,
2576    Path((session_id, message_id, part_id)): Path<(String, String, String)>,
2577    Json(mut part): Json<Value>,
2578) -> Response {
2579    if let Err(err) = state.ensure_initialized().await {
2580        return internal_error(err);
2581    }
2582
2583    if let Some(obj) = part.as_object_mut() {
2584        obj.insert("id".to_string(), json!(part_id.clone()));
2585        obj.insert("sessionID".to_string(), json!(session_id.clone()));
2586        obj.insert("messageID".to_string(), json!(message_id.clone()));
2587    }
2588
2589    {
2590        let mut projection = state.projection.lock().await;
2591        if let Some(session) = projection.sessions.get_mut(&session_id) {
2592            if let Some(message) = session.messages.iter_mut().find(|record| {
2593                record.info.get("id").and_then(Value::as_str) == Some(message_id.as_str())
2594            }) {
2595                if let Some(existing) = message.parts.iter_mut().find(|candidate| {
2596                    candidate.get("id").and_then(Value::as_str) == Some(part_id.as_str())
2597                }) {
2598                    *existing = part.clone();
2599                } else {
2600                    message.parts.push(part.clone());
2601                }
2602            }
2603        }
2604    }
2605
2606    state.emit_event(json!({
2607        "type":"message.part.updated",
2608        "properties":{
2609            "sessionID": session_id,
2610            "messageID": message_id,
2611            "part": part.clone()
2612        }
2613    }));
2614
2615    (StatusCode::OK, Json(part)).into_response()
2616}
2617
2618async fn oc_part_delete(
2619    State(state): State<Arc<AdapterState>>,
2620    Path((session_id, message_id, part_id)): Path<(String, String, String)>,
2621) -> Response {
2622    if let Err(err) = state.ensure_initialized().await {
2623        return internal_error(err);
2624    }
2625
2626    {
2627        let mut projection = state.projection.lock().await;
2628        if let Some(session) = projection.sessions.get_mut(&session_id) {
2629            if let Some(message) = session.messages.iter_mut().find(|record| {
2630                record.info.get("id").and_then(Value::as_str) == Some(message_id.as_str())
2631            }) {
2632                message.parts.retain(|part| {
2633                    part.get("id").and_then(Value::as_str) != Some(part_id.as_str())
2634                });
2635            }
2636        }
2637    }
2638
2639    state.emit_event(json!({
2640        "type":"message.part.removed",
2641        "properties": {"sessionID": session_id, "messageID": message_id, "partID": part_id}
2642    }));
2643
2644    (StatusCode::OK, Json(json!(true))).into_response()
2645}
2646
2647async fn oc_session_prompt_async(
2648    State(state): State<Arc<AdapterState>>,
2649    Path(session_id): Path<String>,
2650    headers: HeaderMap,
2651    query: Query<DirectoryQuery>,
2652    Json(body): Json<PromptBody>,
2653) -> Response {
2654    let _ = oc_session_prompt(State(state), Path(session_id), headers, query, Json(body)).await;
2655
2656    StatusCode::NO_CONTENT.into_response()
2657}
2658
2659async fn oc_permission_respond(
2660    State(state): State<Arc<AdapterState>>,
2661    Path((session_id, permission_id)): Path<(String, String)>,
2662    Json(body): Json<PermissionRespondBody>,
2663) -> Response {
2664    if let Err(err) = state.ensure_initialized().await {
2665        return internal_error(err);
2666    }
2667
2668    let reply = match body.response.as_deref() {
2669        Some("allow") => "once",
2670        Some("deny") => "reject",
2671        Some("always") => "always",
2672        _ => "once",
2673    };
2674
2675    if let Err(err) = resolve_permission_inner(&state, &session_id, &permission_id, reply).await {
2676        return internal_error(err);
2677    }
2678
2679    (StatusCode::OK, Json(json!(true))).into_response()
2680}
2681
2682async fn oc_permission_reply(
2683    State(state): State<Arc<AdapterState>>,
2684    Path(request_id): Path<String>,
2685    Json(body): Json<PermissionReplyBody>,
2686) -> Response {
2687    if let Err(err) = state.ensure_initialized().await {
2688        return internal_error(err);
2689    }
2690
2691    let reply = body.reply.unwrap_or_else(|| "once".to_string());
2692    let session_id = {
2693        let projection = state.projection.lock().await;
2694        projection
2695            .permissions
2696            .get(&request_id)
2697            .and_then(|value| value.get("sessionID"))
2698            .and_then(Value::as_str)
2699            .map(ToOwned::to_owned)
2700    };
2701
2702    let Some(session_id) = session_id else {
2703        return not_found("Permission request not found");
2704    };
2705
2706    if let Err(err) = resolve_permission_inner(&state, &session_id, &request_id, &reply).await {
2707        return internal_error(err);
2708    }
2709
2710    (StatusCode::OK, Json(json!(true))).into_response()
2711}
2712
2713async fn oc_permission_list(State(state): State<Arc<AdapterState>>) -> Response {
2714    if let Err(err) = state.ensure_initialized().await {
2715        return internal_error(err);
2716    }
2717
2718    let projection = state.projection.lock().await;
2719    let mut values = projection.permissions.values().cloned().collect::<Vec<_>>();
2720    values.sort_by(|a, b| {
2721        let a_id = a.get("id").and_then(Value::as_str).unwrap_or_default();
2722        let b_id = b.get("id").and_then(Value::as_str).unwrap_or_default();
2723        a_id.cmp(b_id)
2724    });
2725    (StatusCode::OK, Json(values)).into_response()
2726}
2727
2728async fn oc_question_list(State(state): State<Arc<AdapterState>>) -> Response {
2729    if let Err(err) = state.ensure_initialized().await {
2730        return internal_error(err);
2731    }
2732
2733    let projection = state.projection.lock().await;
2734    let mut values = projection.questions.values().cloned().collect::<Vec<_>>();
2735    values.sort_by(|a, b| {
2736        let a_id = a.get("id").and_then(Value::as_str).unwrap_or_default();
2737        let b_id = b.get("id").and_then(Value::as_str).unwrap_or_default();
2738        a_id.cmp(b_id)
2739    });
2740    (StatusCode::OK, Json(values)).into_response()
2741}
2742
2743async fn oc_question_reply(
2744    State(state): State<Arc<AdapterState>>,
2745    Path(request_id): Path<String>,
2746    Json(body): Json<QuestionReplyBody>,
2747) -> Response {
2748    if let Err(err) = state.ensure_initialized().await {
2749        return internal_error(err);
2750    }
2751
2752    let session_id = {
2753        let projection = state.projection.lock().await;
2754        projection
2755            .questions
2756            .get(&request_id)
2757            .and_then(|value| value.get("sessionID"))
2758            .and_then(Value::as_str)
2759            .map(ToOwned::to_owned)
2760    };
2761
2762    let Some(session_id) = session_id else {
2763        return not_found("Question request not found");
2764    };
2765
2766    let answers = body.answers.unwrap_or_default();
2767
2768    // Forward the answer to the ACP agent if there's a pending request.
2769    let pending = state.acp_request_ids.lock().await.remove(&request_id);
2770
2771    if let Some(pending) = &pending {
2772        if let Some(dispatch) = state.config.acp_dispatch.as_ref() {
2773            let agent_session_id = {
2774                let projection = state.projection.lock().await;
2775                projection
2776                    .sessions
2777                    .get(&session_id)
2778                    .map(|s| s.meta.agent_session_id.clone())
2779            };
2780            if let Some(server_id) = agent_session_id {
2781                let response = json!({
2782                    "jsonrpc": "2.0",
2783                    "id": pending.jsonrpc_id,
2784                    "result": {
2785                        "outcome": "selected",
2786                        "_meta": {
2787                            "sandboxagent.dev": {
2788                                "answers": answers
2789                            }
2790                        }
2791                    }
2792                });
2793                if let Err(err) = dispatch.post(&server_id, None, response).await {
2794                    warn!(?err, "failed to forward question response to ACP agent");
2795                }
2796            }
2797        }
2798    }
2799
2800    let envelope = json!({
2801        "jsonrpc":"2.0",
2802        "method":"_sandboxagent/opencode/question_replied",
2803        "params":{"requestID": request_id, "answers": answers}
2804    });
2805    if let Err(err) = state.persist_event(&session_id, "agent", &envelope).await {
2806        return internal_error(err);
2807    }
2808
2809    state.emit_event(json!({
2810        "type":"question.replied",
2811        "properties": {
2812            "sessionID": session_id,
2813            "requestID": request_id,
2814            "answers": answers,
2815        }
2816    }));
2817
2818    if let Err(err) = set_session_status(&state, &session_id, "idle").await {
2819        return internal_error(err);
2820    }
2821
2822    (StatusCode::OK, Json(json!(true))).into_response()
2823}
2824
2825async fn oc_question_reject(
2826    State(state): State<Arc<AdapterState>>,
2827    Path(request_id): Path<String>,
2828) -> Response {
2829    if let Err(err) = state.ensure_initialized().await {
2830        return internal_error(err);
2831    }
2832
2833    let session_id = {
2834        let projection = state.projection.lock().await;
2835        projection
2836            .questions
2837            .get(&request_id)
2838            .and_then(|value| value.get("sessionID"))
2839            .and_then(Value::as_str)
2840            .map(ToOwned::to_owned)
2841    };
2842
2843    let Some(session_id) = session_id else {
2844        return not_found("Question request not found");
2845    };
2846
2847    // Forward rejection to the ACP agent if there's a pending request.
2848    let pending = state.acp_request_ids.lock().await.remove(&request_id);
2849
2850    if let Some(pending) = &pending {
2851        if let Some(dispatch) = state.config.acp_dispatch.as_ref() {
2852            let agent_session_id = {
2853                let projection = state.projection.lock().await;
2854                projection
2855                    .sessions
2856                    .get(&session_id)
2857                    .map(|s| s.meta.agent_session_id.clone())
2858            };
2859            if let Some(server_id) = agent_session_id {
2860                let response = json!({
2861                    "jsonrpc": "2.0",
2862                    "id": pending.jsonrpc_id,
2863                    "result": {
2864                        "outcome": "rejected"
2865                    }
2866                });
2867                if let Err(err) = dispatch.post(&server_id, None, response).await {
2868                    warn!(?err, "failed to forward question rejection to ACP agent");
2869                }
2870            }
2871        }
2872    }
2873
2874    let envelope = json!({
2875        "jsonrpc":"2.0",
2876        "method":"_sandboxagent/opencode/question_rejected",
2877        "params":{"requestID": request_id}
2878    });
2879    if let Err(err) = state.persist_event(&session_id, "agent", &envelope).await {
2880        return internal_error(err);
2881    }
2882
2883    state.emit_event(json!({
2884        "type":"question.rejected",
2885        "properties": {
2886            "sessionID": session_id,
2887            "requestID": request_id,
2888        }
2889    }));
2890
2891    if let Err(err) = set_session_status(&state, &session_id, "idle").await {
2892        return internal_error(err);
2893    }
2894
2895    (StatusCode::OK, Json(json!(true))).into_response()
2896}
2897
2898async fn oc_provider_list(State(state): State<Arc<AdapterState>>) -> Response {
2899    if let Err(err) = state.ensure_initialized().await {
2900        return internal_error(err);
2901    }
2902    (StatusCode::OK, Json(provider_payload(&state))).into_response()
2903}
2904
2905async fn oc_provider_auth(State(state): State<Arc<AdapterState>>) -> Response {
2906    if let Err(err) = state.ensure_initialized().await {
2907        return internal_error(err);
2908    }
2909    (StatusCode::OK, Json(json!({"mock": [], "amp": []}))).into_response()
2910}
2911
2912async fn oc_provider_oauth_authorize(Path(provider_id): Path<String>) -> Response {
2913    (
2914        StatusCode::OK,
2915        Json(json!({
2916            "url": format!("https://auth.local/{provider_id}/authorize"),
2917            "method": "auto",
2918            "instructions": "stub",
2919        })),
2920    )
2921        .into_response()
2922}
2923
2924async fn oc_provider_oauth_callback() -> Response {
2925    (StatusCode::OK, Json(json!(true))).into_response()
2926}
2927
2928async fn resolve_permission_inner(
2929    state: &Arc<AdapterState>,
2930    session_id: &str,
2931    permission_id: &str,
2932    reply: &str,
2933) -> Result<(), String> {
2934    // If there's a pending ACP request for this permission, forward the
2935    // response to the agent process.
2936    let pending = state.acp_request_ids.lock().await.remove(permission_id);
2937
2938    if let Some(pending) = &pending {
2939        if let Some(dispatch) = state.config.acp_dispatch.as_ref() {
2940            let agent_session_id = {
2941                let projection = state.projection.lock().await;
2942                projection
2943                    .sessions
2944                    .get(session_id)
2945                    .map(|s| s.meta.agent_session_id.clone())
2946            };
2947            if let Some(server_id) = agent_session_id {
2948                let option_kind = match reply {
2949                    "always" => "allow_always",
2950                    "reject" | "deny" => "reject_once",
2951                    _ => "allow_once",
2952                };
2953                let response = json!({
2954                    "jsonrpc": "2.0",
2955                    "id": pending.jsonrpc_id,
2956                    "result": {
2957                        "outcome": "selected",
2958                        "selectedOption": {
2959                            "kind": option_kind
2960                        }
2961                    }
2962                });
2963                if let Err(err) = dispatch.post(&server_id, None, response).await {
2964                    warn!(?err, "failed to forward permission response to ACP agent");
2965                }
2966            }
2967        }
2968    }
2969
2970    let envelope = json!({
2971        "jsonrpc":"2.0",
2972        "method":"_sandboxagent/opencode/permission_replied",
2973        "params": {
2974            "requestID": permission_id,
2975            "reply": reply,
2976        }
2977    });
2978    state.persist_event(session_id, "agent", &envelope).await?;
2979
2980    state.emit_event(json!({
2981        "type":"permission.replied",
2982        "properties": {
2983            "sessionID": session_id,
2984            "requestID": permission_id,
2985            "reply": reply,
2986        }
2987    }));
2988
2989    if reply == "always" {
2990        let mut projection = state.projection.lock().await;
2991        if let Some(session) = projection.sessions.get_mut(session_id) {
2992            session.always_permissions.insert("execute".to_string());
2993        }
2994    }
2995
2996    set_session_status(state, session_id, "idle").await
2997}
2998
2999async fn set_session_status(
3000    state: &Arc<AdapterState>,
3001    session_id: &str,
3002    status: &str,
3003) -> Result<(), String> {
3004    let updated_meta = {
3005        let mut projection = state.projection.lock().await;
3006        let Some(session) = projection.sessions.get_mut(session_id) else {
3007            return Err(format!("session '{session_id}' not found"));
3008        };
3009        session.status = status.to_string();
3010        session.meta.updated_at = now_ms();
3011        session.meta.clone()
3012    };
3013    state.persist_session(&updated_meta).await?;
3014
3015    let env = json!({
3016        "jsonrpc":"2.0",
3017        "method":"_sandboxagent/opencode/status",
3018        "params":{"status": status}
3019    });
3020    state.persist_event(session_id, "agent", &env).await?;
3021
3022    state.emit_event(json!({
3023        "type":"session.status",
3024        "properties": {
3025            "sessionID": session_id,
3026            "status": {"type": status},
3027        }
3028    }));
3029
3030    if status == "idle" {
3031        state.emit_event(json!({
3032            "type":"session.idle",
3033            "properties": {"sessionID": session_id}
3034        }));
3035    }
3036
3037    Ok(())
3038}
3039
3040fn apply_envelope(projection: &mut Projection, session_id: &str, _sender: &str, payload: &Value) {
3041    let Some(method) = payload.get("method").and_then(Value::as_str) else {
3042        return;
3043    };
3044
3045    match method {
3046        "session/prompt" => {
3047            if let Some(message) = payload
3048                .get("params")
3049                .and_then(|params| params.get("message"))
3050                .and_then(Value::as_object)
3051            {
3052                let info = message.get("info").cloned().unwrap_or_else(|| json!({}));
3053                let parts = message
3054                    .get("parts")
3055                    .and_then(Value::as_array)
3056                    .cloned()
3057                    .unwrap_or_default();
3058                if let Some(session) = projection.sessions.get_mut(session_id) {
3059                    upsert_message(session, info, parts);
3060                    session.status = "busy".to_string();
3061                }
3062            }
3063        }
3064        "_sandboxagent/opencode/message" => {
3065            if let Some(message) = payload
3066                .get("params")
3067                .and_then(|params| params.get("message"))
3068                .and_then(Value::as_object)
3069            {
3070                let info = message.get("info").cloned().unwrap_or_else(|| json!({}));
3071                let parts = message
3072                    .get("parts")
3073                    .and_then(Value::as_array)
3074                    .cloned()
3075                    .unwrap_or_default();
3076                if let Some(session) = projection.sessions.get_mut(session_id) {
3077                    upsert_message(session, info, parts);
3078                }
3079            }
3080        }
3081        "_sandboxagent/opencode/status" => {
3082            let status = payload
3083                .get("params")
3084                .and_then(|params| params.get("status"))
3085                .and_then(Value::as_str)
3086                .unwrap_or("idle")
3087                .to_string();
3088            if let Some(session) = projection.sessions.get_mut(session_id) {
3089                session.status = status;
3090            }
3091        }
3092        "_sandboxagent/opencode/permission_asked" => {
3093            if let Some(request) = payload
3094                .get("params")
3095                .and_then(|params| params.get("request"))
3096                .cloned()
3097            {
3098                if let Some(id) = request.get("id").and_then(Value::as_str) {
3099                    projection.permissions.insert(id.to_string(), request);
3100                }
3101                if let Some(session) = projection.sessions.get_mut(session_id) {
3102                    session.status = "busy".to_string();
3103                }
3104            }
3105        }
3106        "_sandboxagent/opencode/permission_replied" => {
3107            if let Some(request_id) = payload
3108                .get("params")
3109                .and_then(|params| params.get("requestID"))
3110                .and_then(Value::as_str)
3111            {
3112                let reply = payload
3113                    .get("params")
3114                    .and_then(|params| params.get("reply"))
3115                    .and_then(Value::as_str)
3116                    .unwrap_or("once");
3117                projection.permissions.remove(request_id);
3118                if reply == "always" {
3119                    if let Some(session) = projection.sessions.get_mut(session_id) {
3120                        session.always_permissions.insert("execute".to_string());
3121                    }
3122                }
3123            }
3124        }
3125        "_sandboxagent/opencode/question_asked" => {
3126            if let Some(request) = payload
3127                .get("params")
3128                .and_then(|params| params.get("request"))
3129                .cloned()
3130            {
3131                if let Some(id) = request.get("id").and_then(Value::as_str) {
3132                    projection.questions.insert(id.to_string(), request);
3133                }
3134                if let Some(session) = projection.sessions.get_mut(session_id) {
3135                    session.status = "busy".to_string();
3136                }
3137            }
3138        }
3139        "_sandboxagent/opencode/question_replied" => {
3140            if let Some(request_id) = payload
3141                .get("params")
3142                .and_then(|params| params.get("requestID"))
3143                .and_then(Value::as_str)
3144            {
3145                projection.questions.remove(request_id);
3146            }
3147        }
3148        "_sandboxagent/opencode/question_rejected" => {
3149            if let Some(request_id) = payload
3150                .get("params")
3151                .and_then(|params| params.get("requestID"))
3152                .and_then(Value::as_str)
3153            {
3154                projection.questions.remove(request_id);
3155            }
3156        }
3157        _ => {}
3158    }
3159}
3160
3161fn upsert_message(session: &mut SessionState, info: Value, parts: Vec<Value>) {
3162    let message_id = info.get("id").and_then(Value::as_str).unwrap_or_default();
3163    if let Some(existing) = session
3164        .messages
3165        .iter_mut()
3166        .find(|message| message.info.get("id").and_then(Value::as_str) == Some(message_id))
3167    {
3168        // Merge new info fields into existing info rather than replacing.
3169        // This prevents partial info (e.g. just {"id":"..."}) from overwriting
3170        // a complete record with role, parentID, etc.
3171        if let (Some(existing_obj), Some(new_obj)) =
3172            (existing.info.as_object_mut(), info.as_object())
3173        {
3174            for (key, value) in new_obj {
3175                existing_obj.insert(key.clone(), value.clone());
3176            }
3177        } else {
3178            existing.info = info;
3179        }
3180        for part in parts {
3181            let part_id = part.get("id").and_then(Value::as_str).unwrap_or_default();
3182            if let Some(existing_part) = existing
3183                .parts
3184                .iter_mut()
3185                .find(|candidate| candidate.get("id").and_then(Value::as_str) == Some(part_id))
3186            {
3187                *existing_part = part;
3188            } else {
3189                existing.parts.push(part);
3190            }
3191        }
3192        return;
3193    }
3194
3195    session.messages.push(MessageRecord { info, parts });
3196}
3197
3198fn provider_payload(state: &Arc<AdapterState>) -> Value {
3199    // Use pre-built provider data from config when available (built from
3200    // real agent config options in router.rs).
3201    if let Some(payload) = state.config.provider_payload.as_ref() {
3202        return payload.clone();
3203    }
3204
3205    // Fallback: hardcoded mock/amp/claude/codex list for standalone testing.
3206    let mock_model = model_entry("mock", "Mock", "Mock", true, true, true, true, 8192, 4096);
3207    let amp_model = model_entry(
3208        "smart", "Smart", "Amp", false, false, true, true, 8192, 4096,
3209    );
3210    let claude_default = model_entry(
3211        "default",
3212        "Default (recommended)",
3213        "Claude",
3214        false,
3215        false,
3216        true,
3217        true,
3218        200_000,
3219        8_192,
3220    );
3221    let claude_sonnet = model_entry(
3222        "sonnet", "Sonnet", "Claude", false, false, true, true, 200_000, 8_192,
3223    );
3224    let codex_default = model_entry(
3225        "gpt-5", "GPT-5", "Codex", true, true, true, true, 200_000, 16_384,
3226    );
3227
3228    json!({
3229        "all": [
3230            {
3231                "id": "mock",
3232                "name": "Mock",
3233                "env": [],
3234                "models": { "mock": mock_model },
3235            },
3236            {
3237                "id": "amp",
3238                "name": "Amp",
3239                "env": [],
3240                "models": { "smart": amp_model },
3241            }
3242            ,
3243            {
3244                "id": "claude",
3245                "name": "Claude",
3246                "env": [],
3247                "models": {
3248                    "default": claude_default,
3249                    "sonnet": claude_sonnet,
3250                },
3251            },
3252            {
3253                "id": "codex",
3254                "name": "Codex",
3255                "env": [],
3256                "models": { "gpt-5": codex_default },
3257            }
3258        ],
3259        "default": {
3260            "mock": "mock",
3261            "amp": "smart",
3262            "claude": "default",
3263            "codex": "gpt-5",
3264        },
3265        "connected": ["mock", "amp", "claude", "codex"],
3266    })
3267}
3268
3269fn model_entry(
3270    id: &str,
3271    name: &str,
3272    family: &str,
3273    attachment: bool,
3274    reasoning: bool,
3275    temperature: bool,
3276    tool_call: bool,
3277    context: i64,
3278    output: i64,
3279) -> Value {
3280    json!({
3281        "id": id,
3282        "name": name,
3283        "family": family,
3284        "release_date": "1970-01-01",
3285        "attachment": attachment,
3286        "reasoning": reasoning,
3287        "temperature": temperature,
3288        "tool_call": tool_call,
3289        "limit": {
3290            "context": context,
3291            "output": output,
3292        },
3293        "options": {},
3294    })
3295}
3296
3297fn build_user_message(
3298    session_id: &str,
3299    message_id: &str,
3300    now: i64,
3301    agent: &str,
3302    provider_id: &str,
3303    model_id: &str,
3304    system: Option<&str>,
3305) -> Value {
3306    let mut value = json!({
3307        "id": message_id,
3308        "sessionID": session_id,
3309        "role": "user",
3310        "time": {"created": now, "completed": now},
3311        "agent": agent,
3312        "model": {
3313            "providerID": provider_id,
3314            "modelID": model_id,
3315        },
3316    });
3317
3318    if let Some(system) = system {
3319        if let Some(obj) = value.as_object_mut() {
3320            obj.insert("system".to_string(), json!(system));
3321        }
3322    }
3323
3324    value
3325}
3326
3327fn build_assistant_message(
3328    session_id: &str,
3329    message_id: &str,
3330    parent_id: &str,
3331    now: i64,
3332    directory: &str,
3333    agent: &str,
3334    provider_id: &str,
3335    model_id: &str,
3336) -> Value {
3337    json!({
3338        "id": message_id,
3339        "sessionID": session_id,
3340        "role": "assistant",
3341        "time": {"created": now},
3342        "parentID": parent_id,
3343        "modelID": model_id,
3344        "providerID": provider_id,
3345        "mode": "default",
3346        "agent": agent,
3347        "finish": "stop",
3348        "path": {
3349            "cwd": directory,
3350            "root": directory,
3351        },
3352        "cost": 0,
3353        "tokens": {
3354            "input": 0,
3355            "output": 0,
3356            "reasoning": 0,
3357            "cache": {"read": 0, "write": 0},
3358        },
3359    })
3360}
3361
3362/// Build a finalized assistant message with `time.completed` set.
3363fn build_completed_assistant_message(
3364    session_id: &str,
3365    message_id: &str,
3366    parent_id: &str,
3367    now: i64,
3368    directory: &str,
3369    agent: &str,
3370    provider_id: &str,
3371    model_id: &str,
3372) -> Value {
3373    json!({
3374        "id": message_id,
3375        "sessionID": session_id,
3376        "role": "assistant",
3377        "time": {"created": now, "completed": now},
3378        "parentID": parent_id,
3379        "modelID": model_id,
3380        "providerID": provider_id,
3381        "mode": "default",
3382        "agent": agent,
3383        "finish": "stop",
3384        "path": {
3385            "cwd": directory,
3386            "root": directory,
3387        },
3388        "cost": 0,
3389        "tokens": {
3390            "input": 0,
3391            "output": 0,
3392            "reasoning": 0,
3393            "cache": {"read": 0, "write": 0},
3394        },
3395    })
3396}
3397
3398/// Wrap a message info Value into a `message.updated` SSE event, matching
3399/// the reference OpenCode format which includes `sessionID` at the
3400/// `properties` level alongside `info`.
3401fn message_event(event_type: &str, message: &Value) -> Value {
3402    let session_id = message
3403        .get("sessionID")
3404        .and_then(Value::as_str)
3405        .map(|v| v.to_string());
3406    let mut props = serde_json::Map::new();
3407    props.insert("info".to_string(), message.clone());
3408    if let Some(session_id) = session_id {
3409        props.insert("sessionID".to_string(), json!(session_id));
3410    }
3411    json!({
3412        "type": event_type,
3413        "properties": Value::Object(props),
3414    })
3415}
3416
3417fn normalize_parts(session_id: &str, message_id: &str, input: &[Value]) -> Vec<Value> {
3418    input
3419        .iter()
3420        .enumerate()
3421        .map(|(index, part)| {
3422            let id = part
3423                .get("id")
3424                .and_then(Value::as_str)
3425                .map(ToOwned::to_owned)
3426                .unwrap_or_else(|| format!("part_{}_{}", message_id, index));
3427
3428            if let Some(text) = part.get("text").and_then(Value::as_str) {
3429                json!({
3430                    "id": id,
3431                    "sessionID": session_id,
3432                    "messageID": message_id,
3433                    "type": "text",
3434                    "text": text,
3435                })
3436            } else {
3437                let mut cloned = part.clone();
3438                if let Some(obj) = cloned.as_object_mut() {
3439                    obj.insert("id".to_string(), json!(id));
3440                    obj.insert("sessionID".to_string(), json!(session_id));
3441                    obj.insert("messageID".to_string(), json!(message_id));
3442                }
3443                cloned
3444            }
3445        })
3446        .collect()
3447}
3448
3449fn session_to_value(meta: &SessionMeta) -> Value {
3450    let mut value = json!({
3451        "id": meta.id,
3452        "slug": meta.slug,
3453        "projectID": meta.project_id,
3454        "directory": meta.directory,
3455        "title": meta.title,
3456        "version": meta.version,
3457        "time": {
3458            "created": meta.created_at,
3459            "updated": meta.updated_at,
3460        },
3461        // Compatibility extras used by tests and bridge logic.
3462        "agent": meta.agent,
3463        "model": meta.model_id,
3464        "providerID": meta.provider_id,
3465    });
3466
3467    if let Some(parent_id) = &meta.parent_id {
3468        if let Some(obj) = value.as_object_mut() {
3469            obj.insert("parentID".to_string(), json!(parent_id));
3470        }
3471    }
3472
3473    if let Some(share_url) = &meta.share_url {
3474        if let Some(obj) = value.as_object_mut() {
3475            obj.insert("share".to_string(), json!({"url": share_url}));
3476        }
3477    }
3478
3479    if let Some(permission_mode) = &meta.permission_mode {
3480        if let Some(obj) = value.as_object_mut() {
3481            obj.insert("permissionMode".to_string(), json!(permission_mode));
3482        }
3483    }
3484
3485    value
3486}
3487
3488fn provider_to_agent(provider_id: &str) -> String {
3489    match provider_id {
3490        "amp" => "amp".to_string(),
3491        "codex" => "codex".to_string(),
3492        "claude" => "claude".to_string(),
3493        "opencode" => "opencode".to_string(),
3494        _ => "mock".to_string(),
3495    }
3496}
3497
3498#[derive(Debug, Clone)]
3499struct RequestedSelection {
3500    provider_id: String,
3501    model_id: String,
3502    agent: String,
3503}
3504
3505fn prompt_has_explicit_model_selection(body: &PromptBody) -> bool {
3506    body.model.is_some() || body.provider_id.is_some() || body.model_id.is_some()
3507}
3508
3509fn resolve_selection_from_prompt(body: &PromptBody) -> Option<RequestedSelection> {
3510    let mut provider_id = body.provider_id.clone().or_else(|| {
3511        body.model
3512            .as_ref()
3513            .and_then(|model| model.provider_id.clone())
3514    });
3515    let mut model_id = body
3516        .model_id
3517        .clone()
3518        .or_else(|| body.model.as_ref().and_then(|model| model.model_id.clone()));
3519
3520    if provider_id.is_none() {
3521        if let Some(agent) = body.agent.as_deref() {
3522            if let Some((default_provider, default_model)) = default_for_agent(agent) {
3523                provider_id = Some(default_provider.to_string());
3524                if model_id.is_none() {
3525                    model_id = Some(default_model.to_string());
3526                }
3527            }
3528        }
3529    }
3530
3531    if provider_id.is_none() {
3532        if let Some(model) = model_id.as_deref() {
3533            provider_id = provider_for_model(model).map(ToOwned::to_owned);
3534        }
3535    }
3536
3537    if model_id.is_none() {
3538        if let Some(provider) = provider_id.as_deref() {
3539            model_id = default_model_for_provider(provider).map(ToOwned::to_owned);
3540        }
3541    }
3542
3543    let provider_id = provider_id?;
3544    let model_id = model_id?;
3545    Some(RequestedSelection {
3546        agent: provider_to_agent(&provider_id),
3547        provider_id,
3548        model_id,
3549    })
3550}
3551
3552fn default_model_for_provider(provider_id: &str) -> Option<&'static str> {
3553    match provider_id {
3554        "mock" => Some("mock"),
3555        "amp" => Some("smart"),
3556        "claude" => Some("default"),
3557        "codex" => Some("gpt-5"),
3558        _ => None,
3559    }
3560}
3561
3562fn provider_for_model(model_id: &str) -> Option<&'static str> {
3563    match model_id {
3564        "mock" => Some("mock"),
3565        "smart" | "rush" | "deep" | "free" => Some("amp"),
3566        _ if model_id.starts_with("amp-") => Some("amp"),
3567        "default" | "sonnet" | "haiku" | "opus" => Some("claude"),
3568        _ if model_id.starts_with("claude-") => Some("claude"),
3569        _ if model_id.starts_with("gpt-") => Some("codex"),
3570        _ if model_id.contains('/') => Some("opencode"),
3571        _ if model_id.starts_with("opencode/") => Some("opencode"),
3572        _ => None,
3573    }
3574}
3575
3576fn default_for_agent(agent: &str) -> Option<(&'static str, &'static str)> {
3577    match agent {
3578        "mock" => Some(("mock", "mock")),
3579        "amp" => Some(("amp", "smart")),
3580        "claude" => Some(("claude", "default")),
3581        "codex" => Some(("codex", "gpt-5")),
3582        _ => None,
3583    }
3584}
3585
3586fn build_replay_text(events: &[Value], max_chars: usize) -> Option<String> {
3587    if events.is_empty() {
3588        return None;
3589    }
3590
3591    let prefix = "Previous session history is replayed below as JSON-RPC envelopes. Use it as context before responding to the latest user prompt.\n";
3592    let mut text = prefix.to_string();
3593
3594    for event in events {
3595        let line = serde_json::to_string(event).unwrap_or_else(|_| "{}".to_string());
3596        if text.len() + line.len() + 1 > max_chars {
3597            text.push_str("\n[history truncated]");
3598            break;
3599        }
3600        text.push_str(&line);
3601        text.push('\n');
3602    }
3603
3604    Some(text)
3605}
3606
3607fn parse_last_event_id(headers: &HeaderMap) -> Option<u64> {
3608    headers
3609        .get("last-event-id")
3610        .and_then(|value| value.to_str().ok())
3611        .and_then(|value| value.trim().parse::<u64>().ok())
3612}
3613
3614fn resolve_directory(headers: &HeaderMap, query_directory: Option<&String>) -> String {
3615    if let Some(value) = query_directory {
3616        return value.clone();
3617    }
3618
3619    if let Ok(value) = std::env::var("OPENCODE_COMPAT_DIRECTORY") {
3620        if !value.trim().is_empty() {
3621            return value;
3622        }
3623    }
3624
3625    if let Some(value) = headers
3626        .get("x-opencode-directory")
3627        .and_then(|v| v.to_str().ok())
3628    {
3629        if !value.trim().is_empty() {
3630            return value.to_string();
3631        }
3632    }
3633
3634    std::env::current_dir()
3635        .ok()
3636        .and_then(|path| path.to_str().map(ToOwned::to_owned))
3637        .unwrap_or_else(|| "/".to_string())
3638}
3639
3640fn now_ms() -> i64 {
3641    SystemTime::now()
3642        .duration_since(UNIX_EPOCH)
3643        .map(|duration| duration.as_millis() as i64)
3644        .unwrap_or(0)
3645}
3646
3647fn runtime_unique_seed() -> u64 {
3648    let nanos = SystemTime::now()
3649        .duration_since(UNIX_EPOCH)
3650        .map(|duration| duration.as_nanos() as u64)
3651        .unwrap_or(0);
3652    nanos ^ ((std::process::id() as u64) << 32)
3653}
3654
3655// ---------------------------------------------------------------------------
3656// ACP SSE event translation — reads the raw ACP SSE stream from the agent
3657// process and emits translated OpenCode-compatible events.
3658// ---------------------------------------------------------------------------
3659
3660async fn acp_sse_translation_task(
3661    state: Arc<AdapterState>,
3662    mut stream: AcpPayloadStream,
3663    session_id: String,
3664    directory: String,
3665    agent: String,
3666    provider_id: String,
3667    model_id: String,
3668) {
3669    tracing::info!(session_id = %session_id, agent = %agent, "ACP SSE translation task started");
3670
3671    // Running assistant message ID (set on first update, used to group parts).
3672    let mut assistant_message_id: Option<String> = None;
3673    let mut part_counter: u64 = 0;
3674    // Accumulated text for the current streaming text part.
3675    let mut text_accum = String::new();
3676    let mut text_part_id: Option<String> = None;
3677
3678    while let Some(payload) = stream.next().await {
3679        // Determine whether this is a notification (no `id`) or a response.
3680        let method = payload.get("method").and_then(Value::as_str);
3681        let has_result = payload.get("result").is_some();
3682        let has_error = payload.get("error").is_some();
3683        let jsonrpc_id = payload.get("id").cloned();
3684
3685        tracing::debug!(
3686            session_id = %session_id,
3687            method = ?method,
3688            has_result,
3689            has_error,
3690            "ACP SSE event received"
3691        );
3692
3693        match method {
3694            // --- Text / tool streaming updates ---
3695            Some("session/update") => {
3696                // Lazily assign an assistant_message_id for grouping parts.
3697                // Only set it here (not for every event) so that response
3698                // events for initialize/session/new don't accidentally set
3699                // it and trigger the turn-completion guard.
3700                if assistant_message_id.is_none() {
3701                    // Derive from the user message ID so that lexicographic
3702                    // sorting in the TUI places the assistant AFTER the user.
3703                    let user_id = state
3704                        .last_user_message_id
3705                        .lock()
3706                        .await
3707                        .get(&*session_id)
3708                        .cloned()
3709                        .unwrap_or_else(|| state.next_id("msg_"));
3710                    assistant_message_id = Some(format!("{user_id}_assistant"));
3711                }
3712                let msg_id = assistant_message_id.as_deref().unwrap();
3713                let params = payload.get("params").cloned().unwrap_or(json!({}));
3714                translate_session_update(
3715                    &state,
3716                    &session_id,
3717                    msg_id,
3718                    &mut part_counter,
3719                    &mut text_accum,
3720                    &mut text_part_id,
3721                    &directory,
3722                    &agent,
3723                    &provider_id,
3724                    &model_id,
3725                    &params,
3726                )
3727                .await;
3728            }
3729
3730            // --- Permission request from agent ---
3731            Some("session/request_permission") => {
3732                let request_id = state.next_id("perm_");
3733                let params = payload.get("params").cloned().unwrap_or(json!({}));
3734                let permission_request = json!({
3735                    "id": request_id,
3736                    "sessionID": session_id,
3737                    "permission": params.get("permission").and_then(Value::as_str).unwrap_or("execute"),
3738                    "patterns": params.get("patterns").cloned().unwrap_or(json!(["*"])),
3739                    "metadata": params.get("metadata").cloned().unwrap_or(json!({})),
3740                    "always": [],
3741                });
3742
3743                // Save the mapping so we can respond to the agent when the user replies.
3744                if let Some(jrpc_id) = jsonrpc_id {
3745                    state.acp_request_ids.lock().await.insert(
3746                        request_id.clone(),
3747                        AcpPendingRequest {
3748                            opencode_session_id: session_id.clone(),
3749                            jsonrpc_id: jrpc_id,
3750                            kind: AcpPendingKind::Permission,
3751                        },
3752                    );
3753                }
3754
3755                let asked = json!({
3756                    "jsonrpc":"2.0",
3757                    "method":"_sandboxagent/opencode/permission_asked",
3758                    "params":{"request": permission_request}
3759                });
3760                if let Err(err) = state.persist_event(&session_id, "agent", &asked).await {
3761                    warn!(?err, "failed to persist permission_asked event");
3762                }
3763                state
3764                    .emit_event(json!({"type":"permission.asked","properties":permission_request}));
3765            }
3766
3767            // --- Question request from agent ---
3768            Some("_sandboxagent/session/request_question") => {
3769                let request_id = state.next_id("q_");
3770                let params = payload.get("params").cloned().unwrap_or(json!({}));
3771                let question_request = json!({
3772                    "id": request_id,
3773                    "sessionID": session_id,
3774                    "questions": params.get("questions").cloned().unwrap_or(json!([])),
3775                });
3776
3777                if let Some(jrpc_id) = jsonrpc_id {
3778                    state.acp_request_ids.lock().await.insert(
3779                        request_id.clone(),
3780                        AcpPendingRequest {
3781                            opencode_session_id: session_id.clone(),
3782                            jsonrpc_id: jrpc_id,
3783                            kind: AcpPendingKind::Question,
3784                        },
3785                    );
3786                }
3787
3788                let asked = json!({
3789                    "jsonrpc":"2.0",
3790                    "method":"_sandboxagent/opencode/question_asked",
3791                    "params":{"request": question_request}
3792                });
3793                if let Err(err) = state.persist_event(&session_id, "agent", &asked).await {
3794                    warn!(?err, "failed to persist question_asked event");
3795                }
3796                state.emit_event(json!({"type":"question.asked","properties":question_request}));
3797            }
3798
3799            // --- Session ended notification ---
3800            Some("_sandboxagent/session/ended") => {
3801                let params = payload.get("params").cloned().unwrap_or(json!({}));
3802                let reason = params
3803                    .get("reason")
3804                    .and_then(Value::as_str)
3805                    .unwrap_or("unknown");
3806                let error_message = params
3807                    .get("message")
3808                    .and_then(Value::as_str)
3809                    .unwrap_or(reason);
3810
3811                state.emit_event(json!({
3812                    "type":"session.error",
3813                    "properties":{
3814                        "sessionID": session_id,
3815                        "error": {"name":"AgentError","data":{"message": error_message}}
3816                    }
3817                }));
3818                let _ = set_session_status(&state, &session_id, "idle").await;
3819                break;
3820            }
3821
3822            // --- Not a notification: might be a response to session/prompt ---
3823            // Responses to initialize/session/new are also broadcast (they
3824            // arrive in order before prompt responses).  Only treat it as a
3825            // turn completion when we've already received content events
3826            // (assistant_message_id is set by session/update handling).
3827            None if (has_result || has_error) && assistant_message_id.is_some() => {
3828                // The session/prompt response signals turn completion.
3829                if has_error {
3830                    let error_msg = payload
3831                        .pointer("/error/message")
3832                        .and_then(Value::as_str)
3833                        .unwrap_or("agent error");
3834                    state.emit_event(json!({
3835                        "type":"session.error",
3836                        "properties":{
3837                            "sessionID": session_id,
3838                            "error": {"name":"AgentError","data":{"message": error_msg}}
3839                        }
3840                    }));
3841                }
3842
3843                // Persist any remaining accumulated text part.
3844                if let Some(tid) = text_part_id.take() {
3845                    let msg_id = assistant_message_id.as_deref().unwrap_or("");
3846                    let part = json!({
3847                        "id": tid,
3848                        "sessionID": session_id,
3849                        "messageID": msg_id,
3850                        "type": "text",
3851                        "text": text_accum,
3852                    });
3853                    let env = json!({
3854                        "jsonrpc":"2.0",
3855                        "method":"_sandboxagent/opencode/message",
3856                        "params":{"message":{"info":{"id": msg_id},"parts":[part]}}
3857                    });
3858                    if let Err(err) = state.persist_event(&session_id, "agent", &env).await {
3859                        warn!(?err, "failed to persist ACP text part at turn end");
3860                    }
3861                    text_accum.clear();
3862                }
3863
3864                // Finalize the assistant message.
3865                if let Some(msg_id) = assistant_message_id.as_ref() {
3866                    let parent_id = state
3867                        .last_user_message_id
3868                        .lock()
3869                        .await
3870                        .get(&*session_id)
3871                        .cloned()
3872                        .unwrap_or_default();
3873                    let now = now_ms();
3874                    let info = build_completed_assistant_message(
3875                        &session_id,
3876                        msg_id,
3877                        &parent_id,
3878                        now,
3879                        &directory,
3880                        &agent,
3881                        &provider_id,
3882                        &model_id,
3883                    );
3884                    state.emit_event(message_event("message.updated", &info));
3885                }
3886
3887                let _ = set_session_status(&state, &session_id, "idle").await;
3888
3889                // Reset for next turn (if the SSE stream stays open).
3890                assistant_message_id = None;
3891                part_counter = 0;
3892            }
3893
3894            _ => {
3895                tracing::info!(
3896                    session_id = %session_id,
3897                    method = ?method,
3898                    "ACP SSE: unhandled event"
3899                );
3900            }
3901        }
3902    }
3903}
3904
3905/// Translate an ACP `session/update` notification into OpenCode SSE events.
3906///
3907/// ACP `session/update` params use a discriminator field `sessionUpdate` to
3908/// indicate the kind of update.  The content structure depends on the kind:
3909///   - `agent_message_chunk` / `agent_thought_chunk`:  `{ content: ContentBlock }`
3910///   - `tool_call`:  ToolCall fields at top level (`toolCallId`, `title`, …)
3911///   - `tool_call_update`:  ToolCallUpdate fields at top level
3912async fn translate_session_update(
3913    state: &Arc<AdapterState>,
3914    session_id: &str,
3915    message_id: &str,
3916    part_counter: &mut u64,
3917    text_accum: &mut String,
3918    text_part_id: &mut Option<String>,
3919    directory: &str,
3920    agent: &str,
3921    provider_id: &str,
3922    model_id: &str,
3923    params: &Value,
3924) {
3925    // ACP session/update params: { sessionId, update: { sessionUpdate, content, ... } }
3926    let update = params.get("update").unwrap_or(params);
3927    let kind = update
3928        .get("sessionUpdate")
3929        .and_then(Value::as_str)
3930        .unwrap_or("");
3931
3932    // Emit AND persist the assistant message info on the first content update.
3933    if *part_counter == 0
3934        && matches!(
3935            kind,
3936            "agent_message_chunk" | "agent_thought_chunk" | "tool_call"
3937        )
3938    {
3939        let parent_id = state
3940            .last_user_message_id
3941            .lock()
3942            .await
3943            .get(session_id)
3944            .cloned()
3945            .unwrap_or_default();
3946        let now = now_ms();
3947        let info = build_assistant_message(
3948            session_id,
3949            message_id,
3950            &parent_id,
3951            now,
3952            directory,
3953            agent,
3954            provider_id,
3955            model_id,
3956        );
3957        state.emit_event(message_event("message.updated", &info));
3958        // Persist so the projection has the correct info (role, parentID, etc.)
3959        // for this assistant message when the session is replayed.
3960        let env = json!({
3961            "jsonrpc":"2.0",
3962            "method":"_sandboxagent/opencode/message",
3963            "params":{"message":{"info": info, "parts":[]}}
3964        });
3965        if let Err(err) = state.persist_event(session_id, "agent", &env).await {
3966            warn!(?err, "failed to persist assistant message info");
3967        }
3968    }
3969
3970    match kind {
3971        // ── Text / thought chunk ───────────────────────────────────────
3972        "agent_message_chunk" | "agent_thought_chunk" => {
3973            // ContentChunk.content is a ContentBlock; for text it has { type: "text", text: "…" }
3974            let chunk = update
3975                .pointer("/content/text")
3976                .and_then(Value::as_str)
3977                .unwrap_or("");
3978            if chunk.is_empty() {
3979                return;
3980            }
3981
3982            // Accumulate into a single part — reuse the same part ID so the
3983            // UI updates in-place instead of creating a new line per chunk.
3984            text_accum.push_str(chunk);
3985            let part_id = text_part_id.get_or_insert_with(|| {
3986                let id = format!("part_{message_id}_{part_counter}");
3987                *part_counter += 1;
3988                id
3989            });
3990            let part = json!({
3991                "id": *part_id,
3992                "sessionID": session_id,
3993                "messageID": message_id,
3994                "type": "text",
3995                "text": *text_accum,
3996            });
3997            state.emit_event(json!({
3998                "type":"message.part.updated",
3999                "properties":{
4000                    "sessionID": session_id,
4001                    "messageID": message_id,
4002                    "part": part,
4003                    "delta": chunk
4004                }
4005            }));
4006        }
4007
4008        // ── Tool call initiation ───────────────────────────────────────
4009        "tool_call" => {
4010            // Finalize any accumulated text part before switching to tool.
4011            if let Some(tid) = text_part_id.take() {
4012                let part = json!({
4013                    "id": tid,
4014                    "sessionID": session_id,
4015                    "messageID": message_id,
4016                    "type": "text",
4017                    "text": *text_accum,
4018                });
4019                let env = json!({
4020                    "jsonrpc":"2.0",
4021                    "method":"_sandboxagent/opencode/message",
4022                    "params":{"message":{"info":{"id": message_id},"parts":[part]}}
4023                });
4024                if let Err(err) = state.persist_event(session_id, "agent", &env).await {
4025                    warn!(?err, "failed to persist ACP text part");
4026                }
4027                text_accum.clear();
4028            }
4029            let call_id = update
4030                .get("toolCallId")
4031                .and_then(Value::as_str)
4032                .unwrap_or("unknown");
4033            let tool_title = update
4034                .get("title")
4035                .and_then(Value::as_str)
4036                .unwrap_or("unknown");
4037            let part_id = format!("part_{message_id}_{part_counter}");
4038            *part_counter += 1;
4039            let now = now_ms();
4040            let part = json!({
4041                "id": part_id,
4042                "sessionID": session_id,
4043                "messageID": message_id,
4044                "type": "tool",
4045                "callID": call_id,
4046                "tool": tool_title,
4047                "state": {
4048                    "status": "running",
4049                    "input": update.get("rawInput").cloned().unwrap_or(json!({})),
4050                    "title": tool_title,
4051                    "metadata": {},
4052                    "time": {"start": now}
4053                }
4054            });
4055            let env = json!({
4056                "jsonrpc":"2.0",
4057                "method":"_sandboxagent/opencode/message",
4058                "params":{"message":{"info":{"id": message_id},"parts":[part.clone()]}}
4059            });
4060            if let Err(err) = state.persist_event(session_id, "agent", &env).await {
4061                warn!(?err, "failed to persist ACP tool call event");
4062            }
4063            state.emit_event(json!({
4064                "type":"message.part.updated",
4065                "properties":{
4066                    "sessionID": session_id,
4067                    "messageID": message_id,
4068                    "part": part
4069                }
4070            }));
4071        }
4072
4073        // ── Tool call status update ────────────────────────────────────
4074        "tool_call_update" => {
4075            let call_id = update
4076                .get("toolCallId")
4077                .and_then(Value::as_str)
4078                .unwrap_or("unknown");
4079            let status = update
4080                .get("status")
4081                .and_then(Value::as_str)
4082                .unwrap_or("completed");
4083            let output = update
4084                .get("content")
4085                .and_then(|v| v.as_array())
4086                .and_then(|arr| {
4087                    arr.iter()
4088                        .filter_map(|c| c.get("text").and_then(Value::as_str))
4089                        .next()
4090                })
4091                .unwrap_or("");
4092            let now = now_ms();
4093            let part = json!({
4094                "id": format!("part_tc_{call_id}"),
4095                "sessionID": session_id,
4096                "messageID": message_id,
4097                "type": "tool",
4098                "callID": call_id,
4099                "state": {
4100                    "status": status,
4101                    "output": output,
4102                    "time": {"end": now}
4103                }
4104            });
4105            state.emit_event(json!({
4106                "type":"message.part.updated",
4107                "properties":{
4108                    "sessionID": session_id,
4109                    "messageID": message_id,
4110                    "part": part
4111                }
4112            }));
4113        }
4114
4115        _ => {
4116            tracing::debug!(
4117                session_id = %session_id,
4118                kind = %kind,
4119                "translate_session_update: unhandled sessionUpdate kind"
4120            );
4121        }
4122    }
4123}
4124
4125fn normalize_proxy_base_url(value: String) -> Option<String> {
4126    let trimmed = value.trim();
4127    if trimmed.is_empty() {
4128        return None;
4129    }
4130    let normalized = trimmed.trim_end_matches('/').to_string();
4131    if normalized.starts_with("http://") || normalized.starts_with("https://") {
4132        Some(normalized)
4133    } else {
4134        None
4135    }
4136}
4137
4138async fn resolve_proxy_base_url(state: &Arc<AdapterState>, path: &str) -> Option<String> {
4139    if let Some(base_url) = state.config.native_proxy_base_url.as_ref() {
4140        return Some(base_url.clone());
4141    }
4142
4143    let manager = state.config.native_proxy_manager.as_ref()?;
4144    match manager.ensure_server().await {
4145        Ok(base_url) => Some(base_url),
4146        Err(err) => {
4147            warn!(path, error = ?err, "failed to lazily start native OpenCode sidecar");
4148            None
4149        }
4150    }
4151}
4152
4153async fn proxy_native_opencode(
4154    state: &Arc<AdapterState>,
4155    method: reqwest::Method,
4156    path: &str,
4157    headers: &HeaderMap,
4158    body: Option<Value>,
4159) -> Option<Response> {
4160    let base_url = resolve_proxy_base_url(state, path).await?;
4161
4162    let mut request = state
4163        .proxy_http_client
4164        .request(method, format!("{base_url}{path}"));
4165
4166    for header_name in [
4167        header::AUTHORIZATION,
4168        header::ACCEPT,
4169        HeaderName::from_static("x-opencode-directory"),
4170    ] {
4171        if let Some(value) = headers.get(&header_name) {
4172            request = request.header(header_name.as_str(), value.as_bytes());
4173        }
4174    }
4175
4176    if let Some(body) = body {
4177        request = request.json(&body);
4178    }
4179
4180    let response = match request.send().await {
4181        Ok(response) => response,
4182        Err(err) => {
4183            warn!(path, error = ?err, "failed proxy request to native OpenCode; falling back to adapter response");
4184            // Return None so the caller can use its own fallback response
4185            // instead of showing a BAD_GATEWAY error to the client.
4186            return None;
4187        }
4188    };
4189
4190    let status =
4191        StatusCode::from_u16(response.status().as_u16()).unwrap_or(StatusCode::BAD_GATEWAY);
4192    let content_type = response
4193        .headers()
4194        .get(reqwest::header::CONTENT_TYPE)
4195        .and_then(|value| value.to_str().ok())
4196        .map(|value| value.to_string());
4197    let body_bytes = match response.bytes().await {
4198        Ok(bytes) => bytes,
4199        Err(err) => {
4200            warn!(path, error = ?err, "failed to read proxied response body");
4201            return Some(
4202                (
4203                    StatusCode::BAD_GATEWAY,
4204                    Json(json!({
4205                        "data": {},
4206                        "errors": [{"message": format!("failed to read proxied response: {err}")}],
4207                        "success": false,
4208                    })),
4209                )
4210                    .into_response(),
4211            );
4212        }
4213    };
4214
4215    let mut proxied = Response::new(Body::from(body_bytes));
4216    *proxied.status_mut() = status;
4217    if let Some(content_type) = content_type {
4218        if let Ok(header_value) = HeaderValue::from_str(&content_type) {
4219            proxied
4220                .headers_mut()
4221                .insert(header::CONTENT_TYPE, header_value);
4222        }
4223    }
4224
4225    Some(proxied)
4226}
4227
4228async fn proxy_native_opencode_json(
4229    state: &Arc<AdapterState>,
4230    method: reqwest::Method,
4231    path: &str,
4232    headers: &HeaderMap,
4233    body: Option<Value>,
4234) -> Option<Result<(StatusCode, Value), Response>> {
4235    let base_url = resolve_proxy_base_url(state, path).await?;
4236
4237    let mut request = state
4238        .proxy_http_client
4239        .request(method, format!("{base_url}{path}"));
4240
4241    for header_name in [
4242        header::AUTHORIZATION,
4243        header::ACCEPT,
4244        HeaderName::from_static("x-opencode-directory"),
4245    ] {
4246        if let Some(value) = headers.get(&header_name) {
4247            request = request.header(header_name.as_str(), value.as_bytes());
4248        }
4249    }
4250
4251    if let Some(body) = body {
4252        request = request.json(&body);
4253    }
4254
4255    let response = match request.send().await {
4256        Ok(response) => response,
4257        Err(err) => {
4258            warn!(path, error = ?err, "failed proxy request to native OpenCode");
4259            return Some(Err((
4260                StatusCode::BAD_GATEWAY,
4261                Json(json!({
4262                    "data": {},
4263                    "errors": [{"message": format!("failed to proxy to native OpenCode: {err}")}],
4264                    "success": false,
4265                })),
4266            )
4267                .into_response()));
4268        }
4269    };
4270
4271    let status =
4272        StatusCode::from_u16(response.status().as_u16()).unwrap_or(StatusCode::BAD_GATEWAY);
4273    let content_type = response
4274        .headers()
4275        .get(reqwest::header::CONTENT_TYPE)
4276        .and_then(|value| value.to_str().ok())
4277        .map(|value| value.to_string());
4278    let body_bytes = match response.bytes().await {
4279        Ok(bytes) => bytes,
4280        Err(err) => {
4281            warn!(path, error = ?err, "failed to read proxied response body");
4282            return Some(Err((
4283                StatusCode::BAD_GATEWAY,
4284                Json(json!({
4285                    "data": {},
4286                    "errors": [{"message": format!("failed to read proxied response: {err}")}],
4287                    "success": false,
4288                })),
4289            )
4290                .into_response()));
4291        }
4292    };
4293
4294    if !status.is_success() {
4295        let mut proxied = Response::new(Body::from(body_bytes));
4296        *proxied.status_mut() = status;
4297        if let Some(content_type) = content_type {
4298            if let Ok(header_value) = HeaderValue::from_str(&content_type) {
4299                proxied
4300                    .headers_mut()
4301                    .insert(header::CONTENT_TYPE, header_value);
4302            }
4303        }
4304        return Some(Err(proxied));
4305    }
4306
4307    if body_bytes.is_empty() {
4308        warn!(
4309            path,
4310            "native OpenCode prompt proxy returned an empty success body; falling back to local compat"
4311        );
4312        return None;
4313    }
4314
4315    let payload = match serde_json::from_slice::<Value>(&body_bytes) {
4316        Ok(payload) => payload,
4317        Err(err) => {
4318            warn!(path, error = ?err, "failed to parse proxied JSON response");
4319            return Some(Err(
4320                (
4321                    StatusCode::BAD_GATEWAY,
4322                    Json(json!({
4323                        "data": {},
4324                        "errors": [{"message": format!("failed to parse proxied response as JSON: {err}")}],
4325                        "success": false,
4326                    })),
4327                )
4328                    .into_response(),
4329            ));
4330        }
4331    };
4332
4333    Some(Ok((status, payload)))
4334}
4335
4336fn bool_ok(value: bool) -> (StatusCode, Json<Value>) {
4337    (StatusCode::OK, Json(json!(value)))
4338}
4339
4340fn bad_request(message: &str) -> Response {
4341    (
4342        StatusCode::BAD_REQUEST,
4343        Json(json!({"errors":[{"message": message}]})),
4344    )
4345        .into_response()
4346}
4347
4348fn not_found(message: &str) -> Response {
4349    (
4350        StatusCode::NOT_FOUND,
4351        Json(json!({"errors":[{"message": message}]})),
4352    )
4353        .into_response()
4354}
4355
4356fn internal_error(message: String) -> Response {
4357    warn!(?message, "opencode adapter internal error");
4358    (
4359        StatusCode::INTERNAL_SERVER_ERROR,
4360        Json(json!({"errors":[{"message": message}]})),
4361    )
4362        .into_response()
4363}