agentox_core/client/
http_sse.rs1use crate::client::transport::{Transport, TransportCapabilities};
4use crate::error::TransportError;
5use reqwest::header::CONTENT_TYPE;
6use std::time::Duration;
7
8pub struct HttpSseTransport {
9 endpoint: String,
10 client: reqwest::Client,
11 timeout: Duration,
12 max_retries: usize,
13}
14
15impl HttpSseTransport {
16 pub fn new(endpoint: impl Into<String>, timeout: Duration) -> Self {
17 let timeout = if timeout.is_zero() {
18 Duration::from_secs(30)
19 } else {
20 timeout
21 };
22 let client = reqwest::Client::builder()
23 .timeout(timeout)
24 .build()
25 .unwrap_or_else(|_| reqwest::Client::new());
26 Self {
27 endpoint: endpoint.into(),
28 client,
29 timeout,
30 max_retries: 2,
31 }
32 }
33
34 async fn post_raw_once(&self, message: &str) -> Result<reqwest::Response, TransportError> {
35 let resp = self
36 .client
37 .post(&self.endpoint)
38 .header(CONTENT_TYPE, "application/json")
39 .body(message.to_string())
40 .send()
41 .await
42 .map_err(|e| {
43 if e.is_timeout() {
44 TransportError::Timeout(self.timeout)
45 } else {
46 TransportError::Http(format!("transport request failed: {e}"))
47 }
48 })?;
49 Ok(resp)
50 }
51
52 fn parse_sse_first_data(body: &str) -> Option<String> {
53 let mut data_lines: Vec<String> = Vec::new();
54 for line in body.lines() {
55 let line = line.trim();
56 if line.is_empty() {
57 if !data_lines.is_empty() {
58 return Some(data_lines.join("\n"));
59 }
60 continue;
61 }
62 if line.starts_with(':') {
63 continue;
64 }
65 if let Some(data) = line.strip_prefix("data:") {
66 data_lines.push(data.trim().to_string());
67 }
68 }
69 if data_lines.is_empty() {
70 None
71 } else {
72 Some(data_lines.join("\n"))
73 }
74 }
75
76 fn is_retryable_status(status: reqwest::StatusCode) -> bool {
77 matches!(
78 status,
79 reqwest::StatusCode::BAD_GATEWAY
80 | reqwest::StatusCode::SERVICE_UNAVAILABLE
81 | reqwest::StatusCode::GATEWAY_TIMEOUT
82 )
83 }
84
85 fn maybe_retry_delay(attempt: usize) -> Duration {
86 Duration::from_millis(50 * (attempt as u64 + 1))
87 }
88
89 async fn execute_with_retry(
90 &self,
91 message: &str,
92 expect_response_body: bool,
93 ) -> Result<Option<String>, TransportError> {
94 let mut last_err: Option<TransportError> = None;
95
96 for attempt in 0..=self.max_retries {
97 let response = self.post_raw_once(message).await;
98 match response {
99 Ok(resp) => {
100 let status = resp.status();
101 if status.is_client_error() {
102 return Err(TransportError::Http(format!(
103 "HTTP client error {}",
104 status.as_u16()
105 )));
106 }
107 if !status.is_success() {
108 if Self::is_retryable_status(status) && attempt < self.max_retries {
109 tokio::time::sleep(Self::maybe_retry_delay(attempt)).await;
110 continue;
111 }
112 return Err(TransportError::Http(format!(
113 "HTTP server error {}",
114 status.as_u16()
115 )));
116 }
117
118 if !expect_response_body {
119 return Ok(None);
120 }
121
122 let content_type = resp
123 .headers()
124 .get(CONTENT_TYPE)
125 .and_then(|v| v.to_str().ok())
126 .unwrap_or("")
127 .to_ascii_lowercase();
128
129 let body = resp
130 .text()
131 .await
132 .map_err(|e| TransportError::Http(format!("response read failed: {e}")))?;
133 if body.trim().is_empty() {
134 return Ok(None);
135 }
136
137 let payload = if content_type.contains("text/event-stream") {
138 Self::parse_sse_first_data(&body).ok_or_else(|| {
139 TransportError::Http(
140 "SSE response contained no data event payload".to_string(),
141 )
142 })?
143 } else {
144 body.trim().to_string()
145 };
146
147 serde_json::from_str::<serde_json::Value>(&payload).map_err(|e| {
148 TransportError::Http(format!("response payload is not valid JSON: {e}"))
149 })?;
150 return Ok(Some(payload));
151 }
152 Err(e) => {
153 let retryable = matches!(e, TransportError::Timeout(_))
154 || matches!(&e, TransportError::Http(msg) if msg.contains("transport request failed"));
155 last_err = Some(e);
156 if retryable && attempt < self.max_retries {
157 tokio::time::sleep(Self::maybe_retry_delay(attempt)).await;
158 continue;
159 }
160 }
161 }
162 break;
163 }
164
165 Err(last_err
166 .unwrap_or_else(|| TransportError::Http("request failed after retries".to_string())))
167 }
168}
169
170#[async_trait::async_trait]
171impl Transport for HttpSseTransport {
172 fn capabilities(&self) -> TransportCapabilities {
173 TransportCapabilities {
174 request_response: true,
175 streaming_notifications: true,
176 }
177 }
178
179 async fn write_raw(&mut self, message: &str) -> Result<(), TransportError> {
180 self.execute_with_retry(message, false).await?;
181 Ok(())
182 }
183
184 async fn request_raw(&mut self, message: &str) -> Result<Option<String>, TransportError> {
185 self.execute_with_retry(message, true).await
186 }
187
188 async fn shutdown(&mut self) -> Result<(), TransportError> {
189 Ok(())
190 }
191}