Skip to main content

synth_ai_core/orchestration/
events.rs

1//! Event types and parsing for optimization job events.
2//!
3//! This module provides event parsing and categorization for SSE events
4//! from optimization jobs.
5
6use serde::{Deserialize, Serialize};
7use serde_json::Value;
8use std::collections::HashMap;
9
10/// Event category for classification.
11#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
12#[serde(rename_all = "snake_case")]
13pub enum EventCategory {
14    /// Baseline evaluation event
15    Baseline,
16    /// Candidate evaluation event
17    Candidate,
18    /// Pareto frontier update
19    Frontier,
20    /// Progress update (rollouts, trials)
21    Progress,
22    /// Generation complete
23    Generation,
24    /// Throughput metrics
25    Throughput,
26    /// Early termination triggered
27    Termination,
28    /// Job complete
29    Complete,
30    /// Validation phase event
31    Validation,
32    /// Usage/billing event
33    Usage,
34    /// Unknown event type
35    Unknown,
36}
37
38/// Terminal status derived from terminal events.
39#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
40pub enum TerminalStatus {
41    Succeeded,
42    Failed,
43    Cancelled,
44    Paused,
45}
46
47/// Parsed event path segments for logging/debugging.
48#[derive(Debug, Clone, Default)]
49pub struct EventPath {
50    pub entity: Option<String>,
51    pub action: Option<String>,
52    pub algorithm: Option<String>,
53    pub detail: Option<String>,
54}
55
56impl EventCategory {
57    /// Check if this is a terminal event category.
58    pub fn is_terminal(&self) -> bool {
59        matches!(self, EventCategory::Complete | EventCategory::Termination)
60    }
61
62    /// Get the category as a string.
63    pub fn as_str(&self) -> &'static str {
64        match self {
65            EventCategory::Baseline => "baseline",
66            EventCategory::Candidate => "candidate",
67            EventCategory::Frontier => "frontier",
68            EventCategory::Progress => "progress",
69            EventCategory::Generation => "generation",
70            EventCategory::Throughput => "throughput",
71            EventCategory::Termination => "termination",
72            EventCategory::Complete => "complete",
73            EventCategory::Validation => "validation",
74            EventCategory::Usage => "usage",
75            EventCategory::Unknown => "unknown",
76        }
77    }
78}
79
80/// Parsed event with category and typed data.
81#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct ParsedEvent {
83    /// Original event type string
84    pub event_type: String,
85    /// Classified category
86    pub category: EventCategory,
87    /// Event data payload
88    pub data: Value,
89    /// Sequence number for ordering
90    #[serde(default)]
91    pub seq: Option<i64>,
92    /// Timestamp in milliseconds
93    #[serde(default)]
94    pub timestamp_ms: Option<i64>,
95}
96
97/// Baseline evaluation event data.
98#[derive(Debug, Clone, Default, Serialize, Deserialize)]
99pub struct BaselineEvent {
100    /// Baseline reward
101    #[serde(default, alias = "accuracy")]
102    pub reward: Option<f64>,
103    /// Objective scores
104    #[serde(default)]
105    pub objectives: Option<HashMap<String, f64>>,
106    /// Per-instance rewards
107    #[serde(default, alias = "instance_scores")]
108    pub instance_rewards: Option<Vec<f64>>,
109    /// Per-instance objectives
110    #[serde(default)]
111    pub instance_objectives: Option<Vec<HashMap<String, f64>>>,
112    /// Prompt configuration
113    #[serde(default)]
114    pub prompt: Option<Value>,
115}
116
117/// Candidate evaluation event data.
118#[derive(Debug, Clone, Default, Serialize, Deserialize)]
119pub struct CandidateEvent {
120    /// Candidate ID
121    #[serde(default)]
122    pub candidate_id: String,
123    /// Candidate reward
124    #[serde(default, alias = "accuracy")]
125    pub reward: Option<f64>,
126    /// Objective scores
127    #[serde(default)]
128    pub objectives: Option<HashMap<String, f64>>,
129    /// Whether candidate was accepted
130    #[serde(default)]
131    pub accepted: bool,
132    /// Generation number
133    #[serde(default)]
134    pub generation: Option<i32>,
135    /// Parent candidate ID
136    #[serde(default)]
137    pub parent_id: Option<String>,
138    /// Whether on Pareto frontier
139    #[serde(default)]
140    pub is_pareto: bool,
141    /// Mutation type used
142    #[serde(default)]
143    pub mutation_type: Option<String>,
144    /// Per-instance rewards
145    #[serde(default, alias = "instance_scores")]
146    pub instance_rewards: Option<Vec<f64>>,
147    /// Per-instance objectives
148    #[serde(default)]
149    pub instance_objectives: Option<Vec<HashMap<String, f64>>>,
150}
151
152/// Frontier update event data.
153#[derive(Debug, Clone, Default, Serialize, Deserialize)]
154pub struct FrontierEvent {
155    /// Current frontier candidate IDs
156    #[serde(default)]
157    pub frontier: Vec<String>,
158    /// Candidates added to frontier
159    #[serde(default)]
160    pub added: Vec<String>,
161    /// Candidates removed from frontier
162    #[serde(default)]
163    pub removed: Vec<String>,
164    /// Frontier size
165    #[serde(default)]
166    pub frontier_size: i32,
167    /// Best reward on frontier
168    #[serde(default, alias = "best_score")]
169    pub best_reward: Option<f64>,
170    /// Rewards by candidate ID
171    #[serde(default, alias = "frontier_scores")]
172    pub frontier_rewards: Option<HashMap<String, f64>>,
173    /// Objective scores by candidate (if provided)
174    #[serde(default)]
175    pub frontier_objectives: Option<Vec<HashMap<String, f64>>>,
176}
177
178/// Progress event data.
179#[derive(Debug, Clone, Default, Serialize, Deserialize)]
180pub struct ProgressEvent {
181    /// Rollouts completed
182    #[serde(default)]
183    pub rollouts_completed: i32,
184    /// Total rollouts planned
185    #[serde(default)]
186    pub rollouts_total: Option<i32>,
187    /// Trials completed (MIPRO)
188    #[serde(default)]
189    pub trials_completed: i32,
190    /// Current best reward
191    #[serde(default, alias = "best_score")]
192    pub best_reward: Option<f64>,
193    /// Baseline reward for comparison
194    #[serde(default, alias = "baseline_score")]
195    pub baseline_reward: Option<f64>,
196}
197
198/// Generation complete event data.
199#[derive(Debug, Clone, Default, Serialize, Deserialize)]
200pub struct GenerationEvent {
201    /// Generation number
202    #[serde(default)]
203    pub generation: i32,
204    /// Best reward in this generation
205    #[serde(default, alias = "best_accuracy")]
206    pub best_reward: f64,
207    /// Candidates proposed
208    #[serde(default)]
209    pub candidates_proposed: i32,
210    /// Candidates accepted
211    #[serde(default)]
212    pub candidates_accepted: i32,
213}
214
215/// Job complete event data.
216#[derive(Debug, Clone, Default, Serialize, Deserialize)]
217pub struct CompleteEvent {
218    /// Final best reward
219    #[serde(default, alias = "best_score")]
220    pub best_reward: Option<f64>,
221    /// Baseline reward
222    #[serde(default, alias = "baseline_score")]
223    pub baseline_reward: Option<f64>,
224    /// Finish reason
225    #[serde(default)]
226    pub finish_reason: Option<String>,
227    /// Total candidates evaluated
228    #[serde(default)]
229    pub total_candidates: i32,
230}
231
232/// Termination event data.
233#[derive(Debug, Clone, Default, Serialize, Deserialize)]
234pub struct TerminationEvent {
235    /// Termination reason
236    #[serde(default)]
237    pub reason: String,
238}
239
240/// Usage event data.
241#[derive(Debug, Clone, Default, Serialize, Deserialize)]
242pub struct UsageEvent {
243    /// Total cost in USD
244    #[serde(default)]
245    pub total_usd: f64,
246    /// Token cost in USD
247    #[serde(default)]
248    pub tokens_usd: f64,
249    /// Sandbox cost in USD
250    #[serde(default)]
251    pub sandbox_usd: f64,
252}
253
254/// Event parser for categorizing and parsing job events.
255pub struct EventParser;
256
257impl EventParser {
258    /// Patterns for baseline events
259    const BASELINE_PATTERNS: &'static [&'static str] = &[".baseline"];
260
261    /// Patterns for candidate events
262    const CANDIDATE_PATTERNS: &'static [&'static str] = &[
263        ".candidate.evaluated",
264        ".candidate.new_best",
265        ".proposal.scored",
266        ".optimized.scored",
267        ".candidate_scored",
268    ];
269
270    /// Patterns for frontier events
271    const FRONTIER_PATTERNS: &'static [&'static str] = &[".frontier_updated", ".frontier.updated"];
272
273    /// Patterns for progress events
274    const PROGRESS_PATTERNS: &'static [&'static str] = &[
275        ".progress",
276        ".rollouts_limit_progress",
277        ".rollouts.progress",
278        ".rollout.failures",
279        ".proposer.invoked",
280        ".job.started",
281        ".trial.started",
282        ".trial.completed",
283        ".iteration.started",
284        ".iteration.completed",
285    ];
286
287    /// Patterns for generation events
288    const GENERATION_PATTERNS: &'static [&'static str] = &[
289        ".generation.complete",
290        ".generation.completed",
291        ".generation.started",
292    ];
293
294    /// Patterns for throughput events
295    const THROUGHPUT_PATTERNS: &'static [&'static str] = &[
296        ".throughput",
297        ".rollout.concurrency",
298        ".rollout_concurrency",
299    ];
300
301    /// Patterns for termination events
302    const TERMINATION_PATTERNS: &'static [&'static str] =
303        &[".termination.triggered", ".termination", ".job.paused"];
304
305    /// Patterns for complete events
306    const COMPLETE_PATTERNS: &'static [&'static str] = &[
307        ".complete",
308        ".completed",
309        ".job.completed",
310        ".job.cancelled",
311        ".job.canceled",
312    ];
313
314    /// Patterns for validation events
315    const VALIDATION_PATTERNS: &'static [&'static str] =
316        &[".validation.scored", ".validation.completed"];
317
318    /// Patterns for usage events
319    const USAGE_PATTERNS: &'static [&'static str] =
320        &[".usage.recorded", ".billing.sandboxes", ".billing.updated"];
321
322    /// Normalize event type by replacing [MASKED] with "gepa".
323    pub fn normalize_type(event_type: &str) -> String {
324        event_type.replace("[MASKED]", "gepa")
325    }
326
327    /// Parse an event type into path segments for logging.
328    pub fn parse_path(event_type: &str) -> EventPath {
329        let normalized = Self::normalize_type(event_type);
330        let parts: Vec<&str> = normalized.split('.').collect();
331        let entity = parts.get(0).map(|s| s.to_string());
332        let action = parts.get(1).map(|s| s.to_string());
333        let algorithm = parts.get(2).map(|s| s.to_string());
334        let detail = if parts.len() > 3 {
335            Some(parts[3..].join("."))
336        } else {
337            None
338        };
339        EventPath {
340            entity,
341            action,
342            algorithm,
343            detail,
344        }
345    }
346
347    /// Map a terminal event type to a terminal status.
348    pub fn terminal_status(event_type: &str) -> Option<TerminalStatus> {
349        let normalized = Self::normalize_type(event_type).to_lowercase();
350        if normalized.contains("cancel") {
351            return Some(TerminalStatus::Cancelled);
352        }
353        if normalized.contains("pause") {
354            return Some(TerminalStatus::Paused);
355        }
356        if normalized.contains("fail") || normalized.contains("error") {
357            return Some(TerminalStatus::Failed);
358        }
359        if normalized.contains("complete") || normalized.contains("succeed") {
360            return Some(TerminalStatus::Succeeded);
361        }
362        None
363    }
364
365    fn coerce_f64(value: Option<&Value>) -> Option<f64> {
366        match value {
367            Some(Value::Number(num)) => num.as_f64(),
368            Some(Value::String(s)) => s.parse::<f64>().ok(),
369            _ => None,
370        }
371    }
372
373    fn coerce_i32(value: Option<&Value>) -> Option<i32> {
374        match value {
375            Some(Value::Number(num)) => num.as_i64().and_then(|v| i32::try_from(v).ok()),
376            Some(Value::String(s)) => s.parse::<i32>().ok(),
377            _ => None,
378        }
379    }
380
381    fn coerce_bool(value: Option<&Value>) -> Option<bool> {
382        match value {
383            Some(Value::Bool(b)) => Some(*b),
384            Some(Value::String(s)) => s.parse::<bool>().ok(),
385            _ => None,
386        }
387    }
388
389    fn coerce_string(value: Option<&Value>) -> Option<String> {
390        match value {
391            Some(Value::String(s)) => Some(s.clone()),
392            Some(Value::Number(num)) => Some(num.to_string()),
393            _ => None,
394        }
395    }
396
397    fn parse_f64_map(value: Option<&Value>) -> Option<HashMap<String, f64>> {
398        let obj = value?.as_object()?;
399        if obj.is_empty() {
400            return None;
401        }
402        let mut map = HashMap::new();
403        for (k, v) in obj {
404            if let Some(val) = Self::coerce_f64(Some(v)) {
405                map.insert(k.clone(), val);
406            } else {
407                return None;
408            }
409        }
410        Some(map)
411    }
412
413    fn parse_vec_f64_map(value: Option<&Value>) -> Option<Vec<HashMap<String, f64>>> {
414        let arr = value?.as_array()?;
415        if arr.is_empty() {
416            return None;
417        }
418        let mut out = Vec::with_capacity(arr.len());
419        for item in arr {
420            let map = Self::parse_f64_map(Some(item))?;
421            out.push(map);
422        }
423        Some(out)
424    }
425
426    fn parse_vec_string(value: Option<&Value>) -> Option<Vec<String>> {
427        let arr = value?.as_array()?;
428        if arr.is_empty() {
429            return None;
430        }
431        let mut out = Vec::with_capacity(arr.len());
432        for item in arr {
433            let s = match item {
434                Value::String(s) => s.clone(),
435                Value::Number(n) => n.to_string(),
436                _ => return None,
437            };
438            out.push(s);
439        }
440        Some(out)
441    }
442
443    fn extract_reward_from_value(value: Option<&Value>) -> Option<f64> {
444        let value = value?;
445        match value {
446            Value::Number(num) => num.as_f64(),
447            Value::String(s) => s.parse::<f64>().ok(),
448            Value::Object(obj) => obj
449                .get("reward")
450                .and_then(|v| Self::coerce_f64(Some(v)))
451                .or_else(|| {
452                    obj.get("mean_reward")
453                        .and_then(|v| Self::coerce_f64(Some(v)))
454                })
455                .or_else(|| {
456                    obj.get("outcome_reward")
457                        .and_then(|v| Self::coerce_f64(Some(v)))
458                })
459                .or_else(|| obj.get("accuracy").and_then(|v| Self::coerce_f64(Some(v))))
460                .or_else(|| obj.get("score").and_then(|v| Self::coerce_f64(Some(v)))),
461            _ => None,
462        }
463    }
464
465    fn extract_instance_rewards(data: &Value) -> Option<Vec<f64>> {
466        let instance_objectives = data.get("instance_objectives")?.as_array()?;
467        if instance_objectives.is_empty() {
468            return None;
469        }
470        let mut values = Vec::with_capacity(instance_objectives.len());
471        for item in instance_objectives {
472            let reward_val = if let Some(obj) = item.as_object() {
473                if let Some(objectives) = obj.get("objectives").and_then(|v| v.as_object()) {
474                    Self::coerce_f64(objectives.get("reward"))
475                } else {
476                    Self::coerce_f64(obj.get("reward"))
477                }
478            } else {
479                None
480            };
481            let reward_val = reward_val?;
482            values.push(reward_val);
483        }
484        if values.is_empty() {
485            None
486        } else {
487            Some(values)
488        }
489    }
490
491    /// Get the category for an event type string.
492    pub fn get_category(event_type: &str) -> EventCategory {
493        let normalized = Self::normalize_type(event_type);
494        let lower = normalized.to_lowercase();
495
496        // Generation.complete should be matched before generic .complete
497        for pattern in Self::GENERATION_PATTERNS {
498            if lower.contains(pattern) {
499                return EventCategory::Generation;
500            }
501        }
502
503        for pattern in Self::BASELINE_PATTERNS {
504            if lower.contains(pattern) {
505                return EventCategory::Baseline;
506            }
507        }
508
509        for pattern in Self::CANDIDATE_PATTERNS {
510            if lower.contains(pattern) {
511                return EventCategory::Candidate;
512            }
513        }
514
515        for pattern in Self::FRONTIER_PATTERNS {
516            if lower.contains(pattern) {
517                return EventCategory::Frontier;
518            }
519        }
520
521        for pattern in Self::PROGRESS_PATTERNS {
522            if lower.contains(pattern) {
523                return EventCategory::Progress;
524            }
525        }
526
527        for pattern in Self::THROUGHPUT_PATTERNS {
528            if lower.contains(pattern) {
529                return EventCategory::Throughput;
530            }
531        }
532
533        for pattern in Self::TERMINATION_PATTERNS {
534            if lower.contains(pattern) {
535                return EventCategory::Termination;
536            }
537        }
538
539        for pattern in Self::VALIDATION_PATTERNS {
540            if lower.contains(pattern) {
541                return EventCategory::Validation;
542            }
543        }
544
545        for pattern in Self::COMPLETE_PATTERNS {
546            if lower.contains(pattern) {
547                return EventCategory::Complete;
548            }
549        }
550
551        for pattern in Self::USAGE_PATTERNS {
552            if lower.contains(pattern) {
553                return EventCategory::Usage;
554            }
555        }
556
557        EventCategory::Unknown
558    }
559
560    /// Parse a raw event JSON into a ParsedEvent.
561    pub fn parse(event: &Value) -> ParsedEvent {
562        let raw_type = event.get("type").and_then(|v| v.as_str()).unwrap_or("");
563        let event_type = Self::normalize_type(raw_type);
564        let category = Self::get_category(&event_type);
565
566        let seq = event.get("seq").and_then(|v| v.as_i64());
567
568        let timestamp_ms = event
569            .get("timestamp_ms")
570            .and_then(|v| v.as_i64())
571            .or_else(|| {
572                event.get("ts").and_then(|v| v.as_str()).and_then(|ts| {
573                    chrono::DateTime::parse_from_rfc3339(ts)
574                        .ok()
575                        .map(|dt| dt.timestamp_millis())
576                })
577            });
578
579        let data = event
580            .get("data")
581            .and_then(|v| v.as_object())
582            .cloned()
583            .unwrap_or_default();
584
585        ParsedEvent {
586            event_type,
587            category,
588            data: Value::Object(data),
589            seq,
590            timestamp_ms,
591        }
592    }
593
594    /// Parse baseline event data from a ParsedEvent.
595    pub fn parse_baseline(event: &ParsedEvent) -> BaselineEvent {
596        let data = event.data.as_object().cloned().unwrap_or_default();
597        let data_value = Value::Object(data.clone());
598
599        let objectives = Self::parse_f64_map(data.get("objectives"));
600        let reward_value = objectives.as_ref().and_then(|m| m.get("reward").copied());
601        let accuracy = reward_value
602            .or_else(|| Self::coerce_f64(data.get("accuracy")))
603            .or_else(|| Self::coerce_f64(data.get("baseline_score")))
604            .or_else(|| Self::coerce_f64(data.get("baseline_accuracy")))
605            .or_else(|| {
606                data.get("outcome_objectives")
607                    .and_then(|v| Self::extract_reward_from_value(Some(v)))
608            })
609            .or_else(|| {
610                data.get("outcome_reward")
611                    .and_then(|v| Self::coerce_f64(Some(v)))
612            })
613            .or_else(|| {
614                data.get("score")
615                    .and_then(|v| Self::extract_reward_from_value(Some(v)))
616            });
617
618        let instance_objectives = Self::parse_vec_f64_map(data.get("instance_objectives"));
619        let instance_rewards = Self::extract_instance_rewards(&data_value);
620        let prompt = data.get("prompt").cloned();
621
622        BaselineEvent {
623            reward: accuracy,
624            objectives,
625            instance_rewards,
626            instance_objectives,
627            prompt,
628        }
629    }
630
631    /// Parse candidate event data from a ParsedEvent.
632    pub fn parse_candidate(event: &ParsedEvent) -> CandidateEvent {
633        let data = event.data.as_object().cloned().unwrap_or_default();
634        let candidate_data = data
635            .get("program_candidate")
636            .and_then(|v| v.as_object())
637            .cloned();
638        let candidate_view = candidate_data.as_ref().unwrap_or(&data);
639        let candidate_value = Value::Object(candidate_view.clone());
640
641        // Extract objectives: try top-level, then score.objectives
642        let objectives = Self::parse_f64_map(candidate_view.get("objectives")).or_else(|| {
643            candidate_view
644                .get("score")
645                .and_then(|v| v.as_object())
646                .and_then(|score| Self::parse_f64_map(score.get("objectives")))
647        });
648
649        let reward_value = objectives.as_ref().and_then(|m| m.get("reward").copied());
650
651        // Extract accuracy/reward: try objectives.reward, then direct fields
652        // Backend emits `reward` (top-level), `score.mean_reward`, `score.objectives.reward`
653        let accuracy = reward_value
654            .or_else(|| Self::coerce_f64(candidate_view.get("reward")))
655            .or_else(|| Self::coerce_f64(candidate_view.get("accuracy")))
656            .or_else(|| Self::coerce_f64(candidate_view.get("score")))
657            .or_else(|| Self::extract_reward_from_value(candidate_view.get("score")))
658            .or_else(|| {
659                candidate_view
660                    .get("outcome_objectives")
661                    .and_then(|v| Self::extract_reward_from_value(Some(v)))
662            })
663            .or_else(|| {
664                candidate_view
665                    .get("outcome_reward")
666                    .and_then(|v| Self::coerce_f64(Some(v)))
667            });
668
669        // If we found a reward but no objectives dict, construct one
670        let objectives = objectives.or_else(|| {
671            accuracy.map(|r| {
672                let mut m = std::collections::HashMap::new();
673                m.insert("reward".to_string(), r);
674                m
675            })
676        });
677
678        let instance_objectives =
679            Self::parse_vec_f64_map(candidate_view.get("instance_objectives"));
680        let instance_rewards = Self::extract_instance_rewards(&candidate_value);
681
682        CandidateEvent {
683            candidate_id: Self::coerce_string(data.get("version_id"))
684                .or_else(|| Self::coerce_string(data.get("candidate_id")))
685                .unwrap_or_default(),
686            reward: accuracy,
687            objectives,
688            accepted: Self::coerce_bool(data.get("accepted")).unwrap_or(false),
689            generation: Self::coerce_i32(data.get("generation")),
690            parent_id: Self::coerce_string(data.get("parent_id")),
691            is_pareto: Self::coerce_bool(data.get("is_pareto")).unwrap_or(false),
692            mutation_type: Self::coerce_string(data.get("mutation_type"))
693                .or_else(|| Self::coerce_string(data.get("operator"))),
694            instance_rewards,
695            instance_objectives,
696        }
697    }
698
699    /// Parse frontier event data from a ParsedEvent.
700    pub fn parse_frontier(event: &ParsedEvent) -> FrontierEvent {
701        let data = event.data.as_object().cloned().unwrap_or_default();
702        let frontier = Self::parse_vec_string(data.get("frontier"));
703
704        FrontierEvent {
705            frontier: frontier.clone().unwrap_or_default(),
706            added: Self::parse_vec_string(data.get("added")).unwrap_or_default(),
707            removed: Self::parse_vec_string(data.get("removed")).unwrap_or_default(),
708            frontier_size: Self::coerce_i32(data.get("frontier_size"))
709                .unwrap_or_else(|| frontier.as_ref().map(|v| v.len() as i32).unwrap_or(0)),
710            best_reward: Self::coerce_f64(data.get("best_score")),
711            frontier_rewards: Self::parse_f64_map(data.get("frontier_scores")),
712            frontier_objectives: Self::parse_vec_f64_map(data.get("frontier_objectives")),
713        }
714    }
715
716    /// Parse progress event data from a ParsedEvent.
717    pub fn parse_progress(event: &ParsedEvent) -> ProgressEvent {
718        let data = event.data.as_object().cloned().unwrap_or_default();
719
720        ProgressEvent {
721            rollouts_completed: Self::coerce_i32(data.get("rollouts_completed"))
722                .or_else(|| Self::coerce_i32(data.get("rollouts_executed")))
723                .unwrap_or(0),
724            rollouts_total: Self::coerce_i32(data.get("rollouts_total"))
725                .or_else(|| Self::coerce_i32(data.get("total_rollouts"))),
726            trials_completed: Self::coerce_i32(data.get("trials_completed")).unwrap_or(0),
727            best_reward: Self::coerce_f64(data.get("best_score")),
728            baseline_reward: Self::coerce_f64(data.get("baseline_score")),
729        }
730    }
731
732    /// Parse generation event data from a ParsedEvent.
733    pub fn parse_generation(event: &ParsedEvent) -> GenerationEvent {
734        let data = event.data.as_object().cloned().unwrap_or_default();
735
736        GenerationEvent {
737            generation: Self::coerce_i32(data.get("generation")).unwrap_or(0),
738            best_reward: Self::coerce_f64(data.get("best_accuracy")).unwrap_or(0.0),
739            candidates_proposed: Self::coerce_i32(data.get("candidates_proposed")).unwrap_or(0),
740            candidates_accepted: Self::coerce_i32(data.get("candidates_accepted")).unwrap_or(0),
741        }
742    }
743
744    /// Parse complete event data from a ParsedEvent.
745    pub fn parse_complete(event: &ParsedEvent) -> CompleteEvent {
746        let data = event.data.as_object().cloned().unwrap_or_default();
747
748        CompleteEvent {
749            best_reward: Self::coerce_f64(data.get("best_score")),
750            baseline_reward: Self::coerce_f64(data.get("baseline_score")),
751            finish_reason: Self::coerce_string(data.get("finish_reason"))
752                .or_else(|| Self::coerce_string(data.get("reason_terminated"))),
753            total_candidates: Self::coerce_i32(data.get("total_candidates")).unwrap_or(0),
754        }
755    }
756
757    /// Parse termination event data from a ParsedEvent.
758    pub fn parse_termination(event: &ParsedEvent) -> TerminationEvent {
759        let data = event.data.as_object().cloned().unwrap_or_default();
760        TerminationEvent {
761            reason: Self::coerce_string(data.get("reason"))
762                .unwrap_or_else(|| "unknown".to_string()),
763        }
764    }
765
766    /// Parse usage event data from a ParsedEvent.
767    pub fn parse_usage(event: &ParsedEvent) -> UsageEvent {
768        let data = event.data.as_object().cloned().unwrap_or_default();
769
770        UsageEvent {
771            total_usd: Self::coerce_f64(data.get("total_usd")).unwrap_or(0.0),
772            tokens_usd: Self::coerce_f64(data.get("usd_tokens")).unwrap_or(0.0),
773            sandbox_usd: Self::coerce_f64(data.get("sandbox_usd")).unwrap_or(0.0),
774        }
775    }
776}
777
778#[cfg(test)]
779mod tests {
780    use super::*;
781    use serde_json::json;
782
783    #[test]
784    fn test_event_category_terminal() {
785        assert!(EventCategory::Complete.is_terminal());
786        assert!(EventCategory::Termination.is_terminal());
787        assert!(!EventCategory::Progress.is_terminal());
788        assert!(!EventCategory::Candidate.is_terminal());
789    }
790
791    #[test]
792    fn test_normalize_type() {
793        assert_eq!(
794            EventParser::normalize_type("learning.policy.[MASKED].candidate.evaluated"),
795            "learning.policy.gepa.candidate.evaluated"
796        );
797    }
798
799    #[test]
800    fn test_get_category() {
801        assert_eq!(
802            EventParser::get_category("learning.policy.gepa.baseline"),
803            EventCategory::Baseline
804        );
805        assert_eq!(
806            EventParser::get_category("learning.policy.gepa.candidate.evaluated"),
807            EventCategory::Candidate
808        );
809        assert_eq!(
810            EventParser::get_category("learning.policy.gepa.frontier_updated"),
811            EventCategory::Frontier
812        );
813        assert_eq!(
814            EventParser::get_category("learning.policy.gepa.progress"),
815            EventCategory::Progress
816        );
817        assert_eq!(
818            EventParser::get_category("learning.policy.gepa.job.completed"),
819            EventCategory::Complete
820        );
821        assert_eq!(
822            EventParser::get_category("unknown.event.type"),
823            EventCategory::Unknown
824        );
825        assert_eq!(
826            EventParser::get_category("learning.policy.gepa.rollout.concurrency"),
827            EventCategory::Throughput
828        );
829        assert_eq!(
830            EventParser::get_category("learning.policy.gepa.candidate.new_best"),
831            EventCategory::Candidate
832        );
833        assert_eq!(
834            EventParser::get_category("learning.policy.gepa.job.started"),
835            EventCategory::Progress
836        );
837        assert_eq!(
838            EventParser::get_category("learning.policy.gepa.rollout.failures"),
839            EventCategory::Progress
840        );
841        assert_eq!(
842            EventParser::get_category("learning.policy.gepa.proposer.invoked"),
843            EventCategory::Progress
844        );
845        assert_eq!(
846            EventParser::get_category("learning.policy.gepa.validation.completed"),
847            EventCategory::Validation
848        );
849        assert_eq!(
850            EventParser::get_category("learning.policy.gepa.generation.started"),
851            EventCategory::Generation
852        );
853    }
854
855    #[test]
856    fn test_parse_event() {
857        let raw = json!({
858            "type": "learning.policy.gepa.candidate.evaluated",
859            "seq": 42,
860            "data": {
861                "candidate_id": "cand_123",
862                "accuracy": 0.85,
863                "accepted": true,
864                "generation": 2
865            }
866        });
867
868        let parsed = EventParser::parse(&raw);
869        assert_eq!(parsed.category, EventCategory::Candidate);
870        assert_eq!(parsed.seq, Some(42));
871
872        let candidate = EventParser::parse_candidate(&parsed);
873        assert_eq!(candidate.candidate_id, "cand_123");
874        assert_eq!(candidate.reward, Some(0.85));
875        assert!(candidate.accepted);
876        assert_eq!(candidate.generation, Some(2));
877    }
878
879    #[test]
880    fn test_parse_baseline() {
881        let raw = json!({
882            "type": "learning.policy.gepa.baseline",
883            "data": {
884                "accuracy": 0.72,
885                "objectives": {"score": 0.72, "cost": 0.01}
886            }
887        });
888
889        let parsed = EventParser::parse(&raw);
890        let baseline = EventParser::parse_baseline(&parsed);
891        assert_eq!(baseline.reward, Some(0.72));
892        assert!(baseline.objectives.is_some());
893    }
894
895    #[test]
896    fn test_parse_frontier() {
897        let raw = json!({
898            "type": "learning.policy.gepa.frontier_updated",
899            "data": {
900                "frontier": ["cand_1", "cand_2"],
901                "added": ["cand_2"],
902                "removed": [],
903                "best_score": 0.88
904            }
905        });
906
907        let parsed = EventParser::parse(&raw);
908        let frontier = EventParser::parse_frontier(&parsed);
909        assert_eq!(frontier.frontier.len(), 2);
910        assert_eq!(frontier.best_reward, Some(0.88));
911    }
912}