codex_runtime/runtime/core/
approval.rs1use serde_json::{json, Value};
2use tokio::sync::mpsc;
3
4use crate::runtime::approvals::ServerRequest;
5use crate::runtime::errors::{RpcErrorObject, RuntimeError};
6
7use super::dispatch::{send_rpc_error, send_rpc_result, validate_server_request_result_payload};
8use super::state_projection::state_remove_pending_server_request;
9use super::{PendingServerRequestEntry, Runtime};
10
11impl Runtime {
12 pub async fn take_server_request_rx(
13 &self,
14 ) -> Result<mpsc::Receiver<ServerRequest>, RuntimeError> {
15 self.inner
16 .io
17 .server_request_rx
18 .lock()
19 .await
20 .take()
21 .ok_or(RuntimeError::ServerRequestReceiverTaken)
22 }
23
24 pub async fn respond_approval_ok(
25 &self,
26 approval_id: &str,
27 result: Value,
28 ) -> Result<(), RuntimeError> {
29 let entry = self
30 .take_pending_server_request_entry(approval_id, |entry| {
31 validate_server_request_result_payload(&entry.method, &result)
32 })
33 .await?;
34 send_rpc_result(&self.inner, &entry.rpc_id, result).await
35 }
36
37 pub async fn respond_approval_err(
38 &self,
39 approval_id: &str,
40 err: RpcErrorObject,
41 ) -> Result<(), RuntimeError> {
42 let entry = self
43 .take_pending_server_request_entry(approval_id, |_| Ok(()))
44 .await?;
45 send_rpc_error(
46 &self.inner,
47 &entry.rpc_id,
48 json!({
49 "code": err.code,
50 "message": err.message,
51 "data": err.data
52 }),
53 )
54 .await
55 }
56
57 async fn take_pending_server_request_entry<F>(
58 &self,
59 approval_id: &str,
60 validate: F,
61 ) -> Result<PendingServerRequestEntry, RuntimeError>
62 where
63 F: FnOnce(&PendingServerRequestEntry) -> Result<(), RuntimeError>,
64 {
65 let entry = {
66 let mut guard = self.inner.io.pending_server_requests.lock().await;
67 let entry = guard
68 .get(approval_id)
69 .cloned()
70 .ok_or_else(|| approval_not_found_error(approval_id))?;
71 validate(&entry)?;
72 guard
73 .remove(approval_id)
74 .ok_or_else(|| approval_not_found_error(approval_id))?
75 };
76 self.inner.metrics.dec_pending_server_request();
77 state_remove_pending_server_request(&self.inner, &entry.rpc_key);
78 Ok(entry)
79 }
80}
81
82fn approval_not_found_error(approval_id: &str) -> RuntimeError {
83 RuntimeError::Internal(format!("approval id not found: {approval_id}"))
84}