Skip to main content

gproxy_protocol/transform/claude/
nonstream_to_stream.rs

1use crate::claude::create_message::response::ClaudeCreateMessageResponse;
2use crate::claude::create_message::stream::{
3    BetaMessageDeltaUsage, BetaRawContentBlockDelta, BetaRawMessageDelta, ClaudeStreamEvent,
4};
5use crate::claude::create_message::types::BetaContentBlock;
6use crate::transform::utils::TransformError;
7
8fn stream_start_content_block(content_block: &BetaContentBlock) -> BetaContentBlock {
9    match content_block {
10        BetaContentBlock::Text(block) => {
11            BetaContentBlock::Text(crate::claude::create_message::types::BetaTextBlock {
12                citations: None,
13                text: String::new(),
14                type_: block.type_.clone(),
15            })
16        }
17        BetaContentBlock::Thinking(block) => {
18            BetaContentBlock::Thinking(crate::claude::create_message::types::BetaThinkingBlock {
19                signature: block.signature.clone(),
20                thinking: String::new(),
21                type_: block.type_.clone(),
22            })
23        }
24        BetaContentBlock::ToolUse(block) => {
25            BetaContentBlock::ToolUse(crate::claude::create_message::types::BetaToolUseBlock {
26                id: block.id.clone(),
27                input: Default::default(),
28                name: block.name.clone(),
29                type_: block.type_.clone(),
30                cache_control: block.cache_control.clone(),
31                caller: block.caller.clone(),
32            })
33        }
34        BetaContentBlock::Compaction(block) => BetaContentBlock::Compaction(
35            crate::claude::create_message::types::BetaCompactionBlock {
36                content: None,
37                type_: block.type_.clone(),
38                cache_control: block.cache_control.clone(),
39            },
40        ),
41        _ => content_block.clone(),
42    }
43}
44
45fn push_content_block_delta_events(
46    events: &mut Vec<ClaudeStreamEvent>,
47    index: u64,
48    content_block: &BetaContentBlock,
49) {
50    match content_block {
51        BetaContentBlock::Text(block) => {
52            if !block.text.is_empty() {
53                events.push(ClaudeStreamEvent::ContentBlockDelta {
54                    delta: BetaRawContentBlockDelta::Text {
55                        text: block.text.clone(),
56                    },
57                    index,
58                });
59            }
60            if let Some(citations) = block.citations.as_ref() {
61                for citation in citations {
62                    events.push(ClaudeStreamEvent::ContentBlockDelta {
63                        delta: BetaRawContentBlockDelta::Citations {
64                            citation: citation.clone(),
65                        },
66                        index,
67                    });
68                }
69            }
70        }
71        BetaContentBlock::Thinking(block) => {
72            if !block.thinking.is_empty() {
73                events.push(ClaudeStreamEvent::ContentBlockDelta {
74                    delta: BetaRawContentBlockDelta::Thinking {
75                        thinking: block.thinking.clone(),
76                    },
77                    index,
78                });
79            }
80            if !block.signature.is_empty() {
81                events.push(ClaudeStreamEvent::ContentBlockDelta {
82                    delta: BetaRawContentBlockDelta::Signature {
83                        signature: block.signature.clone(),
84                    },
85                    index,
86                });
87            }
88        }
89        BetaContentBlock::ToolUse(block) => {
90            if !block.input.is_empty()
91                && let Ok(input_json) = serde_json::to_string(&block.input)
92                && !input_json.is_empty()
93                && input_json != "{}"
94            {
95                events.push(ClaudeStreamEvent::ContentBlockDelta {
96                    delta: BetaRawContentBlockDelta::InputJson {
97                        partial_json: input_json,
98                    },
99                    index,
100                });
101            }
102        }
103        BetaContentBlock::Compaction(block) if block.content.is_some() => {
104            events.push(ClaudeStreamEvent::ContentBlockDelta {
105                delta: BetaRawContentBlockDelta::Compaction {
106                    content: block.content.clone(),
107                },
108                index,
109            });
110        }
111        _ => {}
112    }
113}
114
115pub fn nonstream_to_stream(
116    value: ClaudeCreateMessageResponse,
117    out: &mut Vec<ClaudeStreamEvent>,
118) -> Result<(), TransformError> {
119    match value {
120        ClaudeCreateMessageResponse::Success { body, .. } => {
121            let mut start_message = body.clone();
122            start_message.content = Vec::new();
123            start_message.context_management = None;
124            start_message.stop_reason = None;
125            start_message.stop_sequence = None;
126            start_message.usage.output_tokens = 0;
127
128            out.push(ClaudeStreamEvent::MessageStart {
129                message: start_message,
130            });
131
132            for (index, content_block) in body.content.iter().enumerate() {
133                let index = index as u64;
134                out.push(ClaudeStreamEvent::ContentBlockStart {
135                    content_block: stream_start_content_block(content_block),
136                    index,
137                });
138
139                push_content_block_delta_events(out, index, content_block);
140
141                out.push(ClaudeStreamEvent::ContentBlockStop { index });
142            }
143
144            out.push(ClaudeStreamEvent::MessageDelta {
145                context_management: body.context_management.clone(),
146                delta: BetaRawMessageDelta {
147                    container: body.container.clone(),
148                    stop_reason: body.stop_reason.clone(),
149                    stop_sequence: body.stop_sequence.clone(),
150                },
151                usage: BetaMessageDeltaUsage {
152                    cache_creation_input_tokens: Some(body.usage.cache_creation_input_tokens),
153                    cache_read_input_tokens: Some(body.usage.cache_read_input_tokens),
154                    input_tokens: Some(body.usage.input_tokens),
155                    iterations: Some(body.usage.iterations.clone()),
156                    output_tokens: body.usage.output_tokens,
157                    server_tool_use: Some(body.usage.server_tool_use.clone()),
158                },
159            });
160
161            out.push(ClaudeStreamEvent::MessageStop {});
162
163            Ok(())
164        }
165        ClaudeCreateMessageResponse::Error { body, .. } => {
166            out.push(ClaudeStreamEvent::Error { error: body.error });
167            Ok(())
168        }
169    }
170}