Skip to main content

codex_runtime/runtime/core/
approval.rs

1use serde_json::{json, Value};
2use tokio::sync::mpsc;
3
4use crate::runtime::approvals::ServerRequest;
5use crate::runtime::errors::{RpcErrorObject, RuntimeError};
6
7use super::dispatch::{send_rpc_error, send_rpc_result, validate_server_request_result_payload};
8use super::state_projection::state_remove_pending_server_request;
9use super::{PendingServerRequestEntry, Runtime};
10
11impl Runtime {
12    pub async fn take_server_request_rx(
13        &self,
14    ) -> Result<mpsc::Receiver<ServerRequest>, RuntimeError> {
15        self.inner
16            .io
17            .server_request_rx
18            .lock()
19            .await
20            .take()
21            .ok_or(RuntimeError::ServerRequestReceiverTaken)
22    }
23
24    pub async fn respond_approval_ok(
25        &self,
26        approval_id: &str,
27        result: Value,
28    ) -> Result<(), RuntimeError> {
29        let entry = self
30            .take_pending_server_request_entry(approval_id, |entry| {
31                validate_server_request_result_payload(&entry.method, &result)
32            })
33            .await?;
34        send_rpc_result(&self.inner, &entry.rpc_id, result).await
35    }
36
37    pub async fn respond_approval_err(
38        &self,
39        approval_id: &str,
40        err: RpcErrorObject,
41    ) -> Result<(), RuntimeError> {
42        let entry = self
43            .take_pending_server_request_entry(approval_id, |_| Ok(()))
44            .await?;
45        send_rpc_error(
46            &self.inner,
47            &entry.rpc_id,
48            json!({
49                "code": err.code,
50                "message": err.message,
51                "data": err.data
52            }),
53        )
54        .await
55    }
56
57    async fn take_pending_server_request_entry<F>(
58        &self,
59        approval_id: &str,
60        validate: F,
61    ) -> Result<PendingServerRequestEntry, RuntimeError>
62    where
63        F: FnOnce(&PendingServerRequestEntry) -> Result<(), RuntimeError>,
64    {
65        let entry = {
66            let mut guard = self.inner.io.pending_server_requests.lock().await;
67            let entry = guard
68                .get(approval_id)
69                .cloned()
70                .ok_or_else(|| approval_not_found_error(approval_id))?;
71            validate(&entry)?;
72            guard
73                .remove(approval_id)
74                .ok_or_else(|| approval_not_found_error(approval_id))?
75        };
76        self.inner.metrics.dec_pending_server_request();
77        state_remove_pending_server_request(&self.inner, &entry.rpc_key);
78        Ok(entry)
79    }
80}
81
82fn approval_not_found_error(approval_id: &str) -> RuntimeError {
83    RuntimeError::Internal(format!("approval id not found: {approval_id}"))
84}