Skip to main content

lago_api/sse/
anthropic.rs

1use lago_core::EventEnvelope;
2use lago_core::event::EventPayload;
3use serde_json::json;
4
5use super::format::{SseFormat, SseFrame};
6
7/// Anthropic-compatible SSE format.
8///
9/// Formats events using Anthropic's streaming message API shape with
10/// `content_block_delta` and `message_stop` event types.
11pub struct AnthropicFormat;
12
13impl SseFormat for AnthropicFormat {
14    fn format(&self, event: &EventEnvelope) -> Vec<SseFrame> {
15        match &event.payload {
16            EventPayload::Message {
17                role,
18                content,
19                model,
20                token_usage,
21            } => {
22                let mut frames = Vec::new();
23
24                // message_start
25                let msg_start = json!({
26                    "type": "message_start",
27                    "message": {
28                        "id": format!("msg_{}", event.event_id),
29                        "type": "message",
30                        "role": role,
31                        "content": [],
32                        "model": model.as_deref().unwrap_or("lago"),
33                        "stop_reason": null,
34                        "usage": {
35                            "input_tokens": token_usage.as_ref().map(|u| u.prompt_tokens).unwrap_or(0),
36                            "output_tokens": 0,
37                        },
38                    },
39                });
40                frames.push(SseFrame {
41                    event: Some("message_start".to_string()),
42                    data: msg_start.to_string(),
43                    id: Some(event.seq.to_string()),
44                });
45
46                // content_block_start
47                let block_start = json!({
48                    "type": "content_block_start",
49                    "index": 0,
50                    "content_block": {
51                        "type": "text",
52                        "text": "",
53                    },
54                });
55                frames.push(SseFrame {
56                    event: Some("content_block_start".to_string()),
57                    data: block_start.to_string(),
58                    id: None,
59                });
60
61                // content_block_delta with the full content
62                let block_delta = json!({
63                    "type": "content_block_delta",
64                    "index": 0,
65                    "delta": {
66                        "type": "text_delta",
67                        "text": content,
68                    },
69                });
70                frames.push(SseFrame {
71                    event: Some("content_block_delta".to_string()),
72                    data: block_delta.to_string(),
73                    id: None,
74                });
75
76                // content_block_stop
77                let block_stop = json!({
78                    "type": "content_block_stop",
79                    "index": 0,
80                });
81                frames.push(SseFrame {
82                    event: Some("content_block_stop".to_string()),
83                    data: block_stop.to_string(),
84                    id: None,
85                });
86
87                // message_delta with stop reason
88                let msg_delta = json!({
89                    "type": "message_delta",
90                    "delta": {
91                        "stop_reason": "end_turn",
92                    },
93                    "usage": {
94                        "output_tokens": token_usage.as_ref().map(|u| u.completion_tokens).unwrap_or(0),
95                    },
96                });
97                frames.push(SseFrame {
98                    event: Some("message_delta".to_string()),
99                    data: msg_delta.to_string(),
100                    id: None,
101                });
102
103                frames
104            }
105
106            EventPayload::TextDelta { delta, index } => {
107                let block_delta = json!({
108                    "type": "content_block_delta",
109                    "index": index.unwrap_or(0),
110                    "delta": {
111                        "type": "text_delta",
112                        "text": delta,
113                    },
114                });
115
116                vec![SseFrame {
117                    event: Some("content_block_delta".to_string()),
118                    data: block_delta.to_string(),
119                    id: Some(event.seq.to_string()),
120                }]
121            }
122
123            // Non-message events are filtered out in Anthropic format
124            _ => Vec::new(),
125        }
126    }
127
128    fn done_frame(&self) -> Option<SseFrame> {
129        let stop = json!({
130            "type": "message_stop",
131        });
132        Some(SseFrame {
133            event: Some("message_stop".to_string()),
134            data: stop.to_string(),
135            id: None,
136        })
137    }
138
139    fn extra_headers(&self) -> Vec<(String, String)> {
140        Vec::new()
141    }
142
143    fn name(&self) -> &str {
144        "anthropic"
145    }
146}
147
148#[cfg(test)]
149mod tests {
150    use super::*;
151    use lago_core::event::TokenUsage;
152    use lago_core::id::*;
153    use std::collections::HashMap;
154
155    fn make_envelope(payload: EventPayload, seq: u64) -> EventEnvelope {
156        EventEnvelope {
157            event_id: EventId::from_string("EVT001"),
158            session_id: SessionId::from_string("SESS001"),
159            branch_id: BranchId::from_string("main"),
160            run_id: None,
161            seq,
162            timestamp: 1_700_000_000_000_000,
163            parent_id: None,
164            payload,
165            metadata: HashMap::new(),
166            schema_version: 1,
167        }
168    }
169
170    #[test]
171    fn message_produces_five_frames() {
172        let fmt = AnthropicFormat;
173        let event = make_envelope(
174            EventPayload::Message {
175                role: "assistant".into(),
176                content: "Hello!".into(),
177                model: Some("claude-3".into()),
178                token_usage: Some(TokenUsage {
179                    prompt_tokens: 10,
180                    completion_tokens: 5,
181                    total_tokens: 15,
182                }),
183            },
184            1,
185        );
186        let frames = fmt.format(&event);
187        assert_eq!(frames.len(), 5);
188
189        // message_start
190        assert_eq!(frames[0].event.as_deref(), Some("message_start"));
191        let d0: serde_json::Value = serde_json::from_str(&frames[0].data).unwrap();
192        assert_eq!(d0["type"], "message_start");
193        assert_eq!(d0["message"]["model"], "claude-3");
194        assert_eq!(d0["message"]["usage"]["input_tokens"], 10);
195        assert_eq!(frames[0].id.as_deref(), Some("1"));
196
197        // content_block_start
198        assert_eq!(frames[1].event.as_deref(), Some("content_block_start"));
199
200        // content_block_delta
201        assert_eq!(frames[2].event.as_deref(), Some("content_block_delta"));
202        let d2: serde_json::Value = serde_json::from_str(&frames[2].data).unwrap();
203        assert_eq!(d2["delta"]["text"], "Hello!");
204
205        // content_block_stop
206        assert_eq!(frames[3].event.as_deref(), Some("content_block_stop"));
207
208        // message_delta
209        assert_eq!(frames[4].event.as_deref(), Some("message_delta"));
210        let d4: serde_json::Value = serde_json::from_str(&frames[4].data).unwrap();
211        assert_eq!(d4["delta"]["stop_reason"], "end_turn");
212        assert_eq!(d4["usage"]["output_tokens"], 5);
213    }
214
215    #[test]
216    fn message_delta_produces_one_frame() {
217        let fmt = AnthropicFormat;
218        let event = make_envelope(
219            EventPayload::TextDelta {
220                delta: "token".into(),
221                index: Some(2),
222            },
223            5,
224        );
225        let frames = fmt.format(&event);
226        assert_eq!(frames.len(), 1);
227        assert_eq!(frames[0].event.as_deref(), Some("content_block_delta"));
228        let data: serde_json::Value = serde_json::from_str(&frames[0].data).unwrap();
229        assert_eq!(data["index"], 2);
230        assert_eq!(data["delta"]["text"], "token");
231    }
232
233    #[test]
234    fn non_message_events_filtered() {
235        let fmt = AnthropicFormat;
236        let event = make_envelope(
237            EventPayload::FileDelete {
238                path: "/tmp".into(),
239            },
240            1,
241        );
242        assert!(fmt.format(&event).is_empty());
243    }
244
245    #[test]
246    fn done_frame_is_message_stop() {
247        let fmt = AnthropicFormat;
248        let done = fmt.done_frame().unwrap();
249        assert_eq!(done.event.as_deref(), Some("message_stop"));
250        let data: serde_json::Value = serde_json::from_str(&done.data).unwrap();
251        assert_eq!(data["type"], "message_stop");
252    }
253
254    #[test]
255    fn name_is_anthropic() {
256        assert_eq!(AnthropicFormat.name(), "anthropic");
257    }
258}