Skip to main content

systemprompt_api/services/gateway/protocol/outbound/openai_responses/
mod.rs

1//! Outbound adapter targeting the `OpenAI` Responses API.
2//!
3//! [`OpenAiResponsesOutbound`] orchestrates transport — auth headers, HTTP
4//! status handling, stream-vs-buffered dispatch — and delegates every wire
5//! concern (request build, response parse, SSE-to-event mapping) to the shared
6//! [`systemprompt_models::wire::openai_responses`] codec.
7
8use anyhow::{Result, anyhow};
9use async_trait::async_trait;
10use serde_json::Value;
11use systemprompt_models::wire::openai_responses as codec;
12
13use super::{
14    OutboundAdapter, OutboundCtx, OutboundOutcome, UpstreamError, extract_upstream_message,
15};
16
17#[cfg(feature = "test-api")]
18pub mod test_api {
19    pub use systemprompt_models::wire::openai_responses::{
20        build_request_body, parse_response_object, sse_to_canonical_events,
21    };
22}
23
24#[derive(Debug, Clone, Copy, Default)]
25pub struct OpenAiResponsesOutbound;
26
27#[async_trait]
28impl OutboundAdapter for OpenAiResponsesOutbound {
29    fn provider_tag(&self) -> &'static str {
30        "openai-responses"
31    }
32
33    async fn send(&self, ctx: OutboundCtx<'_>) -> Result<OutboundOutcome> {
34        let body = codec::build_request_body(
35            ctx.request,
36            ctx.upstream_model,
37            ctx.model_limits.map(|l| l.max_output_tokens),
38        );
39        let url = format!("{}/responses", ctx.endpoint.trim_end_matches('/'));
40
41        let client = reqwest::Client::new();
42        let mut req = client
43            .post(&url)
44            .header("authorization", format!("Bearer {}", ctx.api_key))
45            .header("content-type", "application/json")
46            .json(&body);
47        for (name, value) in &ctx.route.extra_headers {
48            req = req.header(name.as_str(), value.as_str());
49        }
50
51        let upstream_response = req.send().await.map_err(|e| {
52            anyhow::Error::new(UpstreamError::Transport {
53                provider: self.provider_tag(),
54                source: e,
55            })
56        })?;
57        let status = upstream_response.status();
58        if !status.is_success() {
59            let err = upstream_response
60                .text()
61                .await
62                .unwrap_or_else(|e| format!("<failed to read upstream body: {e}>"));
63            return Err(anyhow::Error::new(UpstreamError::Status {
64                provider: self.provider_tag(),
65                status: status.as_u16(),
66                message: extract_upstream_message(&err),
67            }));
68        }
69
70        if ctx.request.stream {
71            let stream = upstream_response.bytes_stream();
72            let event_stream = codec::sse_to_canonical_events(stream, ctx.request.model.clone());
73            return Ok(OutboundOutcome::Streaming(event_stream));
74        }
75
76        let bytes = upstream_response
77            .bytes()
78            .await
79            .map_err(|e| anyhow!("Failed to read Responses body: {e}"))?;
80        let value: Value = serde_json::from_slice(&bytes)
81            .map_err(|e| anyhow!("Responses body not valid JSON: {e}"))?;
82        let canon = codec::parse_response_object(&value, &ctx.request.model);
83        Ok(OutboundOutcome::Buffered(Box::new(canon)))
84    }
85}