agentox_core/client/
http_sse.rs1use crate::client::transport::{Transport, TransportCapabilities};
4use crate::error::TransportError;
5use reqwest::header::CONTENT_TYPE;
6use std::time::Duration;
7
8pub struct HttpSseTransport {
9 endpoint: String,
10 client: reqwest::Client,
11 timeout: Duration,
12}
13
14impl HttpSseTransport {
15 pub fn new(endpoint: impl Into<String>, timeout: Duration) -> Self {
16 let timeout = if timeout.is_zero() {
17 Duration::from_secs(30)
18 } else {
19 timeout
20 };
21 let client = reqwest::Client::builder()
22 .timeout(timeout)
23 .build()
24 .unwrap_or_else(|_| reqwest::Client::new());
25 Self {
26 endpoint: endpoint.into(),
27 client,
28 timeout,
29 }
30 }
31
32 async fn post_raw(&self, message: &str) -> Result<reqwest::Response, TransportError> {
33 self.client
34 .post(&self.endpoint)
35 .header(CONTENT_TYPE, "application/json")
36 .body(message.to_string())
37 .send()
38 .await
39 .map_err(|e| {
40 if e.is_timeout() {
41 TransportError::Timeout(self.timeout)
42 } else {
43 TransportError::Http(e.to_string())
44 }
45 })
46 }
47
48 fn parse_sse_first_data(body: &str) -> Option<String> {
49 for line in body.lines() {
50 let line = line.trim();
51 if let Some(data) = line.strip_prefix("data:") {
52 let payload = data.trim();
53 if !payload.is_empty() {
54 return Some(payload.to_string());
55 }
56 }
57 }
58 None
59 }
60}
61
62#[async_trait::async_trait]
63impl Transport for HttpSseTransport {
64 fn capabilities(&self) -> TransportCapabilities {
65 TransportCapabilities {
66 request_response: true,
67 streaming_notifications: true,
68 }
69 }
70
71 async fn write_raw(&mut self, _message: &str) -> Result<(), TransportError> {
72 let resp = self.post_raw(_message).await?;
73 if !resp.status().is_success() {
74 return Err(TransportError::Http(format!(
75 "HTTP status {} for notification",
76 resp.status()
77 )));
78 }
79 Ok(())
80 }
81
82 async fn request_raw(&mut self, message: &str) -> Result<Option<String>, TransportError> {
83 let resp = self.post_raw(message).await?;
84 if !resp.status().is_success() {
85 return Err(TransportError::Http(format!(
86 "HTTP status {} for request",
87 resp.status()
88 )));
89 }
90
91 let content_type = resp
92 .headers()
93 .get(CONTENT_TYPE)
94 .and_then(|v| v.to_str().ok())
95 .unwrap_or("")
96 .to_ascii_lowercase();
97
98 let body = resp
99 .text()
100 .await
101 .map_err(|e| TransportError::Http(e.to_string()))?;
102 if body.trim().is_empty() {
103 return Ok(None);
104 }
105
106 if content_type.contains("text/event-stream") {
107 let payload = Self::parse_sse_first_data(&body).ok_or(TransportError::NoResponse)?;
108 return Ok(Some(payload));
109 }
110
111 Ok(Some(body.trim().to_string()))
112 }
113
114 async fn shutdown(&mut self) -> Result<(), TransportError> {
115 Ok(())
116 }
117}