Skip to main content

synth_ai_core/orchestration/
streaming.rs

1//! Event streaming for optimization jobs.
2//!
3//! This module provides SSE-style event streaming with deduplication
4//! for optimization jobs.
5
6use std::collections::HashSet;
7use std::time::{Duration, Instant};
8
9use serde_json::Value;
10
11use crate::errors::CoreError;
12use crate::http::HttpClient;
13
14use super::events::{EventParser, ParsedEvent};
15
16/// Event stream for polling job events.
17pub struct EventStream {
18    /// HTTP client reference
19    client: HttpClient,
20    /// Job ID to stream events for
21    job_id: String,
22    /// Base URL for API
23    base_url: String,
24    /// Last seen sequence number
25    last_seq: i64,
26    /// Whether to deduplicate events
27    deduplicate: bool,
28    /// Set of seen sequence numbers
29    seen_seqs: HashSet<i64>,
30    /// Maximum events per poll
31    max_events_per_poll: i32,
32}
33
34impl EventStream {
35    /// Create a new event stream for a job.
36    pub fn new(client: HttpClient, base_url: &str, job_id: &str) -> Self {
37        Self {
38            client,
39            job_id: job_id.to_string(),
40            base_url: base_url.trim_end_matches('/').to_string(),
41            last_seq: 0,
42            deduplicate: true,
43            seen_seqs: HashSet::new(),
44            max_events_per_poll: 500,
45        }
46    }
47
48    /// Set the starting sequence number.
49    pub fn with_start_seq(mut self, seq: i64) -> Self {
50        self.last_seq = seq;
51        self
52    }
53
54    /// Enable or disable deduplication.
55    pub fn with_deduplicate(mut self, dedupe: bool) -> Self {
56        self.deduplicate = dedupe;
57        self
58    }
59
60    /// Set max events per poll.
61    pub fn with_max_events(mut self, max: i32) -> Self {
62        self.max_events_per_poll = max;
63        self
64    }
65
66    /// Get the last seen sequence number.
67    pub fn last_seq(&self) -> i64 {
68        self.last_seq
69    }
70
71    /// Poll for new events.
72    ///
73    /// Returns events since the last sequence number.
74    pub async fn poll_events(&mut self) -> Result<Vec<ParsedEvent>, CoreError> {
75        let url = format!(
76            "{}/api/policy-optimization/online/jobs/{}/events",
77            self.base_url, self.job_id
78        );
79
80        let params = [
81            ("since_seq", self.last_seq.to_string()),
82            ("limit", self.max_events_per_poll.to_string()),
83        ];
84
85        let params_slice: &[(&str, &str)] = &[
86            ("since_seq", &params[0].1),
87            ("limit", &params[1].1),
88        ];
89
90        let response: Value = self
91            .client
92            .get(&url, Some(params_slice))
93            .await
94            .map_err(|e| CoreError::Internal(format!("failed to fetch events: {}", e)))?;
95
96        // Parse events array
97        let events_array = response
98            .get("events")
99            .and_then(|v| v.as_array())
100            .cloned()
101            .unwrap_or_default();
102
103        let mut parsed_events = Vec::new();
104
105        for event_value in events_array {
106            let parsed = EventParser::parse(&event_value);
107
108            // Update last_seq
109            if let Some(seq) = parsed.seq {
110                if seq > self.last_seq {
111                    self.last_seq = seq;
112                }
113
114                // Deduplication
115                if self.deduplicate {
116                    if self.seen_seqs.contains(&seq) {
117                        continue;
118                    }
119                    self.seen_seqs.insert(seq);
120
121                    // Limit seen_seqs size to prevent memory growth
122                    if self.seen_seqs.len() > 10000 {
123                        // Keep only recent sequences
124                        let threshold = self.last_seq - 5000;
125                        self.seen_seqs.retain(|&s| s > threshold);
126                    }
127                }
128            }
129
130            parsed_events.push(parsed);
131        }
132
133        Ok(parsed_events)
134    }
135
136    /// Stream events until a terminal condition with callback.
137    ///
138    /// # Arguments
139    ///
140    /// * `on_event` - Callback for each event
141    /// * `timeout` - Maximum time to stream
142    /// * `poll_interval` - Time between polls
143    /// * `is_terminal` - Optional check for terminal status
144    pub async fn stream_until<F, T>(
145        &mut self,
146        mut on_event: F,
147        timeout: Duration,
148        poll_interval: Duration,
149        mut is_terminal: T,
150    ) -> Result<(), CoreError>
151    where
152        F: FnMut(&ParsedEvent),
153        T: FnMut() -> bool,
154    {
155        let start = Instant::now();
156        let mut last_event_time = Instant::now();
157
158        loop {
159            // Check timeout
160            if start.elapsed() > timeout {
161                return Err(CoreError::Timeout(format!(
162                    "event stream timed out after {:.0} seconds",
163                    timeout.as_secs_f64()
164                )));
165            }
166
167            // Check terminal condition
168            if is_terminal() {
169                return Ok(());
170            }
171
172            // Poll events
173            match self.poll_events().await {
174                Ok(events) => {
175                    if !events.is_empty() {
176                        last_event_time = Instant::now();
177                    }
178
179                    for event in &events {
180                        on_event(event);
181
182                        // Check for terminal events
183                        if event.category.is_terminal() {
184                            return Ok(());
185                        }
186                    }
187                }
188                Err(e) => {
189                    // Log error but continue streaming
190                    // Allow some grace period for transient errors
191                    if last_event_time.elapsed() > Duration::from_secs(120) {
192                        return Err(e);
193                    }
194                }
195            }
196
197            // Wait before next poll
198            tokio::time::sleep(poll_interval).await;
199        }
200    }
201}
202
203/// Stream configuration.
204#[derive(Debug, Clone)]
205pub struct StreamConfig {
206    /// Poll interval in seconds
207    pub poll_interval_secs: f64,
208    /// Maximum events per poll
209    pub max_events_per_poll: i32,
210    /// Whether to deduplicate events
211    pub deduplicate: bool,
212    /// Timeout in seconds
213    pub timeout_secs: f64,
214}
215
216impl Default for StreamConfig {
217    fn default() -> Self {
218        Self {
219            poll_interval_secs: 5.0,
220            max_events_per_poll: 500,
221            deduplicate: true,
222            timeout_secs: 3600.0,
223        }
224    }
225}
226
227#[cfg(test)]
228mod tests {
229    use super::*;
230
231    #[test]
232    fn test_stream_config_default() {
233        let config = StreamConfig::default();
234        assert_eq!(config.poll_interval_secs, 5.0);
235        assert_eq!(config.max_events_per_poll, 500);
236        assert!(config.deduplicate);
237    }
238}