Skip to main content

synth_ai_core/
events.rs

1use serde_json::Value;
2use std::time::Duration;
3
4use crate::config::{BackendAuth, CoreConfig};
5use crate::shared_client::{DEFAULT_CONNECT_TIMEOUT_SECS, DEFAULT_POOL_SIZE};
6use crate::CoreError;
7use synth_ai_core_types::{CoreEvent, EventPollResponse};
8
9#[derive(Debug, Clone)]
10pub enum EventKind {
11    PromptLearning,
12    Eval,
13}
14
15impl EventKind {
16    pub fn from_str(kind: &str) -> Option<Self> {
17        match kind {
18            "prompt_learning" | "prompt-learning" | "promptlearning" => {
19                Some(EventKind::PromptLearning)
20            }
21            "eval" | "evaluation" => Some(EventKind::Eval),
22            _ => None,
23        }
24    }
25
26    fn path(&self, job_id: &str) -> String {
27        match self {
28            EventKind::PromptLearning => {
29                format!("/api/prompt-learning/online/jobs/{job_id}/events")
30            }
31            EventKind::Eval => format!("/api/eval/jobs/{job_id}/events"),
32        }
33    }
34}
35
36fn event_seq(value: &Value) -> Option<i64> {
37    value
38        .get("seq")
39        .and_then(|v| v.as_i64())
40        .or_else(|| value.get("sequence").and_then(|v| v.as_i64()))
41}
42
43fn event_type(value: &Value) -> String {
44    value
45        .get("type")
46        .and_then(|v| v.as_str())
47        .or_else(|| value.get("event_type").and_then(|v| v.as_str()))
48        .unwrap_or("unknown")
49        .to_string()
50}
51
52fn event_message(value: &Value) -> Option<String> {
53    value
54        .get("message")
55        .and_then(|v| v.as_str())
56        .map(|s| s.to_string())
57}
58
59fn event_ts(value: &Value) -> Option<String> {
60    value
61        .get("ts")
62        .and_then(|v| v.as_str())
63        .or_else(|| value.get("timestamp").and_then(|v| v.as_str()))
64        .or_else(|| value.get("created_at").and_then(|v| v.as_str()))
65        .map(|s| s.to_string())
66}
67
68fn event_payload(value: &Value) -> Value {
69    value.get("data").cloned().unwrap_or_else(|| value.clone())
70}
71
72fn to_core_event(value: &Value) -> CoreEvent {
73    CoreEvent {
74        seq: event_seq(value).unwrap_or(-1),
75        event_type: event_type(value),
76        message: event_message(value),
77        data_json: event_payload(value),
78        ts: event_ts(value),
79    }
80}
81
82fn extract_events(payload: &Value) -> Vec<Value> {
83    if let Some(list) = payload.as_array() {
84        return list.clone();
85    }
86    if let Some(events) = payload.get("events").and_then(|v| v.as_array()) {
87        return events.clone();
88    }
89    if let Some(events) = payload.get("data").and_then(|v| v.as_array()) {
90        return events.clone();
91    }
92    Vec::new()
93}
94
95fn extract_next_seq(payload: &Value, events: &[CoreEvent]) -> Option<i64> {
96    if let Some(next) = payload.get("next_seq").and_then(|v| v.as_i64()) {
97        return Some(next);
98    }
99    if let Some(max_seq) = events
100        .iter()
101        .filter_map(|e| (e.seq >= 0).then_some(e.seq))
102        .max()
103    {
104        return Some(max_seq + 1);
105    }
106    None
107}
108
109/// Poll events once for a job.
110pub async fn poll_events(
111    kind: EventKind,
112    job_id: &str,
113    config: &CoreConfig,
114    since_seq: Option<i64>,
115    limit: Option<usize>,
116) -> Result<EventPollResponse, CoreError> {
117    let client = reqwest::Client::builder()
118        .timeout(std::time::Duration::from_millis(config.timeout_ms))
119        .user_agent(config.user_agent.clone())
120        .pool_max_idle_per_host(DEFAULT_POOL_SIZE)
121        .pool_idle_timeout(Duration::from_secs(90))
122        .connect_timeout(Duration::from_secs(DEFAULT_CONNECT_TIMEOUT_SECS))
123        .tcp_keepalive(Duration::from_secs(60))
124        .tcp_nodelay(true)
125        .build()?;
126
127    let base = config.backend_base_url.trim_end_matches('/');
128    let mut url = format!("{base}{}", kind.path(job_id));
129
130    let mut params: Vec<(String, String)> = Vec::new();
131    if let Some(since) = since_seq {
132        params.push(("since_seq".to_string(), since.to_string()));
133    }
134    if let Some(limit) = limit {
135        params.push(("limit".to_string(), limit.to_string()));
136    }
137    if !params.is_empty() {
138        let query = serde_urlencoded::to_string(params)
139            .map_err(|e| CoreError::Protocol(format!("query encode error: {e}")))?;
140        url = format!("{url}?{query}");
141    }
142
143    let mut req = client.get(url);
144    if let Some(api_key) = &config.api_key {
145        req = match config.auth {
146            BackendAuth::XApiKey => req.header("X-API-Key", api_key),
147            BackendAuth::Bearer => req.header("Authorization", format!("Bearer {api_key}")),
148        };
149    }
150
151    let resp = req.send().await?;
152    let payload: Value = resp.json().await?;
153
154    let raw_events = extract_events(&payload);
155    let mut events: Vec<CoreEvent> = raw_events.iter().map(to_core_event).collect();
156
157    if let Some(since) = since_seq {
158        events.retain(|e| e.seq < 0 || e.seq > since);
159    }
160
161    let next_seq = extract_next_seq(&payload, &events);
162    let has_more = payload.get("has_more").and_then(|v| v.as_bool());
163
164    Ok(EventPollResponse {
165        events,
166        next_seq,
167        has_more,
168    })
169}