synth_ai_core/orchestration/
streaming.rs1use std::collections::HashSet;
7use std::time::{Duration, Instant};
8
9use serde_json::Value;
10
11use crate::errors::CoreError;
12use crate::http::HttpClient;
13
14use super::events::{EventParser, ParsedEvent};
15
16pub struct EventStream {
18 client: HttpClient,
20 job_id: String,
22 base_url: String,
24 last_seq: i64,
26 deduplicate: bool,
28 seen_seqs: HashSet<i64>,
30 max_events_per_poll: i32,
32}
33
34impl EventStream {
35 pub fn new(client: HttpClient, base_url: &str, job_id: &str) -> Self {
37 Self {
38 client,
39 job_id: job_id.to_string(),
40 base_url: base_url.trim_end_matches('/').to_string(),
41 last_seq: 0,
42 deduplicate: true,
43 seen_seqs: HashSet::new(),
44 max_events_per_poll: 500,
45 }
46 }
47
48 pub fn with_start_seq(mut self, seq: i64) -> Self {
50 self.last_seq = seq;
51 self
52 }
53
54 pub fn with_deduplicate(mut self, dedupe: bool) -> Self {
56 self.deduplicate = dedupe;
57 self
58 }
59
60 pub fn with_max_events(mut self, max: i32) -> Self {
62 self.max_events_per_poll = max;
63 self
64 }
65
66 pub fn last_seq(&self) -> i64 {
68 self.last_seq
69 }
70
71 pub async fn poll_events(&mut self) -> Result<Vec<ParsedEvent>, CoreError> {
75 let url = format!(
76 "{}/api/policy-optimization/online/jobs/{}/events",
77 self.base_url, self.job_id
78 );
79
80 let params = [
81 ("since_seq", self.last_seq.to_string()),
82 ("limit", self.max_events_per_poll.to_string()),
83 ];
84
85 let params_slice: &[(&str, &str)] = &[
86 ("since_seq", ¶ms[0].1),
87 ("limit", ¶ms[1].1),
88 ];
89
90 let response: Value = self
91 .client
92 .get(&url, Some(params_slice))
93 .await
94 .map_err(|e| CoreError::Internal(format!("failed to fetch events: {}", e)))?;
95
96 let events_array = response
98 .get("events")
99 .and_then(|v| v.as_array())
100 .cloned()
101 .unwrap_or_default();
102
103 let mut parsed_events = Vec::new();
104
105 for event_value in events_array {
106 let parsed = EventParser::parse(&event_value);
107
108 if let Some(seq) = parsed.seq {
110 if seq > self.last_seq {
111 self.last_seq = seq;
112 }
113
114 if self.deduplicate {
116 if self.seen_seqs.contains(&seq) {
117 continue;
118 }
119 self.seen_seqs.insert(seq);
120
121 if self.seen_seqs.len() > 10000 {
123 let threshold = self.last_seq - 5000;
125 self.seen_seqs.retain(|&s| s > threshold);
126 }
127 }
128 }
129
130 parsed_events.push(parsed);
131 }
132
133 Ok(parsed_events)
134 }
135
136 pub async fn stream_until<F, T>(
145 &mut self,
146 mut on_event: F,
147 timeout: Duration,
148 poll_interval: Duration,
149 mut is_terminal: T,
150 ) -> Result<(), CoreError>
151 where
152 F: FnMut(&ParsedEvent),
153 T: FnMut() -> bool,
154 {
155 let start = Instant::now();
156 let mut last_event_time = Instant::now();
157
158 loop {
159 if start.elapsed() > timeout {
161 return Err(CoreError::Timeout(format!(
162 "event stream timed out after {:.0} seconds",
163 timeout.as_secs_f64()
164 )));
165 }
166
167 if is_terminal() {
169 return Ok(());
170 }
171
172 match self.poll_events().await {
174 Ok(events) => {
175 if !events.is_empty() {
176 last_event_time = Instant::now();
177 }
178
179 for event in &events {
180 on_event(event);
181
182 if event.category.is_terminal() {
184 return Ok(());
185 }
186 }
187 }
188 Err(e) => {
189 if last_event_time.elapsed() > Duration::from_secs(120) {
192 return Err(e);
193 }
194 }
195 }
196
197 tokio::time::sleep(poll_interval).await;
199 }
200 }
201}
202
203#[derive(Debug, Clone)]
205pub struct StreamConfig {
206 pub poll_interval_secs: f64,
208 pub max_events_per_poll: i32,
210 pub deduplicate: bool,
212 pub timeout_secs: f64,
214}
215
216impl Default for StreamConfig {
217 fn default() -> Self {
218 Self {
219 poll_interval_secs: 5.0,
220 max_events_per_poll: 500,
221 deduplicate: true,
222 timeout_secs: 3600.0,
223 }
224 }
225}
226
227#[cfg(test)]
228mod tests {
229 use super::*;
230
231 #[test]
232 fn test_stream_config_default() {
233 let config = StreamConfig::default();
234 assert_eq!(config.poll_interval_secs, 5.0);
235 assert_eq!(config.max_events_per_poll, 500);
236 assert!(config.deduplicate);
237 }
238}