codex_runtime/adapters/web/
mod.rs1use std::sync::atomic::{AtomicBool, Ordering};
2use std::sync::Arc;
3
4use crate::runtime::approvals::ServerRequest;
5use crate::runtime::core::Runtime;
6use crate::runtime::events::Envelope;
7use tokio::sync::{broadcast, RwLock};
8
9mod adapter;
10mod handlers;
11mod service;
12mod state;
13mod wire;
14
15#[cfg(test)]
16pub(crate) use adapter::WebAdapterFuture;
17pub use adapter::{RuntimeWebAdapter, WebPluginAdapter, WebRuntimeStreams};
18
19mod types;
20
21pub use types::{
22 ApprovalResponsePayload, CloseSessionResponse, CreateSessionRequest, CreateSessionResponse,
23 CreateTurnRequest, CreateTurnResponse, WebAdapterConfig, WebError,
24};
25
26#[derive(Clone)]
27pub struct WebAdapter {
28 adapter: Arc<dyn WebPluginAdapter>,
29 config: WebAdapterConfig,
30 state: Arc<RwLock<state::WebState>>,
31 background_tasks: Arc<BackgroundTasks>,
32}
33
34#[derive(Debug)]
35struct BackgroundTasks {
36 aborted: AtomicBool,
37 handles: Vec<tokio::task::AbortHandle>,
38}
39
40impl BackgroundTasks {
41 fn new(handles: Vec<tokio::task::AbortHandle>) -> Self {
42 Self {
43 aborted: AtomicBool::new(false),
44 handles,
45 }
46 }
47
48 fn abort_all(&self) {
49 if self.aborted.swap(true, Ordering::AcqRel) {
50 return;
51 }
52 for handle in &self.handles {
53 handle.abort();
54 }
55 }
56}
57
58impl WebAdapter {
59 pub async fn spawn(runtime: Runtime, config: WebAdapterConfig) -> Result<Self, WebError> {
60 let adapter: Arc<dyn WebPluginAdapter> = Arc::new(RuntimeWebAdapter::new(runtime));
61 Self::spawn_with_adapter(adapter, config).await
62 }
63
64 pub async fn spawn_with_adapter(
65 adapter: Arc<dyn WebPluginAdapter>,
66 config: WebAdapterConfig,
67 ) -> Result<Self, WebError> {
68 let streams = service::prepare_spawn(&adapter, &config).await?;
69 let state = Arc::new(RwLock::new(state::WebState::default()));
70 let handles =
71 service::spawn_routing_tasks(Arc::clone(&adapter), Arc::clone(&state), streams);
72 let background_tasks = Arc::new(BackgroundTasks::new(handles));
73
74 Ok(Self {
75 adapter,
76 config,
77 state,
78 background_tasks,
79 })
80 }
81
82 pub async fn create_session(
83 &self,
84 tenant_id: &str,
85 request: CreateSessionRequest,
86 ) -> Result<CreateSessionResponse, WebError> {
87 handlers::create_session(&self.adapter, &self.state, self.config, tenant_id, request).await
88 }
89
90 pub async fn create_turn(
91 &self,
92 tenant_id: &str,
93 session_id: &str,
94 request: CreateTurnRequest,
95 ) -> Result<CreateTurnResponse, WebError> {
96 handlers::create_turn(&self.adapter, &self.state, tenant_id, session_id, request).await
97 }
98
99 pub async fn close_session(
100 &self,
101 tenant_id: &str,
102 session_id: &str,
103 ) -> Result<CloseSessionResponse, WebError> {
104 handlers::close_session(&self.adapter, &self.state, tenant_id, session_id).await
105 }
106
107 pub async fn subscribe_session_events(
108 &self,
109 tenant_id: &str,
110 session_id: &str,
111 ) -> Result<broadcast::Receiver<Envelope>, WebError> {
112 handlers::subscribe_session_events(&self.state, tenant_id, session_id).await
113 }
114
115 pub async fn subscribe_session_approvals(
116 &self,
117 tenant_id: &str,
118 session_id: &str,
119 ) -> Result<broadcast::Receiver<ServerRequest>, WebError> {
120 handlers::subscribe_session_approvals(&self.state, tenant_id, session_id).await
121 }
122
123 pub async fn post_approval(
124 &self,
125 tenant_id: &str,
126 session_id: &str,
127 approval_id: &str,
128 payload: ApprovalResponsePayload,
129 ) -> Result<(), WebError> {
130 handlers::post_approval(
131 &self.adapter,
132 &self.state,
133 tenant_id,
134 session_id,
135 approval_id,
136 payload,
137 )
138 .await
139 }
140
141 #[cfg(test)]
142 pub(crate) async fn debug_server_request_route_miss_counts(&self) -> (u64, u64, u64) {
143 let guard = self.state.read().await;
144 let metrics = guard.server_request_route_miss;
145 (
146 metrics.missing_thread_id,
147 metrics.missing_session_mapping,
148 metrics.missing_approval_topic,
149 )
150 }
151
152 #[cfg(test)]
153 pub(crate) async fn debug_remove_approval_topic(&self, session_id: &str) {
154 self.state.write().await.approval_topics.remove(session_id);
155 }
156}
157
158pub fn new_session_id() -> String {
159 state::new_session_id()
160}
161
162pub fn serialize_sse_envelope(envelope: &Envelope) -> Result<String, WebError> {
163 wire::serialize_sse_envelope(envelope)
164}
165
166impl Drop for WebAdapter {
167 fn drop(&mut self) {
168 if Arc::strong_count(&self.background_tasks) == 1 {
170 self.background_tasks.abort_all();
171 }
172 }
173}
174
175#[cfg(test)]
176mod tests;