use std::time::{Duration, Instant};
use reqwest::Response;
use serde_json::Value;
use crate::error::AgentError;
use crate::providers::http::adapter::HttpAgentAdapter;
#[derive(Debug, Clone)]
pub enum SseDelta {
Text(String),
ToolCallDelta {
index: usize,
id: Option<String>,
name: Option<String>,
args_fragment: String,
},
Usage {
input_tokens: u64,
output_tokens: u64,
},
StructuredValue(Value),
Done,
}
pub async fn collect_sse_stream<A: HttpAgentAdapter>(
adapter: &A,
response: Response,
timeout: Duration,
) -> Result<Vec<SseDelta>, AgentError> {
let mut deltas: Vec<SseDelta> = Vec::new();
let mut buffer = String::new();
let deadline = Instant::now() + timeout;
let mut stream = response;
loop {
let remaining = deadline.saturating_duration_since(Instant::now());
if remaining.is_zero() {
return Err(AgentError::Timeout { limit: timeout });
}
let chunk_result = tokio::time::timeout(remaining, stream.chunk()).await;
match chunk_result {
Ok(Ok(Some(chunk))) => {
buffer.push_str(&String::from_utf8_lossy(&chunk));
while let Some(newline_pos) = buffer.find('\n') {
let line = buffer[..newline_pos].trim_end_matches('\r').to_string();
buffer.drain(..=newline_pos);
if let Some(data) = line.strip_prefix("data: ")
&& let Some(delta) = adapter.parse_sse_line(data)
{
let is_done = matches!(delta, SseDelta::Done);
deltas.push(delta);
if is_done {
return Ok(deltas);
}
}
}
}
Ok(Ok(None)) => break,
Ok(Err(e)) => {
return Err(AgentError::HttpProvider {
provider: adapter.provider_name().to_string(),
status_code: 0,
message: format!("SSE stream read error: {e}"),
});
}
Err(_) => {
return Err(AgentError::Timeout { limit: timeout });
}
}
}
Ok(deltas)
}