Skip to main content

gproxy_protocol/transform/openai/websocket/to_http/
response.rs

1use crate::openai::create_response::response::OpenAiCreateResponseResponse;
2use crate::openai::create_response::stream::ResponseStreamEvent;
3use crate::openai::create_response::websocket::response::OpenAiCreateResponseWebSocketMessageResponse;
4use crate::openai::create_response::websocket::types::{
5    OpenAiCreateResponseWebSocketServerMessage, OpenAiCreateResponseWebSocketWrappedErrorEvent,
6};
7use crate::transform::openai::websocket::context::OpenAiWebsocketTransformContext;
8
9const FALLBACK_WS_ERROR_CODE: &str = "websocket_error";
10const FALLBACK_WS_ERROR_MESSAGE: &str = "websocket error";
11
12fn wrapped_error_to_stream_event(
13    event: OpenAiCreateResponseWebSocketWrappedErrorEvent,
14    sequence_number: u64,
15    ctx: &mut OpenAiWebsocketTransformContext,
16) -> ResponseStreamEvent {
17    if let Some(status) = event.status {
18        ctx.push_warning(format!(
19            "openai websocket to_http response: dropped wrapped error status={status}"
20        ));
21    }
22    if let Some(headers) = event.headers.as_ref() {
23        ctx.push_warning(format!(
24            "openai websocket to_http response: dropped wrapped error headers ({} entries)",
25            headers.len()
26        ));
27    }
28    let payload = event.error.unwrap_or_default();
29    ResponseStreamEvent::Error {
30        error: crate::openai::create_response::stream::ResponseStreamErrorPayload {
31            type_: payload
32                .type_
33                .or_else(|| payload.code.clone())
34                .unwrap_or_else(|| FALLBACK_WS_ERROR_CODE.to_string()),
35            code: payload.code,
36            message: payload
37                .message
38                .unwrap_or_else(|| FALLBACK_WS_ERROR_MESSAGE.to_string()),
39            param: payload.param,
40        },
41        sequence_number,
42    }
43}
44
45fn api_error_to_stream_event(
46    event: crate::openai::types::OpenAiApiErrorResponse,
47    sequence_number: u64,
48) -> ResponseStreamEvent {
49    ResponseStreamEvent::Error {
50        error: crate::openai::create_response::stream::ResponseStreamErrorPayload {
51            type_: event.error.type_,
52            code: event.error.code,
53            message: event.error.message,
54            param: event.error.param,
55        },
56        sequence_number,
57    }
58}
59
60impl TryFrom<&[OpenAiCreateResponseWebSocketMessageResponse]> for OpenAiCreateResponseResponse {
61    type Error = crate::transform::utils::TransformError;
62
63    fn try_from(
64        value: &[OpenAiCreateResponseWebSocketMessageResponse],
65    ) -> Result<Self, crate::transform::utils::TransformError> {
66        Ok(websocket_messages_to_openai_nonstream_with_context(value)?.0)
67    }
68}
69
70impl TryFrom<Vec<OpenAiCreateResponseWebSocketMessageResponse>> for OpenAiCreateResponseResponse {
71    type Error = crate::transform::utils::TransformError;
72
73    fn try_from(
74        value: Vec<OpenAiCreateResponseWebSocketMessageResponse>,
75    ) -> Result<Self, crate::transform::utils::TransformError> {
76        OpenAiCreateResponseResponse::try_from(value.as_slice())
77    }
78}
79
80pub fn websocket_messages_to_openai_nonstream_with_context(
81    value: &[OpenAiCreateResponseWebSocketMessageResponse],
82) -> Result<
83    (
84        OpenAiCreateResponseResponse,
85        OpenAiWebsocketTransformContext,
86    ),
87    crate::transform::utils::TransformError,
88> {
89    let (events, ctx) = websocket_messages_to_openai_stream_events_with_context(value)?;
90    let response = OpenAiCreateResponseResponse::try_from(events)?;
91    Ok((response, ctx))
92}
93
94pub fn websocket_messages_to_openai_stream_events_with_context(
95    value: &[OpenAiCreateResponseWebSocketMessageResponse],
96) -> Result<
97    (Vec<ResponseStreamEvent>, OpenAiWebsocketTransformContext),
98    crate::transform::utils::TransformError,
99> {
100    let mut ctx = OpenAiWebsocketTransformContext::default();
101    let mut events = Vec::new();
102    let mut next_sequence_number = 0_u64;
103
104    for message in value.iter().cloned() {
105        match message {
106            OpenAiCreateResponseWebSocketServerMessage::StreamEvent(event) => {
107                events.push(event);
108            }
109            OpenAiCreateResponseWebSocketServerMessage::Done(_) => {
110                // Done marker is not a stream event; skip.
111            }
112            OpenAiCreateResponseWebSocketServerMessage::WrappedError(event) => {
113                events.push(wrapped_error_to_stream_event(
114                    event,
115                    next_sequence_number,
116                    &mut ctx,
117                ));
118                next_sequence_number = next_sequence_number.saturating_add(1);
119            }
120            OpenAiCreateResponseWebSocketServerMessage::ApiError(event) => {
121                events.push(api_error_to_stream_event(event, next_sequence_number));
122                next_sequence_number = next_sequence_number.saturating_add(1);
123            }
124            // No equivalent SSE event in OpenAI response stream schema.
125            OpenAiCreateResponseWebSocketServerMessage::RateLimit(_) => {
126                ctx.push_warning(
127                    "openai websocket to_http response: dropped codex.rate_limits event"
128                        .to_string(),
129                );
130            }
131        }
132    }
133
134    Ok((events, ctx))
135}