walrus_model/remote/
http.rs1use crate::remote::claude::stream::parse_sse_block;
8use anyhow::Result;
9use async_stream::try_stream;
10use futures_core::Stream;
11use futures_util::StreamExt;
12use reqwest::{
13 Client, Method,
14 header::{self, HeaderMap, HeaderName, HeaderValue},
15};
16use serde::Serialize;
17use wcore::model::{Response, StreamChunk};
18
19const API_VERSION: &str = "2023-06-01";
21
22#[derive(Clone)]
27pub struct HttpProvider {
28 client: Client,
29 headers: HeaderMap,
30 endpoint: String,
31}
32
33impl HttpProvider {
34 pub fn bearer(client: Client, key: &str, endpoint: &str) -> Result<Self> {
36 let mut headers = HeaderMap::new();
37 headers.insert(
38 header::CONTENT_TYPE,
39 HeaderValue::from_static("application/json"),
40 );
41 headers.insert(header::ACCEPT, HeaderValue::from_static("application/json"));
42 headers.insert(header::AUTHORIZATION, format!("Bearer {key}").parse()?);
43 Ok(Self {
44 client,
45 headers,
46 endpoint: endpoint.to_owned(),
47 })
48 }
49
50 pub fn no_auth(client: Client, endpoint: &str) -> Self {
52 let mut headers = HeaderMap::new();
53 headers.insert(
54 header::CONTENT_TYPE,
55 HeaderValue::from_static("application/json"),
56 );
57 headers.insert(header::ACCEPT, HeaderValue::from_static("application/json"));
58 Self {
59 client,
60 headers,
61 endpoint: endpoint.to_owned(),
62 }
63 }
64
65 pub fn custom_header(
70 client: Client,
71 header_name: &str,
72 header_value: &str,
73 endpoint: &str,
74 ) -> Result<Self> {
75 let mut headers = HeaderMap::new();
76 headers.insert(
77 header::CONTENT_TYPE,
78 HeaderValue::from_static("application/json"),
79 );
80 headers.insert(header::ACCEPT, HeaderValue::from_static("application/json"));
81 headers.insert(
82 header_name.parse::<HeaderName>()?,
83 header_value.parse::<HeaderValue>()?,
84 );
85 Ok(Self {
86 client,
87 headers,
88 endpoint: endpoint.to_owned(),
89 })
90 }
91
92 pub fn anthropic(client: Client, key: &str, endpoint: &str) -> Result<Self> {
97 let mut headers = HeaderMap::new();
98 headers.insert(
99 header::CONTENT_TYPE,
100 HeaderValue::from_static("application/json"),
101 );
102 headers.insert(header::ACCEPT, HeaderValue::from_static("application/json"));
103 headers.insert(
104 "x-api-key".parse::<HeaderName>()?,
105 key.parse::<HeaderValue>()?,
106 );
107 headers.insert(
108 "anthropic-version".parse::<HeaderName>()?,
109 API_VERSION.parse::<HeaderValue>()?,
110 );
111 Ok(Self {
112 client,
113 headers,
114 endpoint: endpoint.to_owned(),
115 })
116 }
117
118 pub async fn send(&self, body: &impl Serialize) -> Result<Response> {
120 tracing::trace!("request: {}", serde_json::to_string(body)?);
121 let response = self
122 .client
123 .request(Method::POST, &self.endpoint)
124 .headers(self.headers.clone())
125 .json(body)
126 .send()
127 .await?;
128
129 let status = response.status();
130 let text = response.text().await?;
131 if !status.is_success() {
132 anyhow::bail!("API error ({status}): {text}");
133 }
134
135 serde_json::from_str(&text).map_err(Into::into)
136 }
137
138 pub fn stream_sse(
143 &self,
144 body: &impl Serialize,
145 ) -> impl Stream<Item = Result<StreamChunk>> + Send {
146 if let Ok(body) = serde_json::to_string(body) {
147 tracing::trace!("request: {}", body);
148 }
149 let request = self
150 .client
151 .request(Method::POST, &self.endpoint)
152 .headers(self.headers.clone())
153 .json(body);
154
155 try_stream! {
156 let response = request.send().await?;
157 let mut stream = response.bytes_stream();
158 while let Some(next) = stream.next().await {
159 let bytes = next?;
160 let text = String::from_utf8_lossy(&bytes);
161 tracing::trace!("chunk: {}", text);
162 for data in text.split("data: ").skip(1).filter(|s| !s.starts_with("[DONE]")) {
163 let trimmed = data.trim();
164 if trimmed.is_empty() {
165 continue;
166 }
167 match serde_json::from_str::<StreamChunk>(trimmed) {
168 Ok(chunk) => yield chunk,
169 Err(e) => tracing::warn!("failed to parse chunk: {e}, data: {trimmed}"),
170 }
171 }
172 }
173 }
174 }
175
176 pub async fn send_raw(&self, body: &impl Serialize) -> Result<String> {
181 tracing::trace!("request: {}", serde_json::to_string(body)?);
182 let response = self
183 .client
184 .request(Method::POST, &self.endpoint)
185 .headers(self.headers.clone())
186 .json(body)
187 .send()
188 .await?;
189 let status = response.status();
190 let text = response.text().await?;
191 if !status.is_success() {
192 anyhow::bail!("API error ({status}): {text}");
193 }
194 Ok(text)
195 }
196
197 pub fn stream_anthropic(
204 &self,
205 body: serde_json::Value,
206 ) -> impl Stream<Item = Result<StreamChunk>> + Send {
207 tracing::trace!("request: {}", body);
208 let request = self
209 .client
210 .request(Method::POST, &self.endpoint)
211 .headers(self.headers.clone())
212 .json(&body);
213
214 try_stream! {
215 let response = request.send().await?;
216 let mut stream = response.bytes_stream();
217 let mut buf = String::new();
218 while let Some(Ok(bytes)) = stream.next().await {
219 buf.push_str(&String::from_utf8_lossy(&bytes));
220 while let Some(pos) = buf.find("\n\n") {
221 let block = buf[..pos].to_owned();
222 buf = buf[pos + 2..].to_owned();
223 if let Some(chunk) = parse_sse_block(&block) {
224 yield chunk;
225 }
226 }
227 }
228 if !buf.trim().is_empty()
229 && let Some(chunk) = parse_sse_block(&buf)
230 {
231 yield chunk;
232 }
233 }
234 }
235
236 pub fn endpoint(&self) -> &str {
238 &self.endpoint
239 }
240
241 pub fn headers(&self) -> &HeaderMap {
243 &self.headers
244 }
245}