Skip to main content

synth_ai_core/streaming/
endpoints.rs

1//! Stream endpoint configuration for different job types.
2
3/// Endpoint configuration for streaming from a job.
4#[derive(Debug, Clone)]
5pub struct StreamEndpoints {
6    /// Status endpoint.
7    pub status: Option<String>,
8    /// Events endpoint.
9    pub events: Option<String>,
10    /// Metrics endpoint.
11    pub metrics: Option<String>,
12    /// Timeline endpoint.
13    pub timeline: Option<String>,
14    /// Fallback status endpoints (tried in order if primary fails).
15    pub status_fallbacks: Vec<String>,
16    /// Fallback event endpoints.
17    pub event_fallbacks: Vec<String>,
18}
19
20impl StreamEndpoints {
21    /// Create endpoints for a generic learning job.
22    pub fn learning(job_id: &str) -> Self {
23        let base = format!("/learning/jobs/{}", job_id);
24        Self {
25            status: Some(base.clone()),
26            events: Some(format!("{}/events", base)),
27            metrics: Some(format!("{}/metrics", base)),
28            timeline: Some(format!("{}/timeline", base)),
29            status_fallbacks: vec![],
30            event_fallbacks: vec![],
31        }
32    }
33
34    /// Create endpoints for a prompt learning (GEPA) job.
35    pub fn prompt_learning(job_id: &str) -> Self {
36        let base = format!("/policy-optimization/online/jobs/{}", job_id);
37        Self {
38            status: Some(base.clone()),
39            events: Some(format!("{}/events", base)),
40            metrics: Some(format!("{}/metrics", base)),
41            timeline: None,
42            status_fallbacks: vec![
43                format!("/learning/jobs/{}", job_id),
44                format!("/orchestration/jobs/{}", job_id),
45            ],
46            event_fallbacks: vec![format!("/learning/jobs/{}/events", job_id)],
47        }
48    }
49
50    /// Create endpoints for an eval job.
51    pub fn eval(job_id: &str) -> Self {
52        let base = format!("/eval/jobs/{}", job_id);
53        Self {
54            status: Some(base.clone()),
55            events: Some(format!("{}/events", base)),
56            metrics: Some(format!("{}/metrics", base)),
57            timeline: None,
58            status_fallbacks: vec![],
59            event_fallbacks: vec![],
60        }
61    }
62
63    /// Create endpoints for an SFT job.
64    pub fn sft(job_id: &str) -> Self {
65        let base = format!("/sft/jobs/{}", job_id);
66        Self {
67            status: Some(base.clone()),
68            events: Some(format!("{}/events", base)),
69            metrics: Some(format!("{}/metrics", base)),
70            timeline: None,
71            status_fallbacks: vec![],
72            event_fallbacks: vec![],
73        }
74    }
75
76    /// Create endpoints for graph optimization.
77    pub fn graph_optimization(job_id: &str) -> Self {
78        let base = format!("/graphs/optimization/jobs/{}", job_id);
79        Self {
80            status: Some(base.clone()),
81            events: Some(format!("{}/events", base)),
82            metrics: Some(format!("{}/metrics", base)),
83            timeline: None,
84            status_fallbacks: vec![],
85            event_fallbacks: vec![],
86        }
87    }
88
89    /// Create custom endpoints.
90    pub fn custom(
91        status: Option<String>,
92        events: Option<String>,
93        metrics: Option<String>,
94        timeline: Option<String>,
95    ) -> Self {
96        Self {
97            status,
98            events,
99            metrics,
100            timeline,
101            status_fallbacks: vec![],
102            event_fallbacks: vec![],
103        }
104    }
105
106    /// Add a status fallback endpoint.
107    pub fn with_status_fallback(mut self, endpoint: impl Into<String>) -> Self {
108        self.status_fallbacks.push(endpoint.into());
109        self
110    }
111
112    /// Add an event fallback endpoint.
113    pub fn with_event_fallback(mut self, endpoint: impl Into<String>) -> Self {
114        self.event_fallbacks.push(endpoint.into());
115        self
116    }
117
118    /// Get the SSE stream URL for events.
119    pub fn events_stream_url(&self) -> Option<String> {
120        self.events.as_ref().map(|e| format!("{}/stream", e))
121    }
122
123    /// Get all status endpoints to try (primary + fallbacks).
124    pub fn all_status_endpoints(&self) -> Vec<&str> {
125        let mut endpoints = Vec::new();
126        if let Some(ref s) = self.status {
127            endpoints.push(s.as_str());
128        }
129        for fallback in &self.status_fallbacks {
130            endpoints.push(fallback.as_str());
131        }
132        endpoints
133    }
134
135    /// Get all event endpoints to try (primary + fallbacks).
136    pub fn all_event_endpoints(&self) -> Vec<&str> {
137        let mut endpoints = Vec::new();
138        if let Some(ref e) = self.events {
139            endpoints.push(e.as_str());
140        }
141        for fallback in &self.event_fallbacks {
142            endpoints.push(fallback.as_str());
143        }
144        endpoints
145    }
146}
147
148impl Default for StreamEndpoints {
149    fn default() -> Self {
150        Self {
151            status: None,
152            events: None,
153            metrics: None,
154            timeline: None,
155            status_fallbacks: vec![],
156            event_fallbacks: vec![],
157        }
158    }
159}
160
161#[cfg(test)]
162mod tests {
163    use super::*;
164
165    #[test]
166    fn test_learning_endpoints() {
167        let endpoints = StreamEndpoints::learning("job-123");
168
169        assert_eq!(endpoints.status, Some("/learning/jobs/job-123".to_string()));
170        assert_eq!(
171            endpoints.events,
172            Some("/learning/jobs/job-123/events".to_string())
173        );
174        assert_eq!(
175            endpoints.metrics,
176            Some("/learning/jobs/job-123/metrics".to_string())
177        );
178        assert_eq!(
179            endpoints.timeline,
180            Some("/learning/jobs/job-123/timeline".to_string())
181        );
182    }
183
184    #[test]
185    fn test_prompt_learning_endpoints() {
186        let endpoints = StreamEndpoints::prompt_learning("job-456");
187
188        assert_eq!(
189            endpoints.status,
190            Some("/policy-optimization/online/jobs/job-456".to_string())
191        );
192        assert!(endpoints.timeline.is_none());
193        assert_eq!(endpoints.status_fallbacks.len(), 2);
194    }
195
196    #[test]
197    fn test_eval_endpoints() {
198        let endpoints = StreamEndpoints::eval("eval-789");
199
200        assert_eq!(
201            endpoints.status,
202            Some("/eval/jobs/eval-789".to_string())
203        );
204    }
205
206    #[test]
207    fn test_events_stream_url() {
208        let endpoints = StreamEndpoints::learning("job-123");
209
210        assert_eq!(
211            endpoints.events_stream_url(),
212            Some("/learning/jobs/job-123/events/stream".to_string())
213        );
214    }
215
216    #[test]
217    fn test_all_endpoints() {
218        let endpoints = StreamEndpoints::prompt_learning("job-123");
219
220        let status_endpoints = endpoints.all_status_endpoints();
221        assert_eq!(status_endpoints.len(), 3); // primary + 2 fallbacks
222
223        let event_endpoints = endpoints.all_event_endpoints();
224        assert_eq!(event_endpoints.len(), 2); // primary + 1 fallback
225    }
226
227    #[test]
228    fn test_custom_endpoints() {
229        let endpoints = StreamEndpoints::custom(
230            Some("/custom/status".to_string()),
231            Some("/custom/events".to_string()),
232            None,
233            None,
234        )
235        .with_status_fallback("/fallback/status");
236
237        assert_eq!(endpoints.status, Some("/custom/status".to_string()));
238        assert_eq!(endpoints.status_fallbacks.len(), 1);
239    }
240}