1use std::collections::{HashMap, HashSet, VecDeque};
2use std::convert::Infallible;
3use std::future::Future;
4use std::path::PathBuf;
5use std::pin::Pin;
6use std::str::FromStr;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::{Arc, Mutex as StdMutex};
9use std::time::{Duration, SystemTime, UNIX_EPOCH};
10
11use axum::body::Body;
12use axum::extract::{Path, Query, State};
13use axum::http::{header, HeaderMap, HeaderName, HeaderValue, Request, StatusCode};
14use axum::middleware::Next;
15use axum::response::sse::{Event, KeepAlive};
16use axum::response::{IntoResponse, Response, Sse};
17use axum::routing::{get, patch, post};
18use axum::{Json, Router};
19use futures::stream;
20use futures::{Stream, StreamExt};
21use sandbox_agent_opencode_server_manager::OpenCodeServerManager;
22use serde::{Deserialize, Serialize};
23use serde_json::{json, Value};
24use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions};
25use sqlx::{Row, SqlitePool};
26use tokio::sync::{broadcast, Mutex, OnceCell};
27use tokio::time::interval;
28use tracing::warn;
29
30const DEFAULT_REPLAY_MAX_EVENTS: usize = 50;
31const DEFAULT_REPLAY_MAX_CHARS: usize = 12_000;
32const EVENT_LOG_SIZE: usize = 4096;
33const EVENT_CHANNEL_SIZE: usize = 2048;
34const MODEL_CHANGE_ERROR: &str = "OpenCode compatibility currently does not support changing the model after creating a session. Export with /export and load in to a new session.";
35
36pub type AcpPayloadStream = Pin<Box<dyn Stream<Item = Value> + Send>>;
43
44#[derive(Debug)]
45pub enum AcpDispatchResult {
46 Response(Value),
47 Accepted,
48}
49
50pub trait AcpDispatch: Send + Sync + 'static {
55 fn post(
59 &self,
60 server_id: &str,
61 bootstrap_agent: Option<&str>,
62 payload: Value,
63 ) -> Pin<Box<dyn Future<Output = Result<AcpDispatchResult, String>> + Send + '_>>;
64
65 fn notification_stream(
69 &self,
70 server_id: &str,
71 last_event_id: Option<u64>,
72 ) -> Pin<Box<dyn Future<Output = Result<AcpPayloadStream, String>> + Send + '_>>;
73
74 fn delete(
76 &self,
77 server_id: &str,
78 ) -> Pin<Box<dyn Future<Output = Result<(), String>> + Send + '_>>;
79}
80
81pub struct OpenCodeAdapterConfig {
82 pub auth_token: Option<String>,
83 pub sqlite_path: Option<String>,
84 pub replay_max_events: usize,
85 pub replay_max_chars: usize,
86 pub native_proxy_base_url: Option<String>,
87 pub native_proxy_manager: Option<Arc<OpenCodeServerManager>>,
88 pub acp_dispatch: Option<Arc<dyn AcpDispatch>>,
91 pub provider_payload: Option<Value>,
94}
95
96impl Default for OpenCodeAdapterConfig {
97 fn default() -> Self {
98 Self {
99 auth_token: None,
100 sqlite_path: None,
101 replay_max_events: DEFAULT_REPLAY_MAX_EVENTS,
102 replay_max_chars: DEFAULT_REPLAY_MAX_CHARS,
103 native_proxy_base_url: None,
104 native_proxy_manager: None,
105 acp_dispatch: None,
106 provider_payload: None,
107 }
108 }
109}
110
111#[derive(Clone, Debug)]
112struct OpenCodeStreamEvent {
113 id: u64,
114 payload: Value,
115}
116
117#[derive(Clone, Debug)]
118struct SessionState {
119 meta: SessionMeta,
120 messages: Vec<MessageRecord>,
121 status: String,
122 always_permissions: HashSet<String>,
123}
124
125#[derive(Clone, Debug)]
126struct MessageRecord {
127 info: Value,
128 parts: Vec<Value>,
129}
130
131#[derive(Clone, Debug, Serialize, Deserialize)]
132struct SessionMeta {
133 id: String,
134 slug: String,
135 project_id: String,
136 directory: String,
137 parent_id: Option<String>,
138 title: String,
139 version: String,
140 created_at: i64,
141 updated_at: i64,
142 share_url: Option<String>,
143 permission_mode: Option<String>,
144 agent: String,
145 provider_id: String,
146 model_id: String,
147 agent_session_id: String,
148 last_connection_id: String,
149 session_init_json: Option<Value>,
150 destroyed_at: Option<i64>,
151}
152
153#[derive(Debug, Clone, Default)]
154struct Projection {
155 sessions: HashMap<String, SessionState>,
156 permissions: HashMap<String, Value>,
157 questions: HashMap<String, Value>,
158}
159
160#[derive(Debug, Clone)]
161struct AcpPendingRequest {
162 opencode_session_id: String,
163 jsonrpc_id: Value,
165 kind: AcpPendingKind,
166}
167
168#[derive(Debug, Clone)]
169enum AcpPendingKind {
170 Permission,
171 Question,
172}
173
174struct AdapterState {
175 config: OpenCodeAdapterConfig,
176 sqlite_path: String,
177 sqlite_connect_options: SqliteConnectOptions,
178 proxy_http_client: reqwest::Client,
179 pool: OnceCell<SqlitePool>,
180 initialized: OnceCell<()>,
181 project_id: String,
182 projection: Mutex<Projection>,
183 pending_replay: Mutex<HashMap<String, String>>,
184 agent_connections: Mutex<HashMap<String, String>>,
185 event_broadcaster: broadcast::Sender<OpenCodeStreamEvent>,
186 event_log: StdMutex<VecDeque<OpenCodeStreamEvent>>,
187 next_event_id: AtomicU64,
188 next_id: AtomicU64,
189 acp_initialized: Mutex<HashMap<String, String>>,
192 acp_request_ids: Mutex<HashMap<String, AcpPendingRequest>>,
195 last_user_message_id: Mutex<HashMap<String, String>>,
198}
199
200impl AdapterState {
201 async fn ensure_initialized(&self) -> Result<(), String> {
202 self.initialized
203 .get_or_try_init(|| async {
204 let pool = self.pool().await?;
205 sqlx::query("PRAGMA journal_mode=WAL;")
206 .execute(pool)
207 .await
208 .map_err(|err| err.to_string())?;
209 sqlx::query("PRAGMA synchronous=NORMAL;")
210 .execute(pool)
211 .await
212 .map_err(|err| err.to_string())?;
213
214 sqlx::query(include_str!("../migrations/0001_init.sql"))
216 .execute(pool)
217 .await
218 .map_err(|err| err.to_string())?;
219
220 self.rebuild_projection().await?;
221 Ok(())
222 })
223 .await
224 .map(|_| ())
225 }
226
227 async fn rebuild_projection(&self) -> Result<(), String> {
228 let mut projection = Projection::default();
229 let pool = self.pool().await?;
230
231 let rows = sqlx::query(
232 r#"SELECT s.id, s.agent, s.agent_session_id, s.last_connection_id, s.created_at, s.destroyed_at, s.session_init_json,
233 m.metadata_json
234 FROM sessions s
235 JOIN opencode_session_metadata m ON m.session_id = s.id
236 ORDER BY s.created_at ASC, s.id ASC"#,
237 )
238 .fetch_all(pool)
239 .await
240 .map_err(|err| err.to_string())?;
241
242 for row in rows {
243 let id: String = row.try_get("id").map_err(|err| err.to_string())?;
244 let agent: String = row.try_get("agent").map_err(|err| err.to_string())?;
245 let agent_session_id: String = row
246 .try_get("agent_session_id")
247 .map_err(|err| err.to_string())?;
248 let last_connection_id: String = row
249 .try_get("last_connection_id")
250 .map_err(|err| err.to_string())?;
251 let created_at: i64 = row.try_get("created_at").map_err(|err| err.to_string())?;
252 let destroyed_at: Option<i64> =
253 row.try_get("destroyed_at").map_err(|err| err.to_string())?;
254 let session_init_json: Option<String> = row
255 .try_get("session_init_json")
256 .map_err(|err| err.to_string())?;
257 let metadata_json: String = row
258 .try_get("metadata_json")
259 .map_err(|err| err.to_string())?;
260
261 let mut meta: SessionMeta =
262 serde_json::from_str(&metadata_json).map_err(|err| err.to_string())?;
263 meta.id = id.clone();
264 meta.agent = agent;
265 meta.agent_session_id = agent_session_id;
266 meta.last_connection_id = last_connection_id;
267 meta.created_at = created_at;
268 meta.destroyed_at = destroyed_at;
269 meta.session_init_json = session_init_json
270 .as_deref()
271 .and_then(|raw| serde_json::from_str(raw).ok());
272
273 projection.sessions.insert(
274 id,
275 SessionState {
276 meta,
277 messages: Vec::new(),
278 status: "idle".to_string(),
279 always_permissions: HashSet::new(),
280 },
281 );
282 }
283
284 let event_rows = sqlx::query(
285 r#"SELECT session_id, sender, payload_json
286 FROM events
287 ORDER BY created_at ASC, id ASC"#,
288 )
289 .fetch_all(pool)
290 .await
291 .map_err(|err| err.to_string())?;
292
293 for row in event_rows {
294 let session_id: String = row.try_get("session_id").map_err(|err| err.to_string())?;
295 let sender: String = row.try_get("sender").map_err(|err| err.to_string())?;
296 let payload_json: String =
297 row.try_get("payload_json").map_err(|err| err.to_string())?;
298 let payload: Value =
299 serde_json::from_str(&payload_json).map_err(|err| err.to_string())?;
300 apply_envelope(&mut projection, &session_id, &sender, &payload);
301 }
302
303 let mut guard = self.projection.lock().await;
304 *guard = projection;
305 Ok(())
306 }
307
308 fn emit_event(&self, payload: Value) {
309 let event = OpenCodeStreamEvent {
310 id: self.next_event_id.fetch_add(1, Ordering::Relaxed),
311 payload,
312 };
313
314 if let Ok(mut guard) = self.event_log.lock() {
315 guard.push_back(event.clone());
316 while guard.len() > EVENT_LOG_SIZE {
317 guard.pop_front();
318 }
319 }
320
321 let _ = self.event_broadcaster.send(event);
322 }
323
324 fn buffered_events_after(&self, last_event_id: Option<u64>) -> Vec<OpenCodeStreamEvent> {
325 let Some(last_event_id) = last_event_id else {
326 return Vec::new();
327 };
328 let Ok(guard) = self.event_log.lock() else {
329 return Vec::new();
330 };
331 guard
332 .iter()
333 .filter(|entry| entry.id > last_event_id)
334 .cloned()
335 .collect()
336 }
337
338 fn subscribe(&self) -> broadcast::Receiver<OpenCodeStreamEvent> {
339 self.event_broadcaster.subscribe()
340 }
341
342 fn next_id(&self, prefix: &str) -> String {
343 let value = self.next_id.fetch_add(1, Ordering::Relaxed);
344 format!("{prefix}{value}")
345 }
346
347 async fn current_connection_for_agent(&self, agent: &str) -> String {
348 let mut guard = self.agent_connections.lock().await;
349 guard
350 .entry(agent.to_string())
351 .or_insert_with(|| format!("conn_{}_{}", agent, now_ms()))
352 .clone()
353 }
354
355 async fn pool(&self) -> Result<&SqlitePool, String> {
356 self.pool
357 .get_or_try_init(|| async {
358 if let Some(parent) = PathBuf::from(&self.sqlite_path).parent() {
359 if !parent.as_os_str().is_empty() {
360 std::fs::create_dir_all(parent).map_err(|err| err.to_string())?;
361 }
362 }
363 SqlitePoolOptions::new()
364 .max_connections(1)
365 .connect_with(self.sqlite_connect_options.clone())
366 .await
367 .map_err(|err| err.to_string())
368 })
369 .await
370 }
371
372 async fn persist_session(&self, meta: &SessionMeta) -> Result<(), String> {
373 let pool = self.pool().await?;
374 let session_init_json = meta
375 .session_init_json
376 .as_ref()
377 .map(|value| serde_json::to_string(value).unwrap_or_else(|_| "{}".to_string()));
378
379 sqlx::query(
380 r#"INSERT INTO sessions (
381 id, agent, agent_session_id, last_connection_id, created_at, destroyed_at, session_init_json
382 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
383 ON CONFLICT(id) DO UPDATE SET
384 agent = excluded.agent,
385 agent_session_id = excluded.agent_session_id,
386 last_connection_id = excluded.last_connection_id,
387 created_at = excluded.created_at,
388 destroyed_at = excluded.destroyed_at,
389 session_init_json = excluded.session_init_json"#,
390 )
391 .bind(&meta.id)
392 .bind(&meta.agent)
393 .bind(&meta.agent_session_id)
394 .bind(&meta.last_connection_id)
395 .bind(meta.created_at)
396 .bind(meta.destroyed_at)
397 .bind(session_init_json)
398 .execute(pool)
399 .await
400 .map_err(|err| err.to_string())?;
401
402 let metadata_json = serde_json::to_string(meta).map_err(|err| err.to_string())?;
403 sqlx::query(
404 r#"INSERT INTO opencode_session_metadata (session_id, metadata_json)
405 VALUES (?1, ?2)
406 ON CONFLICT(session_id) DO UPDATE SET
407 metadata_json = excluded.metadata_json"#,
408 )
409 .bind(&meta.id)
410 .bind(metadata_json)
411 .execute(pool)
412 .await
413 .map_err(|err| err.to_string())?;
414
415 Ok(())
416 }
417
418 async fn delete_session(&self, session_id: &str) -> Result<(), String> {
419 let pool = self.pool().await?;
420 sqlx::query("DELETE FROM events WHERE session_id = ?1")
421 .bind(session_id)
422 .execute(pool)
423 .await
424 .map_err(|err| err.to_string())?;
425 sqlx::query("DELETE FROM opencode_session_metadata WHERE session_id = ?1")
426 .bind(session_id)
427 .execute(pool)
428 .await
429 .map_err(|err| err.to_string())?;
430 sqlx::query("DELETE FROM sessions WHERE id = ?1")
431 .bind(session_id)
432 .execute(pool)
433 .await
434 .map_err(|err| err.to_string())?;
435 Ok(())
436 }
437
438 async fn persist_event(
439 &self,
440 session_id: &str,
441 sender: &str,
442 payload: &Value,
443 ) -> Result<(), String> {
444 let pool = self.pool().await?;
445 let id = format!("evt_{}", self.next_id(""));
446 let created_at = now_ms();
447 let connection_id = {
448 let projection = self.projection.lock().await;
449 projection
450 .sessions
451 .get(session_id)
452 .map(|state| state.meta.last_connection_id.clone())
453 .unwrap_or_else(|| "conn_unknown".to_string())
454 };
455 sqlx::query(
456 r#"INSERT INTO events (id, session_id, created_at, connection_id, sender, payload_json)
457 VALUES (?1, ?2, ?3, ?4, ?5, ?6)"#,
458 )
459 .bind(id)
460 .bind(session_id)
461 .bind(created_at)
462 .bind(connection_id)
463 .bind(sender)
464 .bind(serde_json::to_string(payload).map_err(|err| err.to_string())?)
465 .execute(pool)
466 .await
467 .map_err(|err| err.to_string())?;
468
469 let mut projection = self.projection.lock().await;
470 apply_envelope(&mut projection, session_id, sender, payload);
471
472 Ok(())
473 }
474
475 async fn collect_replay_events(
476 &self,
477 session_id: &str,
478 max_events: usize,
479 ) -> Result<Vec<Value>, String> {
480 let pool = self.pool().await?;
481 let rows = sqlx::query(
482 r#"SELECT created_at, sender, payload_json
483 FROM events
484 WHERE session_id = ?1
485 ORDER BY created_at ASC, id ASC"#,
486 )
487 .bind(session_id)
488 .fetch_all(pool)
489 .await
490 .map_err(|err| err.to_string())?;
491
492 let mut values = Vec::new();
493 for row in rows {
494 let created_at: i64 = row.try_get("created_at").map_err(|err| err.to_string())?;
495 let sender: String = row.try_get("sender").map_err(|err| err.to_string())?;
496 let payload_json: String =
497 row.try_get("payload_json").map_err(|err| err.to_string())?;
498 let payload: Value =
499 serde_json::from_str(&payload_json).map_err(|err| err.to_string())?;
500 values.push(json!({
501 "createdAt": created_at,
502 "sender": sender,
503 "payload": payload,
504 }));
505 }
506
507 if values.len() > max_events {
508 Ok(values.split_off(values.len() - max_events))
509 } else {
510 Ok(values)
511 }
512 }
513
514 async fn maybe_restore_session(&self, session_id: &str) -> Result<(), String> {
515 let (agent, stale) = {
516 let projection = self.projection.lock().await;
517 let Some(state) = projection.sessions.get(session_id) else {
518 return Ok(());
519 };
520 (
521 state.meta.agent.clone(),
522 state.meta.last_connection_id.clone(),
523 )
524 };
525
526 let current = self.current_connection_for_agent(&agent).await;
527 if stale == current {
528 return Ok(());
529 }
530
531 let replay_source = self
532 .collect_replay_events(session_id, self.config.replay_max_events)
533 .await?;
534 let replay_text = build_replay_text(&replay_source, self.config.replay_max_chars);
535
536 let request_id = self.next_id("oc_req_");
537 let new_agent_session_id = format!("acp_{}", self.next_id("ses_"));
538 let new_request = json!({
539 "jsonrpc": "2.0",
540 "id": request_id,
541 "method": "session/new",
542 "params": {
543 "cwd": "/",
544 "mcpServers": [],
545 }
546 });
547 self.persist_event(session_id, "client", &new_request)
548 .await?;
549
550 let new_response = json!({
551 "jsonrpc": "2.0",
552 "id": request_id,
553 "result": {
554 "sessionId": new_agent_session_id,
555 }
556 });
557 self.persist_event(session_id, "agent", &new_response)
558 .await?;
559
560 let mut updated_meta = None;
561 {
562 let mut projection = self.projection.lock().await;
563 if let Some(session) = projection.sessions.get_mut(session_id) {
564 session.meta.agent_session_id = new_agent_session_id;
565 session.meta.last_connection_id = current;
566 session.meta.destroyed_at = None;
567 updated_meta = Some(session.meta.clone());
568 }
569 }
570 if let Some(meta) = updated_meta {
571 self.persist_session(&meta).await?;
572 }
573
574 if let Some(text) = replay_text {
575 self.pending_replay
576 .lock()
577 .await
578 .insert(session_id.to_string(), text);
579 }
580
581 Ok(())
582 }
583
584 async fn ensure_session(
585 &self,
586 session_id: &str,
587 directory: String,
588 ) -> Result<SessionMeta, String> {
589 {
590 let projection = self.projection.lock().await;
591 if let Some(existing) = projection.sessions.get(session_id) {
592 return Ok(existing.meta.clone());
593 }
594 }
595
596 let now = now_ms();
597 let connection_id = self.current_connection_for_agent("mock").await;
598 let meta = SessionMeta {
599 id: session_id.to_string(),
600 slug: format!("session-{session_id}"),
601 project_id: self.project_id.clone(),
602 directory,
603 parent_id: None,
604 title: format!("Session {session_id}"),
605 version: "0".to_string(),
606 created_at: now,
607 updated_at: now,
608 share_url: None,
609 permission_mode: None,
610 agent: "mock".to_string(),
611 provider_id: "mock".to_string(),
612 model_id: "mock".to_string(),
613 agent_session_id: format!("acp_{}", self.next_id("ses_")),
614 last_connection_id: connection_id,
615 session_init_json: Some(json!({"cwd": "/", "mcpServers": []})),
616 destroyed_at: None,
617 };
618
619 self.persist_session(&meta).await?;
620
621 let session_value = session_to_value(&meta);
622 {
623 let mut projection = self.projection.lock().await;
624 projection.sessions.insert(
625 session_id.to_string(),
626 SessionState {
627 meta: meta.clone(),
628 messages: Vec::new(),
629 status: "idle".to_string(),
630 always_permissions: HashSet::new(),
631 },
632 );
633 }
634
635 self.emit_event(json!({
636 "type": "session.created",
637 "properties": { "info": session_value }
638 }));
639
640 Ok(meta)
641 }
642}
643
644pub fn build_opencode_router(config: OpenCodeAdapterConfig) -> Result<Router, String> {
645 let proxy_base_url = config
646 .native_proxy_base_url
647 .clone()
648 .or_else(|| std::env::var("OPENCODE_COMPAT_PROXY_URL").ok())
649 .and_then(normalize_proxy_base_url);
650 let config = OpenCodeAdapterConfig {
651 native_proxy_base_url: proxy_base_url,
652 ..config
653 };
654
655 let sqlite_path = config
656 .sqlite_path
657 .clone()
658 .or_else(|| std::env::var("OPENCODE_COMPAT_DB_PATH").ok())
659 .or_else(|| {
660 std::env::var("OPENCODE_COMPAT_STATE")
661 .ok()
662 .map(|base| format!("{base}/opencode-sessions.db"))
663 })
664 .unwrap_or_else(|| "/tmp/sandbox-agent-opencode.db".to_string());
665
666 let connect = SqliteConnectOptions::from_str(&format!("sqlite://{sqlite_path}"))
667 .map_err(|err| err.to_string())?
668 .create_if_missing(true)
669 .journal_mode(SqliteJournalMode::Wal)
670 .foreign_keys(true);
671
672 let (event_broadcaster, _) = broadcast::channel(EVENT_CHANNEL_SIZE);
673
674 let state = Arc::new(AdapterState {
675 config,
676 sqlite_path,
677 sqlite_connect_options: connect,
678 proxy_http_client: reqwest::Client::builder()
679 .timeout(Duration::from_secs(10))
680 .build()
681 .unwrap_or_else(|_| reqwest::Client::new()),
682 pool: OnceCell::new(),
683 initialized: OnceCell::new(),
684 project_id: format!("proj_{}", now_ms()),
685 projection: Mutex::new(Projection::default()),
686 pending_replay: Mutex::new(HashMap::new()),
687 agent_connections: Mutex::new(HashMap::new()),
688 event_broadcaster,
689 event_log: StdMutex::new(VecDeque::new()),
690 next_event_id: AtomicU64::new(1),
691 next_id: AtomicU64::new(runtime_unique_seed()),
692 acp_initialized: Mutex::new(HashMap::new()),
693 acp_request_ids: Mutex::new(HashMap::new()),
694 last_user_message_id: Mutex::new(HashMap::new()),
695 });
696
697 let mut router = Router::new()
698 .route("/agent", get(oc_agent_list))
699 .route("/command", get(oc_command_list))
700 .route("/config", get(oc_config_get).patch(oc_config_patch))
701 .route("/config/providers", get(oc_config_providers))
702 .route("/event", get(oc_event_subscribe))
703 .route("/global/event", get(oc_global_event))
704 .route("/global/health", get(oc_global_health))
705 .route(
706 "/global/config",
707 get(oc_global_config_get).patch(oc_global_config_patch),
708 )
709 .route("/global/dispose", post(oc_global_dispose))
710 .route("/instance/dispose", post(oc_instance_dispose))
711 .route("/path", get(oc_path))
712 .route("/vcs", get(oc_vcs))
713 .route("/mcp", get(oc_mcp_status))
714 .route("/lsp", get(oc_lsp_status))
715 .route("/formatter", get(oc_formatter_status))
716 .route("/experimental/resource", get(oc_experimental_resource))
717 .route("/skill", get(oc_skill_list))
718 .route("/tui/control/next", get(oc_tui_next))
719 .route("/tui/control/response", post(oc_tui_response))
720 .route("/tui/append-prompt", post(oc_tui_append_prompt))
721 .route("/tui/open-help", post(oc_tui_open_help))
722 .route("/tui/open-sessions", post(oc_tui_open_sessions))
723 .route("/tui/open-themes", post(oc_tui_open_themes))
724 .route("/tui/open-models", post(oc_tui_open_models))
725 .route("/tui/submit-prompt", post(oc_tui_submit_prompt))
726 .route("/tui/clear-prompt", post(oc_tui_clear_prompt))
727 .route("/tui/execute-command", post(oc_tui_execute_command))
728 .route("/tui/show-toast", post(oc_tui_show_toast))
729 .route("/tui/publish", post(oc_tui_publish))
730 .route("/project", get(oc_project_list).post(oc_project_current))
731 .route("/project/current", get(oc_project_current))
732 .route("/session", post(oc_session_create).get(oc_session_list))
733 .route("/session/status", get(oc_session_status))
734 .route(
735 "/session/:sessionID",
736 get(oc_session_get)
737 .patch(oc_session_update)
738 .delete(oc_session_delete),
739 )
740 .route("/session/:sessionID/abort", post(oc_session_abort))
741 .route("/session/:sessionID/children", get(oc_session_children))
742 .route("/session/:sessionID/init", post(oc_session_init))
743 .route("/session/:sessionID/fork", post(oc_session_fork))
744 .route("/session/:sessionID/diff", get(oc_session_diff))
745 .route("/session/:sessionID/todo", get(oc_session_todo))
746 .route("/session/:sessionID/summarize", post(oc_session_summarize))
747 .route(
748 "/session/:sessionID/message",
749 get(oc_session_messages).post(oc_session_prompt),
750 )
751 .route(
752 "/session/:sessionID/message/:messageID",
753 get(oc_session_message_get),
754 )
755 .route(
756 "/session/:sessionID/message/:messageID/part/:partID",
757 patch(oc_part_update).delete(oc_part_delete),
758 )
759 .route(
760 "/session/:sessionID/prompt_async",
761 post(oc_session_prompt_async),
762 )
763 .route(
764 "/session/:sessionID/permissions/:permissionID",
765 post(oc_permission_respond),
766 )
767 .route("/permission", get(oc_permission_list))
768 .route("/permission/:requestID/reply", post(oc_permission_reply))
769 .route("/question", get(oc_question_list))
770 .route("/question/:requestID/reply", post(oc_question_reply))
771 .route("/question/:requestID/reject", post(oc_question_reject))
772 .route("/provider", get(oc_provider_list))
773 .route("/provider/auth", get(oc_provider_auth))
774 .route(
775 "/provider/:providerID/oauth/authorize",
776 post(oc_provider_oauth_authorize),
777 )
778 .route(
779 "/provider/:providerID/oauth/callback",
780 post(oc_provider_oauth_callback),
781 )
782 .with_state(state.clone());
783
784 if state.config.auth_token.is_some() {
785 router = router.layer(axum::middleware::from_fn_with_state(state, require_token));
786 }
787
788 Ok(router)
789}
790
791async fn require_token(
792 State(state): State<Arc<AdapterState>>,
793 request: Request<Body>,
794 next: Next,
795) -> Result<Response, Response> {
796 let Some(expected) = state.config.auth_token.as_deref() else {
797 return Ok(next.run(request).await);
798 };
799
800 let bearer = request
801 .headers()
802 .get(header::AUTHORIZATION)
803 .and_then(|value| value.to_str().ok())
804 .and_then(|value| value.strip_prefix("Bearer "));
805
806 if bearer == Some(expected) {
807 return Ok(next.run(request).await);
808 }
809
810 Err((
811 StatusCode::UNAUTHORIZED,
812 Json(json!({"errors":[{"message":"missing or invalid bearer token"}]})),
813 )
814 .into_response())
815}
816
817#[derive(Debug, Deserialize)]
818struct DirectoryQuery {
819 directory: Option<String>,
820}
821
822#[derive(Debug, Deserialize)]
823#[serde(rename_all = "camelCase")]
824struct SessionCreateBody {
825 title: Option<String>,
826 #[serde(rename = "parentID")]
827 parent_id: Option<String>,
828 permission: Option<Value>,
829 #[serde(alias = "permission_mode")]
830 permission_mode: Option<String>,
831}
832
833#[derive(Debug, Deserialize)]
834#[serde(rename_all = "camelCase")]
835struct SessionUpdateBody {
836 title: Option<String>,
837 model: Option<Value>,
838 #[serde(rename = "providerID", alias = "provider_id", alias = "providerId")]
839 provider_id: Option<String>,
840 #[serde(rename = "modelID", alias = "model_id", alias = "modelId")]
841 model_id: Option<String>,
842}
843
844#[derive(Debug, Deserialize)]
845#[serde(rename_all = "camelCase")]
846struct SessionInitBody {
847 #[serde(rename = "providerID")]
848 provider_id: Option<String>,
849 #[serde(rename = "modelID")]
850 model_id: Option<String>,
851 #[serde(rename = "messageID")]
852 message_id: Option<String>,
853}
854
855#[derive(Debug, Deserialize)]
856#[serde(rename_all = "camelCase")]
857struct PromptBody {
858 #[serde(rename = "messageID")]
859 message_id: Option<String>,
860 model: Option<ModelSelection>,
861 #[serde(rename = "providerID", alias = "provider_id", alias = "providerId")]
862 provider_id: Option<String>,
863 #[serde(rename = "modelID", alias = "model_id", alias = "modelId")]
864 model_id: Option<String>,
865 agent: Option<String>,
866 system: Option<String>,
867 variant: Option<String>,
868 parts: Option<Vec<Value>>,
869}
870
871#[derive(Debug, Deserialize)]
872#[serde(rename_all = "camelCase")]
873struct ModelSelection {
874 #[serde(rename = "providerID", alias = "provider_id", alias = "providerId")]
875 provider_id: Option<String>,
876 #[serde(rename = "modelID", alias = "model_id", alias = "modelId")]
877 model_id: Option<String>,
878}
879
880#[derive(Debug, Deserialize)]
881struct PermissionRespondBody {
882 response: Option<String>,
883}
884
885#[derive(Debug, Deserialize)]
886struct PermissionReplyBody {
887 reply: Option<String>,
888 message: Option<String>,
889}
890
891#[derive(Debug, Deserialize)]
892#[serde(rename_all = "camelCase")]
893struct QuestionReplyBody {
894 answers: Option<Vec<Vec<String>>>,
895}
896
897async fn oc_agent_list(State(state): State<Arc<AdapterState>>) -> Response {
898 if let Err(err) = state.ensure_initialized().await {
899 return internal_error(err);
900 }
901 (
902 StatusCode::OK,
903 Json(json!([
904 {
905 "name": "Sandbox Agent",
906 "description": "Sandbox Agent compatibility layer",
907 "mode": "all",
908 "native": false,
909 "hidden": false,
910 "permission": [],
911 "options": {},
912 }
913 ])),
914 )
915 .into_response()
916}
917
918async fn oc_command_list(State(state): State<Arc<AdapterState>>, headers: HeaderMap) -> Response {
919 if let Err(err) = state.ensure_initialized().await {
920 return internal_error(err);
921 }
922 if let Some(response) =
923 proxy_native_opencode(&state, reqwest::Method::GET, "/command", &headers, None).await
924 {
925 return response;
926 }
927 (StatusCode::OK, Json(json!([]))).into_response()
928}
929
930async fn oc_config_get(State(state): State<Arc<AdapterState>>, headers: HeaderMap) -> Response {
931 if let Err(err) = state.ensure_initialized().await {
932 return internal_error(err);
933 }
934 if let Some(response) =
935 proxy_native_opencode(&state, reqwest::Method::GET, "/config", &headers, None).await
936 {
937 return response;
938 }
939 (
940 StatusCode::OK,
941 Json(json!({
942 "mcp": {},
943 "agent": {},
944 "provider": {},
945 })),
946 )
947 .into_response()
948}
949
950async fn oc_config_patch(
951 State(state): State<Arc<AdapterState>>,
952 headers: HeaderMap,
953 Json(body): Json<Value>,
954) -> Response {
955 if let Err(err) = state.ensure_initialized().await {
956 return internal_error(err);
957 }
958 if let Some(response) = proxy_native_opencode(
959 &state,
960 reqwest::Method::PATCH,
961 "/config",
962 &headers,
963 Some(body.clone()),
964 )
965 .await
966 {
967 return response;
968 }
969 (StatusCode::OK, Json(body)).into_response()
970}
971
972async fn oc_config_providers(State(state): State<Arc<AdapterState>>) -> Response {
973 if let Err(err) = state.ensure_initialized().await {
974 return internal_error(err);
975 }
976 let providers = provider_payload(&state);
977 let mut payload = providers.clone();
978 if let Some(obj) = payload.as_object_mut() {
979 obj.insert("providers".to_string(), providers["all"].clone());
980 }
981 (StatusCode::OK, Json(payload)).into_response()
982}
983
984async fn oc_event_subscribe(
985 State(state): State<Arc<AdapterState>>,
986 headers: HeaderMap,
987 Query(query): Query<DirectoryQuery>,
988) -> Sse<impl futures::Stream<Item = Result<Event, Infallible>>> {
989 let _ = state.ensure_initialized().await;
990
991 let directory = resolve_directory(&headers, query.directory.as_ref());
992 let replay = state.buffered_events_after(parse_last_event_id(&headers));
993 let receiver = state.subscribe();
994
995 state.emit_event(json!({"type":"server.connected","properties":{}}));
996 state.emit_event(
997 json!({"type":"worktree.ready","properties":{"name": directory, "branch": "main"}}),
998 );
999
1000 let stream = stream::unfold(
1001 (
1002 receiver,
1003 VecDeque::from(replay),
1004 interval(Duration::from_secs(30)),
1005 ),
1006 |(mut rx, mut replay, mut ticker)| async move {
1007 if let Some(item) = replay.pop_front() {
1008 let evt = Event::default()
1009 .id(item.id.to_string())
1010 .json_data(item.payload)
1011 .unwrap_or_else(|_| Event::default().data("{}"));
1012 return Some((Ok(evt), (rx, replay, ticker)));
1013 }
1014
1015 loop {
1016 tokio::select! {
1017 _ = ticker.tick() => {
1018 let evt = Event::default().json_data(json!({"type":"server.heartbeat","properties":{}}))
1019 .unwrap_or_else(|_| Event::default().data("{}"));
1020 return Some((Ok(evt), (rx, replay, ticker)));
1021 }
1022 item = rx.recv() => {
1023 match item {
1024 Ok(payload) => {
1025 let evt = Event::default()
1026 .id(payload.id.to_string())
1027 .json_data(payload.payload)
1028 .unwrap_or_else(|_| Event::default().data("{}"));
1029 return Some((Ok(evt), (rx, replay, ticker)));
1030 }
1031 Err(broadcast::error::RecvError::Lagged(_)) => continue,
1032 Err(broadcast::error::RecvError::Closed) => return None,
1033 }
1034 }
1035 }
1036 }
1037 },
1038 );
1039
1040 Sse::new(stream).keep_alive(KeepAlive::new().interval(Duration::from_secs(15)))
1041}
1042
1043async fn oc_global_event(
1044 State(state): State<Arc<AdapterState>>,
1045 headers: HeaderMap,
1046 Query(query): Query<DirectoryQuery>,
1047) -> Sse<impl futures::Stream<Item = Result<Event, Infallible>>> {
1048 oc_event_subscribe(State(state), headers, Query(query)).await
1049}
1050
1051async fn oc_global_health() -> Response {
1052 (
1053 StatusCode::OK,
1054 Json(json!({
1055 "healthy": true,
1056 "version": env!("CARGO_PKG_VERSION"),
1057 })),
1058 )
1059 .into_response()
1060}
1061
1062async fn oc_global_config_get(
1063 State(state): State<Arc<AdapterState>>,
1064 headers: HeaderMap,
1065) -> Response {
1066 if let Some(response) = proxy_native_opencode(
1067 &state,
1068 reqwest::Method::GET,
1069 "/global/config",
1070 &headers,
1071 None,
1072 )
1073 .await
1074 {
1075 return response;
1076 }
1077 oc_config_get(State(state), headers).await
1078}
1079
1080async fn oc_global_config_patch(
1081 State(state): State<Arc<AdapterState>>,
1082 headers: HeaderMap,
1083 Json(body): Json<Value>,
1084) -> Response {
1085 if let Some(response) = proxy_native_opencode(
1086 &state,
1087 reqwest::Method::PATCH,
1088 "/global/config",
1089 &headers,
1090 Some(body.clone()),
1091 )
1092 .await
1093 {
1094 return response;
1095 }
1096 oc_config_patch(State(state), headers, Json(body)).await
1097}
1098
1099async fn oc_global_dispose() -> Response {
1100 bool_ok(true).into_response()
1101}
1102
1103async fn oc_instance_dispose() -> Response {
1104 bool_ok(true).into_response()
1105}
1106
1107async fn oc_path(
1108 State(state): State<Arc<AdapterState>>,
1109 headers: HeaderMap,
1110 Query(query): Query<DirectoryQuery>,
1111) -> Response {
1112 if let Err(err) = state.ensure_initialized().await {
1113 return internal_error(err);
1114 }
1115
1116 let directory = resolve_directory(&headers, query.directory.as_ref());
1117 (
1118 StatusCode::OK,
1119 Json(json!({
1120 "home": std::env::var("HOME").unwrap_or_else(|_| "/".to_string()),
1121 "state": std::env::var("OPENCODE_COMPAT_STATE").unwrap_or_else(|_| "/tmp".to_string()),
1122 "config": std::env::var("OPENCODE_COMPAT_CONFIG").unwrap_or_else(|_| "/tmp".to_string()),
1123 "worktree": directory,
1124 "directory": resolve_directory(&headers, query.directory.as_ref()),
1125 })),
1126 )
1127 .into_response()
1128}
1129
1130async fn oc_vcs(State(state): State<Arc<AdapterState>>) -> Response {
1131 if let Err(err) = state.ensure_initialized().await {
1132 return internal_error(err);
1133 }
1134 (StatusCode::OK, Json(json!({"branch":"main"}))).into_response()
1135}
1136
1137async fn oc_mcp_status(State(state): State<Arc<AdapterState>>) -> Response {
1138 if let Err(err) = state.ensure_initialized().await {
1139 return internal_error(err);
1140 }
1141 (StatusCode::OK, Json(json!({}))).into_response()
1142}
1143
1144async fn oc_lsp_status(State(state): State<Arc<AdapterState>>) -> Response {
1145 if let Err(err) = state.ensure_initialized().await {
1146 return internal_error(err);
1147 }
1148 (StatusCode::OK, Json(json!([]))).into_response()
1149}
1150
1151async fn oc_formatter_status(State(state): State<Arc<AdapterState>>) -> Response {
1152 if let Err(err) = state.ensure_initialized().await {
1153 return internal_error(err);
1154 }
1155 (StatusCode::OK, Json(json!([]))).into_response()
1156}
1157
1158async fn oc_experimental_resource(State(state): State<Arc<AdapterState>>) -> Response {
1159 if let Err(err) = state.ensure_initialized().await {
1160 return internal_error(err);
1161 }
1162 (StatusCode::OK, Json(json!([]))).into_response()
1163}
1164
1165async fn oc_skill_list() -> Response {
1166 (StatusCode::OK, Json(json!([]))).into_response()
1167}
1168
1169async fn oc_tui_next(State(state): State<Arc<AdapterState>>, headers: HeaderMap) -> Response {
1170 if let Err(err) = state.ensure_initialized().await {
1171 return internal_error(err);
1172 }
1173 if let Some(response) = proxy_native_opencode(
1174 &state,
1175 reqwest::Method::GET,
1176 "/tui/control/next",
1177 &headers,
1178 None,
1179 )
1180 .await
1181 {
1182 return response;
1183 }
1184 (StatusCode::OK, Json(json!({"path": "", "body": {}}))).into_response()
1185}
1186
1187async fn oc_tui_response(
1188 State(state): State<Arc<AdapterState>>,
1189 headers: HeaderMap,
1190 body: Option<Json<Value>>,
1191) -> Response {
1192 if let Err(err) = state.ensure_initialized().await {
1193 return internal_error(err);
1194 }
1195 if let Some(response) = proxy_native_opencode(
1196 &state,
1197 reqwest::Method::POST,
1198 "/tui/control/response",
1199 &headers,
1200 body.map(|json| json.0),
1201 )
1202 .await
1203 {
1204 return response;
1205 }
1206 bool_ok(true).into_response()
1207}
1208
1209async fn oc_tui_append_prompt(
1210 State(state): State<Arc<AdapterState>>,
1211 headers: HeaderMap,
1212 body: Option<Json<Value>>,
1213) -> Response {
1214 if let Err(err) = state.ensure_initialized().await {
1215 return internal_error(err);
1216 }
1217 if let Some(response) = proxy_native_opencode(
1218 &state,
1219 reqwest::Method::POST,
1220 "/tui/append-prompt",
1221 &headers,
1222 body.map(|json| json.0),
1223 )
1224 .await
1225 {
1226 return response;
1227 }
1228 bool_ok(true).into_response()
1229}
1230
1231async fn oc_tui_open_help(State(state): State<Arc<AdapterState>>, headers: HeaderMap) -> Response {
1232 if let Err(err) = state.ensure_initialized().await {
1233 return internal_error(err);
1234 }
1235 if let Some(response) = proxy_native_opencode(
1236 &state,
1237 reqwest::Method::POST,
1238 "/tui/open-help",
1239 &headers,
1240 None,
1241 )
1242 .await
1243 {
1244 return response;
1245 }
1246 bool_ok(true).into_response()
1247}
1248
1249async fn oc_tui_open_sessions() -> Response {
1250 bool_ok(true).into_response()
1251}
1252
1253async fn oc_tui_open_themes(
1254 State(state): State<Arc<AdapterState>>,
1255 headers: HeaderMap,
1256) -> Response {
1257 if let Err(err) = state.ensure_initialized().await {
1258 return internal_error(err);
1259 }
1260 if let Some(response) = proxy_native_opencode(
1261 &state,
1262 reqwest::Method::POST,
1263 "/tui/open-themes",
1264 &headers,
1265 None,
1266 )
1267 .await
1268 {
1269 return response;
1270 }
1271 bool_ok(true).into_response()
1272}
1273
1274async fn oc_tui_open_models(
1275 State(state): State<Arc<AdapterState>>,
1276 headers: HeaderMap,
1277) -> Response {
1278 if let Err(err) = state.ensure_initialized().await {
1279 return internal_error(err);
1280 }
1281 if let Some(response) = proxy_native_opencode(
1282 &state,
1283 reqwest::Method::POST,
1284 "/tui/open-models",
1285 &headers,
1286 None,
1287 )
1288 .await
1289 {
1290 return response;
1291 }
1292 bool_ok(true).into_response()
1293}
1294
1295async fn oc_tui_submit_prompt(
1296 State(state): State<Arc<AdapterState>>,
1297 headers: HeaderMap,
1298 body: Option<Json<Value>>,
1299) -> Response {
1300 if let Err(err) = state.ensure_initialized().await {
1301 return internal_error(err);
1302 }
1303 if let Some(response) = proxy_native_opencode(
1304 &state,
1305 reqwest::Method::POST,
1306 "/tui/submit-prompt",
1307 &headers,
1308 body.map(|json| json.0),
1309 )
1310 .await
1311 {
1312 return response;
1313 }
1314 bool_ok(true).into_response()
1315}
1316
1317async fn oc_tui_clear_prompt(
1318 State(state): State<Arc<AdapterState>>,
1319 headers: HeaderMap,
1320) -> Response {
1321 if let Err(err) = state.ensure_initialized().await {
1322 return internal_error(err);
1323 }
1324 if let Some(response) = proxy_native_opencode(
1325 &state,
1326 reqwest::Method::POST,
1327 "/tui/clear-prompt",
1328 &headers,
1329 None,
1330 )
1331 .await
1332 {
1333 return response;
1334 }
1335 bool_ok(true).into_response()
1336}
1337
1338async fn oc_tui_execute_command(
1339 State(state): State<Arc<AdapterState>>,
1340 headers: HeaderMap,
1341 body: Option<Json<Value>>,
1342) -> Response {
1343 if let Err(err) = state.ensure_initialized().await {
1344 return internal_error(err);
1345 }
1346 if let Some(response) = proxy_native_opencode(
1347 &state,
1348 reqwest::Method::POST,
1349 "/tui/execute-command",
1350 &headers,
1351 body.map(|json| json.0),
1352 )
1353 .await
1354 {
1355 return response;
1356 }
1357 bool_ok(true).into_response()
1358}
1359
1360async fn oc_tui_show_toast(
1361 State(state): State<Arc<AdapterState>>,
1362 headers: HeaderMap,
1363 body: Option<Json<Value>>,
1364) -> Response {
1365 if let Err(err) = state.ensure_initialized().await {
1366 return internal_error(err);
1367 }
1368 if let Some(response) = proxy_native_opencode(
1369 &state,
1370 reqwest::Method::POST,
1371 "/tui/show-toast",
1372 &headers,
1373 body.map(|json| json.0),
1374 )
1375 .await
1376 {
1377 return response;
1378 }
1379 bool_ok(true).into_response()
1380}
1381
1382async fn oc_tui_publish(
1383 State(state): State<Arc<AdapterState>>,
1384 headers: HeaderMap,
1385 body: Option<Json<Value>>,
1386) -> Response {
1387 if let Err(err) = state.ensure_initialized().await {
1388 return internal_error(err);
1389 }
1390 if let Some(response) = proxy_native_opencode(
1391 &state,
1392 reqwest::Method::POST,
1393 "/tui/publish",
1394 &headers,
1395 body.map(|json| json.0),
1396 )
1397 .await
1398 {
1399 return response;
1400 }
1401 bool_ok(true).into_response()
1402}
1403
1404async fn oc_project_list(
1405 State(state): State<Arc<AdapterState>>,
1406 headers: HeaderMap,
1407 Query(query): Query<DirectoryQuery>,
1408) -> Response {
1409 if let Err(err) = state.ensure_initialized().await {
1410 return internal_error(err);
1411 }
1412 let directory = resolve_directory(&headers, query.directory.as_ref());
1413 let now = now_ms();
1414 (
1415 StatusCode::OK,
1416 Json(json!([{
1417 "id": state.project_id,
1418 "worktree": directory,
1419 "vcs": "git",
1420 "name": "sandbox-agent",
1421 "time": {"created": now, "updated": now},
1422 }])),
1423 )
1424 .into_response()
1425}
1426
1427async fn oc_project_current(
1428 State(state): State<Arc<AdapterState>>,
1429 headers: HeaderMap,
1430 Query(query): Query<DirectoryQuery>,
1431) -> Response {
1432 if let Err(err) = state.ensure_initialized().await {
1433 return internal_error(err);
1434 }
1435 let directory = resolve_directory(&headers, query.directory.as_ref());
1436 let now = now_ms();
1437 (
1438 StatusCode::OK,
1439 Json(json!({
1440 "id": state.project_id,
1441 "worktree": directory,
1442 "vcs": "git",
1443 "name": "sandbox-agent",
1444 "time": {"created": now, "updated": now},
1445 })),
1446 )
1447 .into_response()
1448}
1449
1450async fn oc_session_create(
1451 State(state): State<Arc<AdapterState>>,
1452 headers: HeaderMap,
1453 Query(query): Query<DirectoryQuery>,
1454 body: Option<Json<SessionCreateBody>>,
1455) -> Response {
1456 if let Err(err) = state.ensure_initialized().await {
1457 return internal_error(err);
1458 }
1459
1460 let body = body.map(|value| value.0).unwrap_or(SessionCreateBody {
1461 title: None,
1462 parent_id: None,
1463 permission: None,
1464 permission_mode: None,
1465 });
1466
1467 let id = state.next_id("ses_");
1468 let now = now_ms();
1469 let directory = resolve_directory(&headers, query.directory.as_ref());
1470
1471 let default_agent = "mock";
1472 let connection_id = state.current_connection_for_agent(default_agent).await;
1473 let meta = SessionMeta {
1474 id: id.clone(),
1475 slug: format!("session-{id}"),
1476 project_id: state.project_id.clone(),
1477 directory,
1478 parent_id: body.parent_id,
1479 title: body.title.unwrap_or_else(|| format!("Session {id}")),
1480 version: "0".to_string(),
1481 created_at: now,
1482 updated_at: now,
1483 share_url: None,
1484 permission_mode: body.permission_mode,
1485 agent: default_agent.to_string(),
1486 provider_id: default_agent.to_string(),
1487 model_id: default_model_for_provider(default_agent)
1488 .unwrap_or("default")
1489 .to_string(),
1490 agent_session_id: format!("acp_{}", state.next_id("ses_")),
1491 last_connection_id: connection_id,
1492 session_init_json: Some(json!({"cwd": "/", "mcpServers": []})),
1493 destroyed_at: None,
1494 };
1495
1496 if let Err(err) = state.persist_session(&meta).await {
1497 return internal_error(err);
1498 }
1499
1500 {
1501 let mut projection = state.projection.lock().await;
1502 projection.sessions.insert(
1503 id,
1504 SessionState {
1505 meta: meta.clone(),
1506 messages: Vec::new(),
1507 status: "idle".to_string(),
1508 always_permissions: HashSet::new(),
1509 },
1510 );
1511 }
1512
1513 let value = session_to_value(&meta);
1514 state.emit_event(json!({"type":"session.created","properties":{"info":value}}));
1515
1516 (StatusCode::OK, Json(value)).into_response()
1517}
1518
1519async fn oc_session_list(State(state): State<Arc<AdapterState>>) -> Response {
1520 if let Err(err) = state.ensure_initialized().await {
1521 return internal_error(err);
1522 }
1523
1524 let projection = state.projection.lock().await;
1525 let mut values = projection
1526 .sessions
1527 .values()
1528 .map(|session| session_to_value(&session.meta))
1529 .collect::<Vec<_>>();
1530 values.sort_by(|a, b| {
1531 let a_id = a.get("id").and_then(Value::as_str).unwrap_or_default();
1532 let b_id = b.get("id").and_then(Value::as_str).unwrap_or_default();
1533 a_id.cmp(b_id)
1534 });
1535
1536 (StatusCode::OK, Json(values)).into_response()
1537}
1538
1539async fn oc_session_get(
1540 State(state): State<Arc<AdapterState>>,
1541 Path(session_id): Path<String>,
1542) -> Response {
1543 if let Err(err) = state.ensure_initialized().await {
1544 return internal_error(err);
1545 }
1546
1547 let projection = state.projection.lock().await;
1548 let Some(session) = projection.sessions.get(&session_id) else {
1549 return not_found("Session not found");
1550 };
1551
1552 (StatusCode::OK, Json(session_to_value(&session.meta))).into_response()
1553}
1554
1555async fn oc_session_update(
1556 State(state): State<Arc<AdapterState>>,
1557 Path(session_id): Path<String>,
1558 Json(body): Json<SessionUpdateBody>,
1559) -> Response {
1560 if let Err(err) = state.ensure_initialized().await {
1561 return internal_error(err);
1562 }
1563
1564 if body.model.is_some() || body.provider_id.is_some() || body.model_id.is_some() {
1565 return bad_request(MODEL_CHANGE_ERROR);
1566 }
1567
1568 let meta = {
1569 let mut projection = state.projection.lock().await;
1570 let Some(session) = projection.sessions.get_mut(&session_id) else {
1571 return not_found("Session not found");
1572 };
1573
1574 if let Some(title) = body.title {
1575 session.meta.title = title;
1576 session.meta.updated_at = now_ms();
1577 }
1578
1579 session.meta.clone()
1580 };
1581
1582 if let Err(err) = state.persist_session(&meta).await {
1583 return internal_error(err);
1584 }
1585
1586 let value = session_to_value(&meta);
1587 state.emit_event(json!({"type":"session.updated","properties":{"info":value}}));
1588 (StatusCode::OK, Json(value)).into_response()
1589}
1590
1591async fn oc_session_delete(
1592 State(state): State<Arc<AdapterState>>,
1593 Path(session_id): Path<String>,
1594) -> Response {
1595 if let Err(err) = state.ensure_initialized().await {
1596 return internal_error(err);
1597 }
1598
1599 let removed = {
1600 let mut projection = state.projection.lock().await;
1601 projection.permissions.retain(|_, value| {
1602 value
1603 .get("sessionID")
1604 .and_then(Value::as_str)
1605 .map(|id| id != session_id)
1606 .unwrap_or(true)
1607 });
1608 projection.questions.retain(|_, value| {
1609 value
1610 .get("sessionID")
1611 .and_then(Value::as_str)
1612 .map(|id| id != session_id)
1613 .unwrap_or(true)
1614 });
1615 projection.sessions.remove(&session_id)
1616 };
1617
1618 let Some(session) = removed else {
1619 return not_found("Session not found");
1620 };
1621
1622 if let Err(err) = state.delete_session(&session_id).await {
1623 return internal_error(err);
1624 }
1625
1626 let server_id = session.meta.agent_session_id.clone();
1628 if state
1629 .acp_initialized
1630 .lock()
1631 .await
1632 .remove(&server_id)
1633 .is_some()
1634 {
1635 if let Some(dispatch) = state.config.acp_dispatch.as_ref() {
1636 if let Err(err) = dispatch.delete(&server_id).await {
1637 warn!(
1638 ?err,
1639 "failed to delete ACP server instance on session delete"
1640 );
1641 }
1642 }
1643 }
1644
1645 state
1647 .acp_request_ids
1648 .lock()
1649 .await
1650 .retain(|_, req| req.opencode_session_id != session_id);
1651
1652 let value = session_to_value(&session.meta);
1653 state.emit_event(json!({"type":"session.deleted","properties":{"info":value}}));
1654
1655 (StatusCode::OK, Json(json!(true))).into_response()
1656}
1657
1658async fn oc_session_status(State(state): State<Arc<AdapterState>>) -> Response {
1659 if let Err(err) = state.ensure_initialized().await {
1660 return internal_error(err);
1661 }
1662 let projection = state.projection.lock().await;
1663 let mut map = serde_json::Map::new();
1664 for (id, session) in &projection.sessions {
1665 map.insert(id.clone(), json!({"type": session.status}));
1666 }
1667 (StatusCode::OK, Json(Value::Object(map))).into_response()
1668}
1669
1670async fn oc_session_abort(
1671 State(state): State<Arc<AdapterState>>,
1672 Path(session_id): Path<String>,
1673) -> Response {
1674 if let Err(err) = state.ensure_initialized().await {
1675 return internal_error(err);
1676 }
1677
1678 let mut should_emit_idle = false;
1679 {
1680 let mut projection = state.projection.lock().await;
1681 let Some(session) = projection.sessions.get_mut(&session_id) else {
1682 return not_found("Session not found");
1683 };
1684 if session.status != "idle" {
1685 session.status = "idle".to_string();
1686 should_emit_idle = true;
1687 }
1688 projection.permissions.retain(|_, value| {
1689 value.get("sessionID").and_then(Value::as_str) != Some(session_id.as_str())
1690 });
1691 projection.questions.retain(|_, value| {
1692 value.get("sessionID").and_then(Value::as_str) != Some(session_id.as_str())
1693 });
1694 }
1695
1696 if should_emit_idle {
1697 let payload = json!({"jsonrpc":"2.0","method":"_sandboxagent/opencode/status","params":{"status":"idle"}});
1698 if let Err(err) = state.persist_event(&session_id, "agent", &payload).await {
1699 warn!(?err, "failed to persist abort idle status envelope");
1700 }
1701 state.emit_event(json!({"type":"session.idle","properties":{"sessionID":session_id}}));
1702 }
1703
1704 if let Some(dispatch) = state.config.acp_dispatch.as_ref() {
1706 let agent_session_id = {
1707 let projection = state.projection.lock().await;
1708 projection
1709 .sessions
1710 .get(&session_id)
1711 .map(|s| s.meta.agent_session_id.clone())
1712 };
1713 if let Some(server_id) = agent_session_id {
1714 let acp_session_id = state.acp_initialized.lock().await.get(&server_id).cloned();
1715 if let Some(acp_sid) = acp_session_id {
1716 let cancel_id = state.next_id("oc_rpc_");
1717 let cancel_payload = json!({
1718 "jsonrpc": "2.0",
1719 "id": cancel_id,
1720 "method": "session/cancel",
1721 "params": {
1722 "sessionId": acp_sid,
1723 }
1724 });
1725 if let Err(err) = dispatch.post(&server_id, None, cancel_payload).await {
1726 warn!(?err, "failed to send session/cancel to ACP agent");
1727 }
1728 }
1729 }
1730 }
1731
1732 (StatusCode::OK, Json(json!(true))).into_response()
1733}
1734
1735async fn oc_session_children() -> Response {
1736 (StatusCode::OK, Json(json!([]))).into_response()
1737}
1738
1739async fn oc_session_init(
1740 State(state): State<Arc<AdapterState>>,
1741 Path(session_id): Path<String>,
1742 headers: HeaderMap,
1743 Query(query): Query<DirectoryQuery>,
1744 body: Option<Json<SessionInitBody>>,
1745) -> Response {
1746 if let Err(err) = state.ensure_initialized().await {
1747 return internal_error(err);
1748 }
1749
1750 let directory = resolve_directory(&headers, query.directory.as_ref());
1751 if let Err(err) = state.ensure_session(&session_id, directory).await {
1752 return internal_error(err);
1753 }
1754
1755 let body = body.map(|json| json.0).unwrap_or(SessionInitBody {
1756 provider_id: None,
1757 model_id: None,
1758 message_id: None,
1759 });
1760
1761 if body.provider_id.is_none() && body.model_id.is_none() {
1762 return (StatusCode::OK, Json(json!(true))).into_response();
1763 }
1764
1765 if body.provider_id.is_none() || body.model_id.is_none() {
1766 return bad_request("providerID and modelID are required when selecting a model");
1767 }
1768
1769 let provider_id = body.provider_id.unwrap_or_else(|| "mock".to_string());
1770 let model_id = body.model_id.unwrap_or_else(|| "mock".to_string());
1771
1772 let meta = {
1773 let mut projection = state.projection.lock().await;
1774 let Some(session) = projection.sessions.get_mut(&session_id) else {
1775 return not_found("Session not found");
1776 };
1777 let has_messages = !session.messages.is_empty();
1778 let selection_changed =
1779 session.meta.provider_id != provider_id || session.meta.model_id != model_id;
1780 if has_messages && selection_changed {
1781 return bad_request(MODEL_CHANGE_ERROR);
1782 }
1783 session.meta.provider_id = provider_id.clone();
1784 session.meta.model_id = model_id.clone();
1785 session.meta.agent = provider_to_agent(&provider_id);
1786 session.meta.updated_at = now_ms();
1787 session.meta.clone()
1788 };
1789
1790 if let Err(err) = state.persist_session(&meta).await {
1791 return internal_error(err);
1792 }
1793
1794 (StatusCode::OK, Json(json!(true))).into_response()
1795}
1796
1797async fn oc_session_fork(
1798 State(state): State<Arc<AdapterState>>,
1799 Path(session_id): Path<String>,
1800 headers: HeaderMap,
1801 Query(query): Query<DirectoryQuery>,
1802) -> Response {
1803 if let Err(err) = state.ensure_initialized().await {
1804 return internal_error(err);
1805 }
1806
1807 let parent = {
1808 let projection = state.projection.lock().await;
1809 projection.sessions.get(&session_id).cloned()
1810 };
1811 let Some(parent) = parent else {
1812 return not_found("Session not found");
1813 };
1814
1815 let id = state.next_id("ses_");
1816 let now = now_ms();
1817 let directory = resolve_directory(&headers, query.directory.as_ref());
1818 let connection_id = state.current_connection_for_agent(&parent.meta.agent).await;
1819
1820 let meta = SessionMeta {
1821 id: id.clone(),
1822 slug: format!("session-{id}"),
1823 project_id: state.project_id.clone(),
1824 directory,
1825 parent_id: Some(session_id),
1826 title: format!("Fork of {}", parent.meta.title),
1827 version: "0".to_string(),
1828 created_at: now,
1829 updated_at: now,
1830 share_url: None,
1831 permission_mode: parent.meta.permission_mode.clone(),
1832 agent: parent.meta.agent.clone(),
1833 provider_id: parent.meta.provider_id.clone(),
1834 model_id: parent.meta.model_id.clone(),
1835 agent_session_id: format!("acp_{}", state.next_id("ses_")),
1836 last_connection_id: connection_id,
1837 session_init_json: parent.meta.session_init_json.clone(),
1838 destroyed_at: None,
1839 };
1840
1841 if let Err(err) = state.persist_session(&meta).await {
1842 return internal_error(err);
1843 }
1844
1845 {
1846 let mut projection = state.projection.lock().await;
1847 projection.sessions.insert(
1848 id.clone(),
1849 SessionState {
1850 meta: meta.clone(),
1851 messages: Vec::new(),
1852 status: "idle".to_string(),
1853 always_permissions: HashSet::new(),
1854 },
1855 );
1856 }
1857
1858 let value = session_to_value(&meta);
1859 state.emit_event(json!({"type":"session.created","properties":{"info":value}}));
1860
1861 (StatusCode::OK, Json(value)).into_response()
1862}
1863
1864async fn oc_session_diff() -> Response {
1865 (StatusCode::OK, Json(json!([]))).into_response()
1866}
1867
1868async fn oc_session_todo() -> Response {
1869 (StatusCode::OK, Json(json!([]))).into_response()
1870}
1871
1872async fn oc_session_summarize(Json(body): Json<Value>) -> Response {
1873 if body.get("providerID").is_none() || body.get("modelID").is_none() {
1874 return bad_request("providerID and modelID are required");
1875 }
1876 (StatusCode::OK, Json(json!(true))).into_response()
1877}
1878
1879async fn oc_session_messages(
1880 State(state): State<Arc<AdapterState>>,
1881 Path(session_id): Path<String>,
1882) -> Response {
1883 if let Err(err) = state.ensure_initialized().await {
1884 return internal_error(err);
1885 }
1886
1887 let projection = state.projection.lock().await;
1888 let Some(session) = projection.sessions.get(&session_id) else {
1889 return not_found("Session not found");
1890 };
1891
1892 let values = session
1893 .messages
1894 .iter()
1895 .map(|record| json!({"info": record.info, "parts": record.parts}))
1896 .collect::<Vec<_>>();
1897
1898 (StatusCode::OK, Json(values)).into_response()
1899}
1900
1901async fn oc_session_prompt(
1902 State(state): State<Arc<AdapterState>>,
1903 Path(session_id): Path<String>,
1904 headers: HeaderMap,
1905 Query(query): Query<DirectoryQuery>,
1906 Json(body): Json<PromptBody>,
1907) -> Response {
1908 if let Err(err) = state.ensure_initialized().await {
1909 return internal_error(err);
1910 }
1911
1912 let directory = resolve_directory(&headers, query.directory.as_ref());
1913 let mut meta = match state.ensure_session(&session_id, directory.clone()).await {
1914 Ok(meta) => meta,
1915 Err(err) => return internal_error(err),
1916 };
1917
1918 let explicit_model_selection = prompt_has_explicit_model_selection(&body);
1919 let requested_selection = resolve_selection_from_prompt(&body);
1920 if explicit_model_selection && requested_selection.is_none() {
1921 return bad_request("providerID and modelID are required when selecting a model");
1922 }
1923
1924 let has_messages = {
1925 let projection = state.projection.lock().await;
1926 projection
1927 .sessions
1928 .get(&session_id)
1929 .map(|session| !session.messages.is_empty())
1930 .unwrap_or(false)
1931 };
1932
1933 if let Some(selection) = requested_selection.as_ref() {
1934 let selection_changed =
1935 meta.provider_id != selection.provider_id || meta.model_id != selection.model_id;
1936 if has_messages && selection_changed {
1937 return bad_request(MODEL_CHANGE_ERROR);
1938 }
1939 meta.provider_id = selection.provider_id.clone();
1940 meta.model_id = selection.model_id.clone();
1941 meta.agent = selection.agent.clone();
1942 } else if let Some(agent) = body.agent.as_ref() {
1943 if has_messages && meta.agent != *agent {
1944 return bad_request(MODEL_CHANGE_ERROR);
1945 }
1946 meta.agent = agent.clone();
1947 }
1948
1949 let parts_input = body.parts.unwrap_or_default();
1950 if parts_input.is_empty() {
1951 return bad_request("parts are required");
1952 }
1953
1954 if let Some(session_mode) = {
1955 let projection = state.projection.lock().await;
1956 projection
1957 .sessions
1958 .get(&session_id)
1959 .and_then(|session| session.meta.permission_mode.clone())
1960 } {
1961 meta.permission_mode = Some(session_mode);
1962 }
1963
1964 {
1965 let mut projection = state.projection.lock().await;
1966 if let Some(session) = projection.sessions.get_mut(&session_id) {
1967 session.meta.agent = meta.agent.clone();
1968 session.meta.provider_id = meta.provider_id.clone();
1969 session.meta.model_id = meta.model_id.clone();
1970 session.meta.updated_at = now_ms();
1971 meta = session.meta.clone();
1972 }
1973 }
1974
1975 if let Err(err) = state.persist_session(&meta).await {
1976 return internal_error(err);
1977 }
1978
1979 if let Err(err) = state.maybe_restore_session(&session_id).await {
1980 return internal_error(err);
1981 }
1982
1983 {
1987 let projection = state.projection.lock().await;
1988 if let Some(session) = projection.sessions.get(&session_id) {
1989 meta = session.meta.clone();
1990 }
1991 }
1992
1993 let user_message_id = body
1994 .message_id
1995 .clone()
1996 .unwrap_or_else(|| state.next_id("msg_"));
1997 let now = now_ms();
1998
1999 let user_info = build_user_message(
2000 &session_id,
2001 &user_message_id,
2002 now,
2003 &meta.agent,
2004 &meta.provider_id,
2005 &meta.model_id,
2006 body.system.as_deref(),
2007 );
2008 let user_parts = normalize_parts(&session_id, &user_message_id, &parts_input);
2009
2010 let replay_injected = state.pending_replay.lock().await.remove(&session_id);
2011 let outbound_prompt_parts = if let Some(replay_text) = replay_injected {
2012 let mut prompt = vec![json!({"type":"text", "text": replay_text})];
2013 prompt.extend(parts_input.clone());
2014 prompt
2015 } else {
2016 parts_input.clone()
2017 };
2018
2019 let prompt_envelope = json!({
2020 "jsonrpc": "2.0",
2021 "id": state.next_id("oc_req_"),
2022 "method": "session/prompt",
2023 "params": {
2024 "sessionId": meta.agent_session_id,
2025 "prompt": outbound_prompt_parts,
2026 "sessionID": session_id,
2027 "message": {
2028 "info": user_info,
2029 "parts": user_parts,
2030 }
2031 }
2032 });
2033 if let Err(err) = state
2034 .persist_event(&session_id, "client", &prompt_envelope)
2035 .await
2036 {
2037 return internal_error(err);
2038 }
2039
2040 state.emit_event(message_event("message.updated", &user_info));
2041 for part in &user_parts {
2042 state.emit_event(json!({
2043 "type":"message.part.updated",
2044 "properties":{
2045 "sessionID": session_id,
2046 "messageID": user_message_id,
2047 "part": part
2048 }
2049 }));
2050 }
2051
2052 state
2055 .last_user_message_id
2056 .lock()
2057 .await
2058 .insert(session_id.clone(), user_message_id.clone());
2059
2060 if let Err(err) = set_session_status(&state, &session_id, "busy").await {
2061 return internal_error(err);
2062 }
2063
2064 tracing::info!(
2069 session_id = %session_id,
2070 agent = %meta.agent,
2071 provider_id = %meta.provider_id,
2072 model_id = %meta.model_id,
2073 has_acp_dispatch = state.config.acp_dispatch.is_some(),
2074 "prompt dispatch decision"
2075 );
2076 if let Some(dispatch) = state.config.acp_dispatch.as_ref() {
2077 if meta.agent != "mock" {
2078 let server_id = meta.agent_session_id.clone();
2079
2080 tracing::info!(server_id = %server_id, agent = %meta.agent, "entering ACP dispatch path");
2081
2082 let needs_init = !state.acp_initialized.lock().await.contains_key(&server_id);
2084 if needs_init {
2085 tracing::info!(server_id = %server_id, "bootstrapping ACP session (initialize + session/new)");
2086 let init_id = state.next_id("oc_rpc_");
2088 let init_payload = json!({
2089 "jsonrpc": "2.0",
2090 "id": init_id,
2091 "method": "initialize",
2092 "params": {
2093 "protocolVersion": 1,
2094 "capabilities": {},
2095 "clientInfo": {
2096 "name": "sandbox-agent-opencode-adapter",
2097 "version": "0.1.0"
2098 },
2099 "_meta": {
2100 "sandboxagent.dev": {
2101 "agent": meta.agent.clone()
2102 }
2103 }
2104 }
2105 });
2106 match dispatch
2107 .post(&server_id, Some(&meta.agent), init_payload)
2108 .await
2109 {
2110 Ok(AcpDispatchResult::Response(ref resp)) => {
2111 if let Some(err) = resp.get("error") {
2112 tracing::error!(server_id = %server_id, error = %err, "ACP initialize returned JSON-RPC error");
2113 let _ = set_session_status(&state, &session_id, "idle").await;
2114 return internal_error(format!("ACP initialize error: {err}"));
2115 }
2116 tracing::info!(server_id = %server_id, "ACP initialize succeeded");
2117 }
2118 Ok(AcpDispatchResult::Accepted) => {
2119 tracing::info!(server_id = %server_id, "ACP initialize accepted");
2120 }
2121 Err(err) => {
2122 let _ = set_session_status(&state, &session_id, "idle").await;
2123 return internal_error(format!("ACP initialize failed: {err}"));
2124 }
2125 }
2126
2127 let new_id = state.next_id("oc_rpc_");
2129 let new_payload = json!({
2130 "jsonrpc": "2.0",
2131 "id": new_id,
2132 "method": "session/new",
2133 "params": {
2134 "cwd": directory,
2135 "mcpServers": [],
2136 "_meta": {
2137 "sandboxagent.dev": {
2138 "model": meta.model_id.clone()
2139 }
2140 }
2141 }
2142 });
2143 let acp_session_id = match dispatch.post(&server_id, None, new_payload).await {
2144 Ok(AcpDispatchResult::Response(ref resp)) => {
2145 if let Some(err) = resp.get("error") {
2146 tracing::error!(server_id = %server_id, error = %err, "ACP session/new returned JSON-RPC error");
2147 let _ = set_session_status(&state, &session_id, "idle").await;
2148 return internal_error(format!("ACP session/new error: {err}"));
2149 }
2150 let sid = resp
2151 .pointer("/result/sessionId")
2152 .and_then(Value::as_str)
2153 .unwrap_or("")
2154 .to_string();
2155 tracing::info!(server_id = %server_id, acp_session_id = %sid, "ACP session/new succeeded");
2156 sid
2157 }
2158 Ok(AcpDispatchResult::Accepted) => {
2159 tracing::info!(server_id = %server_id, "ACP session/new accepted");
2160 String::new()
2161 }
2162 Err(err) => {
2163 let _ = set_session_status(&state, &session_id, "idle").await;
2164 return internal_error(format!("ACP session/new failed: {err}"));
2165 }
2166 };
2167
2168 match dispatch.notification_stream(&server_id, None).await {
2170 Ok(stream) => {
2171 let state_for_task = state.clone();
2172 let session_id_for_task = session_id.clone();
2173 let directory_for_task = directory.clone();
2174 let agent_for_task = meta.agent.clone();
2175 let provider_for_task = meta.provider_id.clone();
2176 let model_for_task = meta.model_id.clone();
2177 tokio::spawn(acp_sse_translation_task(
2178 state_for_task,
2179 stream,
2180 session_id_for_task,
2181 directory_for_task,
2182 agent_for_task,
2183 provider_for_task,
2184 model_for_task,
2185 ));
2186 }
2187 Err(err) => {
2188 warn!(
2189 ?err,
2190 "failed to open ACP SSE stream; events will not be translated"
2191 );
2192 }
2193 }
2194
2195 state
2196 .acp_initialized
2197 .lock()
2198 .await
2199 .insert(server_id.clone(), acp_session_id);
2200 }
2201
2202 let acp_session_id = state
2204 .acp_initialized
2205 .lock()
2206 .await
2207 .get(&server_id)
2208 .cloned()
2209 .unwrap_or_default();
2210 let prompt_id = state.next_id("oc_rpc_");
2211 let prompt_payload = json!({
2212 "jsonrpc": "2.0",
2213 "id": prompt_id,
2214 "method": "session/prompt",
2215 "params": {
2216 "sessionId": acp_session_id,
2217 "prompt": outbound_prompt_parts,
2218 }
2219 });
2220 match dispatch.post(&server_id, None, prompt_payload).await {
2225 Ok(AcpDispatchResult::Response(ref resp)) => {
2226 if let Some(err) = resp.get("error") {
2227 tracing::error!(server_id = %server_id, error = %err, "ACP session/prompt returned JSON-RPC error");
2228 let _ = set_session_status(&state, &session_id, "idle").await;
2229 return internal_error(format!("ACP session/prompt error: {err}"));
2230 }
2231 tracing::info!(server_id = %server_id, "ACP session/prompt response received (turn completion delegated to SSE task)");
2232 }
2233 Ok(AcpDispatchResult::Accepted) => {
2234 tracing::info!(server_id = %server_id, "ACP session/prompt accepted (streaming)");
2235 }
2236 Err(err) => {
2237 let _ = set_session_status(&state, &session_id, "idle").await;
2238 return internal_error(format!("ACP session/prompt failed: {err}"));
2239 }
2240 };
2241
2242 let assistant_message = build_assistant_message(
2246 &session_id,
2247 &format!("{user_message_id}_pending"),
2248 &user_message_id,
2249 now,
2250 &directory,
2251 &meta.agent,
2252 &meta.provider_id,
2253 &meta.model_id,
2254 );
2255 return (
2256 StatusCode::OK,
2257 Json(json!({
2258 "info": assistant_message,
2259 "parts": [],
2260 })),
2261 )
2262 .into_response();
2263 }
2264 }
2265
2266 let prompt_text = parts_input
2267 .iter()
2268 .find_map(|part| part.get("text").and_then(Value::as_str))
2269 .unwrap_or("")
2270 .to_string();
2271
2272 let auto_allow = {
2273 let projection = state.projection.lock().await;
2274 projection
2275 .sessions
2276 .get(&session_id)
2277 .map(|session| session.always_permissions.contains("execute"))
2278 .unwrap_or(false)
2279 };
2280
2281 if prompt_text.to_ascii_lowercase().contains("permission") {
2282 let request_id = state.next_id("perm_");
2283 let permission_request = json!({
2284 "id": request_id,
2285 "sessionID": session_id,
2286 "permission": "execute",
2287 "patterns": ["*"],
2288 "metadata": {},
2289 "always": [],
2290 });
2291 let asked = json!({
2292 "jsonrpc":"2.0",
2293 "method":"_sandboxagent/opencode/permission_asked",
2294 "params":{"request": permission_request}
2295 });
2296 if let Err(err) = state.persist_event(&session_id, "agent", &asked).await {
2297 return internal_error(err);
2298 }
2299 state.emit_event(json!({"type":"permission.asked","properties":permission_request}));
2300
2301 if auto_allow {
2302 if let Err(err) =
2303 resolve_permission_inner(&state, &session_id, &request_id, "always").await
2304 {
2305 return internal_error(err);
2306 }
2307 }
2308
2309 let assistant_info = build_assistant_message(
2310 &session_id,
2311 &format!("{user_message_id}_pending"),
2312 &user_message_id,
2313 now,
2314 &directory,
2315 &meta.agent,
2316 &meta.provider_id,
2317 &meta.model_id,
2318 );
2319
2320 return (
2321 StatusCode::OK,
2322 Json(json!({"info": assistant_info, "parts": []})),
2323 )
2324 .into_response();
2325 }
2326
2327 if prompt_text.to_ascii_lowercase().contains("question") {
2328 let request_id = state.next_id("q_");
2329 let question_request = json!({
2330 "id": request_id,
2331 "sessionID": session_id,
2332 "questions": [{
2333 "question": "Choose one option",
2334 "header": "Question",
2335 "options": [
2336 {"label":"Yes","description":"Accept"},
2337 {"label":"No","description":"Reject"}
2338 ],
2339 "multiple": false,
2340 "custom": true
2341 }]
2342 });
2343 let asked = json!({
2344 "jsonrpc":"2.0",
2345 "method":"_sandboxagent/opencode/question_asked",
2346 "params":{"request": question_request}
2347 });
2348 if let Err(err) = state.persist_event(&session_id, "agent", &asked).await {
2349 return internal_error(err);
2350 }
2351 state.emit_event(json!({"type":"question.asked","properties":question_request}));
2352
2353 let assistant_info = build_assistant_message(
2354 &session_id,
2355 &format!("{user_message_id}_pending"),
2356 &user_message_id,
2357 now,
2358 &directory,
2359 &meta.agent,
2360 &meta.provider_id,
2361 &meta.model_id,
2362 );
2363
2364 return (
2365 StatusCode::OK,
2366 Json(json!({"info": assistant_info, "parts": []})),
2367 )
2368 .into_response();
2369 }
2370
2371 tokio::time::sleep(Duration::from_millis(120)).await;
2372
2373 if prompt_text.to_ascii_lowercase().contains("error") {
2374 state.emit_event(json!({
2375 "type":"session.error",
2376 "properties":{
2377 "sessionID": session_id,
2378 "error": {"name":"UnknownError","data":{"message":"mock process crashed"}}
2379 }
2380 }));
2381 let err_env = json!({
2382 "jsonrpc":"2.0",
2383 "method":"_sandboxagent/opencode/error",
2384 "params":{"message":"mock process crashed"}
2385 });
2386 if let Err(err) = state.persist_event(&session_id, "agent", &err_env).await {
2387 return internal_error(err);
2388 }
2389 if let Err(err) = set_session_status(&state, &session_id, "idle").await {
2390 return internal_error(err);
2391 }
2392
2393 let assistant_info = build_assistant_message(
2394 &session_id,
2395 &format!("{user_message_id}_error"),
2396 &user_message_id,
2397 now,
2398 &directory,
2399 &meta.agent,
2400 &meta.provider_id,
2401 &meta.model_id,
2402 );
2403
2404 return (
2405 StatusCode::OK,
2406 Json(json!({"info": assistant_info, "parts": []})),
2407 )
2408 .into_response();
2409 }
2410
2411 let assistant_message_id = format!("{user_message_id}_assistant");
2412 let assistant_info = build_completed_assistant_message(
2413 &session_id,
2414 &assistant_message_id,
2415 &user_message_id,
2416 now,
2417 &directory,
2418 &meta.agent,
2419 &meta.provider_id,
2420 &meta.model_id,
2421 );
2422
2423 let mut assistant_parts = Vec::<Value>::new();
2424
2425 if prompt_text.to_ascii_lowercase().contains("tool") {
2426 let tool_part = json!({
2427 "id": state.next_id("part_"),
2428 "sessionID": session_id,
2429 "messageID": assistant_message_id,
2430 "type": "tool",
2431 "callID": state.next_id("call_"),
2432 "tool": "bash",
2433 "state": {
2434 "status": "completed",
2435 "input": {"command": "echo tool"},
2436 "output": "ok",
2437 "title": "bash",
2438 "metadata": {},
2439 "time": {"start": now, "end": now}
2440 }
2441 });
2442 let file_part = json!({
2443 "id": state.next_id("part_"),
2444 "sessionID": session_id,
2445 "messageID": assistant_message_id,
2446 "type": "file",
2447 "mime": "text/plain",
2448 "filename": "README.md",
2449 "url": "file:///README.md",
2450 });
2451
2452 assistant_parts.push(tool_part.clone());
2453 assistant_parts.push(file_part.clone());
2454
2455 state.emit_event(json!({
2456 "type":"message.part.updated",
2457 "properties":{
2458 "sessionID": session_id,
2459 "messageID": assistant_message_id,
2460 "part": tool_part
2461 }
2462 }));
2463 state.emit_event(json!({
2464 "type":"message.part.updated",
2465 "properties":{
2466 "sessionID": session_id,
2467 "messageID": assistant_message_id,
2468 "part": file_part
2469 }
2470 }));
2471 state.emit_event(
2472 json!({"type":"file.edited","properties":{"sessionID":session_id, "path":"README.md"}}),
2473 );
2474 } else {
2475 let response_text = if prompt_text.trim().is_empty() {
2476 "OK".to_string()
2477 } else {
2478 prompt_text.clone()
2479 };
2480 let text_part = json!({
2481 "id": state.next_id("part_"),
2482 "sessionID": session_id,
2483 "messageID": assistant_message_id,
2484 "type": "text",
2485 "text": response_text,
2486 });
2487 assistant_parts.push(text_part.clone());
2488 state.emit_event(json!({
2489 "type":"message.part.updated",
2490 "properties":{
2491 "sessionID": session_id,
2492 "messageID": assistant_message_id,
2493 "part": text_part
2494 }
2495 }));
2496 }
2497
2498 let assistant_env = json!({
2499 "jsonrpc": "2.0",
2500 "method": "_sandboxagent/opencode/message",
2501 "params": {
2502 "message": {
2503 "info": assistant_info,
2504 "parts": assistant_parts,
2505 }
2506 }
2507 });
2508 if let Err(err) = state
2509 .persist_event(&session_id, "agent", &assistant_env)
2510 .await
2511 {
2512 return internal_error(err);
2513 }
2514
2515 state.emit_event(message_event("message.updated", &assistant_info));
2516
2517 if let Err(err) = set_session_status(&state, &session_id, "idle").await {
2518 return internal_error(err);
2519 }
2520
2521 let projection = state.projection.lock().await;
2522 let parts = projection
2523 .sessions
2524 .get(&session_id)
2525 .and_then(|session| {
2526 session
2527 .messages
2528 .iter()
2529 .find(|message| {
2530 message.info.get("id").and_then(Value::as_str)
2531 == Some(assistant_message_id.as_str())
2532 })
2533 .map(|message| message.parts.clone())
2534 })
2535 .unwrap_or_default();
2536
2537 (
2538 StatusCode::OK,
2539 Json(json!({"info": assistant_info, "parts": parts})),
2540 )
2541 .into_response()
2542}
2543
2544async fn oc_session_message_get(
2545 State(state): State<Arc<AdapterState>>,
2546 Path((session_id, message_id)): Path<(String, String)>,
2547) -> Response {
2548 if let Err(err) = state.ensure_initialized().await {
2549 return internal_error(err);
2550 }
2551
2552 let projection = state.projection.lock().await;
2553 let Some(session) = projection.sessions.get(&session_id) else {
2554 return not_found("Session not found");
2555 };
2556
2557 let Some(record) = session.messages.iter().find(|message| {
2558 message.info.get("id").and_then(Value::as_str) == Some(message_id.as_str())
2559 }) else {
2560 return not_found("Message not found");
2561 };
2562
2563 (
2564 StatusCode::OK,
2565 Json(json!({
2566 "id": message_id,
2567 "info": record.info,
2568 "parts": record.parts,
2569 })),
2570 )
2571 .into_response()
2572}
2573
2574async fn oc_part_update(
2575 State(state): State<Arc<AdapterState>>,
2576 Path((session_id, message_id, part_id)): Path<(String, String, String)>,
2577 Json(mut part): Json<Value>,
2578) -> Response {
2579 if let Err(err) = state.ensure_initialized().await {
2580 return internal_error(err);
2581 }
2582
2583 if let Some(obj) = part.as_object_mut() {
2584 obj.insert("id".to_string(), json!(part_id.clone()));
2585 obj.insert("sessionID".to_string(), json!(session_id.clone()));
2586 obj.insert("messageID".to_string(), json!(message_id.clone()));
2587 }
2588
2589 {
2590 let mut projection = state.projection.lock().await;
2591 if let Some(session) = projection.sessions.get_mut(&session_id) {
2592 if let Some(message) = session.messages.iter_mut().find(|record| {
2593 record.info.get("id").and_then(Value::as_str) == Some(message_id.as_str())
2594 }) {
2595 if let Some(existing) = message.parts.iter_mut().find(|candidate| {
2596 candidate.get("id").and_then(Value::as_str) == Some(part_id.as_str())
2597 }) {
2598 *existing = part.clone();
2599 } else {
2600 message.parts.push(part.clone());
2601 }
2602 }
2603 }
2604 }
2605
2606 state.emit_event(json!({
2607 "type":"message.part.updated",
2608 "properties":{
2609 "sessionID": session_id,
2610 "messageID": message_id,
2611 "part": part.clone()
2612 }
2613 }));
2614
2615 (StatusCode::OK, Json(part)).into_response()
2616}
2617
2618async fn oc_part_delete(
2619 State(state): State<Arc<AdapterState>>,
2620 Path((session_id, message_id, part_id)): Path<(String, String, String)>,
2621) -> Response {
2622 if let Err(err) = state.ensure_initialized().await {
2623 return internal_error(err);
2624 }
2625
2626 {
2627 let mut projection = state.projection.lock().await;
2628 if let Some(session) = projection.sessions.get_mut(&session_id) {
2629 if let Some(message) = session.messages.iter_mut().find(|record| {
2630 record.info.get("id").and_then(Value::as_str) == Some(message_id.as_str())
2631 }) {
2632 message.parts.retain(|part| {
2633 part.get("id").and_then(Value::as_str) != Some(part_id.as_str())
2634 });
2635 }
2636 }
2637 }
2638
2639 state.emit_event(json!({
2640 "type":"message.part.removed",
2641 "properties": {"sessionID": session_id, "messageID": message_id, "partID": part_id}
2642 }));
2643
2644 (StatusCode::OK, Json(json!(true))).into_response()
2645}
2646
2647async fn oc_session_prompt_async(
2648 State(state): State<Arc<AdapterState>>,
2649 Path(session_id): Path<String>,
2650 headers: HeaderMap,
2651 query: Query<DirectoryQuery>,
2652 Json(body): Json<PromptBody>,
2653) -> Response {
2654 let _ = oc_session_prompt(State(state), Path(session_id), headers, query, Json(body)).await;
2655
2656 StatusCode::NO_CONTENT.into_response()
2657}
2658
2659async fn oc_permission_respond(
2660 State(state): State<Arc<AdapterState>>,
2661 Path((session_id, permission_id)): Path<(String, String)>,
2662 Json(body): Json<PermissionRespondBody>,
2663) -> Response {
2664 if let Err(err) = state.ensure_initialized().await {
2665 return internal_error(err);
2666 }
2667
2668 let reply = match body.response.as_deref() {
2669 Some("allow") => "once",
2670 Some("deny") => "reject",
2671 Some("always") => "always",
2672 _ => "once",
2673 };
2674
2675 if let Err(err) = resolve_permission_inner(&state, &session_id, &permission_id, reply).await {
2676 return internal_error(err);
2677 }
2678
2679 (StatusCode::OK, Json(json!(true))).into_response()
2680}
2681
2682async fn oc_permission_reply(
2683 State(state): State<Arc<AdapterState>>,
2684 Path(request_id): Path<String>,
2685 Json(body): Json<PermissionReplyBody>,
2686) -> Response {
2687 if let Err(err) = state.ensure_initialized().await {
2688 return internal_error(err);
2689 }
2690
2691 let reply = body.reply.unwrap_or_else(|| "once".to_string());
2692 let session_id = {
2693 let projection = state.projection.lock().await;
2694 projection
2695 .permissions
2696 .get(&request_id)
2697 .and_then(|value| value.get("sessionID"))
2698 .and_then(Value::as_str)
2699 .map(ToOwned::to_owned)
2700 };
2701
2702 let Some(session_id) = session_id else {
2703 return not_found("Permission request not found");
2704 };
2705
2706 if let Err(err) = resolve_permission_inner(&state, &session_id, &request_id, &reply).await {
2707 return internal_error(err);
2708 }
2709
2710 (StatusCode::OK, Json(json!(true))).into_response()
2711}
2712
2713async fn oc_permission_list(State(state): State<Arc<AdapterState>>) -> Response {
2714 if let Err(err) = state.ensure_initialized().await {
2715 return internal_error(err);
2716 }
2717
2718 let projection = state.projection.lock().await;
2719 let mut values = projection.permissions.values().cloned().collect::<Vec<_>>();
2720 values.sort_by(|a, b| {
2721 let a_id = a.get("id").and_then(Value::as_str).unwrap_or_default();
2722 let b_id = b.get("id").and_then(Value::as_str).unwrap_or_default();
2723 a_id.cmp(b_id)
2724 });
2725 (StatusCode::OK, Json(values)).into_response()
2726}
2727
2728async fn oc_question_list(State(state): State<Arc<AdapterState>>) -> Response {
2729 if let Err(err) = state.ensure_initialized().await {
2730 return internal_error(err);
2731 }
2732
2733 let projection = state.projection.lock().await;
2734 let mut values = projection.questions.values().cloned().collect::<Vec<_>>();
2735 values.sort_by(|a, b| {
2736 let a_id = a.get("id").and_then(Value::as_str).unwrap_or_default();
2737 let b_id = b.get("id").and_then(Value::as_str).unwrap_or_default();
2738 a_id.cmp(b_id)
2739 });
2740 (StatusCode::OK, Json(values)).into_response()
2741}
2742
2743async fn oc_question_reply(
2744 State(state): State<Arc<AdapterState>>,
2745 Path(request_id): Path<String>,
2746 Json(body): Json<QuestionReplyBody>,
2747) -> Response {
2748 if let Err(err) = state.ensure_initialized().await {
2749 return internal_error(err);
2750 }
2751
2752 let session_id = {
2753 let projection = state.projection.lock().await;
2754 projection
2755 .questions
2756 .get(&request_id)
2757 .and_then(|value| value.get("sessionID"))
2758 .and_then(Value::as_str)
2759 .map(ToOwned::to_owned)
2760 };
2761
2762 let Some(session_id) = session_id else {
2763 return not_found("Question request not found");
2764 };
2765
2766 let answers = body.answers.unwrap_or_default();
2767
2768 let pending = state.acp_request_ids.lock().await.remove(&request_id);
2770
2771 if let Some(pending) = &pending {
2772 if let Some(dispatch) = state.config.acp_dispatch.as_ref() {
2773 let agent_session_id = {
2774 let projection = state.projection.lock().await;
2775 projection
2776 .sessions
2777 .get(&session_id)
2778 .map(|s| s.meta.agent_session_id.clone())
2779 };
2780 if let Some(server_id) = agent_session_id {
2781 let response = json!({
2782 "jsonrpc": "2.0",
2783 "id": pending.jsonrpc_id,
2784 "result": {
2785 "outcome": "selected",
2786 "_meta": {
2787 "sandboxagent.dev": {
2788 "answers": answers
2789 }
2790 }
2791 }
2792 });
2793 if let Err(err) = dispatch.post(&server_id, None, response).await {
2794 warn!(?err, "failed to forward question response to ACP agent");
2795 }
2796 }
2797 }
2798 }
2799
2800 let envelope = json!({
2801 "jsonrpc":"2.0",
2802 "method":"_sandboxagent/opencode/question_replied",
2803 "params":{"requestID": request_id, "answers": answers}
2804 });
2805 if let Err(err) = state.persist_event(&session_id, "agent", &envelope).await {
2806 return internal_error(err);
2807 }
2808
2809 state.emit_event(json!({
2810 "type":"question.replied",
2811 "properties": {
2812 "sessionID": session_id,
2813 "requestID": request_id,
2814 "answers": answers,
2815 }
2816 }));
2817
2818 if let Err(err) = set_session_status(&state, &session_id, "idle").await {
2819 return internal_error(err);
2820 }
2821
2822 (StatusCode::OK, Json(json!(true))).into_response()
2823}
2824
2825async fn oc_question_reject(
2826 State(state): State<Arc<AdapterState>>,
2827 Path(request_id): Path<String>,
2828) -> Response {
2829 if let Err(err) = state.ensure_initialized().await {
2830 return internal_error(err);
2831 }
2832
2833 let session_id = {
2834 let projection = state.projection.lock().await;
2835 projection
2836 .questions
2837 .get(&request_id)
2838 .and_then(|value| value.get("sessionID"))
2839 .and_then(Value::as_str)
2840 .map(ToOwned::to_owned)
2841 };
2842
2843 let Some(session_id) = session_id else {
2844 return not_found("Question request not found");
2845 };
2846
2847 let pending = state.acp_request_ids.lock().await.remove(&request_id);
2849
2850 if let Some(pending) = &pending {
2851 if let Some(dispatch) = state.config.acp_dispatch.as_ref() {
2852 let agent_session_id = {
2853 let projection = state.projection.lock().await;
2854 projection
2855 .sessions
2856 .get(&session_id)
2857 .map(|s| s.meta.agent_session_id.clone())
2858 };
2859 if let Some(server_id) = agent_session_id {
2860 let response = json!({
2861 "jsonrpc": "2.0",
2862 "id": pending.jsonrpc_id,
2863 "result": {
2864 "outcome": "rejected"
2865 }
2866 });
2867 if let Err(err) = dispatch.post(&server_id, None, response).await {
2868 warn!(?err, "failed to forward question rejection to ACP agent");
2869 }
2870 }
2871 }
2872 }
2873
2874 let envelope = json!({
2875 "jsonrpc":"2.0",
2876 "method":"_sandboxagent/opencode/question_rejected",
2877 "params":{"requestID": request_id}
2878 });
2879 if let Err(err) = state.persist_event(&session_id, "agent", &envelope).await {
2880 return internal_error(err);
2881 }
2882
2883 state.emit_event(json!({
2884 "type":"question.rejected",
2885 "properties": {
2886 "sessionID": session_id,
2887 "requestID": request_id,
2888 }
2889 }));
2890
2891 if let Err(err) = set_session_status(&state, &session_id, "idle").await {
2892 return internal_error(err);
2893 }
2894
2895 (StatusCode::OK, Json(json!(true))).into_response()
2896}
2897
2898async fn oc_provider_list(State(state): State<Arc<AdapterState>>) -> Response {
2899 if let Err(err) = state.ensure_initialized().await {
2900 return internal_error(err);
2901 }
2902 (StatusCode::OK, Json(provider_payload(&state))).into_response()
2903}
2904
2905async fn oc_provider_auth(State(state): State<Arc<AdapterState>>) -> Response {
2906 if let Err(err) = state.ensure_initialized().await {
2907 return internal_error(err);
2908 }
2909 (StatusCode::OK, Json(json!({"mock": [], "amp": []}))).into_response()
2910}
2911
2912async fn oc_provider_oauth_authorize(Path(provider_id): Path<String>) -> Response {
2913 (
2914 StatusCode::OK,
2915 Json(json!({
2916 "url": format!("https://auth.local/{provider_id}/authorize"),
2917 "method": "auto",
2918 "instructions": "stub",
2919 })),
2920 )
2921 .into_response()
2922}
2923
2924async fn oc_provider_oauth_callback() -> Response {
2925 (StatusCode::OK, Json(json!(true))).into_response()
2926}
2927
2928async fn resolve_permission_inner(
2929 state: &Arc<AdapterState>,
2930 session_id: &str,
2931 permission_id: &str,
2932 reply: &str,
2933) -> Result<(), String> {
2934 let pending = state.acp_request_ids.lock().await.remove(permission_id);
2937
2938 if let Some(pending) = &pending {
2939 if let Some(dispatch) = state.config.acp_dispatch.as_ref() {
2940 let agent_session_id = {
2941 let projection = state.projection.lock().await;
2942 projection
2943 .sessions
2944 .get(session_id)
2945 .map(|s| s.meta.agent_session_id.clone())
2946 };
2947 if let Some(server_id) = agent_session_id {
2948 let option_kind = match reply {
2949 "always" => "allow_always",
2950 "reject" | "deny" => "reject_once",
2951 _ => "allow_once",
2952 };
2953 let response = json!({
2954 "jsonrpc": "2.0",
2955 "id": pending.jsonrpc_id,
2956 "result": {
2957 "outcome": "selected",
2958 "selectedOption": {
2959 "kind": option_kind
2960 }
2961 }
2962 });
2963 if let Err(err) = dispatch.post(&server_id, None, response).await {
2964 warn!(?err, "failed to forward permission response to ACP agent");
2965 }
2966 }
2967 }
2968 }
2969
2970 let envelope = json!({
2971 "jsonrpc":"2.0",
2972 "method":"_sandboxagent/opencode/permission_replied",
2973 "params": {
2974 "requestID": permission_id,
2975 "reply": reply,
2976 }
2977 });
2978 state.persist_event(session_id, "agent", &envelope).await?;
2979
2980 state.emit_event(json!({
2981 "type":"permission.replied",
2982 "properties": {
2983 "sessionID": session_id,
2984 "requestID": permission_id,
2985 "reply": reply,
2986 }
2987 }));
2988
2989 if reply == "always" {
2990 let mut projection = state.projection.lock().await;
2991 if let Some(session) = projection.sessions.get_mut(session_id) {
2992 session.always_permissions.insert("execute".to_string());
2993 }
2994 }
2995
2996 set_session_status(state, session_id, "idle").await
2997}
2998
2999async fn set_session_status(
3000 state: &Arc<AdapterState>,
3001 session_id: &str,
3002 status: &str,
3003) -> Result<(), String> {
3004 let updated_meta = {
3005 let mut projection = state.projection.lock().await;
3006 let Some(session) = projection.sessions.get_mut(session_id) else {
3007 return Err(format!("session '{session_id}' not found"));
3008 };
3009 session.status = status.to_string();
3010 session.meta.updated_at = now_ms();
3011 session.meta.clone()
3012 };
3013 state.persist_session(&updated_meta).await?;
3014
3015 let env = json!({
3016 "jsonrpc":"2.0",
3017 "method":"_sandboxagent/opencode/status",
3018 "params":{"status": status}
3019 });
3020 state.persist_event(session_id, "agent", &env).await?;
3021
3022 state.emit_event(json!({
3023 "type":"session.status",
3024 "properties": {
3025 "sessionID": session_id,
3026 "status": {"type": status},
3027 }
3028 }));
3029
3030 if status == "idle" {
3031 state.emit_event(json!({
3032 "type":"session.idle",
3033 "properties": {"sessionID": session_id}
3034 }));
3035 }
3036
3037 Ok(())
3038}
3039
3040fn apply_envelope(projection: &mut Projection, session_id: &str, _sender: &str, payload: &Value) {
3041 let Some(method) = payload.get("method").and_then(Value::as_str) else {
3042 return;
3043 };
3044
3045 match method {
3046 "session/prompt" => {
3047 if let Some(message) = payload
3048 .get("params")
3049 .and_then(|params| params.get("message"))
3050 .and_then(Value::as_object)
3051 {
3052 let info = message.get("info").cloned().unwrap_or_else(|| json!({}));
3053 let parts = message
3054 .get("parts")
3055 .and_then(Value::as_array)
3056 .cloned()
3057 .unwrap_or_default();
3058 if let Some(session) = projection.sessions.get_mut(session_id) {
3059 upsert_message(session, info, parts);
3060 session.status = "busy".to_string();
3061 }
3062 }
3063 }
3064 "_sandboxagent/opencode/message" => {
3065 if let Some(message) = payload
3066 .get("params")
3067 .and_then(|params| params.get("message"))
3068 .and_then(Value::as_object)
3069 {
3070 let info = message.get("info").cloned().unwrap_or_else(|| json!({}));
3071 let parts = message
3072 .get("parts")
3073 .and_then(Value::as_array)
3074 .cloned()
3075 .unwrap_or_default();
3076 if let Some(session) = projection.sessions.get_mut(session_id) {
3077 upsert_message(session, info, parts);
3078 }
3079 }
3080 }
3081 "_sandboxagent/opencode/status" => {
3082 let status = payload
3083 .get("params")
3084 .and_then(|params| params.get("status"))
3085 .and_then(Value::as_str)
3086 .unwrap_or("idle")
3087 .to_string();
3088 if let Some(session) = projection.sessions.get_mut(session_id) {
3089 session.status = status;
3090 }
3091 }
3092 "_sandboxagent/opencode/permission_asked" => {
3093 if let Some(request) = payload
3094 .get("params")
3095 .and_then(|params| params.get("request"))
3096 .cloned()
3097 {
3098 if let Some(id) = request.get("id").and_then(Value::as_str) {
3099 projection.permissions.insert(id.to_string(), request);
3100 }
3101 if let Some(session) = projection.sessions.get_mut(session_id) {
3102 session.status = "busy".to_string();
3103 }
3104 }
3105 }
3106 "_sandboxagent/opencode/permission_replied" => {
3107 if let Some(request_id) = payload
3108 .get("params")
3109 .and_then(|params| params.get("requestID"))
3110 .and_then(Value::as_str)
3111 {
3112 let reply = payload
3113 .get("params")
3114 .and_then(|params| params.get("reply"))
3115 .and_then(Value::as_str)
3116 .unwrap_or("once");
3117 projection.permissions.remove(request_id);
3118 if reply == "always" {
3119 if let Some(session) = projection.sessions.get_mut(session_id) {
3120 session.always_permissions.insert("execute".to_string());
3121 }
3122 }
3123 }
3124 }
3125 "_sandboxagent/opencode/question_asked" => {
3126 if let Some(request) = payload
3127 .get("params")
3128 .and_then(|params| params.get("request"))
3129 .cloned()
3130 {
3131 if let Some(id) = request.get("id").and_then(Value::as_str) {
3132 projection.questions.insert(id.to_string(), request);
3133 }
3134 if let Some(session) = projection.sessions.get_mut(session_id) {
3135 session.status = "busy".to_string();
3136 }
3137 }
3138 }
3139 "_sandboxagent/opencode/question_replied" => {
3140 if let Some(request_id) = payload
3141 .get("params")
3142 .and_then(|params| params.get("requestID"))
3143 .and_then(Value::as_str)
3144 {
3145 projection.questions.remove(request_id);
3146 }
3147 }
3148 "_sandboxagent/opencode/question_rejected" => {
3149 if let Some(request_id) = payload
3150 .get("params")
3151 .and_then(|params| params.get("requestID"))
3152 .and_then(Value::as_str)
3153 {
3154 projection.questions.remove(request_id);
3155 }
3156 }
3157 _ => {}
3158 }
3159}
3160
3161fn upsert_message(session: &mut SessionState, info: Value, parts: Vec<Value>) {
3162 let message_id = info.get("id").and_then(Value::as_str).unwrap_or_default();
3163 if let Some(existing) = session
3164 .messages
3165 .iter_mut()
3166 .find(|message| message.info.get("id").and_then(Value::as_str) == Some(message_id))
3167 {
3168 if let (Some(existing_obj), Some(new_obj)) =
3172 (existing.info.as_object_mut(), info.as_object())
3173 {
3174 for (key, value) in new_obj {
3175 existing_obj.insert(key.clone(), value.clone());
3176 }
3177 } else {
3178 existing.info = info;
3179 }
3180 for part in parts {
3181 let part_id = part.get("id").and_then(Value::as_str).unwrap_or_default();
3182 if let Some(existing_part) = existing
3183 .parts
3184 .iter_mut()
3185 .find(|candidate| candidate.get("id").and_then(Value::as_str) == Some(part_id))
3186 {
3187 *existing_part = part;
3188 } else {
3189 existing.parts.push(part);
3190 }
3191 }
3192 return;
3193 }
3194
3195 session.messages.push(MessageRecord { info, parts });
3196}
3197
3198fn provider_payload(state: &Arc<AdapterState>) -> Value {
3199 if let Some(payload) = state.config.provider_payload.as_ref() {
3202 return payload.clone();
3203 }
3204
3205 let mock_model = model_entry("mock", "Mock", "Mock", true, true, true, true, 8192, 4096);
3207 let amp_model = model_entry(
3208 "smart", "Smart", "Amp", false, false, true, true, 8192, 4096,
3209 );
3210 let claude_default = model_entry(
3211 "default",
3212 "Default (recommended)",
3213 "Claude",
3214 false,
3215 false,
3216 true,
3217 true,
3218 200_000,
3219 8_192,
3220 );
3221 let claude_sonnet = model_entry(
3222 "sonnet", "Sonnet", "Claude", false, false, true, true, 200_000, 8_192,
3223 );
3224 let codex_default = model_entry(
3225 "gpt-5", "GPT-5", "Codex", true, true, true, true, 200_000, 16_384,
3226 );
3227
3228 json!({
3229 "all": [
3230 {
3231 "id": "mock",
3232 "name": "Mock",
3233 "env": [],
3234 "models": { "mock": mock_model },
3235 },
3236 {
3237 "id": "amp",
3238 "name": "Amp",
3239 "env": [],
3240 "models": { "smart": amp_model },
3241 }
3242 ,
3243 {
3244 "id": "claude",
3245 "name": "Claude",
3246 "env": [],
3247 "models": {
3248 "default": claude_default,
3249 "sonnet": claude_sonnet,
3250 },
3251 },
3252 {
3253 "id": "codex",
3254 "name": "Codex",
3255 "env": [],
3256 "models": { "gpt-5": codex_default },
3257 }
3258 ],
3259 "default": {
3260 "mock": "mock",
3261 "amp": "smart",
3262 "claude": "default",
3263 "codex": "gpt-5",
3264 },
3265 "connected": ["mock", "amp", "claude", "codex"],
3266 })
3267}
3268
3269fn model_entry(
3270 id: &str,
3271 name: &str,
3272 family: &str,
3273 attachment: bool,
3274 reasoning: bool,
3275 temperature: bool,
3276 tool_call: bool,
3277 context: i64,
3278 output: i64,
3279) -> Value {
3280 json!({
3281 "id": id,
3282 "name": name,
3283 "family": family,
3284 "release_date": "1970-01-01",
3285 "attachment": attachment,
3286 "reasoning": reasoning,
3287 "temperature": temperature,
3288 "tool_call": tool_call,
3289 "limit": {
3290 "context": context,
3291 "output": output,
3292 },
3293 "options": {},
3294 })
3295}
3296
3297fn build_user_message(
3298 session_id: &str,
3299 message_id: &str,
3300 now: i64,
3301 agent: &str,
3302 provider_id: &str,
3303 model_id: &str,
3304 system: Option<&str>,
3305) -> Value {
3306 let mut value = json!({
3307 "id": message_id,
3308 "sessionID": session_id,
3309 "role": "user",
3310 "time": {"created": now, "completed": now},
3311 "agent": agent,
3312 "model": {
3313 "providerID": provider_id,
3314 "modelID": model_id,
3315 },
3316 });
3317
3318 if let Some(system) = system {
3319 if let Some(obj) = value.as_object_mut() {
3320 obj.insert("system".to_string(), json!(system));
3321 }
3322 }
3323
3324 value
3325}
3326
3327fn build_assistant_message(
3328 session_id: &str,
3329 message_id: &str,
3330 parent_id: &str,
3331 now: i64,
3332 directory: &str,
3333 agent: &str,
3334 provider_id: &str,
3335 model_id: &str,
3336) -> Value {
3337 json!({
3338 "id": message_id,
3339 "sessionID": session_id,
3340 "role": "assistant",
3341 "time": {"created": now},
3342 "parentID": parent_id,
3343 "modelID": model_id,
3344 "providerID": provider_id,
3345 "mode": "default",
3346 "agent": agent,
3347 "finish": "stop",
3348 "path": {
3349 "cwd": directory,
3350 "root": directory,
3351 },
3352 "cost": 0,
3353 "tokens": {
3354 "input": 0,
3355 "output": 0,
3356 "reasoning": 0,
3357 "cache": {"read": 0, "write": 0},
3358 },
3359 })
3360}
3361
3362fn build_completed_assistant_message(
3364 session_id: &str,
3365 message_id: &str,
3366 parent_id: &str,
3367 now: i64,
3368 directory: &str,
3369 agent: &str,
3370 provider_id: &str,
3371 model_id: &str,
3372) -> Value {
3373 json!({
3374 "id": message_id,
3375 "sessionID": session_id,
3376 "role": "assistant",
3377 "time": {"created": now, "completed": now},
3378 "parentID": parent_id,
3379 "modelID": model_id,
3380 "providerID": provider_id,
3381 "mode": "default",
3382 "agent": agent,
3383 "finish": "stop",
3384 "path": {
3385 "cwd": directory,
3386 "root": directory,
3387 },
3388 "cost": 0,
3389 "tokens": {
3390 "input": 0,
3391 "output": 0,
3392 "reasoning": 0,
3393 "cache": {"read": 0, "write": 0},
3394 },
3395 })
3396}
3397
3398fn message_event(event_type: &str, message: &Value) -> Value {
3402 let session_id = message
3403 .get("sessionID")
3404 .and_then(Value::as_str)
3405 .map(|v| v.to_string());
3406 let mut props = serde_json::Map::new();
3407 props.insert("info".to_string(), message.clone());
3408 if let Some(session_id) = session_id {
3409 props.insert("sessionID".to_string(), json!(session_id));
3410 }
3411 json!({
3412 "type": event_type,
3413 "properties": Value::Object(props),
3414 })
3415}
3416
3417fn normalize_parts(session_id: &str, message_id: &str, input: &[Value]) -> Vec<Value> {
3418 input
3419 .iter()
3420 .enumerate()
3421 .map(|(index, part)| {
3422 let id = part
3423 .get("id")
3424 .and_then(Value::as_str)
3425 .map(ToOwned::to_owned)
3426 .unwrap_or_else(|| format!("part_{}_{}", message_id, index));
3427
3428 if let Some(text) = part.get("text").and_then(Value::as_str) {
3429 json!({
3430 "id": id,
3431 "sessionID": session_id,
3432 "messageID": message_id,
3433 "type": "text",
3434 "text": text,
3435 })
3436 } else {
3437 let mut cloned = part.clone();
3438 if let Some(obj) = cloned.as_object_mut() {
3439 obj.insert("id".to_string(), json!(id));
3440 obj.insert("sessionID".to_string(), json!(session_id));
3441 obj.insert("messageID".to_string(), json!(message_id));
3442 }
3443 cloned
3444 }
3445 })
3446 .collect()
3447}
3448
3449fn session_to_value(meta: &SessionMeta) -> Value {
3450 let mut value = json!({
3451 "id": meta.id,
3452 "slug": meta.slug,
3453 "projectID": meta.project_id,
3454 "directory": meta.directory,
3455 "title": meta.title,
3456 "version": meta.version,
3457 "time": {
3458 "created": meta.created_at,
3459 "updated": meta.updated_at,
3460 },
3461 "agent": meta.agent,
3463 "model": meta.model_id,
3464 "providerID": meta.provider_id,
3465 });
3466
3467 if let Some(parent_id) = &meta.parent_id {
3468 if let Some(obj) = value.as_object_mut() {
3469 obj.insert("parentID".to_string(), json!(parent_id));
3470 }
3471 }
3472
3473 if let Some(share_url) = &meta.share_url {
3474 if let Some(obj) = value.as_object_mut() {
3475 obj.insert("share".to_string(), json!({"url": share_url}));
3476 }
3477 }
3478
3479 if let Some(permission_mode) = &meta.permission_mode {
3480 if let Some(obj) = value.as_object_mut() {
3481 obj.insert("permissionMode".to_string(), json!(permission_mode));
3482 }
3483 }
3484
3485 value
3486}
3487
3488fn provider_to_agent(provider_id: &str) -> String {
3489 match provider_id {
3490 "amp" => "amp".to_string(),
3491 "codex" => "codex".to_string(),
3492 "claude" => "claude".to_string(),
3493 "opencode" => "opencode".to_string(),
3494 _ => "mock".to_string(),
3495 }
3496}
3497
3498#[derive(Debug, Clone)]
3499struct RequestedSelection {
3500 provider_id: String,
3501 model_id: String,
3502 agent: String,
3503}
3504
3505fn prompt_has_explicit_model_selection(body: &PromptBody) -> bool {
3506 body.model.is_some() || body.provider_id.is_some() || body.model_id.is_some()
3507}
3508
3509fn resolve_selection_from_prompt(body: &PromptBody) -> Option<RequestedSelection> {
3510 let mut provider_id = body.provider_id.clone().or_else(|| {
3511 body.model
3512 .as_ref()
3513 .and_then(|model| model.provider_id.clone())
3514 });
3515 let mut model_id = body
3516 .model_id
3517 .clone()
3518 .or_else(|| body.model.as_ref().and_then(|model| model.model_id.clone()));
3519
3520 if provider_id.is_none() {
3521 if let Some(agent) = body.agent.as_deref() {
3522 if let Some((default_provider, default_model)) = default_for_agent(agent) {
3523 provider_id = Some(default_provider.to_string());
3524 if model_id.is_none() {
3525 model_id = Some(default_model.to_string());
3526 }
3527 }
3528 }
3529 }
3530
3531 if provider_id.is_none() {
3532 if let Some(model) = model_id.as_deref() {
3533 provider_id = provider_for_model(model).map(ToOwned::to_owned);
3534 }
3535 }
3536
3537 if model_id.is_none() {
3538 if let Some(provider) = provider_id.as_deref() {
3539 model_id = default_model_for_provider(provider).map(ToOwned::to_owned);
3540 }
3541 }
3542
3543 let provider_id = provider_id?;
3544 let model_id = model_id?;
3545 Some(RequestedSelection {
3546 agent: provider_to_agent(&provider_id),
3547 provider_id,
3548 model_id,
3549 })
3550}
3551
3552fn default_model_for_provider(provider_id: &str) -> Option<&'static str> {
3553 match provider_id {
3554 "mock" => Some("mock"),
3555 "amp" => Some("smart"),
3556 "claude" => Some("default"),
3557 "codex" => Some("gpt-5"),
3558 _ => None,
3559 }
3560}
3561
3562fn provider_for_model(model_id: &str) -> Option<&'static str> {
3563 match model_id {
3564 "mock" => Some("mock"),
3565 "smart" | "rush" | "deep" | "free" => Some("amp"),
3566 _ if model_id.starts_with("amp-") => Some("amp"),
3567 "default" | "sonnet" | "haiku" | "opus" => Some("claude"),
3568 _ if model_id.starts_with("claude-") => Some("claude"),
3569 _ if model_id.starts_with("gpt-") => Some("codex"),
3570 _ if model_id.contains('/') => Some("opencode"),
3571 _ if model_id.starts_with("opencode/") => Some("opencode"),
3572 _ => None,
3573 }
3574}
3575
3576fn default_for_agent(agent: &str) -> Option<(&'static str, &'static str)> {
3577 match agent {
3578 "mock" => Some(("mock", "mock")),
3579 "amp" => Some(("amp", "smart")),
3580 "claude" => Some(("claude", "default")),
3581 "codex" => Some(("codex", "gpt-5")),
3582 _ => None,
3583 }
3584}
3585
3586fn build_replay_text(events: &[Value], max_chars: usize) -> Option<String> {
3587 if events.is_empty() {
3588 return None;
3589 }
3590
3591 let prefix = "Previous session history is replayed below as JSON-RPC envelopes. Use it as context before responding to the latest user prompt.\n";
3592 let mut text = prefix.to_string();
3593
3594 for event in events {
3595 let line = serde_json::to_string(event).unwrap_or_else(|_| "{}".to_string());
3596 if text.len() + line.len() + 1 > max_chars {
3597 text.push_str("\n[history truncated]");
3598 break;
3599 }
3600 text.push_str(&line);
3601 text.push('\n');
3602 }
3603
3604 Some(text)
3605}
3606
3607fn parse_last_event_id(headers: &HeaderMap) -> Option<u64> {
3608 headers
3609 .get("last-event-id")
3610 .and_then(|value| value.to_str().ok())
3611 .and_then(|value| value.trim().parse::<u64>().ok())
3612}
3613
3614fn resolve_directory(headers: &HeaderMap, query_directory: Option<&String>) -> String {
3615 if let Some(value) = query_directory {
3616 return value.clone();
3617 }
3618
3619 if let Ok(value) = std::env::var("OPENCODE_COMPAT_DIRECTORY") {
3620 if !value.trim().is_empty() {
3621 return value;
3622 }
3623 }
3624
3625 if let Some(value) = headers
3626 .get("x-opencode-directory")
3627 .and_then(|v| v.to_str().ok())
3628 {
3629 if !value.trim().is_empty() {
3630 return value.to_string();
3631 }
3632 }
3633
3634 std::env::current_dir()
3635 .ok()
3636 .and_then(|path| path.to_str().map(ToOwned::to_owned))
3637 .unwrap_or_else(|| "/".to_string())
3638}
3639
3640fn now_ms() -> i64 {
3641 SystemTime::now()
3642 .duration_since(UNIX_EPOCH)
3643 .map(|duration| duration.as_millis() as i64)
3644 .unwrap_or(0)
3645}
3646
3647fn runtime_unique_seed() -> u64 {
3648 let nanos = SystemTime::now()
3649 .duration_since(UNIX_EPOCH)
3650 .map(|duration| duration.as_nanos() as u64)
3651 .unwrap_or(0);
3652 nanos ^ ((std::process::id() as u64) << 32)
3653}
3654
3655async fn acp_sse_translation_task(
3661 state: Arc<AdapterState>,
3662 mut stream: AcpPayloadStream,
3663 session_id: String,
3664 directory: String,
3665 agent: String,
3666 provider_id: String,
3667 model_id: String,
3668) {
3669 tracing::info!(session_id = %session_id, agent = %agent, "ACP SSE translation task started");
3670
3671 let mut assistant_message_id: Option<String> = None;
3673 let mut part_counter: u64 = 0;
3674 let mut text_accum = String::new();
3676 let mut text_part_id: Option<String> = None;
3677
3678 while let Some(payload) = stream.next().await {
3679 let method = payload.get("method").and_then(Value::as_str);
3681 let has_result = payload.get("result").is_some();
3682 let has_error = payload.get("error").is_some();
3683 let jsonrpc_id = payload.get("id").cloned();
3684
3685 tracing::debug!(
3686 session_id = %session_id,
3687 method = ?method,
3688 has_result,
3689 has_error,
3690 "ACP SSE event received"
3691 );
3692
3693 match method {
3694 Some("session/update") => {
3696 if assistant_message_id.is_none() {
3701 let user_id = state
3704 .last_user_message_id
3705 .lock()
3706 .await
3707 .get(&*session_id)
3708 .cloned()
3709 .unwrap_or_else(|| state.next_id("msg_"));
3710 assistant_message_id = Some(format!("{user_id}_assistant"));
3711 }
3712 let msg_id = assistant_message_id.as_deref().unwrap();
3713 let params = payload.get("params").cloned().unwrap_or(json!({}));
3714 translate_session_update(
3715 &state,
3716 &session_id,
3717 msg_id,
3718 &mut part_counter,
3719 &mut text_accum,
3720 &mut text_part_id,
3721 &directory,
3722 &agent,
3723 &provider_id,
3724 &model_id,
3725 ¶ms,
3726 )
3727 .await;
3728 }
3729
3730 Some("session/request_permission") => {
3732 let request_id = state.next_id("perm_");
3733 let params = payload.get("params").cloned().unwrap_or(json!({}));
3734 let permission_request = json!({
3735 "id": request_id,
3736 "sessionID": session_id,
3737 "permission": params.get("permission").and_then(Value::as_str).unwrap_or("execute"),
3738 "patterns": params.get("patterns").cloned().unwrap_or(json!(["*"])),
3739 "metadata": params.get("metadata").cloned().unwrap_or(json!({})),
3740 "always": [],
3741 });
3742
3743 if let Some(jrpc_id) = jsonrpc_id {
3745 state.acp_request_ids.lock().await.insert(
3746 request_id.clone(),
3747 AcpPendingRequest {
3748 opencode_session_id: session_id.clone(),
3749 jsonrpc_id: jrpc_id,
3750 kind: AcpPendingKind::Permission,
3751 },
3752 );
3753 }
3754
3755 let asked = json!({
3756 "jsonrpc":"2.0",
3757 "method":"_sandboxagent/opencode/permission_asked",
3758 "params":{"request": permission_request}
3759 });
3760 if let Err(err) = state.persist_event(&session_id, "agent", &asked).await {
3761 warn!(?err, "failed to persist permission_asked event");
3762 }
3763 state
3764 .emit_event(json!({"type":"permission.asked","properties":permission_request}));
3765 }
3766
3767 Some("_sandboxagent/session/request_question") => {
3769 let request_id = state.next_id("q_");
3770 let params = payload.get("params").cloned().unwrap_or(json!({}));
3771 let question_request = json!({
3772 "id": request_id,
3773 "sessionID": session_id,
3774 "questions": params.get("questions").cloned().unwrap_or(json!([])),
3775 });
3776
3777 if let Some(jrpc_id) = jsonrpc_id {
3778 state.acp_request_ids.lock().await.insert(
3779 request_id.clone(),
3780 AcpPendingRequest {
3781 opencode_session_id: session_id.clone(),
3782 jsonrpc_id: jrpc_id,
3783 kind: AcpPendingKind::Question,
3784 },
3785 );
3786 }
3787
3788 let asked = json!({
3789 "jsonrpc":"2.0",
3790 "method":"_sandboxagent/opencode/question_asked",
3791 "params":{"request": question_request}
3792 });
3793 if let Err(err) = state.persist_event(&session_id, "agent", &asked).await {
3794 warn!(?err, "failed to persist question_asked event");
3795 }
3796 state.emit_event(json!({"type":"question.asked","properties":question_request}));
3797 }
3798
3799 Some("_sandboxagent/session/ended") => {
3801 let params = payload.get("params").cloned().unwrap_or(json!({}));
3802 let reason = params
3803 .get("reason")
3804 .and_then(Value::as_str)
3805 .unwrap_or("unknown");
3806 let error_message = params
3807 .get("message")
3808 .and_then(Value::as_str)
3809 .unwrap_or(reason);
3810
3811 state.emit_event(json!({
3812 "type":"session.error",
3813 "properties":{
3814 "sessionID": session_id,
3815 "error": {"name":"AgentError","data":{"message": error_message}}
3816 }
3817 }));
3818 let _ = set_session_status(&state, &session_id, "idle").await;
3819 break;
3820 }
3821
3822 None if (has_result || has_error) && assistant_message_id.is_some() => {
3828 if has_error {
3830 let error_msg = payload
3831 .pointer("/error/message")
3832 .and_then(Value::as_str)
3833 .unwrap_or("agent error");
3834 state.emit_event(json!({
3835 "type":"session.error",
3836 "properties":{
3837 "sessionID": session_id,
3838 "error": {"name":"AgentError","data":{"message": error_msg}}
3839 }
3840 }));
3841 }
3842
3843 if let Some(tid) = text_part_id.take() {
3845 let msg_id = assistant_message_id.as_deref().unwrap_or("");
3846 let part = json!({
3847 "id": tid,
3848 "sessionID": session_id,
3849 "messageID": msg_id,
3850 "type": "text",
3851 "text": text_accum,
3852 });
3853 let env = json!({
3854 "jsonrpc":"2.0",
3855 "method":"_sandboxagent/opencode/message",
3856 "params":{"message":{"info":{"id": msg_id},"parts":[part]}}
3857 });
3858 if let Err(err) = state.persist_event(&session_id, "agent", &env).await {
3859 warn!(?err, "failed to persist ACP text part at turn end");
3860 }
3861 text_accum.clear();
3862 }
3863
3864 if let Some(msg_id) = assistant_message_id.as_ref() {
3866 let parent_id = state
3867 .last_user_message_id
3868 .lock()
3869 .await
3870 .get(&*session_id)
3871 .cloned()
3872 .unwrap_or_default();
3873 let now = now_ms();
3874 let info = build_completed_assistant_message(
3875 &session_id,
3876 msg_id,
3877 &parent_id,
3878 now,
3879 &directory,
3880 &agent,
3881 &provider_id,
3882 &model_id,
3883 );
3884 state.emit_event(message_event("message.updated", &info));
3885 }
3886
3887 let _ = set_session_status(&state, &session_id, "idle").await;
3888
3889 assistant_message_id = None;
3891 part_counter = 0;
3892 }
3893
3894 _ => {
3895 tracing::info!(
3896 session_id = %session_id,
3897 method = ?method,
3898 "ACP SSE: unhandled event"
3899 );
3900 }
3901 }
3902 }
3903}
3904
3905async fn translate_session_update(
3913 state: &Arc<AdapterState>,
3914 session_id: &str,
3915 message_id: &str,
3916 part_counter: &mut u64,
3917 text_accum: &mut String,
3918 text_part_id: &mut Option<String>,
3919 directory: &str,
3920 agent: &str,
3921 provider_id: &str,
3922 model_id: &str,
3923 params: &Value,
3924) {
3925 let update = params.get("update").unwrap_or(params);
3927 let kind = update
3928 .get("sessionUpdate")
3929 .and_then(Value::as_str)
3930 .unwrap_or("");
3931
3932 if *part_counter == 0
3934 && matches!(
3935 kind,
3936 "agent_message_chunk" | "agent_thought_chunk" | "tool_call"
3937 )
3938 {
3939 let parent_id = state
3940 .last_user_message_id
3941 .lock()
3942 .await
3943 .get(session_id)
3944 .cloned()
3945 .unwrap_or_default();
3946 let now = now_ms();
3947 let info = build_assistant_message(
3948 session_id,
3949 message_id,
3950 &parent_id,
3951 now,
3952 directory,
3953 agent,
3954 provider_id,
3955 model_id,
3956 );
3957 state.emit_event(message_event("message.updated", &info));
3958 let env = json!({
3961 "jsonrpc":"2.0",
3962 "method":"_sandboxagent/opencode/message",
3963 "params":{"message":{"info": info, "parts":[]}}
3964 });
3965 if let Err(err) = state.persist_event(session_id, "agent", &env).await {
3966 warn!(?err, "failed to persist assistant message info");
3967 }
3968 }
3969
3970 match kind {
3971 "agent_message_chunk" | "agent_thought_chunk" => {
3973 let chunk = update
3975 .pointer("/content/text")
3976 .and_then(Value::as_str)
3977 .unwrap_or("");
3978 if chunk.is_empty() {
3979 return;
3980 }
3981
3982 text_accum.push_str(chunk);
3985 let part_id = text_part_id.get_or_insert_with(|| {
3986 let id = format!("part_{message_id}_{part_counter}");
3987 *part_counter += 1;
3988 id
3989 });
3990 let part = json!({
3991 "id": *part_id,
3992 "sessionID": session_id,
3993 "messageID": message_id,
3994 "type": "text",
3995 "text": *text_accum,
3996 });
3997 state.emit_event(json!({
3998 "type":"message.part.updated",
3999 "properties":{
4000 "sessionID": session_id,
4001 "messageID": message_id,
4002 "part": part,
4003 "delta": chunk
4004 }
4005 }));
4006 }
4007
4008 "tool_call" => {
4010 if let Some(tid) = text_part_id.take() {
4012 let part = json!({
4013 "id": tid,
4014 "sessionID": session_id,
4015 "messageID": message_id,
4016 "type": "text",
4017 "text": *text_accum,
4018 });
4019 let env = json!({
4020 "jsonrpc":"2.0",
4021 "method":"_sandboxagent/opencode/message",
4022 "params":{"message":{"info":{"id": message_id},"parts":[part]}}
4023 });
4024 if let Err(err) = state.persist_event(session_id, "agent", &env).await {
4025 warn!(?err, "failed to persist ACP text part");
4026 }
4027 text_accum.clear();
4028 }
4029 let call_id = update
4030 .get("toolCallId")
4031 .and_then(Value::as_str)
4032 .unwrap_or("unknown");
4033 let tool_title = update
4034 .get("title")
4035 .and_then(Value::as_str)
4036 .unwrap_or("unknown");
4037 let part_id = format!("part_{message_id}_{part_counter}");
4038 *part_counter += 1;
4039 let now = now_ms();
4040 let part = json!({
4041 "id": part_id,
4042 "sessionID": session_id,
4043 "messageID": message_id,
4044 "type": "tool",
4045 "callID": call_id,
4046 "tool": tool_title,
4047 "state": {
4048 "status": "running",
4049 "input": update.get("rawInput").cloned().unwrap_or(json!({})),
4050 "title": tool_title,
4051 "metadata": {},
4052 "time": {"start": now}
4053 }
4054 });
4055 let env = json!({
4056 "jsonrpc":"2.0",
4057 "method":"_sandboxagent/opencode/message",
4058 "params":{"message":{"info":{"id": message_id},"parts":[part.clone()]}}
4059 });
4060 if let Err(err) = state.persist_event(session_id, "agent", &env).await {
4061 warn!(?err, "failed to persist ACP tool call event");
4062 }
4063 state.emit_event(json!({
4064 "type":"message.part.updated",
4065 "properties":{
4066 "sessionID": session_id,
4067 "messageID": message_id,
4068 "part": part
4069 }
4070 }));
4071 }
4072
4073 "tool_call_update" => {
4075 let call_id = update
4076 .get("toolCallId")
4077 .and_then(Value::as_str)
4078 .unwrap_or("unknown");
4079 let status = update
4080 .get("status")
4081 .and_then(Value::as_str)
4082 .unwrap_or("completed");
4083 let output = update
4084 .get("content")
4085 .and_then(|v| v.as_array())
4086 .and_then(|arr| {
4087 arr.iter()
4088 .filter_map(|c| c.get("text").and_then(Value::as_str))
4089 .next()
4090 })
4091 .unwrap_or("");
4092 let now = now_ms();
4093 let part = json!({
4094 "id": format!("part_tc_{call_id}"),
4095 "sessionID": session_id,
4096 "messageID": message_id,
4097 "type": "tool",
4098 "callID": call_id,
4099 "state": {
4100 "status": status,
4101 "output": output,
4102 "time": {"end": now}
4103 }
4104 });
4105 state.emit_event(json!({
4106 "type":"message.part.updated",
4107 "properties":{
4108 "sessionID": session_id,
4109 "messageID": message_id,
4110 "part": part
4111 }
4112 }));
4113 }
4114
4115 _ => {
4116 tracing::debug!(
4117 session_id = %session_id,
4118 kind = %kind,
4119 "translate_session_update: unhandled sessionUpdate kind"
4120 );
4121 }
4122 }
4123}
4124
4125fn normalize_proxy_base_url(value: String) -> Option<String> {
4126 let trimmed = value.trim();
4127 if trimmed.is_empty() {
4128 return None;
4129 }
4130 let normalized = trimmed.trim_end_matches('/').to_string();
4131 if normalized.starts_with("http://") || normalized.starts_with("https://") {
4132 Some(normalized)
4133 } else {
4134 None
4135 }
4136}
4137
4138async fn resolve_proxy_base_url(state: &Arc<AdapterState>, path: &str) -> Option<String> {
4139 if let Some(base_url) = state.config.native_proxy_base_url.as_ref() {
4140 return Some(base_url.clone());
4141 }
4142
4143 let manager = state.config.native_proxy_manager.as_ref()?;
4144 match manager.ensure_server().await {
4145 Ok(base_url) => Some(base_url),
4146 Err(err) => {
4147 warn!(path, error = ?err, "failed to lazily start native OpenCode sidecar");
4148 None
4149 }
4150 }
4151}
4152
4153async fn proxy_native_opencode(
4154 state: &Arc<AdapterState>,
4155 method: reqwest::Method,
4156 path: &str,
4157 headers: &HeaderMap,
4158 body: Option<Value>,
4159) -> Option<Response> {
4160 let base_url = resolve_proxy_base_url(state, path).await?;
4161
4162 let mut request = state
4163 .proxy_http_client
4164 .request(method, format!("{base_url}{path}"));
4165
4166 for header_name in [
4167 header::AUTHORIZATION,
4168 header::ACCEPT,
4169 HeaderName::from_static("x-opencode-directory"),
4170 ] {
4171 if let Some(value) = headers.get(&header_name) {
4172 request = request.header(header_name.as_str(), value.as_bytes());
4173 }
4174 }
4175
4176 if let Some(body) = body {
4177 request = request.json(&body);
4178 }
4179
4180 let response = match request.send().await {
4181 Ok(response) => response,
4182 Err(err) => {
4183 warn!(path, error = ?err, "failed proxy request to native OpenCode; falling back to adapter response");
4184 return None;
4187 }
4188 };
4189
4190 let status =
4191 StatusCode::from_u16(response.status().as_u16()).unwrap_or(StatusCode::BAD_GATEWAY);
4192 let content_type = response
4193 .headers()
4194 .get(reqwest::header::CONTENT_TYPE)
4195 .and_then(|value| value.to_str().ok())
4196 .map(|value| value.to_string());
4197 let body_bytes = match response.bytes().await {
4198 Ok(bytes) => bytes,
4199 Err(err) => {
4200 warn!(path, error = ?err, "failed to read proxied response body");
4201 return Some(
4202 (
4203 StatusCode::BAD_GATEWAY,
4204 Json(json!({
4205 "data": {},
4206 "errors": [{"message": format!("failed to read proxied response: {err}")}],
4207 "success": false,
4208 })),
4209 )
4210 .into_response(),
4211 );
4212 }
4213 };
4214
4215 let mut proxied = Response::new(Body::from(body_bytes));
4216 *proxied.status_mut() = status;
4217 if let Some(content_type) = content_type {
4218 if let Ok(header_value) = HeaderValue::from_str(&content_type) {
4219 proxied
4220 .headers_mut()
4221 .insert(header::CONTENT_TYPE, header_value);
4222 }
4223 }
4224
4225 Some(proxied)
4226}
4227
4228async fn proxy_native_opencode_json(
4229 state: &Arc<AdapterState>,
4230 method: reqwest::Method,
4231 path: &str,
4232 headers: &HeaderMap,
4233 body: Option<Value>,
4234) -> Option<Result<(StatusCode, Value), Response>> {
4235 let base_url = resolve_proxy_base_url(state, path).await?;
4236
4237 let mut request = state
4238 .proxy_http_client
4239 .request(method, format!("{base_url}{path}"));
4240
4241 for header_name in [
4242 header::AUTHORIZATION,
4243 header::ACCEPT,
4244 HeaderName::from_static("x-opencode-directory"),
4245 ] {
4246 if let Some(value) = headers.get(&header_name) {
4247 request = request.header(header_name.as_str(), value.as_bytes());
4248 }
4249 }
4250
4251 if let Some(body) = body {
4252 request = request.json(&body);
4253 }
4254
4255 let response = match request.send().await {
4256 Ok(response) => response,
4257 Err(err) => {
4258 warn!(path, error = ?err, "failed proxy request to native OpenCode");
4259 return Some(Err((
4260 StatusCode::BAD_GATEWAY,
4261 Json(json!({
4262 "data": {},
4263 "errors": [{"message": format!("failed to proxy to native OpenCode: {err}")}],
4264 "success": false,
4265 })),
4266 )
4267 .into_response()));
4268 }
4269 };
4270
4271 let status =
4272 StatusCode::from_u16(response.status().as_u16()).unwrap_or(StatusCode::BAD_GATEWAY);
4273 let content_type = response
4274 .headers()
4275 .get(reqwest::header::CONTENT_TYPE)
4276 .and_then(|value| value.to_str().ok())
4277 .map(|value| value.to_string());
4278 let body_bytes = match response.bytes().await {
4279 Ok(bytes) => bytes,
4280 Err(err) => {
4281 warn!(path, error = ?err, "failed to read proxied response body");
4282 return Some(Err((
4283 StatusCode::BAD_GATEWAY,
4284 Json(json!({
4285 "data": {},
4286 "errors": [{"message": format!("failed to read proxied response: {err}")}],
4287 "success": false,
4288 })),
4289 )
4290 .into_response()));
4291 }
4292 };
4293
4294 if !status.is_success() {
4295 let mut proxied = Response::new(Body::from(body_bytes));
4296 *proxied.status_mut() = status;
4297 if let Some(content_type) = content_type {
4298 if let Ok(header_value) = HeaderValue::from_str(&content_type) {
4299 proxied
4300 .headers_mut()
4301 .insert(header::CONTENT_TYPE, header_value);
4302 }
4303 }
4304 return Some(Err(proxied));
4305 }
4306
4307 if body_bytes.is_empty() {
4308 warn!(
4309 path,
4310 "native OpenCode prompt proxy returned an empty success body; falling back to local compat"
4311 );
4312 return None;
4313 }
4314
4315 let payload = match serde_json::from_slice::<Value>(&body_bytes) {
4316 Ok(payload) => payload,
4317 Err(err) => {
4318 warn!(path, error = ?err, "failed to parse proxied JSON response");
4319 return Some(Err(
4320 (
4321 StatusCode::BAD_GATEWAY,
4322 Json(json!({
4323 "data": {},
4324 "errors": [{"message": format!("failed to parse proxied response as JSON: {err}")}],
4325 "success": false,
4326 })),
4327 )
4328 .into_response(),
4329 ));
4330 }
4331 };
4332
4333 Some(Ok((status, payload)))
4334}
4335
4336fn bool_ok(value: bool) -> (StatusCode, Json<Value>) {
4337 (StatusCode::OK, Json(json!(value)))
4338}
4339
4340fn bad_request(message: &str) -> Response {
4341 (
4342 StatusCode::BAD_REQUEST,
4343 Json(json!({"errors":[{"message": message}]})),
4344 )
4345 .into_response()
4346}
4347
4348fn not_found(message: &str) -> Response {
4349 (
4350 StatusCode::NOT_FOUND,
4351 Json(json!({"errors":[{"message": message}]})),
4352 )
4353 .into_response()
4354}
4355
4356fn internal_error(message: String) -> Response {
4357 warn!(?message, "opencode adapter internal error");
4358 (
4359 StatusCode::INTERNAL_SERVER_ERROR,
4360 Json(json!({"errors":[{"message": message}]})),
4361 )
4362 .into_response()
4363}