Skip to main content

synth_ai_core/streaming/
endpoints.rs

1//! Stream endpoint configuration for different job types.
2
3/// Endpoint configuration for streaming from a job.
4#[derive(Debug, Clone)]
5pub struct StreamEndpoints {
6    /// Status endpoint.
7    pub status: Option<String>,
8    /// Events endpoint.
9    pub events: Option<String>,
10    /// Metrics endpoint.
11    pub metrics: Option<String>,
12    /// Timeline endpoint.
13    pub timeline: Option<String>,
14    /// Fallback status endpoints (tried in order if primary fails).
15    pub status_fallbacks: Vec<String>,
16    /// Fallback event endpoints.
17    pub event_fallbacks: Vec<String>,
18    /// Fallback metrics endpoints.
19    pub metric_fallbacks: Vec<String>,
20    /// Fallback timeline endpoints.
21    pub timeline_fallbacks: Vec<String>,
22}
23
24impl StreamEndpoints {
25    /// Create endpoints for a generic learning job.
26    pub fn learning(job_id: &str) -> Self {
27        let base = format!("/learning/jobs/{}", job_id);
28        Self {
29            status: Some(base.clone()),
30            events: Some(format!("{}/events", base)),
31            metrics: Some(format!("{}/metrics", base)),
32            timeline: Some(format!("{}/timeline", base)),
33            status_fallbacks: vec![],
34            event_fallbacks: vec![],
35            metric_fallbacks: vec![],
36            timeline_fallbacks: vec![],
37        }
38    }
39
40    /// Create endpoints for a prompt learning (GEPA) job.
41    pub fn prompt_learning(job_id: &str) -> Self {
42        let base = format!("/prompt-learning/online/jobs/{}", job_id);
43        Self {
44            status: Some(base.clone()),
45            events: Some(format!("{}/events", base)),
46            metrics: Some(format!("{}/metrics", base)),
47            timeline: None,
48            status_fallbacks: vec![
49                format!("/learning/jobs/{}", job_id),
50                format!("/orchestration/jobs/{}", job_id),
51            ],
52            event_fallbacks: vec![format!("/learning/jobs/{}/events", job_id)],
53            metric_fallbacks: vec![],
54            timeline_fallbacks: vec![],
55        }
56    }
57
58    /// Create endpoints for an eval job.
59    pub fn eval(job_id: &str) -> Self {
60        let base = format!("/eval/jobs/{}", job_id);
61        Self {
62            status: Some(base.clone()),
63            events: Some(format!("{}/events", base)),
64            metrics: None,
65            timeline: None,
66            status_fallbacks: vec![],
67            event_fallbacks: vec![],
68            metric_fallbacks: vec![],
69            timeline_fallbacks: vec![],
70        }
71    }
72
73    /// Create endpoints for an RL job.
74    pub fn rl(job_id: &str) -> Self {
75        let base = format!("/rl/jobs/{}", job_id);
76        Self {
77            status: Some(base.clone()),
78            events: Some(format!("{}/events", base)),
79            metrics: Some(format!("{}/metrics", base)),
80            timeline: Some(format!("{}/timeline", base)),
81            status_fallbacks: vec![
82                format!("/learning/jobs/{}", job_id),
83                format!("/orchestration/jobs/{}", job_id),
84            ],
85            event_fallbacks: vec![
86                format!("/learning/jobs/{}/events", job_id),
87                format!("/orchestration/jobs/{}/events", job_id),
88            ],
89            metric_fallbacks: vec![format!("/learning/jobs/{}/metrics", job_id)],
90            timeline_fallbacks: vec![format!("/learning/jobs/{}/timeline", job_id)],
91        }
92    }
93
94    /// Create endpoints for an SFT job.
95    pub fn sft(job_id: &str) -> Self {
96        let base = format!("/sft/jobs/{}", job_id);
97        Self {
98            status: Some(base.clone()),
99            events: Some(format!("{}/events", base)),
100            metrics: Some(format!("{}/metrics", base)),
101            timeline: None,
102            status_fallbacks: vec![],
103            event_fallbacks: vec![],
104            metric_fallbacks: vec![],
105            timeline_fallbacks: vec![],
106        }
107    }
108
109    /// Create endpoints for graph optimization.
110    pub fn graph_optimization(job_id: &str) -> Self {
111        let base = format!("/graphs/optimization/jobs/{}", job_id);
112        Self {
113            status: Some(base.clone()),
114            events: Some(format!("{}/events", base)),
115            metrics: Some(format!("{}/metrics", base)),
116            timeline: None,
117            status_fallbacks: vec![],
118            event_fallbacks: vec![],
119            metric_fallbacks: vec![],
120            timeline_fallbacks: vec![],
121        }
122    }
123
124    /// Create endpoints for graph evolve jobs.
125    pub fn graph_evolve(job_id: &str) -> Self {
126        let base = format!("/graph-evolve/jobs/{}", job_id);
127        Self {
128            status: Some(base.clone()),
129            events: Some(format!("{}/events", base)),
130            metrics: Some(format!("{}/metrics", base)),
131            timeline: None,
132            status_fallbacks: vec![format!("/graphgen/jobs/{}", job_id)],
133            event_fallbacks: vec![format!("/graphgen/jobs/{}/events", job_id)],
134            metric_fallbacks: vec![format!("/graphgen/jobs/{}/metrics", job_id)],
135            timeline_fallbacks: vec![],
136        }
137    }
138
139    /// Legacy alias for graph evolve endpoints.
140    pub fn graphgen(job_id: &str) -> Self {
141        Self::graph_evolve(job_id)
142    }
143
144    /// Create custom endpoints.
145    pub fn custom(
146        status: Option<String>,
147        events: Option<String>,
148        metrics: Option<String>,
149        timeline: Option<String>,
150    ) -> Self {
151        Self {
152            status,
153            events,
154            metrics,
155            timeline,
156            status_fallbacks: vec![],
157            event_fallbacks: vec![],
158            metric_fallbacks: vec![],
159            timeline_fallbacks: vec![],
160        }
161    }
162
163    /// Add a status fallback endpoint.
164    pub fn with_status_fallback(mut self, endpoint: impl Into<String>) -> Self {
165        self.status_fallbacks.push(endpoint.into());
166        self
167    }
168
169    /// Add an event fallback endpoint.
170    pub fn with_event_fallback(mut self, endpoint: impl Into<String>) -> Self {
171        self.event_fallbacks.push(endpoint.into());
172        self
173    }
174
175    /// Add a metrics fallback endpoint.
176    pub fn with_metric_fallback(mut self, endpoint: impl Into<String>) -> Self {
177        self.metric_fallbacks.push(endpoint.into());
178        self
179    }
180
181    /// Add a timeline fallback endpoint.
182    pub fn with_timeline_fallback(mut self, endpoint: impl Into<String>) -> Self {
183        self.timeline_fallbacks.push(endpoint.into());
184        self
185    }
186
187    /// Get the SSE stream URL for events.
188    pub fn events_stream_url(&self) -> Option<String> {
189        self.events.as_ref().map(|e| format!("{}/stream", e))
190    }
191
192    /// Get all status endpoints to try (primary + fallbacks).
193    pub fn all_status_endpoints(&self) -> Vec<&str> {
194        let mut endpoints = Vec::new();
195        if let Some(ref s) = self.status {
196            endpoints.push(s.as_str());
197        }
198        for fallback in &self.status_fallbacks {
199            endpoints.push(fallback.as_str());
200        }
201        endpoints
202    }
203
204    /// Get all event endpoints to try (primary + fallbacks).
205    pub fn all_event_endpoints(&self) -> Vec<&str> {
206        let mut endpoints = Vec::new();
207        if let Some(ref e) = self.events {
208            endpoints.push(e.as_str());
209        }
210        for fallback in &self.event_fallbacks {
211            endpoints.push(fallback.as_str());
212        }
213        endpoints
214    }
215
216    /// Get all metrics endpoints to try (primary + fallbacks).
217    pub fn all_metric_endpoints(&self) -> Vec<&str> {
218        let mut endpoints = Vec::new();
219        if let Some(ref m) = self.metrics {
220            endpoints.push(m.as_str());
221        }
222        for fallback in &self.metric_fallbacks {
223            endpoints.push(fallback.as_str());
224        }
225        endpoints
226    }
227
228    /// Get all timeline endpoints to try (primary + fallbacks).
229    pub fn all_timeline_endpoints(&self) -> Vec<&str> {
230        let mut endpoints = Vec::new();
231        if let Some(ref t) = self.timeline {
232            endpoints.push(t.as_str());
233        }
234        for fallback in &self.timeline_fallbacks {
235            endpoints.push(fallback.as_str());
236        }
237        endpoints
238    }
239}
240
241impl Default for StreamEndpoints {
242    fn default() -> Self {
243        Self {
244            status: None,
245            events: None,
246            metrics: None,
247            timeline: None,
248            status_fallbacks: vec![],
249            event_fallbacks: vec![],
250            metric_fallbacks: vec![],
251            timeline_fallbacks: vec![],
252        }
253    }
254}
255
256#[cfg(test)]
257mod tests {
258    use super::*;
259
260    #[test]
261    fn test_learning_endpoints() {
262        let endpoints = StreamEndpoints::learning("job-123");
263
264        assert_eq!(endpoints.status, Some("/learning/jobs/job-123".to_string()));
265        assert_eq!(
266            endpoints.events,
267            Some("/learning/jobs/job-123/events".to_string())
268        );
269        assert_eq!(
270            endpoints.metrics,
271            Some("/learning/jobs/job-123/metrics".to_string())
272        );
273        assert_eq!(
274            endpoints.timeline,
275            Some("/learning/jobs/job-123/timeline".to_string())
276        );
277    }
278
279    #[test]
280    fn test_prompt_learning_endpoints() {
281        let endpoints = StreamEndpoints::prompt_learning("job-456");
282
283        assert_eq!(
284            endpoints.status,
285            Some("/prompt-learning/online/jobs/job-456".to_string())
286        );
287        assert!(endpoints.timeline.is_none());
288        assert_eq!(endpoints.status_fallbacks.len(), 2);
289    }
290
291    #[test]
292    fn test_eval_endpoints() {
293        let endpoints = StreamEndpoints::eval("eval-789");
294
295        assert_eq!(endpoints.status, Some("/eval/jobs/eval-789".to_string()));
296    }
297
298    #[test]
299    fn test_events_stream_url() {
300        let endpoints = StreamEndpoints::learning("job-123");
301
302        assert_eq!(
303            endpoints.events_stream_url(),
304            Some("/learning/jobs/job-123/events/stream".to_string())
305        );
306    }
307
308    #[test]
309    fn test_all_endpoints() {
310        let endpoints = StreamEndpoints::prompt_learning("job-123");
311
312        let status_endpoints = endpoints.all_status_endpoints();
313        assert_eq!(status_endpoints.len(), 3); // primary + 2 fallbacks
314
315        let event_endpoints = endpoints.all_event_endpoints();
316        assert_eq!(event_endpoints.len(), 2); // primary + 1 fallback
317    }
318
319    #[test]
320    fn test_custom_endpoints() {
321        let endpoints = StreamEndpoints::custom(
322            Some("/custom/status".to_string()),
323            Some("/custom/events".to_string()),
324            None,
325            None,
326        )
327        .with_status_fallback("/fallback/status");
328
329        assert_eq!(endpoints.status, Some("/custom/status".to_string()));
330        assert_eq!(endpoints.status_fallbacks.len(), 1);
331    }
332}