1use indexmap::IndexMap;
8use meerkat_core::{AssistantBlock, ProviderMeta};
9use serde_json::value::RawValue;
10
11#[derive(Debug, thiserror::Error)]
14pub enum StreamAssemblyError {
15 #[error("delta for unknown tool call: {0}")]
16 OrphanedToolDelta(String),
17 #[error("delta for unknown reasoning block")]
18 OrphanedReasoningDelta,
19 #[error("duplicate tool call start: {0}")]
20 DuplicateToolStart(String),
21 #[error("complete event for unknown tool: {0}")]
22 UnknownToolComplete(String),
23 #[error("finalize args for unknown tool: {0}")]
24 UnknownToolFinalize(String),
25 #[error("invalid args JSON for tool {id}: {reason}")]
26 InvalidArgsJson { id: String, reason: String },
27}
28
29#[derive(Debug, Clone, Copy, PartialEq, Eq)]
31pub struct BlockKey(usize);
32
33enum BlockSlot {
35 Finalized(AssistantBlock),
36 Pending,
37}
38
39struct ToolCallBuffer {
42 name: Option<String>,
43 args_json: String,
44 block_key: BlockKey,
45}
46
47struct ReasoningBuffer {
49 text: String,
50 block_key: BlockKey,
51}
52
53pub struct BlockAssembler {
67 slots: Vec<BlockSlot>,
69 tool_buffers: IndexMap<String, ToolCallBuffer>,
71 reasoning_buffer: Option<ReasoningBuffer>,
73}
74
75impl BlockAssembler {
76 pub fn new() -> Self {
78 Self {
79 slots: Vec::new(),
80 tool_buffers: IndexMap::new(),
81 reasoning_buffer: None,
82 }
83 }
84
85 pub fn on_text_delta(&mut self, delta: &str, meta: Option<Box<ProviderMeta>>) {
90 if meta.is_none()
91 && let Some(BlockSlot::Finalized(AssistantBlock::Text { text, meta: None })) =
92 self.slots.last_mut()
93 {
94 text.push_str(delta);
95 return;
96 }
97 self.slots.push(BlockSlot::Finalized(AssistantBlock::Text {
99 text: delta.into(),
100 meta,
101 }));
102 }
103
104 pub fn on_reasoning_start(&mut self) {
106 let key = BlockKey(self.slots.len());
107 self.slots.push(BlockSlot::Pending);
108 self.reasoning_buffer = Some(ReasoningBuffer {
109 text: String::new(),
110 block_key: key,
111 });
112 }
113
114 pub fn on_reasoning_delta(&mut self, delta: &str) -> Result<(), StreamAssemblyError> {
119 let buf = self
120 .reasoning_buffer
121 .as_mut()
122 .ok_or(StreamAssemblyError::OrphanedReasoningDelta)?;
123 buf.text.push_str(delta);
124 Ok(())
125 }
126
127 pub fn on_reasoning_complete(&mut self, meta: Option<Box<ProviderMeta>>) {
131 if let Some(buf) = self.reasoning_buffer.take()
132 && let Some(slot) = self.slots.get_mut(buf.block_key.0)
133 {
134 *slot = BlockSlot::Finalized(AssistantBlock::Reasoning {
135 text: buf.text,
136 meta,
137 });
138 }
139 }
141
142 pub fn current_reasoning_text(&self) -> String {
145 self.reasoning_buffer
146 .as_ref()
147 .map(|buf| buf.text.clone())
148 .unwrap_or_default()
149 }
150
151 pub fn on_tool_call_start(&mut self, id: String) -> Result<(), StreamAssemblyError> {
156 if self.tool_buffers.contains_key(&id) {
157 return Err(StreamAssemblyError::DuplicateToolStart(id));
158 }
159 let key = BlockKey(self.slots.len());
160 self.slots.push(BlockSlot::Pending);
161 self.tool_buffers.insert(
162 id,
163 ToolCallBuffer {
164 name: None,
165 args_json: String::new(),
166 block_key: key,
167 },
168 );
169 Ok(())
170 }
171
172 pub fn on_tool_call_delta(
177 &mut self,
178 id: &str,
179 name: Option<&str>,
180 args_delta: &str,
181 ) -> Result<(), StreamAssemblyError> {
182 let buf = self
183 .tool_buffers
184 .get_mut(id)
185 .ok_or_else(|| StreamAssemblyError::OrphanedToolDelta(id.to_string()))?;
186 if let Some(n) = name {
187 buf.name = Some(n.into());
188 }
189 buf.args_json.push_str(args_delta);
190 Ok(())
191 }
192
193 pub fn finalize_tool_args(&self, id: &str) -> Result<Box<RawValue>, StreamAssemblyError> {
199 let buf = self
200 .tool_buffers
201 .get(id)
202 .ok_or_else(|| StreamAssemblyError::UnknownToolFinalize(id.to_string()))?;
203
204 let args_str = if buf.args_json.is_empty() {
206 "{}".to_string()
207 } else {
208 buf.args_json.clone()
209 };
210
211 RawValue::from_string(args_str).map_err(|e| StreamAssemblyError::InvalidArgsJson {
212 id: id.to_string(),
213 reason: e.to_string(),
214 })
215 }
216
217 pub fn on_tool_call_complete(
225 &mut self,
226 id: String,
227 name: String,
228 args: Box<RawValue>,
229 meta: Option<Box<ProviderMeta>>,
230 ) -> Result<(), StreamAssemblyError> {
231 if let Some((_, _, buf)) = self.tool_buffers.swap_remove_full(&id) {
232 if let Some(slot) = self.slots.get_mut(buf.block_key.0) {
233 *slot = BlockSlot::Finalized(AssistantBlock::ToolUse {
234 id,
235 name,
236 args,
237 meta,
238 });
239 return Ok(());
240 }
241 Ok(())
242 } else {
243 self.slots
246 .push(BlockSlot::Finalized(AssistantBlock::ToolUse {
247 id,
248 name,
249 args,
250 meta,
251 }));
252 Ok(())
253 }
254 }
255
256 pub fn finalize(self) -> Vec<AssistantBlock> {
261 self.slots
262 .into_iter()
263 .filter_map(|slot| match slot {
264 BlockSlot::Finalized(block) => Some(block),
265 BlockSlot::Pending => None,
266 })
267 .collect()
268 }
269}
270
271impl Default for BlockAssembler {
272 fn default() -> Self {
273 Self::new()
274 }
275}
276
277#[cfg(test)]
278#[allow(clippy::unwrap_used, clippy::expect_used)]
279mod tests {
280 use super::*;
281
282 #[test]
287 fn test_text_deltas_coalesce_into_single_block() {
288 let mut assembler = BlockAssembler::new();
289
290 assembler.on_text_delta("Hello", None);
291 assembler.on_text_delta(" ", None);
292 assembler.on_text_delta("World", None);
293
294 let blocks = assembler.finalize();
295 assert_eq!(blocks.len(), 1);
296
297 match &blocks[0] {
298 AssistantBlock::Text { text, meta } => {
299 assert_eq!(text, "Hello World");
300 assert!(meta.is_none());
301 }
302 _ => panic!("Expected Text block"),
303 }
304 }
305
306 #[test]
307 fn test_text_deltas_with_meta_do_not_coalesce() {
308 let mut assembler = BlockAssembler::new();
309
310 assembler.on_text_delta("First", None);
311 assembler.on_text_delta(
312 "Second",
313 Some(Box::new(ProviderMeta::Gemini {
314 thought_signature: "sig1".to_string(),
315 })),
316 );
317 assembler.on_text_delta("Third", None);
318
319 let blocks = assembler.finalize();
320 assert_eq!(blocks.len(), 3);
321
322 match &blocks[0] {
324 AssistantBlock::Text { text, .. } => assert_eq!(text, "First"),
325 _ => panic!("Expected Text block"),
326 }
327 match &blocks[1] {
328 AssistantBlock::Text { text, meta } => {
329 assert_eq!(text, "Second");
330 assert!(meta.is_some());
331 }
332 _ => panic!("Expected Text block"),
333 }
334 match &blocks[2] {
336 AssistantBlock::Text { text, .. } => assert_eq!(text, "Third"),
337 _ => panic!("Expected Text block"),
338 }
339 }
340
341 #[test]
346 fn test_reasoning_start_delta_complete() {
347 let mut assembler = BlockAssembler::new();
348
349 assembler.on_reasoning_start();
350 assembler.on_reasoning_delta("Let me think").unwrap();
351 assembler.on_reasoning_delta("...").unwrap();
352 assembler.on_reasoning_complete(Some(Box::new(ProviderMeta::Anthropic {
353 signature: "sig_abc".to_string(),
354 })));
355
356 let blocks = assembler.finalize();
357 assert_eq!(blocks.len(), 1);
358
359 match &blocks[0] {
360 AssistantBlock::Reasoning { text, meta } => {
361 assert_eq!(text, "Let me think...");
362 match meta.as_deref() {
363 Some(ProviderMeta::Anthropic { signature }) => {
364 assert_eq!(signature, "sig_abc");
365 }
366 _ => panic!("Expected Anthropic meta"),
367 }
368 }
369 _ => panic!("Expected Reasoning block"),
370 }
371 }
372
373 #[test]
374 fn test_reasoning_complete_without_start_is_ignored() {
375 let mut assembler = BlockAssembler::new();
376
377 assembler.on_reasoning_complete(None);
379
380 let blocks = assembler.finalize();
381 assert!(blocks.is_empty());
382 }
383
384 #[test]
385 fn test_orphaned_reasoning_delta_returns_error() {
386 let mut assembler = BlockAssembler::new();
387
388 let result = assembler.on_reasoning_delta("orphan");
390 assert!(matches!(
391 result,
392 Err(StreamAssemblyError::OrphanedReasoningDelta)
393 ));
394 }
395
396 #[test]
401 fn test_tool_call_start_delta_complete() {
402 let mut assembler = BlockAssembler::new();
403
404 assembler.on_tool_call_start("tc_1".to_string()).unwrap();
405 assembler
406 .on_tool_call_delta("tc_1", Some("read_file"), r#"{"pa"#)
407 .unwrap();
408 assembler
409 .on_tool_call_delta("tc_1", None, r#"th":"#)
410 .unwrap();
411 assembler
412 .on_tool_call_delta("tc_1", None, r#""/tmp/test"}"#)
413 .unwrap();
414
415 let args = assembler.finalize_tool_args("tc_1").unwrap();
416 assembler
417 .on_tool_call_complete("tc_1".to_string(), "read_file".to_string(), args, None)
418 .unwrap();
419
420 let blocks = assembler.finalize();
421 assert_eq!(blocks.len(), 1);
422
423 match &blocks[0] {
424 AssistantBlock::ToolUse {
425 id,
426 name,
427 args,
428 meta,
429 } => {
430 assert_eq!(id, "tc_1");
431 assert_eq!(name, "read_file");
432 assert!(meta.is_none());
433 let parsed: serde_json::Value = serde_json::from_str(args.get()).unwrap();
435 assert_eq!(parsed["path"], "/tmp/test");
436 }
437 _ => panic!("Expected ToolUse block"),
438 }
439 }
440
441 #[test]
442 fn test_tool_call_with_gemini_meta() {
443 let mut assembler = BlockAssembler::new();
444
445 assembler.on_tool_call_start("tc_2".to_string()).unwrap();
446 assembler
447 .on_tool_call_delta("tc_2", Some("search"), r#"{"q":"test"}"#)
448 .unwrap();
449
450 let args = assembler.finalize_tool_args("tc_2").unwrap();
451 assembler
452 .on_tool_call_complete(
453 "tc_2".to_string(),
454 "search".to_string(),
455 args,
456 Some(Box::new(ProviderMeta::Gemini {
457 thought_signature: "gemini_sig".to_string(),
458 })),
459 )
460 .unwrap();
461
462 let blocks = assembler.finalize();
463 match &blocks[0] {
464 AssistantBlock::ToolUse { meta, .. } => match meta.as_deref() {
465 Some(ProviderMeta::Gemini { thought_signature }) => {
466 assert_eq!(thought_signature, "gemini_sig");
467 }
468 _ => panic!("Expected Gemini meta"),
469 },
470 _ => panic!("Expected ToolUse block"),
471 }
472 }
473
474 #[test]
475 fn test_tool_call_complete_without_start_inserts_at_end() {
476 let mut assembler = BlockAssembler::new();
477
478 assembler.on_text_delta("Hello", None);
480
481 let args = RawValue::from_string(r#"{"key":"value"}"#.to_string()).unwrap();
483 assembler
484 .on_tool_call_complete(
485 "tc_orphan".to_string(),
486 "orphan_tool".to_string(),
487 args,
488 None,
489 )
490 .unwrap();
491
492 let blocks = assembler.finalize();
493 assert_eq!(blocks.len(), 2);
494
495 match &blocks[1] {
497 AssistantBlock::ToolUse { id, name, .. } => {
498 assert_eq!(id, "tc_orphan");
499 assert_eq!(name, "orphan_tool");
500 }
501 _ => panic!("Expected ToolUse block"),
502 }
503 }
504
505 #[test]
506 fn test_duplicate_tool_start_returns_error() {
507 let mut assembler = BlockAssembler::new();
508
509 assembler.on_tool_call_start("tc_dup".to_string()).unwrap();
510 let result = assembler.on_tool_call_start("tc_dup".to_string());
511
512 assert!(matches!(
513 result,
514 Err(StreamAssemblyError::DuplicateToolStart(id)) if id == "tc_dup"
515 ));
516 }
517
518 #[test]
519 fn test_orphaned_tool_delta_returns_error() {
520 let mut assembler = BlockAssembler::new();
521
522 let result = assembler.on_tool_call_delta("unknown", Some("tool"), "{}");
523 assert!(matches!(
524 result,
525 Err(StreamAssemblyError::OrphanedToolDelta(id)) if id == "unknown"
526 ));
527 }
528
529 #[test]
530 fn test_finalize_tool_args_unknown_id() {
531 let assembler = BlockAssembler::new();
532
533 let result = assembler.finalize_tool_args("unknown");
534 assert!(matches!(
535 result,
536 Err(StreamAssemblyError::UnknownToolFinalize(id)) if id == "unknown"
537 ));
538 }
539
540 #[test]
541 fn test_finalize_tool_args_invalid_json() {
542 let mut assembler = BlockAssembler::new();
543
544 assembler.on_tool_call_start("tc_bad".to_string()).unwrap();
545 assembler
546 .on_tool_call_delta("tc_bad", Some("bad_tool"), r#"{"invalid"#)
547 .unwrap();
548
549 let result = assembler.finalize_tool_args("tc_bad");
550 assert!(matches!(
551 result,
552 Err(StreamAssemblyError::InvalidArgsJson { id, .. }) if id == "tc_bad"
553 ));
554 }
555
556 #[test]
557 fn test_finalize_tool_args_empty_args() {
558 let mut assembler = BlockAssembler::new();
559
560 assembler
561 .on_tool_call_start("tc_empty".to_string())
562 .unwrap();
563 let args = assembler.finalize_tool_args("tc_empty").unwrap();
566 let parsed: serde_json::Value = serde_json::from_str(args.get()).unwrap();
567 assert_eq!(parsed, serde_json::json!({}));
568 }
569
570 #[test]
575 fn test_block_ordering_interleaved_events() {
576 let mut assembler = BlockAssembler::new();
577
578 assembler.on_text_delta("Let me help. ", None);
589
590 assembler.on_reasoning_start();
591
592 assembler.on_tool_call_start("tc_1".to_string()).unwrap();
593
594 assembler.on_reasoning_delta("thinking...").unwrap();
595
596 assembler
597 .on_tool_call_delta("tc_1", Some("search"), r#"{"q":"x"}"#)
598 .unwrap();
599
600 assembler.on_text_delta("Done!", None);
601
602 assembler.on_reasoning_complete(Some(Box::new(ProviderMeta::Anthropic {
603 signature: "sig".to_string(),
604 })));
605
606 let args = assembler.finalize_tool_args("tc_1").unwrap();
607 assembler
608 .on_tool_call_complete("tc_1".to_string(), "search".to_string(), args, None)
609 .unwrap();
610
611 let blocks = assembler.finalize();
612
613 assert_eq!(blocks.len(), 4);
615
616 assert!(matches!(&blocks[0], AssistantBlock::Text { text, .. } if text == "Let me help. "));
617 assert!(
618 matches!(&blocks[1], AssistantBlock::Reasoning { text, .. } if text == "thinking...")
619 );
620 assert!(matches!(&blocks[2], AssistantBlock::ToolUse { name, .. } if name == "search"));
621 assert!(matches!(&blocks[3], AssistantBlock::Text { text, .. } if text == "Done!"));
622 }
623
624 #[test]
625 fn test_multiple_tool_calls_preserve_start_order() {
626 let mut assembler = BlockAssembler::new();
627
628 assembler
630 .on_tool_call_start("tc_first".to_string())
631 .unwrap();
632 assembler
633 .on_tool_call_start("tc_second".to_string())
634 .unwrap();
635
636 assembler
638 .on_tool_call_delta("tc_second", Some("tool_b"), r"{}")
639 .unwrap();
640 let args2 = assembler.finalize_tool_args("tc_second").unwrap();
641 assembler
642 .on_tool_call_complete("tc_second".to_string(), "tool_b".to_string(), args2, None)
643 .unwrap();
644
645 assembler
646 .on_tool_call_delta("tc_first", Some("tool_a"), r"{}")
647 .unwrap();
648 let args1 = assembler.finalize_tool_args("tc_first").unwrap();
649 assembler
650 .on_tool_call_complete("tc_first".to_string(), "tool_a".to_string(), args1, None)
651 .unwrap();
652
653 let blocks = assembler.finalize();
654
655 assert_eq!(blocks.len(), 2);
657 assert!(matches!(&blocks[0], AssistantBlock::ToolUse { id, .. } if id == "tc_first"));
658 assert!(matches!(&blocks[1], AssistantBlock::ToolUse { id, .. } if id == "tc_second"));
659 }
660
661 #[test]
662 fn test_pending_blocks_filtered_on_finalize() {
663 let mut assembler = BlockAssembler::new();
664
665 assembler.on_text_delta("Complete text", None);
666 assembler.on_reasoning_start(); assembler
668 .on_tool_call_start("tc_incomplete".to_string())
669 .unwrap(); let blocks = assembler.finalize();
672
673 assert_eq!(blocks.len(), 1);
675 assert!(matches!(&blocks[0], AssistantBlock::Text { .. }));
676 }
677}