Skip to main content

lago_api/sse/
anthropic.rs

1use lago_core::EventEnvelope;
2use lago_core::event::EventPayload;
3use serde_json::json;
4
5use super::format::{SseFormat, SseFrame};
6
7/// Anthropic-compatible SSE format.
8///
9/// Formats events using Anthropic's streaming message API shape with
10/// `content_block_delta` and `message_stop` event types.
11pub struct AnthropicFormat;
12
13impl SseFormat for AnthropicFormat {
14    fn format(&self, event: &EventEnvelope) -> Vec<SseFrame> {
15        match &event.payload {
16            EventPayload::Message {
17                role,
18                content,
19                model,
20                token_usage,
21            } => {
22                let mut frames = Vec::new();
23
24                // message_start
25                let msg_start = json!({
26                    "type": "message_start",
27                    "message": {
28                        "id": format!("msg_{}", event.event_id),
29                        "type": "message",
30                        "role": role,
31                        "content": [],
32                        "model": model.as_deref().unwrap_or("lago"),
33                        "stop_reason": null,
34                        "usage": {
35                            "input_tokens": token_usage.as_ref().map(|u| u.prompt_tokens).unwrap_or(0),
36                            "output_tokens": 0,
37                        },
38                    },
39                });
40                frames.push(SseFrame {
41                    event: Some("message_start".to_string()),
42                    data: msg_start.to_string(),
43                    id: Some(event.seq.to_string()),
44                });
45
46                // content_block_start
47                let block_start = json!({
48                    "type": "content_block_start",
49                    "index": 0,
50                    "content_block": {
51                        "type": "text",
52                        "text": "",
53                    },
54                });
55                frames.push(SseFrame {
56                    event: Some("content_block_start".to_string()),
57                    data: block_start.to_string(),
58                    id: None,
59                });
60
61                // content_block_delta with the full content
62                let block_delta = json!({
63                    "type": "content_block_delta",
64                    "index": 0,
65                    "delta": {
66                        "type": "text_delta",
67                        "text": content,
68                    },
69                });
70                frames.push(SseFrame {
71                    event: Some("content_block_delta".to_string()),
72                    data: block_delta.to_string(),
73                    id: None,
74                });
75
76                // content_block_stop
77                let block_stop = json!({
78                    "type": "content_block_stop",
79                    "index": 0,
80                });
81                frames.push(SseFrame {
82                    event: Some("content_block_stop".to_string()),
83                    data: block_stop.to_string(),
84                    id: None,
85                });
86
87                // message_delta with stop reason
88                let msg_delta = json!({
89                    "type": "message_delta",
90                    "delta": {
91                        "stop_reason": "end_turn",
92                    },
93                    "usage": {
94                        "output_tokens": token_usage.as_ref().map(|u| u.completion_tokens).unwrap_or(0),
95                    },
96                });
97                frames.push(SseFrame {
98                    event: Some("message_delta".to_string()),
99                    data: msg_delta.to_string(),
100                    id: None,
101                });
102
103                frames
104            }
105
106            EventPayload::MessageDelta { delta, index, .. } => {
107                let block_delta = json!({
108                    "type": "content_block_delta",
109                    "index": index,
110                    "delta": {
111                        "type": "text_delta",
112                        "text": delta,
113                    },
114                });
115
116                vec![SseFrame {
117                    event: Some("content_block_delta".to_string()),
118                    data: block_delta.to_string(),
119                    id: Some(event.seq.to_string()),
120                }]
121            }
122
123            // Non-message events are filtered out in Anthropic format
124            _ => Vec::new(),
125        }
126    }
127
128    fn done_frame(&self) -> Option<SseFrame> {
129        let stop = json!({
130            "type": "message_stop",
131        });
132        Some(SseFrame {
133            event: Some("message_stop".to_string()),
134            data: stop.to_string(),
135            id: None,
136        })
137    }
138
139    fn extra_headers(&self) -> Vec<(String, String)> {
140        Vec::new()
141    }
142
143    fn name(&self) -> &str {
144        "anthropic"
145    }
146}
147
148#[cfg(test)]
149mod tests {
150    use super::*;
151    use lago_core::event::TokenUsage;
152    use lago_core::id::*;
153    use std::collections::HashMap;
154
155    fn make_envelope(payload: EventPayload, seq: u64) -> EventEnvelope {
156        EventEnvelope {
157            event_id: EventId::from_string("EVT001"),
158            session_id: SessionId::from_string("SESS001"),
159            branch_id: BranchId::from_string("main"),
160            run_id: None,
161            seq,
162            timestamp: 1_700_000_000_000_000,
163            parent_id: None,
164            payload,
165            metadata: HashMap::new(),
166        }
167    }
168
169    #[test]
170    fn message_produces_five_frames() {
171        let fmt = AnthropicFormat;
172        let event = make_envelope(
173            EventPayload::Message {
174                role: "assistant".into(),
175                content: "Hello!".into(),
176                model: Some("claude-3".into()),
177                token_usage: Some(TokenUsage {
178                    prompt_tokens: 10,
179                    completion_tokens: 5,
180                    total_tokens: 15,
181                }),
182            },
183            1,
184        );
185        let frames = fmt.format(&event);
186        assert_eq!(frames.len(), 5);
187
188        // message_start
189        assert_eq!(frames[0].event.as_deref(), Some("message_start"));
190        let d0: serde_json::Value = serde_json::from_str(&frames[0].data).unwrap();
191        assert_eq!(d0["type"], "message_start");
192        assert_eq!(d0["message"]["model"], "claude-3");
193        assert_eq!(d0["message"]["usage"]["input_tokens"], 10);
194        assert_eq!(frames[0].id.as_deref(), Some("1"));
195
196        // content_block_start
197        assert_eq!(frames[1].event.as_deref(), Some("content_block_start"));
198
199        // content_block_delta
200        assert_eq!(frames[2].event.as_deref(), Some("content_block_delta"));
201        let d2: serde_json::Value = serde_json::from_str(&frames[2].data).unwrap();
202        assert_eq!(d2["delta"]["text"], "Hello!");
203
204        // content_block_stop
205        assert_eq!(frames[3].event.as_deref(), Some("content_block_stop"));
206
207        // message_delta
208        assert_eq!(frames[4].event.as_deref(), Some("message_delta"));
209        let d4: serde_json::Value = serde_json::from_str(&frames[4].data).unwrap();
210        assert_eq!(d4["delta"]["stop_reason"], "end_turn");
211        assert_eq!(d4["usage"]["output_tokens"], 5);
212    }
213
214    #[test]
215    fn message_delta_produces_one_frame() {
216        let fmt = AnthropicFormat;
217        let event = make_envelope(
218            EventPayload::MessageDelta {
219                role: "assistant".into(),
220                delta: "token".into(),
221                index: 2,
222            },
223            5,
224        );
225        let frames = fmt.format(&event);
226        assert_eq!(frames.len(), 1);
227        assert_eq!(frames[0].event.as_deref(), Some("content_block_delta"));
228        let data: serde_json::Value = serde_json::from_str(&frames[0].data).unwrap();
229        assert_eq!(data["index"], 2);
230        assert_eq!(data["delta"]["text"], "token");
231    }
232
233    #[test]
234    fn non_message_events_filtered() {
235        let fmt = AnthropicFormat;
236        let event = make_envelope(
237            EventPayload::FileDelete {
238                path: "/tmp".into(),
239            },
240            1,
241        );
242        assert!(fmt.format(&event).is_empty());
243    }
244
245    #[test]
246    fn done_frame_is_message_stop() {
247        let fmt = AnthropicFormat;
248        let done = fmt.done_frame().unwrap();
249        assert_eq!(done.event.as_deref(), Some("message_stop"));
250        let data: serde_json::Value = serde_json::from_str(&done.data).unwrap();
251        assert_eq!(data["type"], "message_stop");
252    }
253
254    #[test]
255    fn name_is_anthropic() {
256        assert_eq!(AnthropicFormat.name(), "anthropic");
257    }
258}