Skip to main content

agentox_core/client/
http_sse.rs

1//! HTTP/SSE transport beta implementation.
2
3use crate::client::transport::{Transport, TransportCapabilities};
4use crate::error::TransportError;
5use reqwest::header::CONTENT_TYPE;
6use std::time::Duration;
7
8pub struct HttpSseTransport {
9    endpoint: String,
10    client: reqwest::Client,
11    timeout: Duration,
12}
13
14impl HttpSseTransport {
15    pub fn new(endpoint: impl Into<String>, timeout: Duration) -> Self {
16        let timeout = if timeout.is_zero() {
17            Duration::from_secs(30)
18        } else {
19            timeout
20        };
21        let client = reqwest::Client::builder()
22            .timeout(timeout)
23            .build()
24            .unwrap_or_else(|_| reqwest::Client::new());
25        Self {
26            endpoint: endpoint.into(),
27            client,
28            timeout,
29        }
30    }
31
32    async fn post_raw(&self, message: &str) -> Result<reqwest::Response, TransportError> {
33        self.client
34            .post(&self.endpoint)
35            .header(CONTENT_TYPE, "application/json")
36            .body(message.to_string())
37            .send()
38            .await
39            .map_err(|e| {
40                if e.is_timeout() {
41                    TransportError::Timeout(self.timeout)
42                } else {
43                    TransportError::Http(e.to_string())
44                }
45            })
46    }
47
48    fn parse_sse_first_data(body: &str) -> Option<String> {
49        for line in body.lines() {
50            let line = line.trim();
51            if let Some(data) = line.strip_prefix("data:") {
52                let payload = data.trim();
53                if !payload.is_empty() {
54                    return Some(payload.to_string());
55                }
56            }
57        }
58        None
59    }
60}
61
62#[async_trait::async_trait]
63impl Transport for HttpSseTransport {
64    fn capabilities(&self) -> TransportCapabilities {
65        TransportCapabilities {
66            request_response: true,
67            streaming_notifications: true,
68        }
69    }
70
71    async fn write_raw(&mut self, _message: &str) -> Result<(), TransportError> {
72        let resp = self.post_raw(_message).await?;
73        if !resp.status().is_success() {
74            return Err(TransportError::Http(format!(
75                "HTTP status {} for notification",
76                resp.status()
77            )));
78        }
79        Ok(())
80    }
81
82    async fn request_raw(&mut self, message: &str) -> Result<Option<String>, TransportError> {
83        let resp = self.post_raw(message).await?;
84        if !resp.status().is_success() {
85            return Err(TransportError::Http(format!(
86                "HTTP status {} for request",
87                resp.status()
88            )));
89        }
90
91        let content_type = resp
92            .headers()
93            .get(CONTENT_TYPE)
94            .and_then(|v| v.to_str().ok())
95            .unwrap_or("")
96            .to_ascii_lowercase();
97
98        let body = resp
99            .text()
100            .await
101            .map_err(|e| TransportError::Http(e.to_string()))?;
102        if body.trim().is_empty() {
103            return Ok(None);
104        }
105
106        if content_type.contains("text/event-stream") {
107            let payload = Self::parse_sse_first_data(&body).ok_or(TransportError::NoResponse)?;
108            return Ok(Some(payload));
109        }
110
111        Ok(Some(body.trim().to_string()))
112    }
113
114    async fn shutdown(&mut self) -> Result<(), TransportError> {
115        Ok(())
116    }
117}