Skip to main content

meerkat_client/
block_assembler.rs

1//! Block assembly for streaming LLM responses.
2//!
3//! This module provides the `BlockAssembler` for assembling ordered blocks
4//! from streaming LLM events. It tracks global arrival order across all block
5//! types and handles interleaved reasoning/tool-use blocks correctly.
6
7use indexmap::IndexMap;
8use meerkat_core::{AssistantBlock, ProviderMeta};
9use serde_json::value::RawValue;
10
11/// Errors that can occur during stream assembly.
12/// Returned to caller, who decides whether to skip, count, or abort.
13#[derive(Debug, thiserror::Error)]
14pub enum StreamAssemblyError {
15    #[error("delta for unknown tool call: {0}")]
16    OrphanedToolDelta(String),
17    #[error("delta for unknown reasoning block")]
18    OrphanedReasoningDelta,
19    #[error("duplicate tool call start: {0}")]
20    DuplicateToolStart(String),
21    #[error("complete event for unknown tool: {0}")]
22    UnknownToolComplete(String),
23    #[error("finalize args for unknown tool: {0}")]
24    UnknownToolFinalize(String),
25    #[error("invalid args JSON for tool {id}: {reason}")]
26    InvalidArgsJson { id: String, reason: String },
27}
28
29/// Typed key into the block list - prevents mixing up different indices.
30#[derive(Debug, Clone, Copy, PartialEq, Eq)]
31pub struct BlockKey(usize);
32
33/// Represents either a finalized block or a placeholder for one still streaming.
34enum BlockSlot {
35    Finalized(AssistantBlock),
36    Pending,
37}
38
39/// Buffer for tool call being assembled from streaming deltas.
40/// Key (id) is stored in the map, not duplicated here.
41struct ToolCallBuffer {
42    name: Option<String>,
43    args_json: String,
44    block_key: BlockKey,
45}
46
47/// Buffer for reasoning block being assembled.
48struct ReasoningBuffer {
49    text: String,
50    block_key: BlockKey,
51}
52
53/// Assembler for building ordered blocks from streaming events.
54///
55/// The assembler tracks **global arrival order** across all block types,
56/// not just tool calls. Blocks are ordered by when they *started*, not
57/// when they completed.
58///
59/// # Design
60///
61/// - `Vec<BlockSlot>` provides stable indices because we never remove elements
62/// - `BlockKey(usize)` is a newtype - prevents mixing up different indices
63/// - Methods return `Result<(), StreamAssemblyError>` for caller-decided policy
64/// - `ToolCallBuffer` does NOT store `id` - it's the map key, avoiding duplication
65/// - `Box<RawValue>` for args - no parsing in adapter
66pub struct BlockAssembler {
67    /// Slots are append-only to preserve start-order.
68    slots: Vec<BlockSlot>,
69    /// Map from tool call ID to buffer. ID is the key, not stored in value.
70    tool_buffers: IndexMap<String, ToolCallBuffer>,
71    /// Active reasoning block buffer.
72    reasoning_buffer: Option<ReasoningBuffer>,
73}
74
75impl BlockAssembler {
76    /// Create a new empty assembler.
77    pub fn new() -> Self {
78        Self {
79            slots: Vec::new(),
80            tool_buffers: IndexMap::new(),
81            reasoning_buffer: None,
82        }
83    }
84
85    /// Handle a text delta event.
86    ///
87    /// Text deltas can always succeed - no Result needed.
88    /// `meta` is used by Gemini for thoughtSignature on text parts.
89    pub fn on_text_delta(&mut self, delta: &str, meta: Option<Box<ProviderMeta>>) {
90        if meta.is_none()
91            && let Some(BlockSlot::Finalized(AssistantBlock::Text { text, meta: None })) =
92                self.slots.last_mut()
93        {
94            text.push_str(delta);
95            return;
96        }
97        // Insert new text block
98        self.slots.push(BlockSlot::Finalized(AssistantBlock::Text {
99            text: delta.into(),
100            meta,
101        }));
102    }
103
104    /// Start a new reasoning block.
105    pub fn on_reasoning_start(&mut self) {
106        let key = BlockKey(self.slots.len());
107        self.slots.push(BlockSlot::Pending);
108        self.reasoning_buffer = Some(ReasoningBuffer {
109            text: String::new(),
110            block_key: key,
111        });
112    }
113
114    /// Handle a reasoning delta event.
115    ///
116    /// # Errors
117    /// Returns `OrphanedReasoningDelta` if no reasoning block is currently being assembled.
118    pub fn on_reasoning_delta(&mut self, delta: &str) -> Result<(), StreamAssemblyError> {
119        let buf = self
120            .reasoning_buffer
121            .as_mut()
122            .ok_or(StreamAssemblyError::OrphanedReasoningDelta)?;
123        buf.text.push_str(delta);
124        Ok(())
125    }
126
127    /// Complete the current reasoning block.
128    ///
129    /// Provider adapter converts raw JSON to typed `ProviderMeta` before calling.
130    pub fn on_reasoning_complete(&mut self, meta: Option<Box<ProviderMeta>>) {
131        if let Some(buf) = self.reasoning_buffer.take()
132            && let Some(slot) = self.slots.get_mut(buf.block_key.0)
133        {
134            *slot = BlockSlot::Finalized(AssistantBlock::Reasoning {
135                text: buf.text,
136                meta,
137            });
138        }
139        // Complete without prior start is silently ignored - provider protocol quirk
140    }
141
142    /// Return a snapshot of the current reasoning buffer text.
143    /// Returns an empty string if no reasoning block is in progress.
144    pub fn current_reasoning_text(&self) -> String {
145        self.reasoning_buffer
146            .as_ref()
147            .map(|buf| buf.text.clone())
148            .unwrap_or_default()
149    }
150
151    /// Start a new tool call block.
152    ///
153    /// # Errors
154    /// Returns `DuplicateToolStart` if a tool call with the same ID is already being assembled.
155    pub fn on_tool_call_start(&mut self, id: String) -> Result<(), StreamAssemblyError> {
156        if self.tool_buffers.contains_key(&id) {
157            return Err(StreamAssemblyError::DuplicateToolStart(id));
158        }
159        let key = BlockKey(self.slots.len());
160        self.slots.push(BlockSlot::Pending);
161        self.tool_buffers.insert(
162            id,
163            ToolCallBuffer {
164                name: None,
165                args_json: String::new(),
166                block_key: key,
167            },
168        );
169        Ok(())
170    }
171
172    /// Handle a tool call delta event.
173    ///
174    /// # Errors
175    /// Returns `OrphanedToolDelta` if no tool call with the given ID is being assembled.
176    pub fn on_tool_call_delta(
177        &mut self,
178        id: &str,
179        name: Option<&str>,
180        args_delta: &str,
181    ) -> Result<(), StreamAssemblyError> {
182        let buf = self
183            .tool_buffers
184            .get_mut(id)
185            .ok_or_else(|| StreamAssemblyError::OrphanedToolDelta(id.to_string()))?;
186        if let Some(n) = name {
187            buf.name = Some(n.into());
188        }
189        buf.args_json.push_str(args_delta);
190        Ok(())
191    }
192
193    /// Convert buffered args_json to RawValue. Called before on_tool_call_complete.
194    ///
195    /// # Errors
196    /// - `UnknownToolFinalize` if no buffer exists for this ID (protocol error)
197    /// - `InvalidArgsJson` if the buffered JSON is malformed
198    pub fn finalize_tool_args(&self, id: &str) -> Result<Box<RawValue>, StreamAssemblyError> {
199        let buf = self
200            .tool_buffers
201            .get(id)
202            .ok_or_else(|| StreamAssemblyError::UnknownToolFinalize(id.to_string()))?;
203
204        // Handle empty args (tools with no parameters)
205        let args_str = if buf.args_json.is_empty() {
206            "{}".to_string()
207        } else {
208            buf.args_json.clone()
209        };
210
211        RawValue::from_string(args_str).map_err(|e| StreamAssemblyError::InvalidArgsJson {
212            id: id.to_string(),
213            reason: e.to_string(),
214        })
215    }
216
217    /// Complete a tool call block.
218    ///
219    /// Provider adapter converts raw JSON to typed `ProviderMeta` before calling.
220    ///
221    /// # Errors
222    /// This method never returns an error. If no prior start exists for the ID,
223    /// the tool call is inserted at the end (ordering may be off but we have the data).
224    pub fn on_tool_call_complete(
225        &mut self,
226        id: String,
227        name: String,
228        args: Box<RawValue>,
229        meta: Option<Box<ProviderMeta>>,
230    ) -> Result<(), StreamAssemblyError> {
231        if let Some((_, _, buf)) = self.tool_buffers.swap_remove_full(&id) {
232            if let Some(slot) = self.slots.get_mut(buf.block_key.0) {
233                *slot = BlockSlot::Finalized(AssistantBlock::ToolUse {
234                    id,
235                    name,
236                    args,
237                    meta,
238                });
239                return Ok(());
240            }
241            Ok(())
242        } else {
243            // No prior start - provider that doesn't emit start events
244            // Insert at end; ordering may be off but we have the data
245            self.slots
246                .push(BlockSlot::Finalized(AssistantBlock::ToolUse {
247                    id,
248                    name,
249                    args,
250                    meta,
251                }));
252            Ok(())
253        }
254    }
255
256    /// Finalize the assembler and return the ordered blocks.
257    ///
258    /// Slab iteration is in insertion order, so blocks are returned
259    /// in the order they were started.
260    pub fn finalize(self) -> Vec<AssistantBlock> {
261        self.slots
262            .into_iter()
263            .filter_map(|slot| match slot {
264                BlockSlot::Finalized(block) => Some(block),
265                BlockSlot::Pending => None,
266            })
267            .collect()
268    }
269}
270
271impl Default for BlockAssembler {
272    fn default() -> Self {
273        Self::new()
274    }
275}
276
277#[cfg(test)]
278#[allow(clippy::unwrap_used, clippy::expect_used)]
279mod tests {
280    use super::*;
281
282    // =========================================================================
283    // Text delta coalescing tests
284    // =========================================================================
285
286    #[test]
287    fn test_text_deltas_coalesce_into_single_block() {
288        let mut assembler = BlockAssembler::new();
289
290        assembler.on_text_delta("Hello", None);
291        assembler.on_text_delta(" ", None);
292        assembler.on_text_delta("World", None);
293
294        let blocks = assembler.finalize();
295        assert_eq!(blocks.len(), 1);
296
297        match &blocks[0] {
298            AssistantBlock::Text { text, meta } => {
299                assert_eq!(text, "Hello World");
300                assert!(meta.is_none());
301            }
302            _ => panic!("Expected Text block"),
303        }
304    }
305
306    #[test]
307    fn test_text_deltas_with_meta_do_not_coalesce() {
308        let mut assembler = BlockAssembler::new();
309
310        assembler.on_text_delta("First", None);
311        assembler.on_text_delta(
312            "Second",
313            Some(Box::new(ProviderMeta::Gemini {
314                thought_signature: "sig1".to_string(),
315            })),
316        );
317        assembler.on_text_delta("Third", None);
318
319        let blocks = assembler.finalize();
320        assert_eq!(blocks.len(), 3);
321
322        // First and second should not coalesce (second has meta)
323        match &blocks[0] {
324            AssistantBlock::Text { text, .. } => assert_eq!(text, "First"),
325            _ => panic!("Expected Text block"),
326        }
327        match &blocks[1] {
328            AssistantBlock::Text { text, meta } => {
329                assert_eq!(text, "Second");
330                assert!(meta.is_some());
331            }
332            _ => panic!("Expected Text block"),
333        }
334        // Third starts new block because previous has meta
335        match &blocks[2] {
336            AssistantBlock::Text { text, .. } => assert_eq!(text, "Third"),
337            _ => panic!("Expected Text block"),
338        }
339    }
340
341    // =========================================================================
342    // Reasoning block tests
343    // =========================================================================
344
345    #[test]
346    fn test_reasoning_start_delta_complete() {
347        let mut assembler = BlockAssembler::new();
348
349        assembler.on_reasoning_start();
350        assembler.on_reasoning_delta("Let me think").unwrap();
351        assembler.on_reasoning_delta("...").unwrap();
352        assembler.on_reasoning_complete(Some(Box::new(ProviderMeta::Anthropic {
353            signature: "sig_abc".to_string(),
354        })));
355
356        let blocks = assembler.finalize();
357        assert_eq!(blocks.len(), 1);
358
359        match &blocks[0] {
360            AssistantBlock::Reasoning { text, meta } => {
361                assert_eq!(text, "Let me think...");
362                match meta.as_deref() {
363                    Some(ProviderMeta::Anthropic { signature }) => {
364                        assert_eq!(signature, "sig_abc");
365                    }
366                    _ => panic!("Expected Anthropic meta"),
367                }
368            }
369            _ => panic!("Expected Reasoning block"),
370        }
371    }
372
373    #[test]
374    fn test_reasoning_complete_without_start_is_ignored() {
375        let mut assembler = BlockAssembler::new();
376
377        // Complete without prior start should be silently ignored
378        assembler.on_reasoning_complete(None);
379
380        let blocks = assembler.finalize();
381        assert!(blocks.is_empty());
382    }
383
384    #[test]
385    fn test_orphaned_reasoning_delta_returns_error() {
386        let mut assembler = BlockAssembler::new();
387
388        // Delta without prior start should error
389        let result = assembler.on_reasoning_delta("orphan");
390        assert!(matches!(
391            result,
392            Err(StreamAssemblyError::OrphanedReasoningDelta)
393        ));
394    }
395
396    // =========================================================================
397    // Tool call block tests
398    // =========================================================================
399
400    #[test]
401    fn test_tool_call_start_delta_complete() {
402        let mut assembler = BlockAssembler::new();
403
404        assembler.on_tool_call_start("tc_1".to_string()).unwrap();
405        assembler
406            .on_tool_call_delta("tc_1", Some("read_file"), r#"{"pa"#)
407            .unwrap();
408        assembler
409            .on_tool_call_delta("tc_1", None, r#"th":"#)
410            .unwrap();
411        assembler
412            .on_tool_call_delta("tc_1", None, r#""/tmp/test"}"#)
413            .unwrap();
414
415        let args = assembler.finalize_tool_args("tc_1").unwrap();
416        assembler
417            .on_tool_call_complete("tc_1".to_string(), "read_file".to_string(), args, None)
418            .unwrap();
419
420        let blocks = assembler.finalize();
421        assert_eq!(blocks.len(), 1);
422
423        match &blocks[0] {
424            AssistantBlock::ToolUse {
425                id,
426                name,
427                args,
428                meta,
429            } => {
430                assert_eq!(id, "tc_1");
431                assert_eq!(name, "read_file");
432                assert!(meta.is_none());
433                // Parse args to verify
434                let parsed: serde_json::Value = serde_json::from_str(args.get()).unwrap();
435                assert_eq!(parsed["path"], "/tmp/test");
436            }
437            _ => panic!("Expected ToolUse block"),
438        }
439    }
440
441    #[test]
442    fn test_tool_call_with_gemini_meta() {
443        let mut assembler = BlockAssembler::new();
444
445        assembler.on_tool_call_start("tc_2".to_string()).unwrap();
446        assembler
447            .on_tool_call_delta("tc_2", Some("search"), r#"{"q":"test"}"#)
448            .unwrap();
449
450        let args = assembler.finalize_tool_args("tc_2").unwrap();
451        assembler
452            .on_tool_call_complete(
453                "tc_2".to_string(),
454                "search".to_string(),
455                args,
456                Some(Box::new(ProviderMeta::Gemini {
457                    thought_signature: "gemini_sig".to_string(),
458                })),
459            )
460            .unwrap();
461
462        let blocks = assembler.finalize();
463        match &blocks[0] {
464            AssistantBlock::ToolUse { meta, .. } => match meta.as_deref() {
465                Some(ProviderMeta::Gemini { thought_signature }) => {
466                    assert_eq!(thought_signature, "gemini_sig");
467                }
468                _ => panic!("Expected Gemini meta"),
469            },
470            _ => panic!("Expected ToolUse block"),
471        }
472    }
473
474    #[test]
475    fn test_tool_call_complete_without_start_inserts_at_end() {
476        let mut assembler = BlockAssembler::new();
477
478        // Add some text first
479        assembler.on_text_delta("Hello", None);
480
481        // Complete without prior start - should still work
482        let args = RawValue::from_string(r#"{"key":"value"}"#.to_string()).unwrap();
483        assembler
484            .on_tool_call_complete(
485                "tc_orphan".to_string(),
486                "orphan_tool".to_string(),
487                args,
488                None,
489            )
490            .unwrap();
491
492        let blocks = assembler.finalize();
493        assert_eq!(blocks.len(), 2);
494
495        // Tool should be at end
496        match &blocks[1] {
497            AssistantBlock::ToolUse { id, name, .. } => {
498                assert_eq!(id, "tc_orphan");
499                assert_eq!(name, "orphan_tool");
500            }
501            _ => panic!("Expected ToolUse block"),
502        }
503    }
504
505    #[test]
506    fn test_duplicate_tool_start_returns_error() {
507        let mut assembler = BlockAssembler::new();
508
509        assembler.on_tool_call_start("tc_dup".to_string()).unwrap();
510        let result = assembler.on_tool_call_start("tc_dup".to_string());
511
512        assert!(matches!(
513            result,
514            Err(StreamAssemblyError::DuplicateToolStart(id)) if id == "tc_dup"
515        ));
516    }
517
518    #[test]
519    fn test_orphaned_tool_delta_returns_error() {
520        let mut assembler = BlockAssembler::new();
521
522        let result = assembler.on_tool_call_delta("unknown", Some("tool"), "{}");
523        assert!(matches!(
524            result,
525            Err(StreamAssemblyError::OrphanedToolDelta(id)) if id == "unknown"
526        ));
527    }
528
529    #[test]
530    fn test_finalize_tool_args_unknown_id() {
531        let assembler = BlockAssembler::new();
532
533        let result = assembler.finalize_tool_args("unknown");
534        assert!(matches!(
535            result,
536            Err(StreamAssemblyError::UnknownToolFinalize(id)) if id == "unknown"
537        ));
538    }
539
540    #[test]
541    fn test_finalize_tool_args_invalid_json() {
542        let mut assembler = BlockAssembler::new();
543
544        assembler.on_tool_call_start("tc_bad".to_string()).unwrap();
545        assembler
546            .on_tool_call_delta("tc_bad", Some("bad_tool"), r#"{"invalid"#)
547            .unwrap();
548
549        let result = assembler.finalize_tool_args("tc_bad");
550        assert!(matches!(
551            result,
552            Err(StreamAssemblyError::InvalidArgsJson { id, .. }) if id == "tc_bad"
553        ));
554    }
555
556    #[test]
557    fn test_finalize_tool_args_empty_args() {
558        let mut assembler = BlockAssembler::new();
559
560        assembler
561            .on_tool_call_start("tc_empty".to_string())
562            .unwrap();
563        // No deltas - empty args
564
565        let args = assembler.finalize_tool_args("tc_empty").unwrap();
566        let parsed: serde_json::Value = serde_json::from_str(args.get()).unwrap();
567        assert_eq!(parsed, serde_json::json!({}));
568    }
569
570    // =========================================================================
571    // Block ordering tests
572    // =========================================================================
573
574    #[test]
575    fn test_block_ordering_interleaved_events() {
576        let mut assembler = BlockAssembler::new();
577
578        // Simulate interleaved stream:
579        // 1. Text "Let me help"
580        // 2. Reasoning starts
581        // 3. Tool call starts
582        // 4. Reasoning delta
583        // 5. Tool call delta
584        // 6. More text (should NOT coalesce with #1 - reasoning is in between)
585        // 7. Reasoning complete
586        // 8. Tool complete
587
588        assembler.on_text_delta("Let me help. ", None);
589
590        assembler.on_reasoning_start();
591
592        assembler.on_tool_call_start("tc_1".to_string()).unwrap();
593
594        assembler.on_reasoning_delta("thinking...").unwrap();
595
596        assembler
597            .on_tool_call_delta("tc_1", Some("search"), r#"{"q":"x"}"#)
598            .unwrap();
599
600        assembler.on_text_delta("Done!", None);
601
602        assembler.on_reasoning_complete(Some(Box::new(ProviderMeta::Anthropic {
603            signature: "sig".to_string(),
604        })));
605
606        let args = assembler.finalize_tool_args("tc_1").unwrap();
607        assembler
608            .on_tool_call_complete("tc_1".to_string(), "search".to_string(), args, None)
609            .unwrap();
610
611        let blocks = assembler.finalize();
612
613        // Expected order: Text, Reasoning, ToolUse, Text
614        assert_eq!(blocks.len(), 4);
615
616        assert!(matches!(&blocks[0], AssistantBlock::Text { text, .. } if text == "Let me help. "));
617        assert!(
618            matches!(&blocks[1], AssistantBlock::Reasoning { text, .. } if text == "thinking...")
619        );
620        assert!(matches!(&blocks[2], AssistantBlock::ToolUse { name, .. } if name == "search"));
621        assert!(matches!(&blocks[3], AssistantBlock::Text { text, .. } if text == "Done!"));
622    }
623
624    #[test]
625    fn test_multiple_tool_calls_preserve_start_order() {
626        let mut assembler = BlockAssembler::new();
627
628        // Start two tool calls
629        assembler
630            .on_tool_call_start("tc_first".to_string())
631            .unwrap();
632        assembler
633            .on_tool_call_start("tc_second".to_string())
634            .unwrap();
635
636        // Complete in reverse order
637        assembler
638            .on_tool_call_delta("tc_second", Some("tool_b"), r"{}")
639            .unwrap();
640        let args2 = assembler.finalize_tool_args("tc_second").unwrap();
641        assembler
642            .on_tool_call_complete("tc_second".to_string(), "tool_b".to_string(), args2, None)
643            .unwrap();
644
645        assembler
646            .on_tool_call_delta("tc_first", Some("tool_a"), r"{}")
647            .unwrap();
648        let args1 = assembler.finalize_tool_args("tc_first").unwrap();
649        assembler
650            .on_tool_call_complete("tc_first".to_string(), "tool_a".to_string(), args1, None)
651            .unwrap();
652
653        let blocks = assembler.finalize();
654
655        // Should be in START order, not completion order
656        assert_eq!(blocks.len(), 2);
657        assert!(matches!(&blocks[0], AssistantBlock::ToolUse { id, .. } if id == "tc_first"));
658        assert!(matches!(&blocks[1], AssistantBlock::ToolUse { id, .. } if id == "tc_second"));
659    }
660
661    #[test]
662    fn test_pending_blocks_filtered_on_finalize() {
663        let mut assembler = BlockAssembler::new();
664
665        assembler.on_text_delta("Complete text", None);
666        assembler.on_reasoning_start(); // Started but never completed
667        assembler
668            .on_tool_call_start("tc_incomplete".to_string())
669            .unwrap(); // Started but never completed
670
671        let blocks = assembler.finalize();
672
673        // Only the completed text block should remain
674        assert_eq!(blocks.len(), 1);
675        assert!(matches!(&blocks[0], AssistantBlock::Text { .. }));
676    }
677}