codex_runtime/adapters/web/
adapter.rs1use std::future::Future;
2use std::pin::Pin;
3
4use crate::plugin::PluginContractVersion;
5use crate::runtime::api::ThreadStartParams;
6use crate::runtime::approvals::ServerRequest;
7use crate::runtime::core::Runtime;
8use crate::runtime::errors::RuntimeError;
9use crate::runtime::events::Envelope;
10use crate::runtime::rpc_contract::methods as rpc_methods;
11use serde_json::Value;
12use tokio::sync::{broadcast, mpsc};
13
14use super::WebError;
15
16pub type WebAdapterFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
17
18#[derive(Debug)]
19pub struct WebRuntimeStreams {
20 pub request_rx: mpsc::Receiver<ServerRequest>,
21 pub live_rx: broadcast::Receiver<Envelope>,
22}
23
24pub trait WebPluginAdapter: Send + Sync {
25 fn plugin_contract_version(&self) -> PluginContractVersion {
26 PluginContractVersion::CURRENT
27 }
28
29 fn take_streams<'a>(&'a self) -> WebAdapterFuture<'a, Result<WebRuntimeStreams, WebError>>;
30 fn thread_start<'a>(
31 &'a self,
32 params: ThreadStartParams,
33 ) -> WebAdapterFuture<'a, Result<String, WebError>>;
34 fn thread_resume<'a>(
35 &'a self,
36 thread_id: &'a str,
37 params: ThreadStartParams,
38 ) -> WebAdapterFuture<'a, Result<String, WebError>>;
39 fn turn_start<'a>(
40 &'a self,
41 turn_params: Value,
42 ) -> WebAdapterFuture<'a, Result<Value, WebError>>;
43 fn thread_archive<'a>(
44 &'a self,
45 thread_id: &'a str,
46 ) -> WebAdapterFuture<'a, Result<(), WebError>>;
47 fn respond_approval_ok<'a>(
48 &'a self,
49 approval_id: &'a str,
50 result: Value,
51 ) -> WebAdapterFuture<'a, Result<(), WebError>>;
52 fn pending_approval_ids(&self) -> Vec<String>;
53}
54
55#[derive(Clone)]
56pub struct RuntimeWebAdapter {
57 runtime: Runtime,
58}
59
60impl RuntimeWebAdapter {
61 pub fn new(runtime: Runtime) -> Self {
64 Self { runtime }
65 }
66}
67
68impl WebPluginAdapter for RuntimeWebAdapter {
69 fn take_streams<'a>(&'a self) -> WebAdapterFuture<'a, Result<WebRuntimeStreams, WebError>> {
70 Box::pin(async move {
71 let request_rx = self
72 .runtime
73 .take_server_request_rx()
74 .await
75 .map_err(map_take_stream_error)?;
76 let live_rx = self.runtime.subscribe_live();
77 Ok(WebRuntimeStreams {
78 request_rx,
79 live_rx,
80 })
81 })
82 }
83
84 fn thread_start<'a>(
85 &'a self,
86 params: ThreadStartParams,
87 ) -> WebAdapterFuture<'a, Result<String, WebError>> {
88 Box::pin(async move {
89 let thread = self
90 .runtime
91 .thread_start(params)
92 .await
93 .map_err(map_rpc_error)?;
94 Ok(thread.thread_id)
95 })
96 }
97
98 fn thread_resume<'a>(
99 &'a self,
100 thread_id: &'a str,
101 params: ThreadStartParams,
102 ) -> WebAdapterFuture<'a, Result<String, WebError>> {
103 Box::pin(async move {
104 let thread = self
105 .runtime
106 .thread_resume(thread_id, params)
107 .await
108 .map_err(map_rpc_error)?;
109 Ok(thread.thread_id)
110 })
111 }
112
113 fn turn_start<'a>(
114 &'a self,
115 turn_params: Value,
116 ) -> WebAdapterFuture<'a, Result<Value, WebError>> {
117 Box::pin(async move {
118 self.runtime
119 .call_raw(rpc_methods::TURN_START, turn_params)
120 .await
121 .map_err(map_rpc_error)
122 })
123 }
124
125 fn thread_archive<'a>(
126 &'a self,
127 thread_id: &'a str,
128 ) -> WebAdapterFuture<'a, Result<(), WebError>> {
129 Box::pin(async move {
130 self.runtime
131 .thread_archive(thread_id)
132 .await
133 .map_err(map_rpc_error)
134 })
135 }
136
137 fn respond_approval_ok<'a>(
138 &'a self,
139 approval_id: &'a str,
140 result: Value,
141 ) -> WebAdapterFuture<'a, Result<(), WebError>> {
142 Box::pin(async move {
143 self.runtime
144 .respond_approval_ok(approval_id, result)
145 .await
146 .map_err(map_runtime_error)
147 })
148 }
149
150 fn pending_approval_ids(&self) -> Vec<String> {
151 self.runtime
152 .state_snapshot()
153 .pending_server_requests
154 .values()
155 .map(|req| req.approval_id.clone())
156 .collect()
157 }
158}
159
160fn map_take_stream_error(err: RuntimeError) -> WebError {
161 match err {
162 RuntimeError::ServerRequestReceiverTaken => WebError::AlreadyBound,
163 other => map_runtime_error(other),
164 }
165}
166
167fn map_web_error(kind: &str, err: impl std::fmt::Display) -> WebError {
168 WebError::Internal(format!("{kind} error: {err}"))
169}
170
171fn map_runtime_error(err: impl std::fmt::Display) -> WebError {
172 map_web_error("runtime", err)
173}
174
175fn map_rpc_error(err: impl std::fmt::Display) -> WebError {
176 map_web_error("rpc", err)
177}