Skip to main content

codex_runtime/adapters/web/
adapter.rs

1use std::future::Future;
2use std::pin::Pin;
3
4use crate::plugin::PluginContractVersion;
5use crate::runtime::api::ThreadStartParams;
6use crate::runtime::approvals::ServerRequest;
7use crate::runtime::core::Runtime;
8use crate::runtime::errors::RuntimeError;
9use crate::runtime::events::Envelope;
10use crate::runtime::rpc_contract::methods as rpc_methods;
11use serde_json::Value;
12use tokio::sync::{broadcast, mpsc};
13
14use super::WebError;
15
16pub type WebAdapterFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
17
18#[derive(Debug)]
19pub struct WebRuntimeStreams {
20    pub request_rx: mpsc::Receiver<ServerRequest>,
21    pub live_rx: broadcast::Receiver<Envelope>,
22}
23
24pub trait WebPluginAdapter: Send + Sync {
25    fn plugin_contract_version(&self) -> PluginContractVersion {
26        PluginContractVersion::CURRENT
27    }
28
29    fn take_streams<'a>(&'a self) -> WebAdapterFuture<'a, Result<WebRuntimeStreams, WebError>>;
30    fn thread_start<'a>(
31        &'a self,
32        params: ThreadStartParams,
33    ) -> WebAdapterFuture<'a, Result<String, WebError>>;
34    fn thread_resume<'a>(
35        &'a self,
36        thread_id: &'a str,
37        params: ThreadStartParams,
38    ) -> WebAdapterFuture<'a, Result<String, WebError>>;
39    fn turn_start<'a>(
40        &'a self,
41        turn_params: Value,
42    ) -> WebAdapterFuture<'a, Result<Value, WebError>>;
43    fn thread_archive<'a>(
44        &'a self,
45        thread_id: &'a str,
46    ) -> WebAdapterFuture<'a, Result<(), WebError>>;
47    fn respond_approval_ok<'a>(
48        &'a self,
49        approval_id: &'a str,
50        result: Value,
51    ) -> WebAdapterFuture<'a, Result<(), WebError>>;
52    fn pending_approval_ids(&self) -> Vec<String>;
53}
54
55#[derive(Clone)]
56pub struct RuntimeWebAdapter {
57    runtime: Runtime,
58}
59
60impl RuntimeWebAdapter {
61    /// Create runtime-backed web adapter.
62    /// Allocation: none. Complexity: O(1).
63    pub fn new(runtime: Runtime) -> Self {
64        Self { runtime }
65    }
66}
67
68impl WebPluginAdapter for RuntimeWebAdapter {
69    fn take_streams<'a>(&'a self) -> WebAdapterFuture<'a, Result<WebRuntimeStreams, WebError>> {
70        Box::pin(async move {
71            let request_rx = self
72                .runtime
73                .take_server_request_rx()
74                .await
75                .map_err(map_take_stream_error)?;
76            let live_rx = self.runtime.subscribe_live();
77            Ok(WebRuntimeStreams {
78                request_rx,
79                live_rx,
80            })
81        })
82    }
83
84    fn thread_start<'a>(
85        &'a self,
86        params: ThreadStartParams,
87    ) -> WebAdapterFuture<'a, Result<String, WebError>> {
88        Box::pin(async move {
89            let thread = self
90                .runtime
91                .thread_start(params)
92                .await
93                .map_err(map_rpc_error)?;
94            Ok(thread.thread_id)
95        })
96    }
97
98    fn thread_resume<'a>(
99        &'a self,
100        thread_id: &'a str,
101        params: ThreadStartParams,
102    ) -> WebAdapterFuture<'a, Result<String, WebError>> {
103        Box::pin(async move {
104            let thread = self
105                .runtime
106                .thread_resume(thread_id, params)
107                .await
108                .map_err(map_rpc_error)?;
109            Ok(thread.thread_id)
110        })
111    }
112
113    fn turn_start<'a>(
114        &'a self,
115        turn_params: Value,
116    ) -> WebAdapterFuture<'a, Result<Value, WebError>> {
117        Box::pin(async move {
118            self.runtime
119                .call_raw(rpc_methods::TURN_START, turn_params)
120                .await
121                .map_err(map_rpc_error)
122        })
123    }
124
125    fn thread_archive<'a>(
126        &'a self,
127        thread_id: &'a str,
128    ) -> WebAdapterFuture<'a, Result<(), WebError>> {
129        Box::pin(async move {
130            self.runtime
131                .thread_archive(thread_id)
132                .await
133                .map_err(map_rpc_error)
134        })
135    }
136
137    fn respond_approval_ok<'a>(
138        &'a self,
139        approval_id: &'a str,
140        result: Value,
141    ) -> WebAdapterFuture<'a, Result<(), WebError>> {
142        Box::pin(async move {
143            self.runtime
144                .respond_approval_ok(approval_id, result)
145                .await
146                .map_err(map_runtime_error)
147        })
148    }
149
150    fn pending_approval_ids(&self) -> Vec<String> {
151        self.runtime
152            .state_snapshot()
153            .pending_server_requests
154            .values()
155            .map(|req| req.approval_id.clone())
156            .collect()
157    }
158}
159
160fn map_take_stream_error(err: RuntimeError) -> WebError {
161    match err {
162        RuntimeError::ServerRequestReceiverTaken => WebError::AlreadyBound,
163        other => map_runtime_error(other),
164    }
165}
166
167fn map_web_error(kind: &str, err: impl std::fmt::Display) -> WebError {
168    WebError::Internal(format!("{kind} error: {err}"))
169}
170
171fn map_runtime_error(err: impl std::fmt::Display) -> WebError {
172    map_web_error("runtime", err)
173}
174
175fn map_rpc_error(err: impl std::fmt::Display) -> WebError {
176    map_web_error("rpc", err)
177}