Skip to main content

systemprompt_api/services/gateway/protocol/outbound/openai_chat/
mod.rs

1//! Outbound adapter targeting the `OpenAI` Chat Completions API.
2//!
3//! [`OpenAiChatOutbound`] orchestrates transport — auth headers, HTTP status
4//! handling, stream-vs-buffered dispatch — and delegates every wire concern
5//! (request build, response parse, SSE-to-event mapping) to the shared
6//! [`systemprompt_models::wire::openai_chat`] codec. Also serves
7//! OpenAI-compatible providers exposing the same surface.
8
9use anyhow::{Result, anyhow};
10use async_trait::async_trait;
11use serde_json::Value;
12use systemprompt_models::wire::openai_chat as codec;
13
14use super::{
15    OutboundAdapter, OutboundCtx, OutboundOutcome, UpstreamError, extract_upstream_message,
16};
17
18#[cfg(feature = "test-api")]
19pub mod test_api {
20    pub use systemprompt_models::wire::openai_chat::{
21        build_request_body, parse_response, sse_to_canonical_events,
22    };
23}
24
25#[derive(Debug, Clone, Copy, Default)]
26pub struct OpenAiChatOutbound;
27
28#[async_trait]
29impl OutboundAdapter for OpenAiChatOutbound {
30    fn provider_tag(&self) -> &'static str {
31        "openai"
32    }
33
34    async fn send(&self, ctx: OutboundCtx<'_>) -> Result<OutboundOutcome> {
35        let body = codec::build_request_body(
36            ctx.request,
37            ctx.upstream_model,
38            ctx.model_limits.map(|l| l.max_output_tokens),
39        );
40        let url = format!("{}/chat/completions", ctx.endpoint.trim_end_matches('/'));
41
42        let client = reqwest::Client::new();
43        let mut req = client
44            .post(&url)
45            .header("authorization", format!("Bearer {}", ctx.api_key))
46            .header("content-type", "application/json")
47            .json(&body);
48        for (name, value) in &ctx.route.extra_headers {
49            req = req.header(name.as_str(), value.as_str());
50        }
51
52        let upstream_response = req.send().await.map_err(|e| {
53            anyhow::Error::new(UpstreamError::Transport {
54                provider: self.provider_tag(),
55                source: e,
56            })
57        })?;
58
59        let status = upstream_response.status();
60        if !status.is_success() {
61            let err = upstream_response
62                .text()
63                .await
64                .unwrap_or_else(|e| format!("<failed to read upstream body: {e}>"));
65            return Err(anyhow::Error::new(UpstreamError::Status {
66                provider: self.provider_tag(),
67                status: status.as_u16(),
68                message: extract_upstream_message(&err),
69            }));
70        }
71
72        if ctx.request.stream {
73            let stream = upstream_response.bytes_stream();
74            let event_stream = codec::sse_to_canonical_events(stream, ctx.request.model.clone());
75            return Ok(OutboundOutcome::Streaming(event_stream));
76        }
77
78        let bytes = upstream_response
79            .bytes()
80            .await
81            .map_err(|e| anyhow!("Failed to read OpenAI response: {e}"))?;
82        let value: Value = serde_json::from_slice(&bytes)
83            .map_err(|e| anyhow!("OpenAI response not valid JSON: {e}"))?;
84        let canon = codec::parse_response(&value, &ctx.request.model);
85        Ok(OutboundOutcome::Buffered(Box::new(canon)))
86    }
87}