Skip to main content

tower_llm/codec/
mod.rs

1//! Bijection codec between RunItem and raw OpenAI chat messages
2//!
3//! What this module provides (spec)
4//! - Deterministic, lossless conversions used by recording/replay and persistence layers
5//! - Pure utility functions decoupled from services
6//!
7//! Exports
8//! - Models (reuse from `items.rs` if desired)
9//!   - `RunItem::{Message, ToolCall, ToolOutput, Handoff}`
10//! - Utils (pure)
11//!   - `messages_to_items(messages: &[RawChatMessage]) -> Vec<RunItem>`
12//!   - `items_to_messages(items: &[RunItem]) -> Vec<RawChatMessage>`
13//!   - `CodecError` for invalid sequences (e.g., tool output without prior tool_call)
14//!
15//! Implementation strategy
16//! - One-pass state machine for `messages_to_items`:
17//!   - Accumulate assistant `tool_calls` to attach to the preceding assistant message
18//!   - Immediately emit `ToolOutput` for `tool` role messages with resolved `tool_call_id`
19//! - The inverse reconstructs messages, ensuring assistant tool_calls coalesce correctly
20//! - Avoid allocations by pre-sizing vectors and reusing buffers
21//!
22//! Composition
23//! - Recorder and session layers call these functions at the edge
24//! - Does not depend on Tower; purely functional
25//!
26//! Testing strategy
27//! - Property tests (e.g., with `proptest`) asserting round-trip identity
28//! - Golden-case unit tests for:
29//!   - Multiple tool_calls in a single assistant message
30//!   - Interleaved ToolCall/ToolOutput
31//!   - Empty content and error outputs
32//! - Fuzz invalid sequences to ensure `CodecError` surfaces clearly
33
34use async_openai::types::{
35    ChatCompletionRequestAssistantMessageArgs, ChatCompletionRequestAssistantMessageContent,
36    ChatCompletionRequestMessage, ChatCompletionRequestSystemMessageArgs,
37    ChatCompletionRequestSystemMessageContent, ChatCompletionRequestToolMessageArgs,
38    ChatCompletionRequestToolMessageContent, ChatCompletionRequestUserMessageArgs,
39    ChatCompletionRequestUserMessageContent, ChatCompletionToolType,
40};
41use serde_json::Value;
42
43use crate::items::{HandoffItem, MessageItem, Role, RunItem, ToolCallItem, ToolOutputItem};
44
45use chrono::Utc;
46use uuid::Uuid;
47
48#[derive(thiserror::Error, Debug)]
49pub enum CodecError {
50    #[error("invalid tool output without tool_call_id")]
51    MissingToolCallId,
52}
53
54/// Convert raw OpenAI request messages to a sequence of RunItems.
55pub fn messages_to_items(
56    messages: &[ChatCompletionRequestMessage],
57) -> Result<Vec<RunItem>, CodecError> {
58    let mut items: Vec<RunItem> = Vec::with_capacity(messages.len());
59    for m in messages {
60        match m {
61            ChatCompletionRequestMessage::System(s) => {
62                items.push(RunItem::Message(MessageItem {
63                    id: Uuid::new_v4().to_string(),
64                    role: Role::System,
65                    content: match &s.content {
66                        ChatCompletionRequestSystemMessageContent::Text(t) => t.clone(),
67                        _ => String::new(),
68                    },
69                    created_at: Utc::now(),
70                }));
71            }
72            ChatCompletionRequestMessage::User(u) => {
73                items.push(RunItem::Message(MessageItem {
74                    id: Uuid::new_v4().to_string(),
75                    role: Role::User,
76                    content: match &u.content {
77                        ChatCompletionRequestUserMessageContent::Text(t) => t.clone(),
78                        _ => String::new(),
79                    },
80                    created_at: Utc::now(),
81                }));
82            }
83            ChatCompletionRequestMessage::Assistant(a) => {
84                // Always emit assistant message (content may be empty)
85                items.push(RunItem::Message(MessageItem {
86                    id: Uuid::new_v4().to_string(),
87                    role: Role::Assistant,
88                    content: if let Some(ChatCompletionRequestAssistantMessageContent::Text(t)) =
89                        &a.content
90                    {
91                        t.clone()
92                    } else {
93                        String::new()
94                    },
95                    created_at: Utc::now(),
96                }));
97                if let Some(tool_calls) = &a.tool_calls {
98                    for tc in tool_calls {
99                        // tc.type must be function
100                        if tc.r#type == ChatCompletionToolType::Function {
101                            let args: Value = serde_json::from_str(&tc.function.arguments)
102                                .unwrap_or(Value::String(tc.function.arguments.clone()));
103                            items.push(RunItem::ToolCall(ToolCallItem {
104                                id: tc.id.clone(),
105                                tool_name: tc.function.name.clone(),
106                                arguments: args,
107                                created_at: Utc::now(),
108                            }));
109                        }
110                    }
111                }
112            }
113            ChatCompletionRequestMessage::Tool(t) => {
114                let tcid = t.tool_call_id.clone();
115                if tcid.is_empty() {
116                    return Err(CodecError::MissingToolCallId);
117                }
118                // Attempt JSON parse, fallback to raw string
119                let content_str = match &t.content {
120                    ChatCompletionRequestToolMessageContent::Text(s) => s.clone(),
121                    _ => String::new(),
122                };
123                let output = serde_json::from_str::<Value>(&content_str)
124                    .unwrap_or(Value::String(content_str));
125                items.push(RunItem::ToolOutput(ToolOutputItem {
126                    id: Uuid::new_v4().to_string(),
127                    tool_call_id: tcid,
128                    output,
129                    error: None,
130                    created_at: Utc::now(),
131                }));
132            }
133            ChatCompletionRequestMessage::Developer(_)
134            | ChatCompletionRequestMessage::Function(_) => {
135                // Not used in our normal flow; treat as system message string if needed
136                // For now, map to a system message placeholder
137                items.push(RunItem::Message(MessageItem {
138                    id: Uuid::new_v4().to_string(),
139                    role: Role::System,
140                    content: String::new(),
141                    created_at: Utc::now(),
142                }));
143            }
144        }
145    }
146    Ok(items)
147}
148
149/// Convert RunItems back to raw OpenAI request messages, preserving tool_calls semantics.
150pub fn items_to_messages(items: &[RunItem]) -> Vec<ChatCompletionRequestMessage> {
151    let mut messages: Vec<ChatCompletionRequestMessage> = Vec::new();
152
153    let mut i = 0;
154    while i < items.len() {
155        match &items[i] {
156            RunItem::Message(msg) => {
157                match msg.role {
158                    Role::System => {
159                        let sys = ChatCompletionRequestSystemMessageArgs::default()
160                            .content(msg.content.clone())
161                            .build()
162                            .expect("sys build");
163                        messages.push(sys.into());
164                    }
165                    Role::User => {
166                        let usr = ChatCompletionRequestUserMessageArgs::default()
167                            .content(msg.content.clone())
168                            .build()
169                            .expect("user build");
170                        messages.push(usr.into());
171                    }
172                    Role::Assistant => {
173                        // Look ahead to see if there are tool calls following this assistant message
174                        let mut j = i + 1;
175                        let mut tool_calls = Vec::new();
176                        while j < items.len() {
177                            if let RunItem::ToolCall(tc) = &items[j] {
178                                tool_calls.push(
179                                    async_openai::types::ChatCompletionMessageToolCall {
180                                        id: tc.id.clone(),
181                                        r#type: ChatCompletionToolType::Function,
182                                        function: async_openai::types::FunctionCall {
183                                            name: tc.tool_name.clone(),
184                                            arguments: tc.arguments.to_string(),
185                                        },
186                                    },
187                                );
188                                j += 1;
189                            } else {
190                                break;
191                            }
192                        }
193
194                        // Build assistant message with or without tool calls
195                        let mut builder = ChatCompletionRequestAssistantMessageArgs::default();
196                        builder.content(msg.content.clone());
197                        if !tool_calls.is_empty() {
198                            builder.tool_calls(tool_calls);
199                            // Skip the tool call items we just processed
200                            i = j - 1;
201                        }
202                        let assistant = builder.build().expect("assistant build");
203                        messages.push(assistant.into());
204                    }
205                    Role::Tool => {
206                        // Should not happen as tool outputs are ToolOutput items; ignore
207                    }
208                }
209            }
210            RunItem::ToolCall(_tc) => {
211                // Tool calls should have been handled when processing the preceding assistant message
212                // If we get here, it means there's a tool call without a preceding assistant message
213                // This shouldn't happen in well-formed data, but we'll skip it
214            }
215            RunItem::ToolOutput(out) => {
216                let content = out
217                    .error
218                    .as_ref()
219                    .map(|e| format!("Error: {}", e))
220                    .unwrap_or_else(|| out.output.to_string());
221                let tool_msg = ChatCompletionRequestToolMessageArgs::default()
222                    .content(content)
223                    .tool_call_id(out.tool_call_id.clone())
224                    .build()
225                    .expect("tool build");
226                messages.push(tool_msg.into());
227            }
228            RunItem::Handoff(HandoffItem { .. }) => {
229                // Agent-only; not part of the bijection to raw messages
230            }
231        }
232        i += 1;
233    }
234
235    messages
236}
237
238#[cfg(test)]
239mod tests {
240    use super::*;
241    use crate::validation::{gen, validate_conversation, ValidationPolicy};
242    use async_openai::types::ChatCompletionRequestMessage as ReqMsg;
243    use proptest::prop_assert;
244
245    fn assistant_with_calls(name: &str, args: Value, id: &str) -> ReqMsg {
246        let tc = async_openai::types::ChatCompletionMessageToolCall {
247            id: id.to_string(),
248            r#type: ChatCompletionToolType::Function,
249            function: async_openai::types::FunctionCall {
250                name: name.to_string(),
251                arguments: args.to_string(),
252            },
253        };
254        ChatCompletionRequestAssistantMessageArgs::default()
255            .content("")
256            .tool_calls(vec![tc])
257            .build()
258            .unwrap()
259            .into()
260    }
261
262    #[test]
263    fn maps_assistant_tool_calls_to_items() {
264        let user = ChatCompletionRequestUserMessageArgs::default()
265            .content("hi")
266            .build()
267            .unwrap()
268            .into();
269        let asst = assistant_with_calls("calc", serde_json::json!({"a":1}), "call_1");
270        let tool = ChatCompletionRequestToolMessageArgs::default()
271            .content("{\"sum\":2}")
272            .tool_call_id("call_1")
273            .build()
274            .unwrap()
275            .into();
276        let items = messages_to_items(&[user, asst, tool]).unwrap();
277        assert!(matches!(items[0], RunItem::Message(_)));
278        assert!(matches!(items[1], RunItem::Message(_))); // assistant message
279        assert!(matches!(items[2], RunItem::ToolCall(_)));
280        assert!(matches!(items[3], RunItem::ToolOutput(_)));
281    }
282
283    #[test]
284    fn roundtrip_messages_identity_basic() {
285        let sys = ChatCompletionRequestSystemMessageArgs::default()
286            .content("sys")
287            .build()
288            .unwrap()
289            .into();
290        let usr = ChatCompletionRequestUserMessageArgs::default()
291            .content("hello")
292            .build()
293            .unwrap()
294            .into();
295        let asst = ChatCompletionRequestAssistantMessageArgs::default()
296            .content("ok")
297            .build()
298            .unwrap()
299            .into();
300        let orig = vec![sys, usr, asst];
301        let items = messages_to_items(&orig).unwrap();
302        let back = items_to_messages(&items);
303        assert_eq!(back.len(), orig.len());
304        // Verify content is preserved
305        if let ChatCompletionRequestMessage::System(s2) = &back[0] {
306            if let ChatCompletionRequestSystemMessageContent::Text(t) = &s2.content {
307                assert_eq!(t, "sys");
308            } else {
309                panic!("expected system text");
310            }
311        } else {
312            panic!("expected system message");
313        }
314    }
315
316    #[test]
317    fn preserves_assistant_text_content() {
318        let asst = ChatCompletionRequestAssistantMessageArgs::default()
319            .content("assistant says ok")
320            .build()
321            .unwrap()
322            .into();
323        let items = messages_to_items(&[asst]).unwrap();
324        // Expect first item is assistant message with same content
325        match &items[0] {
326            RunItem::Message(m) => {
327                assert_eq!(m.role, Role::Assistant);
328                assert_eq!(m.content, "assistant says ok");
329            }
330            _ => panic!("expected message item"),
331        }
332    }
333
334    #[test]
335    fn tool_output_json_roundtrip_value() {
336        // Assistant calls a tool, tool returns JSON content
337        let asst = assistant_with_calls("calc", serde_json::json!({"a":1}), "call_x");
338        let tool = ChatCompletionRequestToolMessageArgs::default()
339            .content("{\"sum\": 3, \"ok\": true}")
340            .tool_call_id("call_x")
341            .build()
342            .unwrap()
343            .into();
344
345        let items = messages_to_items(&[asst, tool]).unwrap();
346        // Last item should be ToolOutput with parsed JSON
347        match &items[1] {
348            RunItem::ToolCall(_) => {}
349            _ => panic!("expected tool call at index 1"),
350        }
351        match &items[2] {
352            RunItem::ToolOutput(out) => {
353                let expected = serde_json::json!({"sum":3, "ok": true});
354                assert_eq!(out.output, expected);
355            }
356            _ => panic!("expected tool output at index 2"),
357        }
358
359        // Convert back to messages and validate content parses to same value
360        let back = items_to_messages(&items);
361        // Find the tool message
362        let tool_msg = back
363            .iter()
364            .find(|m| matches!(m, ChatCompletionRequestMessage::Tool(_)))
365            .expect("tool msg present");
366        if let ChatCompletionRequestMessage::Tool(t) = tool_msg {
367            if let ChatCompletionRequestToolMessageContent::Text(txt) = &t.content {
368                let val: Value = serde_json::from_str(txt).unwrap();
369                assert_eq!(val, serde_json::json!({"sum":3, "ok": true}));
370            } else {
371                panic!("expected text content for tool message");
372            }
373        }
374    }
375
376    #[test]
377    fn tool_output_plain_string_roundtrip_value() {
378        // Assistant calls a tool, tool returns plain string content
379        let asst = assistant_with_calls("echo", serde_json::json!({"v":1}), "c1");
380        let tool = ChatCompletionRequestToolMessageArgs::default()
381            .content("hello world")
382            .tool_call_id("c1")
383            .build()
384            .unwrap()
385            .into();
386        let items = messages_to_items(&[asst, tool]).unwrap();
387        match &items[2] {
388            RunItem::ToolOutput(out) => {
389                assert_eq!(out.output, Value::String("hello world".to_string()));
390            }
391            _ => panic!("expected tool output at index 2"),
392        }
393
394        // Back to messages: content will be JSON-escaped string; ensure it parses to same value
395        let back = items_to_messages(&items);
396        let tool_msg = back
397            .iter()
398            .find(|m| matches!(m, ChatCompletionRequestMessage::Tool(_)))
399            .expect("tool msg present");
400        if let ChatCompletionRequestMessage::Tool(t) = tool_msg {
401            if let ChatCompletionRequestToolMessageContent::Text(txt) = &t.content {
402                let parsed: Value = serde_json::from_str(txt).unwrap_or(Value::String(txt.clone()));
403                assert_eq!(parsed, Value::String("hello world".to_string()));
404            } else {
405                panic!("expected text content");
406            }
407        }
408    }
409
410    proptest::proptest! {
411        #[test]
412        fn roundtrip_preserves_validity(msgs in gen::valid_conversation(gen::GeneratorConfig::default())) {
413            let items = messages_to_items(&msgs).unwrap();
414            let back = items_to_messages(&items);
415            prop_assert!(validate_conversation(&back, &ValidationPolicy::default()).is_none());
416        }
417    }
418
419    #[test]
420    fn missing_tool_call_id_is_error() {
421        // Tool message without tool_call_id should error
422        let tool = ChatCompletionRequestToolMessageArgs::default()
423            .content("{\"x\":1}")
424            .tool_call_id("")
425            .build()
426            .unwrap()
427            .into();
428        let err = messages_to_items(&[tool]).unwrap_err();
429        match err {
430            CodecError::MissingToolCallId => {}
431        }
432    }
433}