Skip to main content

lutum_protocol/
reducer.rs

1use std::sync::Arc;
2
3use thiserror::Error;
4
5use crate::{
6    budget::Usage,
7    conversation::{AssistantTurn, AssistantTurnItem, RawJson, ToolCallId},
8    llm::{
9        CompletionEvent, FinishReason, StructuredCompletionEvent, StructuredTurnEvent,
10        TextTurnEvent,
11    },
12    structured::StructuredOutput,
13    toolset::{ToolCallWrapper, Toolset},
14    transcript::CommittedTurn,
15};
16
17#[derive(Debug)]
18pub struct TextTurnState<T: Toolset> {
19    pub request_id: Option<String>,
20    pub model: String,
21    pub assistant_turn: Vec<AssistantTurnItem>,
22    pub tool_calls: Vec<T::ToolCall>,
23    pub finish_reason: Option<FinishReason>,
24    pub usage: Option<Usage>,
25    pub committed_turn: Option<CommittedTurn>,
26}
27
28impl<T> Default for TextTurnState<T>
29where
30    T: Toolset,
31{
32    fn default() -> Self {
33        Self {
34            request_id: None,
35            model: String::new(),
36            assistant_turn: Vec::new(),
37            tool_calls: Vec::new(),
38            finish_reason: None,
39            usage: None,
40            committed_turn: None,
41        }
42    }
43}
44
45impl<T> Clone for TextTurnState<T>
46where
47    T: Toolset,
48{
49    fn clone(&self) -> Self {
50        Self {
51            request_id: self.request_id.clone(),
52            model: self.model.clone(),
53            assistant_turn: self.assistant_turn.clone(),
54            tool_calls: self.tool_calls.clone(),
55            finish_reason: self.finish_reason.clone(),
56            usage: self.usage,
57            committed_turn: self.committed_turn.clone(),
58        }
59    }
60}
61
62impl<T> PartialEq for TextTurnState<T>
63where
64    T: Toolset,
65    T::ToolCall: PartialEq,
66{
67    fn eq(&self, other: &Self) -> bool {
68        self.request_id == other.request_id
69            && self.model == other.model
70            && self.assistant_turn == other.assistant_turn
71            && self.tool_calls == other.tool_calls
72            && self.finish_reason == other.finish_reason
73            && self.usage == other.usage
74            && committed_turn_option_eq(&self.committed_turn, &other.committed_turn)
75    }
76}
77
78impl<T> Eq for TextTurnState<T>
79where
80    T: Toolset,
81    T::ToolCall: Eq,
82{
83}
84
85impl<T> TextTurnState<T>
86where
87    T: Toolset,
88{
89    pub fn assistant_text(&self) -> String {
90        assistant_text(&self.assistant_turn)
91    }
92
93    /// Apply a streaming event to advance this turn state.
94    ///
95    /// Returns an error if the turn has already completed.
96    pub fn apply(&mut self, event: &TextTurnEvent<T>) -> Result<(), TextTurnReductionError> {
97        if self.finish_reason.is_some() {
98            return Err(TextTurnReductionError::AlreadyCompleted);
99        }
100
101        match event {
102            TextTurnEvent::Started { request_id, model } => {
103                self.request_id = request_id.clone();
104                self.model = model.clone();
105            }
106            TextTurnEvent::TextDelta { delta } => {
107                push_or_extend_text(&mut self.assistant_turn, delta);
108            }
109            TextTurnEvent::ReasoningDelta { delta } => {
110                push_or_extend_reasoning(&mut self.assistant_turn, delta);
111            }
112            TextTurnEvent::RefusalDelta { delta } => {
113                push_or_extend_refusal(&mut self.assistant_turn, delta);
114            }
115            TextTurnEvent::ToolCallChunk { .. } => {}
116            TextTurnEvent::ToolCallReady(tool_call) => {
117                push_tool_call(&mut self.assistant_turn, &mut self.tool_calls, tool_call);
118            }
119            TextTurnEvent::Completed {
120                request_id,
121                finish_reason,
122                usage,
123                committed_turn,
124            } => {
125                if let Some(request_id) = request_id.clone() {
126                    self.request_id = Some(request_id);
127                }
128                self.finish_reason = Some(finish_reason.clone());
129                self.usage = Some(*usage);
130                self.committed_turn = Some(committed_turn.clone());
131            }
132        }
133
134        Ok(())
135    }
136
137    /// Finalize the accumulated state into a completed turn result.
138    ///
139    /// Returns [`TextTurnReductionError::Incomplete`] if the turn has not yet
140    /// received a `Completed` event, and
141    /// [`TextTurnReductionError::EmptyAssistantOutput`] if the completed turn
142    /// produced no assistant items.
143    pub fn finish(self) -> Result<TextTurnResult<T>, TextTurnReductionError> {
144        // Check completion first so callers get Incomplete (not EmptyAssistantOutput)
145        // when finish() is called on a fresh or mid-stream state.
146        let finish_reason = self
147            .finish_reason
148            .ok_or(TextTurnReductionError::Incomplete)?;
149        let usage = self.usage.ok_or(TextTurnReductionError::Incomplete)?;
150        let committed_turn = self
151            .committed_turn
152            .ok_or(TextTurnReductionError::Incomplete)?;
153        let assistant_turn = AssistantTurn::from_items(self.assistant_turn)
154            .map_err(|_| TextTurnReductionError::EmptyAssistantOutput)?;
155        Ok(TextTurnResult {
156            request_id: self.request_id,
157            model: self.model,
158            assistant_turn,
159            tool_calls: self.tool_calls,
160            finish_reason,
161            usage,
162            committed_turn,
163        })
164    }
165}
166
167#[derive(Debug)]
168pub struct TextTurnResult<T: Toolset> {
169    pub request_id: Option<String>,
170    pub model: String,
171    pub assistant_turn: AssistantTurn,
172    pub tool_calls: Vec<T::ToolCall>,
173    pub finish_reason: FinishReason,
174    pub usage: Usage,
175    pub committed_turn: CommittedTurn,
176}
177
178impl<T> TextTurnResult<T>
179where
180    T: Toolset,
181{
182    pub fn assistant_text(&self) -> String {
183        self.assistant_turn.assistant_text()
184    }
185}
186
187impl<T> Clone for TextTurnResult<T>
188where
189    T: Toolset,
190{
191    fn clone(&self) -> Self {
192        Self {
193            request_id: self.request_id.clone(),
194            model: self.model.clone(),
195            assistant_turn: self.assistant_turn.clone(),
196            tool_calls: self.tool_calls.clone(),
197            finish_reason: self.finish_reason.clone(),
198            usage: self.usage,
199            committed_turn: self.committed_turn.clone(),
200        }
201    }
202}
203
204#[derive(Debug)]
205pub struct StructuredTurnState<T: Toolset, O: StructuredOutput> {
206    pub request_id: Option<String>,
207    pub model: String,
208    pub assistant_turn: Vec<AssistantTurnItem>,
209    pub tool_calls: Vec<T::ToolCall>,
210    pub structured: Option<O>,
211    pub refusal: Option<String>,
212    pub finish_reason: Option<FinishReason>,
213    pub usage: Option<Usage>,
214    pub committed_turn: Option<CommittedTurn>,
215}
216
217impl<T, O> Default for StructuredTurnState<T, O>
218where
219    T: Toolset,
220    O: StructuredOutput,
221{
222    fn default() -> Self {
223        Self {
224            request_id: None,
225            model: String::new(),
226            assistant_turn: Vec::new(),
227            tool_calls: Vec::new(),
228            structured: None,
229            refusal: None,
230            finish_reason: None,
231            usage: None,
232            committed_turn: None,
233        }
234    }
235}
236
237impl<T, O> Clone for StructuredTurnState<T, O>
238where
239    T: Toolset,
240    O: StructuredOutput,
241{
242    fn clone(&self) -> Self {
243        Self {
244            request_id: self.request_id.clone(),
245            model: self.model.clone(),
246            assistant_turn: self.assistant_turn.clone(),
247            tool_calls: self.tool_calls.clone(),
248            structured: self.structured.clone(),
249            refusal: self.refusal.clone(),
250            finish_reason: self.finish_reason.clone(),
251            usage: self.usage,
252            committed_turn: self.committed_turn.clone(),
253        }
254    }
255}
256
257impl<T, O> PartialEq for StructuredTurnState<T, O>
258where
259    T: Toolset,
260    T::ToolCall: PartialEq,
261    O: StructuredOutput + PartialEq,
262{
263    fn eq(&self, other: &Self) -> bool {
264        self.request_id == other.request_id
265            && self.model == other.model
266            && self.assistant_turn == other.assistant_turn
267            && self.tool_calls == other.tool_calls
268            && self.structured == other.structured
269            && self.refusal == other.refusal
270            && self.finish_reason == other.finish_reason
271            && self.usage == other.usage
272            && committed_turn_option_eq(&self.committed_turn, &other.committed_turn)
273    }
274}
275
276impl<T, O> Eq for StructuredTurnState<T, O>
277where
278    T: Toolset,
279    T::ToolCall: Eq,
280    O: StructuredOutput + Eq,
281{
282}
283
284impl<T, O> StructuredTurnState<T, O>
285where
286    T: Toolset,
287    O: StructuredOutput,
288{
289    /// Apply a streaming event to advance this turn state.
290    ///
291    /// Returns an error if the turn has already completed.
292    pub fn apply(
293        &mut self,
294        event: &StructuredTurnEvent<T, O>,
295    ) -> Result<(), StructuredTurnReductionError> {
296        if self.finish_reason.is_some() {
297            return Err(StructuredTurnReductionError::AlreadyCompleted);
298        }
299
300        match event {
301            StructuredTurnEvent::Started { request_id, model } => {
302                self.request_id = request_id.clone();
303                self.model = model.clone();
304            }
305            StructuredTurnEvent::StructuredOutputChunk { json_delta } => {
306                push_or_extend_text(&mut self.assistant_turn, json_delta);
307            }
308            StructuredTurnEvent::StructuredOutputReady(value) => {
309                if self.structured.is_some() {
310                    return Err(StructuredTurnReductionError::DuplicateStructuredOutput);
311                }
312                self.structured = Some(value.clone());
313            }
314            StructuredTurnEvent::ReasoningDelta { delta } => {
315                push_or_extend_reasoning(&mut self.assistant_turn, delta);
316            }
317            StructuredTurnEvent::RefusalDelta { delta } => {
318                push_or_extend_refusal(&mut self.assistant_turn, delta);
319                if let Some(existing) = self.refusal.as_mut() {
320                    existing.push_str(delta);
321                } else {
322                    self.refusal = Some(delta.clone());
323                }
324            }
325            StructuredTurnEvent::ToolCallChunk { .. } => {}
326            StructuredTurnEvent::ToolCallReady(tool_call) => {
327                push_tool_call(&mut self.assistant_turn, &mut self.tool_calls, tool_call);
328            }
329            StructuredTurnEvent::Completed {
330                request_id,
331                finish_reason,
332                usage,
333                committed_turn,
334            } => {
335                if let Some(request_id) = request_id.clone() {
336                    self.request_id = Some(request_id);
337                }
338                self.finish_reason = Some(finish_reason.clone());
339                self.usage = Some(*usage);
340                self.committed_turn = Some(committed_turn.clone());
341            }
342        }
343
344        Ok(())
345    }
346
347    /// Finalize the accumulated state into a completed turn result.
348    ///
349    /// Returns [`StructuredTurnReductionError::Incomplete`] if the turn has
350    /// not yet received a `Completed` event, and
351    /// [`StructuredTurnReductionError::EmptyAssistantOutput`] if the completed
352    /// turn produced no assistant items.
353    pub fn finish(self) -> Result<StructuredTurnResult<T, O>, StructuredTurnReductionError> {
354        // Check completion first so callers get Incomplete (not EmptyAssistantOutput)
355        // when finish() is called on a fresh or mid-stream state.
356        let finish_reason = self
357            .finish_reason
358            .ok_or(StructuredTurnReductionError::Incomplete)?;
359        let usage = self.usage.ok_or(StructuredTurnReductionError::Incomplete)?;
360        let committed_turn = self
361            .committed_turn
362            .ok_or(StructuredTurnReductionError::Incomplete)?;
363        let assistant_turn = AssistantTurn::from_items(self.assistant_turn)
364            .map_err(|_| StructuredTurnReductionError::EmptyAssistantOutput)?;
365        let semantic = match (self.structured, self.refusal) {
366            (Some(value), None) => StructuredTurnOutcome::Structured(value),
367            (None, Some(refusal)) => StructuredTurnOutcome::Refusal(refusal),
368            (None, None) => return Err(StructuredTurnReductionError::MissingSemantic),
369            (Some(_), Some(_)) => return Err(StructuredTurnReductionError::ConflictingSemantic),
370        };
371
372        Ok(StructuredTurnResult {
373            request_id: self.request_id,
374            model: self.model,
375            assistant_turn,
376            tool_calls: self.tool_calls,
377            semantic,
378            finish_reason,
379            usage,
380            committed_turn,
381        })
382    }
383}
384
385#[derive(Clone, Debug, Eq, PartialEq)]
386pub enum StructuredTurnOutcome<O> {
387    Structured(O),
388    Refusal(String),
389}
390
391#[derive(Debug)]
392pub struct StructuredTurnResult<T: Toolset, O: StructuredOutput> {
393    pub request_id: Option<String>,
394    pub model: String,
395    pub assistant_turn: AssistantTurn,
396    pub tool_calls: Vec<T::ToolCall>,
397    pub semantic: StructuredTurnOutcome<O>,
398    pub finish_reason: FinishReason,
399    pub usage: Usage,
400    pub committed_turn: CommittedTurn,
401}
402
403impl<T, O> Clone for StructuredTurnResult<T, O>
404where
405    T: Toolset,
406    O: StructuredOutput,
407{
408    fn clone(&self) -> Self {
409        Self {
410            request_id: self.request_id.clone(),
411            model: self.model.clone(),
412            assistant_turn: self.assistant_turn.clone(),
413            tool_calls: self.tool_calls.clone(),
414            semantic: self.semantic.clone(),
415            finish_reason: self.finish_reason.clone(),
416            usage: self.usage,
417            committed_turn: self.committed_turn.clone(),
418        }
419    }
420}
421
422#[derive(Clone, Debug, Eq, PartialEq, Default)]
423pub struct CompletionTurnState {
424    pub request_id: Option<String>,
425    pub model: String,
426    pub text: String,
427    pub finish_reason: Option<FinishReason>,
428    pub usage: Option<Usage>,
429}
430
431impl CompletionTurnState {
432    /// Apply a streaming event to advance this turn state.
433    ///
434    /// Returns an error if the turn has already completed.
435    pub fn apply(&mut self, event: &CompletionEvent) -> Result<(), CompletionReductionError> {
436        if self.finish_reason.is_some() {
437            return Err(CompletionReductionError::AlreadyCompleted);
438        }
439
440        match event {
441            CompletionEvent::Started { request_id, model } => {
442                self.request_id = request_id.clone();
443                self.model = model.clone();
444            }
445            CompletionEvent::TextDelta(delta) => {
446                self.text.push_str(delta);
447            }
448            CompletionEvent::Completed {
449                request_id,
450                finish_reason,
451                usage,
452            } => {
453                if let Some(request_id) = request_id.clone() {
454                    self.request_id = Some(request_id);
455                }
456                self.finish_reason = Some(finish_reason.clone());
457                self.usage = Some(*usage);
458            }
459        }
460
461        Ok(())
462    }
463
464    /// Finalize the accumulated state into a completed turn result.
465    pub fn finish(self) -> Result<CompletionTurnResult, CompletionReductionError> {
466        Ok(CompletionTurnResult {
467            request_id: self.request_id,
468            model: self.model,
469            text: self.text,
470            finish_reason: self
471                .finish_reason
472                .ok_or(CompletionReductionError::Incomplete)?,
473            usage: self.usage.ok_or(CompletionReductionError::Incomplete)?,
474        })
475    }
476}
477
478#[derive(Clone, Debug, Eq, PartialEq)]
479pub struct CompletionTurnResult {
480    pub request_id: Option<String>,
481    pub model: String,
482    pub text: String,
483    pub finish_reason: FinishReason,
484    pub usage: Usage,
485}
486
487#[derive(Clone, Debug)]
488pub struct StructuredCompletionState<O: StructuredOutput> {
489    pub request_id: Option<String>,
490    pub model: String,
491    pub structured: Option<O>,
492    pub refusal: Option<String>,
493    pub finish_reason: Option<FinishReason>,
494    pub usage: Option<Usage>,
495}
496
497impl<O> Default for StructuredCompletionState<O>
498where
499    O: StructuredOutput,
500{
501    fn default() -> Self {
502        Self {
503            request_id: None,
504            model: String::new(),
505            structured: None,
506            refusal: None,
507            finish_reason: None,
508            usage: None,
509        }
510    }
511}
512
513impl<O> StructuredCompletionState<O>
514where
515    O: StructuredOutput,
516{
517    pub fn apply(
518        &mut self,
519        event: &StructuredCompletionEvent<O>,
520    ) -> Result<(), StructuredCompletionReductionError> {
521        if self.finish_reason.is_some() {
522            return Err(StructuredCompletionReductionError::AlreadyCompleted);
523        }
524
525        match event {
526            StructuredCompletionEvent::Started { request_id, model } => {
527                self.request_id = request_id.clone();
528                self.model = model.clone();
529            }
530            StructuredCompletionEvent::StructuredOutputChunk { .. } => {}
531            StructuredCompletionEvent::StructuredOutputReady(value) => {
532                if self.structured.is_some() {
533                    return Err(StructuredCompletionReductionError::DuplicateStructuredOutput);
534                }
535                self.structured = Some(value.clone());
536            }
537            StructuredCompletionEvent::ReasoningDelta { .. } => {}
538            StructuredCompletionEvent::RefusalDelta { delta } => match self.refusal.as_mut() {
539                Some(existing) => existing.push_str(delta),
540                None => self.refusal = Some(delta.clone()),
541            },
542            StructuredCompletionEvent::Completed {
543                request_id,
544                finish_reason,
545                usage,
546            } => {
547                if let Some(request_id) = request_id.clone() {
548                    self.request_id = Some(request_id);
549                }
550                self.finish_reason = Some(finish_reason.clone());
551                self.usage = Some(*usage);
552            }
553        }
554
555        Ok(())
556    }
557
558    pub fn finish(
559        self,
560    ) -> Result<StructuredCompletionResult<O>, StructuredCompletionReductionError> {
561        let semantic = match (self.structured, self.refusal) {
562            (Some(value), None) => StructuredTurnOutcome::Structured(value),
563            (None, Some(refusal)) => StructuredTurnOutcome::Refusal(refusal),
564            (None, None) => return Err(StructuredCompletionReductionError::MissingSemantic),
565            (Some(_), Some(_)) => {
566                return Err(StructuredCompletionReductionError::ConflictingSemantic);
567            }
568        };
569
570        Ok(StructuredCompletionResult {
571            request_id: self.request_id,
572            model: self.model,
573            semantic,
574            finish_reason: self
575                .finish_reason
576                .ok_or(StructuredCompletionReductionError::Incomplete)?,
577            usage: self
578                .usage
579                .ok_or(StructuredCompletionReductionError::Incomplete)?,
580        })
581    }
582}
583
584#[derive(Clone, Debug, Eq, PartialEq)]
585pub struct StructuredCompletionResult<O: StructuredOutput> {
586    pub request_id: Option<String>,
587    pub model: String,
588    pub semantic: StructuredTurnOutcome<O>,
589    pub finish_reason: FinishReason,
590    pub usage: Usage,
591}
592
593#[derive(Debug, Error, Clone, Eq, PartialEq)]
594pub enum TextTurnReductionError {
595    #[error("turn already completed")]
596    AlreadyCompleted,
597    #[error("completed turn produced no assistant items")]
598    EmptyAssistantOutput,
599    #[error("turn has not completed yet")]
600    Incomplete,
601}
602
603#[derive(Debug, Error, Clone, Eq, PartialEq)]
604pub enum StructuredTurnReductionError {
605    #[error("turn already completed")]
606    AlreadyCompleted,
607    #[error("structured output appeared more than once")]
608    DuplicateStructuredOutput,
609    #[error("completed turn produced no assistant items")]
610    EmptyAssistantOutput,
611    #[error("turn has not completed yet")]
612    Incomplete,
613    #[error("turn completed without structured output or refusal")]
614    MissingSemantic,
615    #[error("turn completed with both structured output and refusal")]
616    ConflictingSemantic,
617}
618
619#[derive(Debug, Error, Clone, Eq, PartialEq)]
620pub enum CompletionReductionError {
621    #[error("turn already completed")]
622    AlreadyCompleted,
623    #[error("turn has not completed yet")]
624    Incomplete,
625}
626
627#[derive(Debug, Error, Clone, Eq, PartialEq)]
628pub enum StructuredCompletionReductionError {
629    #[error("turn already completed")]
630    AlreadyCompleted,
631    #[error("structured output appeared more than once")]
632    DuplicateStructuredOutput,
633    #[error("turn has not completed yet")]
634    Incomplete,
635    #[error("turn completed without structured output or refusal")]
636    MissingSemantic,
637    #[error("turn completed with both structured output and refusal")]
638    ConflictingSemantic,
639}
640
641pub struct TextTurnReducer<T: Toolset> {
642    state: TextTurnState<T>,
643}
644
645impl<T> Default for TextTurnReducer<T>
646where
647    T: Toolset,
648{
649    fn default() -> Self {
650        Self::new()
651    }
652}
653
654impl<T> TextTurnReducer<T>
655where
656    T: Toolset,
657{
658    pub fn new() -> Self {
659        Self {
660            state: TextTurnState::default(),
661        }
662    }
663
664    pub fn state(&self) -> &TextTurnState<T> {
665        &self.state
666    }
667
668    pub fn into_state(self) -> TextTurnState<T> {
669        self.state
670    }
671
672    pub fn apply(&mut self, event: &TextTurnEvent<T>) -> Result<(), TextTurnReductionError> {
673        self.state.apply(event)
674    }
675
676    pub fn into_result(self) -> Result<TextTurnResult<T>, TextTurnReductionError> {
677        self.state.finish()
678    }
679}
680
681pub struct StructuredTurnReducer<T: Toolset, O: StructuredOutput> {
682    state: StructuredTurnState<T, O>,
683}
684
685impl<T, O> Default for StructuredTurnReducer<T, O>
686where
687    T: Toolset,
688    O: StructuredOutput,
689{
690    fn default() -> Self {
691        Self::new()
692    }
693}
694
695impl<T, O> StructuredTurnReducer<T, O>
696where
697    T: Toolset,
698    O: StructuredOutput,
699{
700    pub fn new() -> Self {
701        Self {
702            state: StructuredTurnState::default(),
703        }
704    }
705
706    pub fn state(&self) -> &StructuredTurnState<T, O> {
707        &self.state
708    }
709
710    pub fn into_state(self) -> StructuredTurnState<T, O> {
711        self.state
712    }
713
714    pub fn apply(
715        &mut self,
716        event: &StructuredTurnEvent<T, O>,
717    ) -> Result<(), StructuredTurnReductionError> {
718        self.state.apply(event)
719    }
720
721    pub fn into_result(
722        self,
723    ) -> Result<StructuredTurnResult<T, O>, (StructuredTurnReductionError, Option<CommittedTurn>)>
724    {
725        let committed_turn = self.state.committed_turn.clone();
726        self.state
727            .finish()
728            .map_err(|source| (source, committed_turn))
729    }
730}
731
732pub struct CompletionReducer {
733    state: CompletionTurnState,
734}
735
736impl Default for CompletionReducer {
737    fn default() -> Self {
738        Self::new()
739    }
740}
741
742impl CompletionReducer {
743    pub fn new() -> Self {
744        Self {
745            state: CompletionTurnState::default(),
746        }
747    }
748
749    pub fn state(&self) -> &CompletionTurnState {
750        &self.state
751    }
752
753    pub fn into_state(self) -> CompletionTurnState {
754        self.state
755    }
756
757    pub fn apply(&mut self, event: &CompletionEvent) -> Result<(), CompletionReductionError> {
758        self.state.apply(event)
759    }
760
761    pub fn into_result(self) -> Result<CompletionTurnResult, CompletionReductionError> {
762        self.state.finish()
763    }
764}
765
766pub struct StructuredCompletionReducer<O: StructuredOutput> {
767    state: StructuredCompletionState<O>,
768}
769
770impl<O> Default for StructuredCompletionReducer<O>
771where
772    O: StructuredOutput,
773{
774    fn default() -> Self {
775        Self::new()
776    }
777}
778
779impl<O> StructuredCompletionReducer<O>
780where
781    O: StructuredOutput,
782{
783    pub fn new() -> Self {
784        Self {
785            state: StructuredCompletionState::default(),
786        }
787    }
788
789    pub fn state(&self) -> &StructuredCompletionState<O> {
790        &self.state
791    }
792
793    pub fn into_state(self) -> StructuredCompletionState<O> {
794        self.state
795    }
796
797    pub fn apply(
798        &mut self,
799        event: &StructuredCompletionEvent<O>,
800    ) -> Result<(), StructuredCompletionReductionError> {
801        self.state.apply(event)
802    }
803
804    pub fn into_result(
805        self,
806    ) -> Result<StructuredCompletionResult<O>, StructuredCompletionReductionError> {
807        self.state.finish()
808    }
809}
810
811fn push_or_extend_text(items: &mut Vec<AssistantTurnItem>, delta: &str) {
812    if delta.is_empty() {
813        return;
814    }
815    match items.last_mut() {
816        Some(AssistantTurnItem::Text(existing)) => existing.push_str(delta),
817        _ => items.push(AssistantTurnItem::Text(delta.to_string())),
818    }
819}
820
821fn push_or_extend_reasoning(items: &mut Vec<AssistantTurnItem>, delta: &str) {
822    if delta.is_empty() {
823        return;
824    }
825    match items.last_mut() {
826        Some(AssistantTurnItem::Reasoning(existing)) => existing.push_str(delta),
827        _ => items.push(AssistantTurnItem::Reasoning(delta.to_string())),
828    }
829}
830
831fn push_or_extend_refusal(items: &mut Vec<AssistantTurnItem>, delta: &str) {
832    if delta.is_empty() {
833        return;
834    }
835    match items.last_mut() {
836        Some(AssistantTurnItem::Refusal(existing)) => existing.push_str(delta),
837        _ => items.push(AssistantTurnItem::Refusal(delta.to_string())),
838    }
839}
840
841fn push_tool_call<T>(assistant: &mut Vec<AssistantTurnItem>, tool_calls: &mut Vec<T>, tool_call: &T)
842where
843    T: ToolCallWrapper + Clone,
844{
845    if tool_calls
846        .iter()
847        .any(|existing| existing.metadata().id == tool_call.metadata().id)
848    {
849        return;
850    }
851
852    let metadata = tool_call.metadata();
853    assistant.push(AssistantTurnItem::ToolCall {
854        id: metadata.id.clone(),
855        name: metadata.name.clone(),
856        arguments: metadata.arguments.clone(),
857    });
858    tool_calls.push(tool_call.clone());
859}
860
861fn assistant_text(items: &[AssistantTurnItem]) -> String {
862    let mut text = String::new();
863    for item in items {
864        if let AssistantTurnItem::Text(delta) = item {
865            text.push_str(delta);
866        }
867    }
868    text
869}
870
871fn committed_turn_option_eq(lhs: &Option<CommittedTurn>, rhs: &Option<CommittedTurn>) -> bool {
872    match (lhs, rhs) {
873        (Some(lhs), Some(rhs)) => Arc::ptr_eq(lhs, rhs),
874        (None, None) => true,
875        _ => false,
876    }
877}
878
879pub fn assistant_json(items: &[AssistantTurnItem]) -> Option<Result<RawJson, serde_json::Error>> {
880    let text = assistant_text(items);
881    if text.is_empty() {
882        None
883    } else {
884        Some(RawJson::parse(text))
885    }
886}
887
888pub fn find_tool_call_arguments<'a>(
889    items: &'a [AssistantTurnItem],
890    id: &ToolCallId,
891) -> Option<&'a RawJson> {
892    items.iter().find_map(|item| match item {
893        AssistantTurnItem::ToolCall {
894            id: candidate,
895            arguments,
896            ..
897        } if candidate == id => Some(arguments),
898        _ => None,
899    })
900}
901
902#[cfg(test)]
903mod tests {
904    use std::sync::Arc;
905
906    use schemars::JsonSchema;
907    use serde::{Deserialize, Serialize};
908
909    use super::*;
910    use crate::{
911        ToolCallError, ToolDef, ToolMetadata, ToolName,
912        toolset::{ToolCallWrapper, ToolInput, ToolSelector},
913        transcript::AssistantTurnView,
914    };
915
916    #[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, JsonSchema)]
917    struct WeatherArgs {
918        city: String,
919    }
920
921    #[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, JsonSchema)]
922    struct WeatherResult {
923        forecast: String,
924    }
925
926    impl ToolInput for WeatherArgs {
927        type Output = WeatherResult;
928
929        const NAME: &'static str = "weather";
930        const DESCRIPTION: &'static str = "Get weather";
931    }
932
933    #[derive(Clone, Debug, Eq, PartialEq)]
934    struct WeatherArgsCall {
935        metadata: ToolMetadata,
936        input: WeatherArgs,
937    }
938
939    impl ToolCallWrapper for WeatherArgsCall {
940        fn metadata(&self) -> &ToolMetadata {
941            &self.metadata
942        }
943    }
944
945    #[derive(Clone, Debug, Eq, PartialEq)]
946    enum CallsCall {
947        Weather(WeatherArgsCall),
948    }
949
950    impl ToolCallWrapper for CallsCall {
951        fn metadata(&self) -> &ToolMetadata {
952            match self {
953                Self::Weather(call) => &call.metadata,
954            }
955        }
956    }
957
958    #[derive(Clone, Copy, Debug, Default)]
959    struct Tools;
960
961    #[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize, JsonSchema)]
962    enum ToolsSelector {
963        Weather,
964    }
965
966    impl ToolSelector<Tools> for ToolsSelector {
967        fn name(self) -> &'static str {
968            match self {
969                Self::Weather => "weather",
970            }
971        }
972
973        fn definition(self) -> &'static ToolDef {
974            &Tools::definitions()[match self {
975                Self::Weather => 0,
976            }]
977        }
978
979        fn all() -> &'static [Self] {
980            &[Self::Weather]
981        }
982
983        fn try_from_name(name: &str) -> Option<Self> {
984            match name {
985                "weather" => Some(Self::Weather),
986                _ => None,
987            }
988        }
989    }
990
991    impl Toolset for Tools {
992        type ToolCall = CallsCall;
993        type Selector = ToolsSelector;
994
995        fn definitions() -> &'static [ToolDef] {
996            fn weather_args_schema() -> schemars::Schema {
997                schemars::schema_for!(WeatherArgs)
998            }
999
1000            static DEFS: [ToolDef; 1] =
1001                [ToolDef::new("weather", "Get weather", weather_args_schema)];
1002            &DEFS
1003        }
1004
1005        fn parse_tool_call(metadata: ToolMetadata) -> Result<Self::ToolCall, ToolCallError> {
1006            match metadata.name.as_str() {
1007                "weather" => serde_json::from_str(metadata.arguments.get())
1008                    .map(|input| CallsCall::Weather(WeatherArgsCall { metadata, input }))
1009                    .map_err(|source| ToolCallError::Deserialize {
1010                        name: "weather".to_string(),
1011                        source,
1012                    }),
1013                _ => Err(ToolCallError::UnknownTool {
1014                    name: metadata.name.as_str().to_string(),
1015                }),
1016            }
1017        }
1018    }
1019
1020    #[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, JsonSchema)]
1021    struct Summary {
1022        answer: String,
1023    }
1024
1025    #[test]
1026    fn text_reducer_returns_assistant_turn() {
1027        let mut reducer = TextTurnReducer::<Tools>::new();
1028        reducer
1029            .apply(&TextTurnEvent::Started {
1030                request_id: Some("req-1".into()),
1031                model: "gpt-4.1".into(),
1032            })
1033            .unwrap();
1034        reducer
1035            .apply(&TextTurnEvent::TextDelta {
1036                delta: "checking ".into(),
1037            })
1038            .unwrap();
1039        reducer
1040            .apply(&TextTurnEvent::ToolCallReady(CallsCall::Weather(
1041                WeatherArgsCall {
1042                    metadata: ToolMetadata::new(
1043                        ToolCallId::from("call-1"),
1044                        ToolName::from("weather"),
1045                        RawJson::parse("{\"city\":\"Tokyo\"}").unwrap(),
1046                    ),
1047                    input: WeatherArgs {
1048                        city: "Tokyo".into(),
1049                    },
1050                },
1051            )))
1052            .unwrap();
1053        reducer
1054            .apply(&TextTurnEvent::Completed {
1055                request_id: Some("req-1".into()),
1056                finish_reason: FinishReason::ToolCall,
1057                usage: Usage {
1058                    total_tokens: 12,
1059                    ..Usage::zero()
1060                },
1061                committed_turn: Arc::new(AssistantTurnView::from_items(&[])),
1062            })
1063            .unwrap();
1064
1065        let result = reducer.into_result().unwrap();
1066        assert_eq!(result.assistant_turn.items().len(), 2);
1067        assert_eq!(result.assistant_turn.assistant_text(), "checking ");
1068    }
1069
1070    #[test]
1071    fn structured_reducer_distinguishes_structured_and_refusal() {
1072        let mut reducer = StructuredTurnReducer::<Tools, Summary>::new();
1073        reducer
1074            .apply(&StructuredTurnEvent::Started {
1075                request_id: Some("req-2".into()),
1076                model: "gpt-4.1".into(),
1077            })
1078            .unwrap();
1079        reducer
1080            .apply(&StructuredTurnEvent::RefusalDelta { delta: "no".into() })
1081            .unwrap();
1082        reducer
1083            .apply(&StructuredTurnEvent::Completed {
1084                request_id: Some("req-2".into()),
1085                finish_reason: FinishReason::ContentFilter,
1086                usage: Usage {
1087                    total_tokens: 7,
1088                    ..Usage::zero()
1089                },
1090                committed_turn: Arc::new(AssistantTurnView::from_items(&[])),
1091            })
1092            .unwrap();
1093
1094        let result = reducer.into_result().unwrap();
1095        assert_eq!(
1096            result.semantic,
1097            StructuredTurnOutcome::Refusal(String::from("no"))
1098        );
1099    }
1100}