Skip to main content

lago_api/sse/
openai.rs

1use lago_core::EventEnvelope;
2use lago_core::event::EventPayload;
3use serde_json::json;
4
5use super::format::{SseFormat, SseFrame};
6
7/// OpenAI-compatible SSE format.
8///
9/// Formats `Message` and `MessageDelta` events as chat completion chunk
10/// objects matching the OpenAI streaming API shape.
11pub struct OpenAiFormat;
12
13impl SseFormat for OpenAiFormat {
14    fn format(&self, event: &EventEnvelope) -> Vec<SseFrame> {
15        match &event.payload {
16            EventPayload::Message {
17                role,
18                content,
19                model,
20                ..
21            } => {
22                // Emit a single chunk with the full message content
23                let chunk = json!({
24                    "id": format!("chatcmpl-{}", event.event_id),
25                    "object": "chat.completion.chunk",
26                    "created": event.timestamp / 1_000_000, // micros -> seconds
27                    "model": model.as_deref().unwrap_or("lago"),
28                    "choices": [{
29                        "index": 0,
30                        "delta": {
31                            "role": role,
32                            "content": content,
33                        },
34                        "finish_reason": "stop",
35                    }],
36                });
37
38                vec![SseFrame {
39                    event: None,
40                    data: chunk.to_string(),
41                    id: Some(event.seq.to_string()),
42                }]
43            }
44
45            EventPayload::MessageDelta { role, delta, index } => {
46                let chunk = json!({
47                    "id": format!("chatcmpl-{}", event.event_id),
48                    "object": "chat.completion.chunk",
49                    "created": event.timestamp / 1_000_000,
50                    "model": "lago",
51                    "choices": [{
52                        "index": index,
53                        "delta": {
54                            "role": role,
55                            "content": delta,
56                        },
57                        "finish_reason": null,
58                    }],
59                });
60
61                vec![SseFrame {
62                    event: None,
63                    data: chunk.to_string(),
64                    id: Some(event.seq.to_string()),
65                }]
66            }
67
68            EventPayload::RunFinished {
69                reason,
70                final_answer,
71                ..
72            } => {
73                let finish_reason = match reason.as_str() {
74                    "Completed" | "Stop" => "stop",
75                    "MaxTokens" | "Length" => "length",
76                    "Safety" | "ContentFilter" => "content_filter",
77                    "ToolUse" => "tool_calls",
78                    _ => "stop",
79                };
80
81                // OpenAi format expects a delta with content if present, and the finish_reason
82                let chunk = json!({
83                    "id": format!("chatcmpl-{}", event.event_id),
84                    "object": "chat.completion.chunk",
85                    "created": event.timestamp / 1_000_000,
86                    "model": "lago",
87                    "choices": [{
88                        "index": 0,
89                        "delta": {
90                            "content": final_answer,
91                        },
92                        "finish_reason": finish_reason,
93                    }],
94                });
95
96                vec![SseFrame {
97                    event: None,
98                    data: chunk.to_string(),
99                    id: Some(event.seq.to_string()),
100                }]
101            }
102
103            // Non-message events are filtered out in OpenAI format
104            _ => Vec::new(),
105        }
106    }
107
108    fn done_frame(&self) -> Option<SseFrame> {
109        Some(SseFrame {
110            event: None,
111            data: "[DONE]".to_string(),
112            id: None,
113        })
114    }
115
116    fn extra_headers(&self) -> Vec<(String, String)> {
117        Vec::new()
118    }
119
120    fn name(&self) -> &str {
121        "openai"
122    }
123}
124
125#[cfg(test)]
126mod tests {
127    use super::*;
128    use lago_core::id::*;
129    use std::collections::HashMap;
130
131    fn make_envelope(payload: EventPayload, seq: u64) -> EventEnvelope {
132        EventEnvelope {
133            event_id: EventId::from_string("EVT001"),
134            session_id: SessionId::from_string("SESS001"),
135            branch_id: BranchId::from_string("main"),
136            run_id: None,
137            seq,
138            timestamp: 1_700_000_000_000_000,
139            parent_id: None,
140            payload,
141            metadata: HashMap::new(),
142        }
143    }
144
145    #[test]
146    fn message_produces_one_frame() {
147        let fmt = OpenAiFormat;
148        let event = make_envelope(
149            EventPayload::Message {
150                role: "assistant".into(),
151                content: "Hello!".into(),
152                model: Some("gpt-4".into()),
153                token_usage: None,
154            },
155            42,
156        );
157        let frames = fmt.format(&event);
158        assert_eq!(frames.len(), 1);
159        assert!(frames[0].event.is_none()); // OpenAI uses bare data lines
160        assert_eq!(frames[0].id.as_deref(), Some("42"));
161
162        let data: serde_json::Value = serde_json::from_str(&frames[0].data).unwrap();
163        assert_eq!(data["object"], "chat.completion.chunk");
164        assert_eq!(data["choices"][0]["delta"]["content"], "Hello!");
165        assert_eq!(data["choices"][0]["delta"]["role"], "assistant");
166        assert_eq!(data["choices"][0]["finish_reason"], "stop");
167        assert_eq!(data["model"], "gpt-4");
168        assert!(data["id"].as_str().unwrap().starts_with("chatcmpl-"));
169    }
170
171    #[test]
172    fn message_delta_produces_frame() {
173        let fmt = OpenAiFormat;
174        let event = make_envelope(
175            EventPayload::MessageDelta {
176                role: "assistant".into(),
177                delta: "chunk".into(),
178                index: 0,
179            },
180            7,
181        );
182        let frames = fmt.format(&event);
183        assert_eq!(frames.len(), 1);
184        let data: serde_json::Value = serde_json::from_str(&frames[0].data).unwrap();
185        assert_eq!(data["choices"][0]["delta"]["content"], "chunk");
186        assert!(data["choices"][0]["finish_reason"].is_null());
187    }
188
189    #[test]
190    fn non_message_events_filtered_out() {
191        let fmt = OpenAiFormat;
192        let event = make_envelope(
193            EventPayload::FileDelete {
194                path: "/tmp/x".into(),
195            },
196            1,
197        );
198        let frames = fmt.format(&event);
199        assert!(frames.is_empty());
200    }
201
202    #[test]
203    fn done_frame() {
204        let fmt = OpenAiFormat;
205        let done = fmt.done_frame().unwrap();
206        assert_eq!(done.data, "[DONE]");
207        assert!(done.event.is_none());
208    }
209
210    #[test]
211    fn no_extra_headers() {
212        let fmt = OpenAiFormat;
213        assert!(fmt.extra_headers().is_empty());
214    }
215
216    #[test]
217    fn name_is_openai() {
218        assert_eq!(OpenAiFormat.name(), "openai");
219    }
220}