gproxy_protocol/transform/claude/
nonstream_to_stream.rs1use crate::claude::create_message::response::ClaudeCreateMessageResponse;
2use crate::claude::create_message::stream::{
3 BetaMessageDeltaUsage, BetaRawContentBlockDelta, BetaRawMessageDelta, ClaudeStreamEvent,
4};
5use crate::claude::create_message::types::BetaContentBlock;
6use crate::transform::utils::TransformError;
7
8fn stream_start_content_block(content_block: &BetaContentBlock) -> BetaContentBlock {
9 match content_block {
10 BetaContentBlock::Text(block) => {
11 BetaContentBlock::Text(crate::claude::create_message::types::BetaTextBlock {
12 citations: None,
13 text: String::new(),
14 type_: block.type_.clone(),
15 })
16 }
17 BetaContentBlock::Thinking(block) => {
18 BetaContentBlock::Thinking(crate::claude::create_message::types::BetaThinkingBlock {
19 signature: block.signature.clone(),
20 thinking: String::new(),
21 type_: block.type_.clone(),
22 })
23 }
24 BetaContentBlock::ToolUse(block) => {
25 BetaContentBlock::ToolUse(crate::claude::create_message::types::BetaToolUseBlock {
26 id: block.id.clone(),
27 input: Default::default(),
28 name: block.name.clone(),
29 type_: block.type_.clone(),
30 cache_control: block.cache_control.clone(),
31 caller: block.caller.clone(),
32 })
33 }
34 BetaContentBlock::Compaction(block) => BetaContentBlock::Compaction(
35 crate::claude::create_message::types::BetaCompactionBlock {
36 content: None,
37 type_: block.type_.clone(),
38 cache_control: block.cache_control.clone(),
39 },
40 ),
41 _ => content_block.clone(),
42 }
43}
44
45fn push_content_block_delta_events(
46 events: &mut Vec<ClaudeStreamEvent>,
47 index: u64,
48 content_block: &BetaContentBlock,
49) {
50 match content_block {
51 BetaContentBlock::Text(block) => {
52 if !block.text.is_empty() {
53 events.push(ClaudeStreamEvent::ContentBlockDelta {
54 delta: BetaRawContentBlockDelta::Text {
55 text: block.text.clone(),
56 },
57 index,
58 });
59 }
60 if let Some(citations) = block.citations.as_ref() {
61 for citation in citations {
62 events.push(ClaudeStreamEvent::ContentBlockDelta {
63 delta: BetaRawContentBlockDelta::Citations {
64 citation: citation.clone(),
65 },
66 index,
67 });
68 }
69 }
70 }
71 BetaContentBlock::Thinking(block) => {
72 if !block.thinking.is_empty() {
73 events.push(ClaudeStreamEvent::ContentBlockDelta {
74 delta: BetaRawContentBlockDelta::Thinking {
75 thinking: block.thinking.clone(),
76 },
77 index,
78 });
79 }
80 if !block.signature.is_empty() {
81 events.push(ClaudeStreamEvent::ContentBlockDelta {
82 delta: BetaRawContentBlockDelta::Signature {
83 signature: block.signature.clone(),
84 },
85 index,
86 });
87 }
88 }
89 BetaContentBlock::ToolUse(block) => {
90 if !block.input.is_empty()
91 && let Ok(input_json) = serde_json::to_string(&block.input)
92 && !input_json.is_empty()
93 && input_json != "{}"
94 {
95 events.push(ClaudeStreamEvent::ContentBlockDelta {
96 delta: BetaRawContentBlockDelta::InputJson {
97 partial_json: input_json,
98 },
99 index,
100 });
101 }
102 }
103 BetaContentBlock::Compaction(block) if block.content.is_some() => {
104 events.push(ClaudeStreamEvent::ContentBlockDelta {
105 delta: BetaRawContentBlockDelta::Compaction {
106 content: block.content.clone(),
107 },
108 index,
109 });
110 }
111 _ => {}
112 }
113}
114
115pub fn nonstream_to_stream(
116 value: ClaudeCreateMessageResponse,
117 out: &mut Vec<ClaudeStreamEvent>,
118) -> Result<(), TransformError> {
119 match value {
120 ClaudeCreateMessageResponse::Success { body, .. } => {
121 let mut start_message = body.clone();
122 start_message.content = Vec::new();
123 start_message.context_management = None;
124 start_message.stop_reason = None;
125 start_message.stop_sequence = None;
126 start_message.usage.output_tokens = 0;
127
128 out.push(ClaudeStreamEvent::MessageStart {
129 message: start_message,
130 });
131
132 for (index, content_block) in body.content.iter().enumerate() {
133 let index = index as u64;
134 out.push(ClaudeStreamEvent::ContentBlockStart {
135 content_block: stream_start_content_block(content_block),
136 index,
137 });
138
139 push_content_block_delta_events(out, index, content_block);
140
141 out.push(ClaudeStreamEvent::ContentBlockStop { index });
142 }
143
144 out.push(ClaudeStreamEvent::MessageDelta {
145 context_management: body.context_management.clone(),
146 delta: BetaRawMessageDelta {
147 container: body.container.clone(),
148 stop_reason: body.stop_reason.clone(),
149 stop_sequence: body.stop_sequence.clone(),
150 },
151 usage: BetaMessageDeltaUsage {
152 cache_creation_input_tokens: Some(body.usage.cache_creation_input_tokens),
153 cache_read_input_tokens: Some(body.usage.cache_read_input_tokens),
154 input_tokens: Some(body.usage.input_tokens),
155 iterations: Some(body.usage.iterations.clone()),
156 output_tokens: body.usage.output_tokens,
157 server_tool_use: Some(body.usage.server_tool_use.clone()),
158 },
159 });
160
161 out.push(ClaudeStreamEvent::MessageStop {});
162
163 Ok(())
164 }
165 ClaudeCreateMessageResponse::Error { body, .. } => {
166 out.push(ClaudeStreamEvent::Error { error: body.error });
167 Ok(())
168 }
169 }
170}