Skip to main content

walrus_model/remote/
http.rs

1//! Shared HTTP transport for OpenAI-compatible and Anthropic LLM providers.
2//!
3//! `HttpProvider` wraps a `reqwest::Client` with pre-configured headers and
4//! endpoint URL. Provides `send()` / `send_raw()` for non-streaming, `stream_sse()`
5//! for OpenAI-format SSE, and `stream_anthropic()` for Anthropic block-buffer SSE.
6
7use crate::remote::claude::stream::parse_sse_block;
8use anyhow::Result;
9use async_stream::try_stream;
10use futures_core::Stream;
11use futures_util::StreamExt;
12use reqwest::{
13    Client, Method,
14    header::{self, HeaderMap, HeaderName, HeaderValue},
15};
16use serde::Serialize;
17use wcore::model::{Response, StreamChunk};
18
19/// Anthropic API version header value.
20const API_VERSION: &str = "2023-06-01";
21
22/// Shared HTTP transport for OpenAI-compatible providers.
23///
24/// Holds a `reqwest::Client`, pre-built headers (auth + content-type),
25/// and the target endpoint URL.
26#[derive(Clone)]
27pub struct HttpProvider {
28    client: Client,
29    headers: HeaderMap,
30    endpoint: String,
31}
32
33impl HttpProvider {
34    /// Create a provider with Bearer token authentication.
35    pub fn bearer(client: Client, key: &str, endpoint: &str) -> Result<Self> {
36        let mut headers = HeaderMap::new();
37        headers.insert(
38            header::CONTENT_TYPE,
39            HeaderValue::from_static("application/json"),
40        );
41        headers.insert(header::ACCEPT, HeaderValue::from_static("application/json"));
42        headers.insert(header::AUTHORIZATION, format!("Bearer {key}").parse()?);
43        Ok(Self {
44            client,
45            headers,
46            endpoint: endpoint.to_owned(),
47        })
48    }
49
50    /// Create a provider without authentication (e.g. Ollama).
51    pub fn no_auth(client: Client, endpoint: &str) -> Self {
52        let mut headers = HeaderMap::new();
53        headers.insert(
54            header::CONTENT_TYPE,
55            HeaderValue::from_static("application/json"),
56        );
57        headers.insert(header::ACCEPT, HeaderValue::from_static("application/json"));
58        Self {
59            client,
60            headers,
61            endpoint: endpoint.to_owned(),
62        }
63    }
64
65    /// Create a provider with a custom header for authentication.
66    ///
67    /// Used by providers that don't use Bearer tokens (e.g. Anthropic
68    /// uses `x-api-key`).
69    pub fn custom_header(
70        client: Client,
71        header_name: &str,
72        header_value: &str,
73        endpoint: &str,
74    ) -> Result<Self> {
75        let mut headers = HeaderMap::new();
76        headers.insert(
77            header::CONTENT_TYPE,
78            HeaderValue::from_static("application/json"),
79        );
80        headers.insert(header::ACCEPT, HeaderValue::from_static("application/json"));
81        headers.insert(
82            header_name.parse::<HeaderName>()?,
83            header_value.parse::<HeaderValue>()?,
84        );
85        Ok(Self {
86            client,
87            headers,
88            endpoint: endpoint.to_owned(),
89        })
90    }
91
92    /// Create a provider with Anthropic authentication headers.
93    ///
94    /// Inserts `x-api-key` and `anthropic-version` in addition to the
95    /// standard content-type and accept headers.
96    pub fn anthropic(client: Client, key: &str, endpoint: &str) -> Result<Self> {
97        let mut headers = HeaderMap::new();
98        headers.insert(
99            header::CONTENT_TYPE,
100            HeaderValue::from_static("application/json"),
101        );
102        headers.insert(header::ACCEPT, HeaderValue::from_static("application/json"));
103        headers.insert(
104            "x-api-key".parse::<HeaderName>()?,
105            key.parse::<HeaderValue>()?,
106        );
107        headers.insert(
108            "anthropic-version".parse::<HeaderName>()?,
109            API_VERSION.parse::<HeaderValue>()?,
110        );
111        Ok(Self {
112            client,
113            headers,
114            endpoint: endpoint.to_owned(),
115        })
116    }
117
118    /// Send a non-streaming request and deserialize the response as JSON.
119    pub async fn send(&self, body: &impl Serialize) -> Result<Response> {
120        tracing::trace!("request: {}", serde_json::to_string(body)?);
121        let response = self
122            .client
123            .request(Method::POST, &self.endpoint)
124            .headers(self.headers.clone())
125            .json(body)
126            .send()
127            .await?;
128
129        let status = response.status();
130        let text = response.text().await?;
131        if !status.is_success() {
132            anyhow::bail!("API error ({status}): {text}");
133        }
134
135        serde_json::from_str(&text).map_err(Into::into)
136    }
137
138    /// Stream an SSE response (OpenAI-compatible format).
139    ///
140    /// Parses `data: ` prefixed lines, skips `[DONE]` sentinel, and
141    /// deserializes each chunk as [`StreamChunk`].
142    pub fn stream_sse(
143        &self,
144        body: &impl Serialize,
145    ) -> impl Stream<Item = Result<StreamChunk>> + Send {
146        if let Ok(body) = serde_json::to_string(body) {
147            tracing::trace!("request: {}", body);
148        }
149        let request = self
150            .client
151            .request(Method::POST, &self.endpoint)
152            .headers(self.headers.clone())
153            .json(body);
154
155        try_stream! {
156            let response = request.send().await?;
157            let mut stream = response.bytes_stream();
158            while let Some(next) = stream.next().await {
159                let bytes = next?;
160                let text = String::from_utf8_lossy(&bytes);
161                tracing::trace!("chunk: {}", text);
162                for data in text.split("data: ").skip(1).filter(|s| !s.starts_with("[DONE]")) {
163                    let trimmed = data.trim();
164                    if trimmed.is_empty() {
165                        continue;
166                    }
167                    match serde_json::from_str::<StreamChunk>(trimmed) {
168                        Ok(chunk) => yield chunk,
169                        Err(e) => tracing::warn!("failed to parse chunk: {e}, data: {trimmed}"),
170                    }
171                }
172            }
173        }
174    }
175
176    /// Send a non-streaming request and return the raw response body text.
177    ///
178    /// Unlike `send()`, the caller is responsible for deserialization.
179    /// Used by providers whose response schema differs from the OpenAI format (e.g. Anthropic).
180    pub async fn send_raw(&self, body: &impl Serialize) -> Result<String> {
181        tracing::trace!("request: {}", serde_json::to_string(body)?);
182        let response = self
183            .client
184            .request(Method::POST, &self.endpoint)
185            .headers(self.headers.clone())
186            .json(body)
187            .send()
188            .await?;
189        let status = response.status();
190        let text = response.text().await?;
191        if !status.is_success() {
192            anyhow::bail!("API error ({status}): {text}");
193        }
194        Ok(text)
195    }
196
197    /// Stream an SSE response in Anthropic block-buffer format.
198    ///
199    /// Anthropic uses `\n\n`-delimited blocks each containing `event:` and
200    /// `data:` lines, unlike OpenAI's line-by-line `data: ` prefix format.
201    /// Takes the body as an owned `serde_json::Value` so the stream can be
202    /// `'static` without capturing a borrow.
203    pub fn stream_anthropic(
204        &self,
205        body: serde_json::Value,
206    ) -> impl Stream<Item = Result<StreamChunk>> + Send {
207        tracing::trace!("request: {}", body);
208        let request = self
209            .client
210            .request(Method::POST, &self.endpoint)
211            .headers(self.headers.clone())
212            .json(&body);
213
214        try_stream! {
215            let response = request.send().await?;
216            let mut stream = response.bytes_stream();
217            let mut buf = String::new();
218            while let Some(Ok(bytes)) = stream.next().await {
219                buf.push_str(&String::from_utf8_lossy(&bytes));
220                while let Some(pos) = buf.find("\n\n") {
221                    let block = buf[..pos].to_owned();
222                    buf = buf[pos + 2..].to_owned();
223                    if let Some(chunk) = parse_sse_block(&block) {
224                        yield chunk;
225                    }
226                }
227            }
228            if !buf.trim().is_empty()
229                && let Some(chunk) = parse_sse_block(&buf)
230            {
231                yield chunk;
232            }
233        }
234    }
235
236    /// Get the endpoint URL.
237    pub fn endpoint(&self) -> &str {
238        &self.endpoint
239    }
240
241    /// Get a reference to the headers.
242    pub fn headers(&self) -> &HeaderMap {
243        &self.headers
244    }
245}