Skip to main content

codex_runtime/adapters/web/
mod.rs

1use std::sync::atomic::{AtomicBool, Ordering};
2use std::sync::Arc;
3
4use crate::runtime::approvals::ServerRequest;
5use crate::runtime::core::Runtime;
6use crate::runtime::events::Envelope;
7use tokio::sync::{broadcast, RwLock};
8
9mod adapter;
10mod handlers;
11mod service;
12mod state;
13mod wire;
14
15#[cfg(test)]
16pub(crate) use adapter::WebAdapterFuture;
17pub use adapter::{RuntimeWebAdapter, WebPluginAdapter, WebRuntimeStreams};
18
19mod types;
20
21pub use types::{
22    ApprovalResponsePayload, CloseSessionResponse, CreateSessionRequest, CreateSessionResponse,
23    CreateTurnRequest, CreateTurnResponse, WebAdapterConfig, WebError,
24};
25
26#[derive(Clone)]
27pub struct WebAdapter {
28    adapter: Arc<dyn WebPluginAdapter>,
29    config: WebAdapterConfig,
30    state: Arc<RwLock<state::WebState>>,
31    background_tasks: Arc<BackgroundTasks>,
32}
33
34#[derive(Debug)]
35struct BackgroundTasks {
36    aborted: AtomicBool,
37    handles: Vec<tokio::task::AbortHandle>,
38}
39
40impl BackgroundTasks {
41    fn new(handles: Vec<tokio::task::AbortHandle>) -> Self {
42        Self {
43            aborted: AtomicBool::new(false),
44            handles,
45        }
46    }
47
48    fn abort_all(&self) {
49        if self.aborted.swap(true, Ordering::AcqRel) {
50            return;
51        }
52        for handle in &self.handles {
53            handle.abort();
54        }
55    }
56}
57
58impl WebAdapter {
59    pub async fn spawn(runtime: Runtime, config: WebAdapterConfig) -> Result<Self, WebError> {
60        let adapter: Arc<dyn WebPluginAdapter> = Arc::new(RuntimeWebAdapter::new(runtime));
61        Self::spawn_with_adapter(adapter, config).await
62    }
63
64    pub async fn spawn_with_adapter(
65        adapter: Arc<dyn WebPluginAdapter>,
66        config: WebAdapterConfig,
67    ) -> Result<Self, WebError> {
68        let streams = service::prepare_spawn(&adapter, &config).await?;
69        let state = Arc::new(RwLock::new(state::WebState::default()));
70        let handles =
71            service::spawn_routing_tasks(Arc::clone(&adapter), Arc::clone(&state), streams);
72        let background_tasks = Arc::new(BackgroundTasks::new(handles));
73
74        Ok(Self {
75            adapter,
76            config,
77            state,
78            background_tasks,
79        })
80    }
81
82    pub async fn create_session(
83        &self,
84        tenant_id: &str,
85        request: CreateSessionRequest,
86    ) -> Result<CreateSessionResponse, WebError> {
87        handlers::create_session(&self.adapter, &self.state, self.config, tenant_id, request).await
88    }
89
90    pub async fn create_turn(
91        &self,
92        tenant_id: &str,
93        session_id: &str,
94        request: CreateTurnRequest,
95    ) -> Result<CreateTurnResponse, WebError> {
96        handlers::create_turn(&self.adapter, &self.state, tenant_id, session_id, request).await
97    }
98
99    pub async fn close_session(
100        &self,
101        tenant_id: &str,
102        session_id: &str,
103    ) -> Result<CloseSessionResponse, WebError> {
104        handlers::close_session(&self.adapter, &self.state, tenant_id, session_id).await
105    }
106
107    pub async fn subscribe_session_events(
108        &self,
109        tenant_id: &str,
110        session_id: &str,
111    ) -> Result<broadcast::Receiver<Envelope>, WebError> {
112        handlers::subscribe_session_events(&self.state, tenant_id, session_id).await
113    }
114
115    pub async fn subscribe_session_approvals(
116        &self,
117        tenant_id: &str,
118        session_id: &str,
119    ) -> Result<broadcast::Receiver<ServerRequest>, WebError> {
120        handlers::subscribe_session_approvals(&self.state, tenant_id, session_id).await
121    }
122
123    pub async fn post_approval(
124        &self,
125        tenant_id: &str,
126        session_id: &str,
127        approval_id: &str,
128        payload: ApprovalResponsePayload,
129    ) -> Result<(), WebError> {
130        handlers::post_approval(
131            &self.adapter,
132            &self.state,
133            tenant_id,
134            session_id,
135            approval_id,
136            payload,
137        )
138        .await
139    }
140
141    #[cfg(test)]
142    pub(crate) async fn debug_server_request_route_miss_counts(&self) -> (u64, u64, u64) {
143        let guard = self.state.read().await;
144        let metrics = guard.server_request_route_miss;
145        (
146            metrics.missing_thread_id,
147            metrics.missing_session_mapping,
148            metrics.missing_approval_topic,
149        )
150    }
151
152    #[cfg(test)]
153    pub(crate) async fn debug_remove_approval_topic(&self, session_id: &str) {
154        self.state.write().await.approval_topics.remove(session_id);
155    }
156}
157
158pub fn new_session_id() -> String {
159    state::new_session_id()
160}
161
162pub fn serialize_sse_envelope(envelope: &Envelope) -> Result<String, WebError> {
163    wire::serialize_sse_envelope(envelope)
164}
165
166impl Drop for WebAdapter {
167    fn drop(&mut self) {
168        // Shared background tasks must stay alive while any clone is still in use.
169        if Arc::strong_count(&self.background_tasks) == 1 {
170            self.background_tasks.abort_all();
171        }
172    }
173}
174
175#[cfg(test)]
176mod tests;