1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
// File: rswarm/src/stream.rs
use async_stream::try_stream;
use futures_util::{stream::Stream, StreamExt};
use reqwest::Client;
use serde_json::{json, Value};
use crate::error::{SwarmError, SwarmResult};
use crate::types::{
Agent, ApiKey, ContextVariables, FunctionCall, Instructions, Message, MessageRole,
};
use crate::util::{debug_print, function_to_json};
/// Streamer provides a streaming–based API to receive agent responses incrementally.
pub struct Streamer {
client: Client,
api_key: ApiKey,
api_url: String,
}
impl Streamer {
/// Create a new Streamer instance using the provided HTTP Client, API key, and API URL.
pub fn new(client: Client, api_key: ApiKey, api_url: String) -> Self {
Self {
client,
api_key,
api_url,
}
}
/// Begins a streaming chat completion request.
///
/// The returned stream yields individual messages (using a JSON structure
/// defined by ChatCompletionResponse) as soon as they are available.
pub fn stream_chat(
&self,
agent: &Agent,
history: &[Message],
context_variables: &ContextVariables,
model_override: Option<String>,
debug: bool,
) -> impl Stream<Item = SwarmResult<Message>> {
// Clone values to use in the async block.
let client = self.client.clone();
let api_key = self.api_key.clone();
let model = model_override.unwrap_or_else(|| match &agent.instructions {
Instructions::Text(_text) => agent.model.clone(),
Instructions::Function(_func) => agent.model.clone(),
});
debug_print(debug, &format!("stream called with debug={:?}", debug));
let history_vec = history.to_vec();
let system_instructions = match &agent.instructions {
Instructions::Text(text) => text.clone(),
Instructions::Function(func) => func(context_variables.clone()),
};
// Pre-compute fallible values so ? can be used inside try_stream!
let functions_result: SwarmResult<Vec<Value>> =
agent.functions.iter().map(function_to_json).collect();
let function_call_json: Option<Value> =
agent.function_call().to_wire_value().map(|s| json!(s));
let api_url = self.api_url.clone();
// Use try_stream to create a stream that can yield items and errors.
try_stream! {
// Build messages, propagating any construction errors.
let system_msg = Message::system(system_instructions)?;
let mut messages = vec![system_msg];
messages.extend_from_slice(&history_vec);
// Build functions list, propagating serialization errors.
let functions = functions_result?;
// Build the request payload.
let mut request_body = json!({
"model": model,
"messages": messages,
"stream": true,
});
if !functions.is_empty() {
request_body["functions"] = Value::Array(functions);
}
if let Some(fc) = function_call_json {
request_body["function_call"] = fc;
}
// Send POST request.
let response = client
.post(api_url)
.bearer_auth(api_key.as_str())
.json(&request_body)
.send()
.await
.map_err(|e| SwarmError::NetworkError(e.to_string()))?;
// Ensure the HTTP status is successful without consuming response.
response.error_for_status_ref()
.map_err(|e| SwarmError::ApiError(e.to_string()))?;
// Now get the streaming body.
let mut byte_stream = response.bytes_stream();
// Line buffer: TCP chunks can split SSE `data:` lines across boundaries.
let mut line_buf = String::new();
'sse: while let Some(chunk_result) = byte_stream.next().await {
match chunk_result {
Ok(chunk) => {
line_buf.push_str(&String::from_utf8_lossy(&chunk));
// Process every complete line (terminated by \n).
while let Some(newline_pos) = line_buf.find('\n') {
let line = line_buf[..newline_pos]
.trim_end_matches('\r')
.to_string();
line_buf.drain(..=newline_pos);
if let Some(json_str) = line.strip_prefix("data: ") {
let json_str = json_str.trim();
if json_str == "[DONE]" {
break 'sse;
}
// Parse as raw Value to avoid validation failures
// on empty-content delta messages.
let chunk_val: Value = serde_json::from_str(json_str)
.map_err(|e| SwarmError::DeserializationError(e.to_string()))?;
if let Some(choices) = chunk_val["choices"].as_array() {
for choice in choices {
// Real OpenAI SSE uses "delta"; non-streaming uses
// "message". Support both for test/compat.
let null = Value::Null;
let source: &Value = choice
.get("delta")
.or_else(|| choice.get("message"))
.unwrap_or(&null);
let content = source["content"]
.as_str()
.filter(|s| !s.is_empty())
.map(str::to_owned);
let fc_val = source.get("function_call").cloned();
// Only yield when there is actual payload.
if content.is_some() || fc_val.is_some() {
let fc = fc_val.map(|v| {
FunctionCall::from_parts_unchecked(
v["name"]
.as_str()
.unwrap_or("")
.to_string(),
v["arguments"]
.as_str()
.unwrap_or("")
.to_string(),
)
});
yield Message::from_parts_unchecked(
MessageRole::Assistant,
content,
None,
fc,
);
}
}
}
}
}
}
Err(e) => Err(SwarmError::StreamError(e.to_string()))?,
}
}
}
}
}