gproxy_protocol/transform/openai/websocket/to_http/
response.rs1use crate::openai::create_response::response::OpenAiCreateResponseResponse;
2use crate::openai::create_response::stream::ResponseStreamEvent;
3use crate::openai::create_response::websocket::response::OpenAiCreateResponseWebSocketMessageResponse;
4use crate::openai::create_response::websocket::types::{
5 OpenAiCreateResponseWebSocketServerMessage, OpenAiCreateResponseWebSocketWrappedErrorEvent,
6};
7use crate::transform::openai::websocket::context::OpenAiWebsocketTransformContext;
8
9const FALLBACK_WS_ERROR_CODE: &str = "websocket_error";
10const FALLBACK_WS_ERROR_MESSAGE: &str = "websocket error";
11
12fn wrapped_error_to_stream_event(
13 event: OpenAiCreateResponseWebSocketWrappedErrorEvent,
14 sequence_number: u64,
15 ctx: &mut OpenAiWebsocketTransformContext,
16) -> ResponseStreamEvent {
17 if let Some(status) = event.status {
18 ctx.push_warning(format!(
19 "openai websocket to_http response: dropped wrapped error status={status}"
20 ));
21 }
22 if let Some(headers) = event.headers.as_ref() {
23 ctx.push_warning(format!(
24 "openai websocket to_http response: dropped wrapped error headers ({} entries)",
25 headers.len()
26 ));
27 }
28 let payload = event.error.unwrap_or_default();
29 ResponseStreamEvent::Error {
30 error: crate::openai::create_response::stream::ResponseStreamErrorPayload {
31 type_: payload
32 .type_
33 .or_else(|| payload.code.clone())
34 .unwrap_or_else(|| FALLBACK_WS_ERROR_CODE.to_string()),
35 code: payload.code,
36 message: payload
37 .message
38 .unwrap_or_else(|| FALLBACK_WS_ERROR_MESSAGE.to_string()),
39 param: payload.param,
40 },
41 sequence_number,
42 }
43}
44
45fn api_error_to_stream_event(
46 event: crate::openai::types::OpenAiApiErrorResponse,
47 sequence_number: u64,
48) -> ResponseStreamEvent {
49 ResponseStreamEvent::Error {
50 error: crate::openai::create_response::stream::ResponseStreamErrorPayload {
51 type_: event.error.type_,
52 code: event.error.code,
53 message: event.error.message,
54 param: event.error.param,
55 },
56 sequence_number,
57 }
58}
59
60impl TryFrom<&[OpenAiCreateResponseWebSocketMessageResponse]> for OpenAiCreateResponseResponse {
61 type Error = crate::transform::utils::TransformError;
62
63 fn try_from(
64 value: &[OpenAiCreateResponseWebSocketMessageResponse],
65 ) -> Result<Self, crate::transform::utils::TransformError> {
66 Ok(websocket_messages_to_openai_nonstream_with_context(value)?.0)
67 }
68}
69
70impl TryFrom<Vec<OpenAiCreateResponseWebSocketMessageResponse>> for OpenAiCreateResponseResponse {
71 type Error = crate::transform::utils::TransformError;
72
73 fn try_from(
74 value: Vec<OpenAiCreateResponseWebSocketMessageResponse>,
75 ) -> Result<Self, crate::transform::utils::TransformError> {
76 OpenAiCreateResponseResponse::try_from(value.as_slice())
77 }
78}
79
80pub fn websocket_messages_to_openai_nonstream_with_context(
81 value: &[OpenAiCreateResponseWebSocketMessageResponse],
82) -> Result<
83 (
84 OpenAiCreateResponseResponse,
85 OpenAiWebsocketTransformContext,
86 ),
87 crate::transform::utils::TransformError,
88> {
89 let (events, ctx) = websocket_messages_to_openai_stream_events_with_context(value)?;
90 let response = OpenAiCreateResponseResponse::try_from(events)?;
91 Ok((response, ctx))
92}
93
94pub fn websocket_messages_to_openai_stream_events_with_context(
95 value: &[OpenAiCreateResponseWebSocketMessageResponse],
96) -> Result<
97 (Vec<ResponseStreamEvent>, OpenAiWebsocketTransformContext),
98 crate::transform::utils::TransformError,
99> {
100 let mut ctx = OpenAiWebsocketTransformContext::default();
101 let mut events = Vec::new();
102 let mut next_sequence_number = 0_u64;
103
104 for message in value.iter().cloned() {
105 match message {
106 OpenAiCreateResponseWebSocketServerMessage::StreamEvent(event) => {
107 events.push(event);
108 }
109 OpenAiCreateResponseWebSocketServerMessage::Done(_) => {
110 }
112 OpenAiCreateResponseWebSocketServerMessage::WrappedError(event) => {
113 events.push(wrapped_error_to_stream_event(
114 event,
115 next_sequence_number,
116 &mut ctx,
117 ));
118 next_sequence_number = next_sequence_number.saturating_add(1);
119 }
120 OpenAiCreateResponseWebSocketServerMessage::ApiError(event) => {
121 events.push(api_error_to_stream_event(event, next_sequence_number));
122 next_sequence_number = next_sequence_number.saturating_add(1);
123 }
124 OpenAiCreateResponseWebSocketServerMessage::RateLimit(_) => {
126 ctx.push_warning(
127 "openai websocket to_http response: dropped codex.rate_limits event"
128 .to_string(),
129 );
130 }
131 }
132 }
133
134 Ok((events, ctx))
135}