Skip to main content

agentox_core/client/
http_sse.rs

1//! HTTP/SSE transport beta implementation.
2
3use crate::client::transport::{Transport, TransportCapabilities};
4use crate::error::TransportError;
5use reqwest::header::CONTENT_TYPE;
6use std::time::Duration;
7
8pub struct HttpSseTransport {
9    endpoint: String,
10    client: reqwest::Client,
11    timeout: Duration,
12    max_retries: usize,
13}
14
15impl HttpSseTransport {
16    pub fn new(endpoint: impl Into<String>, timeout: Duration) -> Self {
17        let timeout = if timeout.is_zero() {
18            Duration::from_secs(30)
19        } else {
20            timeout
21        };
22        let client = reqwest::Client::builder()
23            .timeout(timeout)
24            .build()
25            .unwrap_or_else(|_| reqwest::Client::new());
26        Self {
27            endpoint: endpoint.into(),
28            client,
29            timeout,
30            max_retries: 2,
31        }
32    }
33
34    async fn post_raw_once(&self, message: &str) -> Result<reqwest::Response, TransportError> {
35        let resp = self
36            .client
37            .post(&self.endpoint)
38            .header(CONTENT_TYPE, "application/json")
39            .body(message.to_string())
40            .send()
41            .await
42            .map_err(|e| {
43                if e.is_timeout() {
44                    TransportError::Timeout(self.timeout)
45                } else {
46                    TransportError::Http(format!("transport request failed: {e}"))
47                }
48            })?;
49        Ok(resp)
50    }
51
52    fn parse_sse_first_data(body: &str) -> Option<String> {
53        let mut data_lines: Vec<String> = Vec::new();
54        for line in body.lines() {
55            let line = line.trim();
56            if line.is_empty() {
57                if !data_lines.is_empty() {
58                    return Some(data_lines.join("\n"));
59                }
60                continue;
61            }
62            if line.starts_with(':') {
63                continue;
64            }
65            if let Some(data) = line.strip_prefix("data:") {
66                data_lines.push(data.trim().to_string());
67            }
68        }
69        if data_lines.is_empty() {
70            None
71        } else {
72            Some(data_lines.join("\n"))
73        }
74    }
75
76    fn is_retryable_status(status: reqwest::StatusCode) -> bool {
77        matches!(
78            status,
79            reqwest::StatusCode::BAD_GATEWAY
80                | reqwest::StatusCode::SERVICE_UNAVAILABLE
81                | reqwest::StatusCode::GATEWAY_TIMEOUT
82        )
83    }
84
85    fn maybe_retry_delay(attempt: usize) -> Duration {
86        Duration::from_millis(50 * (attempt as u64 + 1))
87    }
88
89    async fn execute_with_retry(
90        &self,
91        message: &str,
92        expect_response_body: bool,
93    ) -> Result<Option<String>, TransportError> {
94        let mut last_err: Option<TransportError> = None;
95
96        for attempt in 0..=self.max_retries {
97            let response = self.post_raw_once(message).await;
98            match response {
99                Ok(resp) => {
100                    let status = resp.status();
101                    if status.is_client_error() {
102                        return Err(TransportError::Http(format!(
103                            "HTTP client error {}",
104                            status.as_u16()
105                        )));
106                    }
107                    if !status.is_success() {
108                        if Self::is_retryable_status(status) && attempt < self.max_retries {
109                            tokio::time::sleep(Self::maybe_retry_delay(attempt)).await;
110                            continue;
111                        }
112                        return Err(TransportError::Http(format!(
113                            "HTTP server error {}",
114                            status.as_u16()
115                        )));
116                    }
117
118                    if !expect_response_body {
119                        return Ok(None);
120                    }
121
122                    let content_type = resp
123                        .headers()
124                        .get(CONTENT_TYPE)
125                        .and_then(|v| v.to_str().ok())
126                        .unwrap_or("")
127                        .to_ascii_lowercase();
128
129                    let body = resp
130                        .text()
131                        .await
132                        .map_err(|e| TransportError::Http(format!("response read failed: {e}")))?;
133                    if body.trim().is_empty() {
134                        return Ok(None);
135                    }
136
137                    let payload = if content_type.contains("text/event-stream") {
138                        Self::parse_sse_first_data(&body).ok_or_else(|| {
139                            TransportError::Http(
140                                "SSE response contained no data event payload".to_string(),
141                            )
142                        })?
143                    } else {
144                        body.trim().to_string()
145                    };
146
147                    serde_json::from_str::<serde_json::Value>(&payload).map_err(|e| {
148                        TransportError::Http(format!("response payload is not valid JSON: {e}"))
149                    })?;
150                    return Ok(Some(payload));
151                }
152                Err(e) => {
153                    let retryable = matches!(e, TransportError::Timeout(_))
154                        || matches!(&e, TransportError::Http(msg) if msg.contains("transport request failed"));
155                    last_err = Some(e);
156                    if retryable && attempt < self.max_retries {
157                        tokio::time::sleep(Self::maybe_retry_delay(attempt)).await;
158                        continue;
159                    }
160                }
161            }
162            break;
163        }
164
165        Err(last_err
166            .unwrap_or_else(|| TransportError::Http("request failed after retries".to_string())))
167    }
168}
169
170#[async_trait::async_trait]
171impl Transport for HttpSseTransport {
172    fn capabilities(&self) -> TransportCapabilities {
173        TransportCapabilities {
174            request_response: true,
175            streaming_notifications: true,
176        }
177    }
178
179    async fn write_raw(&mut self, message: &str) -> Result<(), TransportError> {
180        self.execute_with_retry(message, false).await?;
181        Ok(())
182    }
183
184    async fn request_raw(&mut self, message: &str) -> Result<Option<String>, TransportError> {
185        self.execute_with_retry(message, true).await
186    }
187
188    async fn shutdown(&mut self) -> Result<(), TransportError> {
189        Ok(())
190    }
191}