Skip to main content

systemprompt_api/services/gateway/protocol/outbound/
mod.rs

1//! Outbound protocol adapters: canonical model to upstream provider.
2//!
3//! The [`OutboundAdapter`] trait sends a [`CanonicalRequest`] to an upstream
4//! provider and yields an [`OutboundOutcome`] — a buffered response or a stream
5//! of canonical events. Adapters register themselves via
6//! [`OutboundAdapterRegistration`] (collected by `inventory`) so the upstream
7//! registry can resolve one by provider tag. Implementations cover Anthropic,
8//! `OpenAI` Chat Completions, and `OpenAI` Responses.
9
10pub mod anthropic;
11pub mod gemini;
12pub mod openai_chat;
13pub mod openai_responses;
14
15use std::sync::Arc;
16
17use anyhow::Result;
18use async_trait::async_trait;
19use futures_util::stream::BoxStream;
20use systemprompt_models::profile::GatewayRoute;
21use systemprompt_models::services::ai::ModelLimits;
22use thiserror::Error;
23
24use super::canonical::CanonicalRequest;
25use super::canonical_response::{CanonicalEvent, CanonicalResponse};
26
27/// Upstream provider failure, carried inside the `anyhow::Error` an adapter
28/// returns so the route layer can recover the real HTTP status by downcast
29/// instead of flattening every failure to 502.
30#[derive(Debug, Error)]
31pub enum UpstreamError {
32    #[error("{provider} returned {status}: {message}")]
33    Status {
34        provider: &'static str,
35        status: u16,
36        message: String,
37    },
38    #[error("{provider} request failed: {source}")]
39    Transport {
40        provider: &'static str,
41        #[source]
42        source: reqwest::Error,
43    },
44}
45
46/// Pulls the provider's `error.message` from a JSON error body (the shape
47/// `OpenAI`, Anthropic, and Gemini all use), falling back to the raw body
48/// truncated so logs and client responses stay bounded.
49pub fn extract_upstream_message(body: &str) -> String {
50    serde_json::from_str::<serde_json::Value>(body)
51        .ok()
52        .and_then(|v| v["error"]["message"].as_str().map(ToOwned::to_owned))
53        .unwrap_or_else(|| body.chars().take(500).collect())
54}
55
56#[derive(Debug)]
57pub struct OutboundCtx<'a> {
58    pub route: &'a GatewayRoute,
59    pub endpoint: &'a str,
60    pub api_key: &'a str,
61    pub request: &'a CanonicalRequest,
62    pub upstream_model: &'a str,
63    /// Limits from the resolved upstream model card, when the requested model
64    /// maps to a known catalog entry. Codecs use this to clamp
65    /// provider-specific values (e.g. Gemini's `thinkingBudget`) to what
66    /// the upstream accepts.
67    pub model_limits: Option<ModelLimits>,
68}
69
70#[expect(
71    missing_debug_implementations,
72    reason = "variants hold streaming bodies that intentionally do not implement Debug"
73)]
74pub enum OutboundOutcome {
75    Buffered(Box<CanonicalResponse>),
76    Streaming(BoxStream<'static, Result<CanonicalEvent, String>>),
77}
78
79// Why: #[async_trait] is required — the upstream registry stores adapters as
80// `Arc<dyn OutboundAdapter>`, so the trait must stay dyn-compatible.
81#[async_trait]
82pub trait OutboundAdapter: Send + Sync {
83    fn provider_tag(&self) -> &'static str;
84    async fn send(&self, ctx: OutboundCtx<'_>) -> Result<OutboundOutcome>;
85}
86
87#[derive(Debug, Clone, Copy)]
88pub struct OutboundAdapterRegistration {
89    pub tag: &'static str,
90    pub factory: fn() -> Arc<dyn OutboundAdapter>,
91}
92
93inventory::collect!(OutboundAdapterRegistration);