#![allow(clippy::too_many_lines, dead_code)]
#![allow(
clippy::must_use_candidate,
clippy::map_entry,
clippy::if_not_else,
clippy::collapsible_if,
clippy::needless_update,
clippy::match_same_arms,
clippy::uninlined_format_args,
clippy::unnecessary_wraps
)]
mod anthropic_to_openai;
mod anthropic_to_responses;
mod anthropic_types;
mod frame_dispatch;
mod openai_stream;
mod openai_types;
mod responses_to_anthropic_stream;
mod sse_output;
mod sse_parser;
mod stream_helpers;
#[cfg(test)]
mod tests;
pub(crate) use anthropic_to_openai::transform_anthropic_stream_to_openai;
pub(crate) use anthropic_to_responses::transform_anthropic_stream_to_openai_responses;
pub(crate) use openai_stream::transform_openai_stream;
pub(crate) use responses_to_anthropic_stream::transform_responses_stream_to_anthropic;
pub use sse_output::events_to_sse;
pub(crate) use sse_output::{anthropic_sse_frames_to_events, passthrough_anthropic_stream};
pub use sse_parser::{SseFrame, parse_sse_frames};
#[allow(unused_imports)]
pub(crate) use stream_helpers::SYNTHETIC_THINKING_SIGNATURE;
use crate::model::{ApiFormat, StreamEvent, StreamState, TransformError};
pub fn transform_stream_events(
upstream_sse: &[u8],
source: ApiFormat,
state: &mut StreamState,
) -> Result<Vec<StreamEvent>, TransformError> {
let frames = parse_sse_frames(upstream_sse);
if frames.is_empty() {
return Ok(Vec::new());
}
let total_bytes: usize = frames.iter().map(|f| f.data.len()).sum();
if total_bytes > crate::model::MAX_SSE_STREAM_BYTES {
return Err(TransformError::BufferLimitExceeded(format!(
"SSE stream size {total_bytes} bytes exceeds {} byte limit",
crate::model::MAX_SSE_STREAM_BYTES
)));
}
match source {
ApiFormat::OpenaiChat => transform_openai_stream(&frames, state),
ApiFormat::AnthropicMessages => passthrough_anthropic_stream(&frames, state),
ApiFormat::OpenaiResponses => {
let bytes = transform_responses_stream_to_anthropic(&frames, state)?;
let anthro_frames = parse_sse_frames(&bytes);
anthropic_sse_frames_to_events(&anthro_frames, state)
}
}
}
pub fn transform_stream_to_openai_sse(
upstream_sse: &[u8],
source: ApiFormat,
state: &mut StreamState,
) -> Result<Vec<u8>, TransformError> {
let frames = parse_sse_frames(upstream_sse);
if frames.is_empty() {
return Ok(Vec::new());
}
match source {
ApiFormat::AnthropicMessages => transform_anthropic_stream_to_openai(&frames, state),
ApiFormat::OpenaiChat => Err(TransformError::InvalidFormat(
"OpenAI -> OpenAI passthrough is handled outside core transform".to_string(),
)),
ApiFormat::OpenaiResponses => Err(TransformError::InvalidFormat(
"OpenAI Responses -> OpenAI Responses passthrough is handled outside core transform"
.to_string(),
)),
}
}
pub fn transform_stream_to_openai_responses_sse(
upstream_sse: &[u8],
source: ApiFormat,
state: &mut StreamState,
) -> Result<Vec<u8>, TransformError> {
let frames = parse_sse_frames(upstream_sse);
if frames.is_empty() {
return Ok(Vec::new());
}
match source {
ApiFormat::AnthropicMessages => {
transform_anthropic_stream_to_openai_responses(&frames, state)
}
ApiFormat::OpenaiChat => Err(TransformError::InvalidFormat(
"OpenAI -> OpenAI passthrough is handled outside core transform".to_string(),
)),
ApiFormat::OpenaiResponses => Err(TransformError::InvalidFormat(
"OpenAI Responses -> OpenAI Responses passthrough is handled outside core transform"
.to_string(),
)),
}
}
pub fn transform_stream_to_anthropic_sse(
upstream_sse: &[u8],
source: ApiFormat,
state: &mut StreamState,
) -> Result<Vec<u8>, TransformError> {
let frames = parse_sse_frames(upstream_sse);
if frames.is_empty() {
return Ok(Vec::new());
}
match source {
ApiFormat::OpenaiChat => Ok(events_to_sse(&transform_openai_stream(&frames, state)?)),
ApiFormat::OpenaiResponses => transform_responses_stream_to_anthropic(&frames, state),
ApiFormat::AnthropicMessages => Err(TransformError::InvalidFormat(
"Anthropic -> Anthropic passthrough is handled outside core transform".to_string(),
)),
}
}