1use std::{
7 io,
8 sync::Arc,
9 time::{SystemTime, UNIX_EPOCH},
10};
11
12use axum::{
13 Json, Router,
14 extract::State,
15 http::{HeaderMap, HeaderName, HeaderValue, Method, StatusCode, Uri},
16 response::{
17 IntoResponse, Response,
18 sse::{Event, Sse},
19 },
20 routing::{get, post},
21};
22use serde_json::{Value, json};
23use thiserror::Error;
24use tokio::net::TcpListener;
25use tracing::{debug, error, info, warn};
26
27use crate::{
28 attestation::{AttestationError, AttestationVerifier},
29 config::ProxyConfig,
30 e2ee::{E2eeCodec, E2eeCodecError},
31 keys::ProxyInstanceKey,
32 openai::{
33 ErrorResponse,
34 chat::{
35 ChatCompletionRequest, ChatConstructionError, ChatRequestError, NormalizedChatMessage,
36 },
37 },
38 sessions::{AttestedModelState, SessionContext, SessionError, SessionManager, SessionRequest},
39 tools::{ToolEmulationContext, ToolOutputClassification, ValidatedToolCall},
40 venice::{VeniceClient, VeniceClientError},
41};
42
43pub const HEADER_PROXY_E2EE: &str = "X-Venice-Proxy-E2EE";
44pub const HEADER_PROXY_ATTESTATION_MODE: &str = "X-Venice-Proxy-Attestation-Mode";
45pub const HEADER_PROXY_ATTESTED_MODEL: &str = "X-Venice-Proxy-Attested-Model";
46pub const HEADER_PROXY_TEE_PROVIDER: &str = "X-Venice-Proxy-TEE-Provider";
47pub const HEADER_PROXY_TDX_VERIFIED: &str = "X-Venice-Proxy-TDX-Verified";
48pub const HEADER_PROXY_TDX_DEBUG: &str = "X-Venice-Proxy-TDX-Debug";
49pub const HEADER_PROXY_NVIDIA_VERIFIED: &str = "X-Venice-Proxy-NVIDIA-Verified";
50pub const HEADER_PROXY_KEY_BINDING: &str = "X-Venice-Proxy-Key-Binding";
51pub const HEADER_PROXY_SESSION_ID: &str = "X-Venice-Proxy-Session-Id";
52pub const HEADER_PROXY_SESSION_SCOPE: &str = "X-Venice-Proxy-Session-Scope";
53pub const HEADER_PROXY_TOOL_MODE: &str = "X-Venice-Proxy-Tool-Mode";
54pub const HEADER_PROXY_TOOL_RETRIES: &str = "X-Venice-Proxy-Tool-Retries";
55pub const HEADER_PROXY_ERROR_CODE: &str = "X-Venice-Proxy-Error-Code";
56
57#[derive(Debug, Clone)]
59pub struct AppState {
60 config: Arc<ProxyConfig>,
61 venice_client: VeniceClient,
62 proxy_instance_key: Option<ProxyInstanceKey>,
63 session_manager: SessionManager,
64 attestation_verifier: AttestationVerifier,
65}
66
67impl AppState {
68 pub fn new(config: ProxyConfig) -> Result<Self, VeniceClientError> {
70 let venice_client = VeniceClient::from_config(&config)?;
71 Ok(Self::from_parts(config, venice_client))
72 }
73
74 pub fn from_parts(config: ProxyConfig, venice_client: VeniceClient) -> Self {
76 let proxy_instance_key = ProxyInstanceKey::generate_from_config(&config.keys);
77 let session_manager = SessionManager::new(config.session.clone());
78 let attestation_verifier = AttestationVerifier::from_config(&config, venice_client.clone());
79
80 Self {
81 config: Arc::new(config),
82 venice_client,
83 proxy_instance_key,
84 session_manager,
85 attestation_verifier,
86 }
87 }
88
89 pub fn config(&self) -> &ProxyConfig {
91 &self.config
92 }
93
94 pub fn venice_client(&self) -> &VeniceClient {
96 &self.venice_client
97 }
98
99 pub fn proxy_instance_key(&self) -> Option<&ProxyInstanceKey> {
101 self.proxy_instance_key.as_ref()
102 }
103
104 pub fn session_manager(&self) -> &SessionManager {
106 &self.session_manager
107 }
108
109 pub fn attestation_verifier(&self) -> &AttestationVerifier {
111 &self.attestation_verifier
112 }
113}
114
115pub fn router(config: ProxyConfig) -> Result<Router, VeniceClientError> {
118 Ok(router_from_state(AppState::new(config)?))
119}
120
121pub fn router_with_venice_client(config: ProxyConfig, venice_client: VeniceClient) -> Router {
126 router_from_state(AppState::from_parts(config, venice_client))
127}
128
129fn router_from_state(state: AppState) -> Router {
131 Router::new()
132 .route("/v1/models", get(list_models).fallback(method_not_allowed))
133 .route(
134 "/v1/chat/completions",
135 post(create_chat_completion).fallback(method_not_allowed),
136 )
137 .fallback(not_found)
138 .with_state(state)
139}
140
141pub async fn serve(listener: TcpListener, router: Router) -> io::Result<()> {
143 axum::serve(listener, router).await
144}
145
146async fn list_models(State(state): State<AppState>) -> Result<Response, ProxyError> {
148 info!(route = "/v1/models", "listing Venice models");
149 let models = state.venice_client().list_models().await?;
150 let mut response = Json(models).into_response();
151 ProxyMetadataHeaders::from_config(state.config()).apply(response.headers_mut());
152 info!(route = "/v1/models", "Venice models response proxied");
153 Ok(response)
154}
155
156async fn create_chat_completion(
158 State(state): State<AppState>,
159 headers: HeaderMap,
160 Json(body): Json<Value>,
161) -> Result<Response, ProxyError> {
162 let request = ChatCompletionRequest::parse(&body)?;
163 let proxy_instance_key = state
164 .proxy_instance_key()
165 .ok_or(ProxyError::ProxyInstanceKeyUnavailable)?;
166
167 let session_resolution = state
168 .session_manager()
169 .get_or_create(SessionRequest::new(&request.model, &headers).with_body(&body))?;
170 let session_created = session_resolution.created;
171 let session_replaced_expired = session_resolution.replaced_expired;
172 let session_scope = session_resolution.session.scope;
173 let session = ensure_attested_session(&state, session_resolution.session).await?;
174 let model_public_key = session
175 .attested_model_public_key
176 .as_deref()
177 .ok_or(ProxyError::MissingAttestedModelKey)?;
178
179 let codec =
180 E2eeCodec::from_config(&state.config().e2ee).map_err(ChatConstructionError::E2ee)?;
181 let tool_context = ToolEmulationContext::from_request(&state.config().tools, &request)?;
182 let metadata = ProxyMetadataHeaders::for_verified_chat(state.config(), &session);
183
184 info!(
185 route = "/v1/chat/completions",
186 model = %request.model,
187 stream = request.stream,
188 message_count = request.messages.len(),
189 tool_count = request.tools.len(),
190 tool_mode = tool_context.is_some(),
191 session_created,
192 session_replaced_expired = ?session_replaced_expired,
193 session_scope = %session_scope,
194 "chat completion request accepted"
195 );
196
197 if let Some(tool_context) = tool_context {
198 info!(model = %request.model, "using tool-emulated chat completion");
199 return openai_tool_emulated_chat_response(
200 &state,
201 &request,
202 &tool_context,
203 codec,
204 proxy_instance_key.clone(),
205 model_public_key,
206 metadata,
207 )
208 .await;
209 }
210
211 let prepared = request.to_venice_e2ee_request(&codec, model_public_key)?;
212 info!(
213 model = %request.model,
214 client_stream = prepared.client_stream,
215 "forwarding encrypted chat completion to Venice"
216 );
217
218 let upstream = state
219 .venice_client()
220 .create_chat_completion_stream(
221 &prepared.upstream,
222 proxy_instance_key.public_key_hex(),
223 model_public_key,
224 )
225 .await?;
226
227 if prepared.client_stream {
228 info!(model = %request.model, "streaming chat completion response to client");
229 let include_usage_requested = request.stream_options.include_usage.unwrap_or(false);
230 let transformer = OpenAiChatStreamTransformer::new(
231 codec,
232 proxy_instance_key.clone(),
233 request.model.clone(),
234 include_usage_requested,
235 );
236 Ok(chat_sse_response(
237 upstream,
238 transformer,
239 request.model,
240 include_usage_requested,
241 &CHAT_SSE_LOG,
242 metadata,
243 ))
244 } else {
245 info!(model = %request.model, "buffering chat completion response for client");
246 openai_chat_buffered_response(
247 upstream,
248 codec,
249 proxy_instance_key.clone(),
250 request.model,
251 metadata,
252 )
253 .await
254 }
255}
256
257async fn ensure_attested_session(
259 state: &AppState,
260 session: SessionContext,
261) -> Result<SessionContext, ProxyError> {
262 if session.attested_model_public_key.is_some() {
263 info!(model = %session.model_id, session_scope = %session.scope, "using cached model attestation");
264 return Ok(session);
265 }
266
267 info!(model = %session.model_id, session_scope = %session.scope, "fetching model attestation");
268 let attestation = state
269 .attestation_verifier()
270 .verify_model_attestation(&session.model_id)
271 .await?;
272
273 info!(
274 model = %attestation.model_id,
275 tee_provider = attestation.tee_provider.as_deref().unwrap_or("unknown"),
276 tdx_verified = attestation.tdx.verified,
277 nvidia_verified = attestation.nvidia.verified.as_header_value(),
278 "model attestation verified"
279 );
280
281 let state_update = AttestedModelState {
282 model_public_key: attestation.model_public_key,
283 tee_provider: attestation.tee_provider,
284 tdx_debug: attestation.tdx.debug.or(attestation.debug),
285 nvidia_verified: attestation.nvidia.verified.as_header_value().to_owned(),
286 verified_at: attestation.verified_at,
287 };
288
289 Ok(state
290 .session_manager()
291 .set_attested_model_state(&session.session_key, state_update)?)
292}
293
294async fn openai_chat_buffered_response(
296 upstream: reqwest::Response,
297 codec: E2eeCodec,
298 proxy_instance_key: ProxyInstanceKey,
299 fallback_model: String,
300 metadata: ProxyMetadataHeaders,
301) -> Result<Response, ProxyError> {
302 let completion =
303 buffer_openai_chat_completion(upstream, codec, proxy_instance_key, fallback_model).await?;
304 let mut response = Json(completion).into_response();
305 metadata.apply(response.headers_mut());
306 Ok(response)
307}
308
309async fn openai_tool_emulated_chat_response(
311 state: &AppState,
312 request: &ChatCompletionRequest,
313 tool_context: &ToolEmulationContext,
314 codec: E2eeCodec,
315 proxy_instance_key: ProxyInstanceKey,
316 model_public_key: &str,
317 metadata: ProxyMetadataHeaders,
318) -> Result<Response, ProxyError> {
319 info!(
320 model = %request.model,
321 max_retries = tool_context.max_retries(),
322 "starting tool-emulated chat completion"
323 );
324 if request.stream {
325 let upstream = tool_emulated_upstream_stream(
326 state,
327 request,
328 tool_context,
329 &codec,
330 &proxy_instance_key,
331 model_public_key,
332 None,
333 )
334 .await?;
335
336 return Ok(tool_emulated_chat_sse_response_with_retries(
337 ToolEmulatedChatSseRetryState {
338 state: state.clone(),
339 request: request.clone(),
340 tool_context: tool_context.clone(),
341 codec,
342 proxy_instance_key,
343 model_public_key: model_public_key.to_owned(),
344 },
345 upstream,
346 metadata,
347 ));
348 }
349
350 let mut retries = 0;
351 let mut correction: Option<(String, String)> = None;
352
353 loop {
354 let upstream = tool_emulated_upstream_stream(
355 state,
356 request,
357 tool_context,
358 &codec,
359 &proxy_instance_key,
360 model_public_key,
361 correction.as_ref(),
362 )
363 .await?;
364
365 let completion = match tokio::time::timeout(
366 tool_context.marker_timeout(),
367 buffer_openai_chat_completion(
368 upstream,
369 codec.clone(),
370 proxy_instance_key.clone(),
371 request.model.clone(),
372 ),
373 )
374 .await
375 {
376 Ok(completion) => completion?,
377 Err(_) => {
378 let validation_error = format!(
379 "tool-emulated completion did not finish within {}",
380 humantime::format_duration(tool_context.config().tool_call_marker_timeout)
381 );
382 if retries >= tool_context.max_retries() {
383 return Err(ProxyError::ToolCallRetryExhausted {
384 max_retries: tool_context.max_retries(),
385 last_validation_error: validation_error,
386 });
387 }
388 warn!(
389 model = %request.model,
390 retry = retries + 1,
391 max_retries = tool_context.max_retries(),
392 "tool call marker timed out; retrying with correction"
393 );
394 retries += 1;
395 correction = Some((validation_error, String::new()));
396 continue;
397 }
398 };
399 let assistant_content = completion
400 .get("choices")
401 .and_then(Value::as_array)
402 .and_then(|choices| choices.first())
403 .and_then(|choice| choice.get("message"))
404 .and_then(|message| message.get("content"))
405 .and_then(Value::as_str)
406 .unwrap_or_default();
407
408 let mut metadata = metadata.clone();
409 if retries > 0 {
410 metadata.tool_retries = Some(retries);
411 }
412
413 match tool_context.classify_assistant_output(assistant_content) {
414 ToolOutputClassification::NormalText => {
415 info!(model = %request.model, retries, "tool emulation produced normal text");
416 let mut response = Json(completion).into_response();
417 metadata.apply(response.headers_mut());
418 return Ok(response);
419 }
420 ToolOutputClassification::ToolCalls(tool_calls) => {
421 info!(
422 model = %request.model,
423 tool_calls = tool_calls.len(),
424 retries,
425 "tool emulation produced tool calls"
426 );
427 let body = openai_tool_call_completion(completion, tool_calls);
428 let mut response = Json(body).into_response();
429 metadata.apply(response.headers_mut());
430 return Ok(response);
431 }
432 ToolOutputClassification::InvalidToolCall {
433 error,
434 invalid_output,
435 } => {
436 if retries >= tool_context.max_retries() {
437 warn!(
438 model = %request.model,
439 max_retries = tool_context.max_retries(),
440 validation_error = %error,
441 "tool call validation failed and retries were exhausted"
442 );
443 return Err(ProxyError::ToolCallRetryExhausted {
444 max_retries: tool_context.max_retries(),
445 last_validation_error: error.to_string(),
446 });
447 }
448 warn!(
449 model = %request.model,
450 retry = retries + 1,
451 max_retries = tool_context.max_retries(),
452 validation_error = %error,
453 "tool call validation failed; retrying with correction"
454 );
455 retries += 1;
456 correction = Some((error.to_string(), invalid_output));
457 }
458 }
459 }
460}
461
462async fn tool_emulated_upstream_stream(
465 state: &AppState,
466 request: &ChatCompletionRequest,
467 tool_context: &ToolEmulationContext,
468 codec: &E2eeCodec,
469 proxy_instance_key: &ProxyInstanceKey,
470 model_public_key: &str,
471 correction: Option<&(String, String)>,
472) -> Result<reqwest::Response, ProxyError> {
473 let messages = tool_emulated_messages(request, tool_context, correction);
474 let mut tool_request = request.clone();
475 tool_request.messages = messages;
476
477 let prepared = tool_request.to_venice_e2ee_request(codec, model_public_key)?;
478
479 Ok(state
480 .venice_client()
481 .create_chat_completion_stream(
482 &prepared.upstream,
483 proxy_instance_key.public_key_hex(),
484 model_public_key,
485 )
486 .await?)
487}
488
489fn tool_emulated_messages(
491 request: &ChatCompletionRequest,
492 tool_context: &ToolEmulationContext,
493 correction: Option<&(String, String)>,
494) -> Vec<NormalizedChatMessage> {
495 let mut messages = request.messages.clone();
496 let mut tool_system_content = tool_context.controller_message().content;
497
498 if let Some((validation_error, invalid_output)) = correction {
499 tool_system_content.push_str("\n\n");
500 tool_system_content.push_str(
501 &tool_context
502 .correction_message(validation_error, invalid_output)
503 .content,
504 );
505 }
506
507 append_to_system_message(&mut messages, tool_system_content);
508 messages
509}
510
511fn append_to_system_message(messages: &mut Vec<NormalizedChatMessage>, content: String) {
513 if let Some(system_message) = messages.iter_mut().find(|message| message.role == "system") {
514 system_message.content.push_str("\n\n");
515 system_message.content.push_str(&content);
516 } else {
517 messages.insert(0, NormalizedChatMessage::new("system", content));
518 }
519}
520
521fn openai_tool_call_completion(completion: Value, tool_calls: Vec<ValidatedToolCall>) -> Value {
523 let choice = completion
524 .get("choices")
525 .and_then(Value::as_array)
526 .and_then(|choices| choices.first())
527 .cloned()
528 .unwrap_or(Value::Null);
529 let index = choice.get("index").and_then(Value::as_u64).unwrap_or(0);
530 let tool_call_values: Vec<Value> = tool_calls
531 .iter()
532 .map(ValidatedToolCall::to_openai_value)
533 .collect();
534 let reasoning_content = choice
535 .get("message")
536 .and_then(|message| message.get("reasoning_content"))
537 .and_then(Value::as_str);
538 let mut message = serde_json::Map::new();
539 message.insert("role".to_owned(), Value::String("assistant".to_owned()));
540 message.insert("content".to_owned(), Value::Null);
541 if let Some(reasoning_content) = reasoning_content {
542 message.insert(
543 "reasoning_content".to_owned(),
544 Value::String(reasoning_content.to_owned()),
545 );
546 }
547 message.insert("tool_calls".to_owned(), Value::Array(tool_call_values));
548
549 json!({
550 "id": string_field(&completion, "id").unwrap_or("chatcmpl-local"),
551 "object": "chat.completion",
552 "created": integer_field(&completion, "created").unwrap_or_else(unix_timestamp_now),
553 "model": string_field(&completion, "model").unwrap_or("unknown"),
554 "choices": [{
555 "index": index,
556 "message": Value::Object(message),
557 "finish_reason": "tool_calls",
558 }],
559 "usage": completion.get("usage").cloned().unwrap_or(Value::Null),
560 })
561}
562
563async fn buffer_openai_chat_completion(
565 mut upstream: reqwest::Response,
566 codec: E2eeCodec,
567 proxy_instance_key: ProxyInstanceKey,
568 fallback_model: String,
569) -> Result<Value, ChatStreamError> {
570 info!(model = %fallback_model, "buffering upstream chat stream");
571 let mut parser = SseEventParser::default();
572 let mut transformer =
573 OpenAiChatCompletionBuffer::new(codec, proxy_instance_key, fallback_model.clone());
574 let mut upstream_done = false;
575 let mut chunk_count = 0_u64;
576 let mut event_count = 0_u64;
577
578 while let Some(chunk) = upstream
579 .chunk()
580 .await
581 .map_err(ChatStreamError::upstream_stream)?
582 {
583 chunk_count += 1;
584 let chunk = std::str::from_utf8(&chunk).map_err(ChatStreamError::invalid_utf8)?;
585 let events = parser.push(chunk)?;
586 event_count += events.len() as u64;
587 debug!(
588 model = %fallback_model,
589 chunk_count,
590 parsed_events = events.len(),
591 total_events = event_count,
592 "parsed buffered upstream SSE chunk"
593 );
594
595 for event in events {
596 if transformer.handle_event(event)? {
597 upstream_done = true;
598 break;
599 }
600 }
601
602 if upstream_done {
603 break;
604 }
605 }
606
607 if !upstream_done {
608 warn!(
609 model = %fallback_model,
610 chunk_count,
611 event_count,
612 "buffered upstream stream ended before DONE"
613 );
614 parser.finish()?;
615 return Err(ChatStreamError::malformed_event(
616 "upstream stream ended before data: [DONE]",
617 ));
618 }
619
620 let completion = transformer.into_response();
621 info!(
622 model = %fallback_model,
623 chunk_count,
624 event_count,
625 "buffered upstream chat stream transformed"
626 );
627 Ok(completion)
628}
629
630struct ChatSseLogMessages {
633 start: &'static str,
634 parsed_chunk: &'static str,
635 transformed_event: &'static str,
636 completed: &'static str,
637 ended_early: &'static str,
638}
639
640const CHAT_SSE_LOG: ChatSseLogMessages = ChatSseLogMessages {
641 start: "starting upstream chat SSE transformation",
642 parsed_chunk: "parsed streaming upstream SSE chunk",
643 transformed_event: "transformed streaming upstream SSE event",
644 completed: "completed upstream chat SSE transformation",
645 ended_early: "streaming upstream stream ended before DONE",
646};
647
648const TOOL_EMULATED_CHAT_SSE_LOG: ChatSseLogMessages = ChatSseLogMessages {
649 start: "starting tool-emulated upstream chat SSE transformation",
650 parsed_chunk: "parsed tool-emulated upstream SSE chunk",
651 transformed_event: "transformed tool-emulated upstream SSE event",
652 completed: "completed tool-emulated upstream chat SSE transformation",
653 ended_early: "tool-emulated upstream stream ended before DONE",
654};
655
656trait ChatSseTransformer {
658 fn handle_event(&mut self, event: RawSseEvent) -> Result<Vec<StreamOutput>, ChatStreamError>;
660}
661
662fn chat_sse_response<T>(
664 upstream: reqwest::Response,
665 transformer: T,
666 fallback_model: String,
667 include_usage_requested: bool,
668 log: &'static ChatSseLogMessages,
669 metadata: ProxyMetadataHeaders,
670) -> Response
671where
672 T: ChatSseTransformer + Send + 'static,
673{
674 let stream = chat_sse_event_stream(
675 upstream,
676 transformer,
677 fallback_model,
678 include_usage_requested,
679 log,
680 );
681 let mut response = Sse::new(stream).into_response();
682 metadata.apply(response.headers_mut());
683 response
684}
685
686struct ToolEmulatedChatSseRetryState {
688 state: AppState,
689 request: ChatCompletionRequest,
690 tool_context: ToolEmulationContext,
691 codec: E2eeCodec,
692 proxy_instance_key: ProxyInstanceKey,
693 model_public_key: String,
694}
695
696impl ToolEmulatedChatSseRetryState {
697 fn include_usage_requested(&self) -> bool {
698 self.request.stream_options.include_usage.unwrap_or(false)
699 }
700
701 async fn upstream(
702 &self,
703 initial_upstream: &mut Option<reqwest::Response>,
704 correction: Option<&(String, String)>,
705 ) -> Result<reqwest::Response, ProxyError> {
706 if let Some(upstream) = initial_upstream.take() {
707 return Ok(upstream);
708 }
709
710 tool_emulated_upstream_stream(
711 &self.state,
712 &self.request,
713 &self.tool_context,
714 &self.codec,
715 &self.proxy_instance_key,
716 &self.model_public_key,
717 correction,
718 )
719 .await
720 }
721
722 fn transformer(
723 &self,
724 suppress_normal_output: bool,
725 ) -> Result<OpenAiToolEmulatedChatStreamTransformer, ChatStreamError> {
726 OpenAiToolEmulatedChatStreamTransformer::new(
727 &self.tool_context,
728 self.codec.clone(),
729 self.proxy_instance_key.clone(),
730 self.request.model.clone(),
731 self.include_usage_requested(),
732 suppress_normal_output,
733 )
734 }
735
736 fn log_attempt_start(&self, retries: u32, retrying: bool) {
737 info!(
738 model = %self.request.model,
739 include_usage_requested = self.include_usage_requested(),
740 retry = retries,
741 retrying,
742 "{}",
743 TOOL_EMULATED_CHAT_SSE_LOG.start
744 );
745 }
746}
747
748fn tool_emulated_chat_sse_response_with_retries(
750 retry_state: ToolEmulatedChatSseRetryState,
751 initial_upstream: reqwest::Response,
752 metadata: ProxyMetadataHeaders,
753) -> Response {
754 let stream = tool_emulated_chat_sse_event_stream_with_retries(retry_state, initial_upstream);
755 let mut response = Sse::new(stream).into_response();
756 metadata.apply(response.headers_mut());
757 response
758}
759
760fn chat_sse_event_stream<T>(
762 mut upstream: reqwest::Response,
763 mut transformer: T,
764 fallback_model: String,
765 include_usage_requested: bool,
766 log: &'static ChatSseLogMessages,
767) -> impl futures_core::Stream<Item = Result<Event, axum::BoxError>>
768where
769 T: ChatSseTransformer + Send + 'static,
770{
771 async_stream::try_stream! {
772 info!(
773 model = %fallback_model,
774 include_usage_requested,
775 "{}", log.start
776 );
777 let mut parser = SseEventParser::default();
778 let mut stats = ChatSseStreamStats::default();
779
780 'stream: loop {
781 let events = next_sse_events(
782 &mut upstream,
783 &mut parser,
784 &fallback_model,
785 log,
786 &mut stats,
787 )
788 .await
789 .map_err(box_chat_stream_error)?;
790
791 for event in events {
792 let outputs = transformer.handle_event(event).map_err(box_chat_stream_error)?;
793 record_transformed_outputs(&fallback_model, log, outputs.len(), &mut stats);
794
795 for output in outputs {
796 let (event, done) = stream_output_event(output);
797 if done {
798 log_sse_completed(&fallback_model, log, &stats);
799 yield event;
800 break 'stream;
801 }
802 yield event;
803 }
804 }
805 }
806 }
807}
808
809fn tool_emulated_chat_sse_event_stream_with_retries(
811 retry_state: ToolEmulatedChatSseRetryState,
812 initial_upstream: reqwest::Response,
813) -> impl futures_core::Stream<Item = Result<Event, axum::BoxError>> {
814 async_stream::try_stream! {
815 let mut retries = 0_u32;
816 let mut correction: Option<(String, String)> = None;
817 let mut initial_upstream = Some(initial_upstream);
818
819 'attempts: loop {
820 let retrying = correction.is_some();
821 let mut upstream = retry_state
822 .upstream(&mut initial_upstream, correction.as_ref())
823 .await
824 .map_err(box_proxy_error)?;
825
826 retry_state.log_attempt_start(retries, retrying);
827
828 let mut transformer = retry_state
829 .transformer(retrying)
830 .map_err(box_chat_stream_error)?;
831 let mut parser = SseEventParser::default();
832 let mut stats = ChatSseStreamStats::default();
833
834 loop {
835 let events = next_sse_events(
836 &mut upstream,
837 &mut parser,
838 &retry_state.request.model,
839 &TOOL_EMULATED_CHAT_SSE_LOG,
840 &mut stats,
841 )
842 .await
843 .map_err(box_chat_stream_error)?;
844
845 for event in events {
846 let outputs = match tool_stream_event_outputs(
847 &retry_state,
848 &mut transformer,
849 event,
850 &mut retries,
851 &mut correction,
852 ) {
853 Ok(ToolStreamEventOutputs::Outputs(outputs)) => outputs,
854 Ok(ToolStreamEventOutputs::Retry) => continue 'attempts,
855 Err(error) => Err::<Vec<StreamOutput>, axum::BoxError>(error)?,
856 };
857
858 record_transformed_outputs(
859 &retry_state.request.model,
860 &TOOL_EMULATED_CHAT_SSE_LOG,
861 outputs.len(),
862 &mut stats,
863 );
864
865 for output in outputs {
866 let (event, done) = stream_output_event(output);
867 if done {
868 log_sse_completed(
869 &retry_state.request.model,
870 &TOOL_EMULATED_CHAT_SSE_LOG,
871 &stats,
872 );
873 yield event;
874 break 'attempts;
875 }
876 yield event;
877 }
878 }
879 }
880 }
881 }
882}
883
884enum ToolStreamEventOutputs {
886 Outputs(Vec<StreamOutput>),
887 Retry,
888}
889
890fn tool_stream_event_outputs(
892 retry_state: &ToolEmulatedChatSseRetryState,
893 transformer: &mut OpenAiToolEmulatedChatStreamTransformer,
894 event: RawSseEvent,
895 retries: &mut u32,
896 correction: &mut Option<(String, String)>,
897) -> Result<ToolStreamEventOutputs, axum::BoxError> {
898 match transformer.handle_event(event) {
899 Ok(outputs) => Ok(ToolStreamEventOutputs::Outputs(outputs)),
900 Err(ChatStreamError::InvalidToolCall {
901 validation_error,
902 invalid_output,
903 }) if *retries < retry_state.tool_context.max_retries() => {
904 warn!(
905 model = %retry_state.request.model,
906 retry = *retries + 1,
907 max_retries = retry_state.tool_context.max_retries(),
908 validation_error = %validation_error,
909 "streamed tool call validation failed; retrying with correction"
910 );
911 *retries += 1;
912 *correction = Some((validation_error, invalid_output));
913 Ok(ToolStreamEventOutputs::Retry)
914 }
915 Err(ChatStreamError::InvalidToolCall {
916 validation_error, ..
917 }) => {
918 error!(
919 model = %retry_state.request.model,
920 max_retries = retry_state.tool_context.max_retries(),
921 validation_error = %validation_error,
922 "streamed tool call validation failed and retries were exhausted"
923 );
924 Err(box_proxy_error(ProxyError::ToolCallRetryExhausted {
925 max_retries: retry_state.tool_context.max_retries(),
926 last_validation_error: validation_error,
927 }))
928 }
929 Err(error) => Err(box_chat_stream_error(error)),
930 }
931}
932
933#[derive(Debug, Default)]
935struct ChatSseStreamStats {
936 chunk_count: u64,
937 event_count: u64,
938 output_count: u64,
939}
940
941async fn next_sse_events(
943 upstream: &mut reqwest::Response,
944 parser: &mut SseEventParser,
945 fallback_model: &str,
946 log: &'static ChatSseLogMessages,
947 stats: &mut ChatSseStreamStats,
948) -> Result<Vec<RawSseEvent>, ChatStreamError> {
949 let Some(chunk) = upstream
950 .chunk()
951 .await
952 .map_err(ChatStreamError::upstream_stream)?
953 else {
954 warn!(
955 model = %fallback_model,
956 chunk_count = stats.chunk_count,
957 event_count = stats.event_count,
958 output_count = stats.output_count,
959 "{}",
960 log.ended_early
961 );
962 parser.finish()?;
963 return Err(ChatStreamError::malformed_event(
964 "upstream stream ended before data: [DONE]",
965 ));
966 };
967
968 stats.chunk_count += 1;
969 let chunk = std::str::from_utf8(&chunk).map_err(ChatStreamError::invalid_utf8)?;
970 let events = parser.push(chunk)?;
971 stats.event_count += events.len() as u64;
972 debug!(
973 model = %fallback_model,
974 chunk_count = stats.chunk_count,
975 parsed_events = events.len(),
976 total_events = stats.event_count,
977 "{}",
978 log.parsed_chunk
979 );
980 Ok(events)
981}
982
983fn record_transformed_outputs(
985 fallback_model: &str,
986 log: &'static ChatSseLogMessages,
987 output_count: usize,
988 stats: &mut ChatSseStreamStats,
989) {
990 stats.output_count += output_count as u64;
991 debug!(
992 model = %fallback_model,
993 emitted_outputs = output_count,
994 total_outputs = stats.output_count,
995 "{}",
996 log.transformed_event
997 );
998}
999
1000fn stream_output_event(output: StreamOutput) -> (Event, bool) {
1002 match output {
1003 StreamOutput::Json(value) => (Event::default().data(value.to_string()), false),
1004 StreamOutput::Done => (Event::default().data("[DONE]"), true),
1005 }
1006}
1007
1008fn log_sse_completed(
1010 fallback_model: &str,
1011 log: &'static ChatSseLogMessages,
1012 stats: &ChatSseStreamStats,
1013) {
1014 info!(
1015 model = %fallback_model,
1016 chunk_count = stats.chunk_count,
1017 event_count = stats.event_count,
1018 output_count = stats.output_count,
1019 "{}",
1020 log.completed
1021 );
1022}
1023
1024fn box_proxy_error(error: ProxyError) -> axum::BoxError {
1026 error!(error = %error, "chat stream failed");
1027 Box::new(error)
1028}
1029
1030fn box_chat_stream_error(error: ChatStreamError) -> axum::BoxError {
1032 error!(error = %error, "chat stream transformation failed");
1033 Box::new(error)
1034}
1035
1036#[derive(Debug, Default)]
1038struct SseEventParser {
1039 buffer: String,
1040}
1041
1042impl SseEventParser {
1043 fn push(&mut self, chunk: &str) -> Result<Vec<RawSseEvent>, ChatStreamError> {
1045 self.buffer.push_str(chunk);
1046 let mut events = Vec::new();
1047
1048 while let Some((boundary_start, boundary_len)) = sse_event_boundary(&self.buffer) {
1049 let raw = self.buffer[..boundary_start].to_owned();
1050 self.buffer.drain(..boundary_start + boundary_len);
1051 if let Some(event) = parse_sse_event(&raw)? {
1052 events.push(event);
1053 }
1054 }
1055
1056 debug!(
1057 chunk_bytes = chunk.len(),
1058 buffered_bytes = self.buffer.len(),
1059 parsed_events = events.len(),
1060 "SSE parser processed upstream chunk"
1061 );
1062 Ok(events)
1063 }
1064
1065 fn finish(&self) -> Result<(), ChatStreamError> {
1067 if self.buffer.trim().is_empty() {
1068 Ok(())
1069 } else {
1070 warn!(
1071 buffered_bytes = self.buffer.len(),
1072 "upstream SSE stream ended with incomplete event"
1073 );
1074 Err(ChatStreamError::malformed_event(
1075 "upstream stream ended with an incomplete SSE event",
1076 ))
1077 }
1078 }
1079}
1080
1081#[derive(Debug, Clone, PartialEq, Eq)]
1083struct RawSseEvent {
1084 event: Option<String>,
1085 data: String,
1086}
1087
1088struct UpstreamEventLogMessages {
1092 event: &'static str,
1093 sse_error: &'static str,
1094 done: &'static str,
1095 parsing: Option<&'static str>,
1096 json_error: &'static str,
1097 missing_choices: &'static str,
1098 parsed: Option<&'static str>,
1099 unexpected_choice_count: &'static str,
1100}
1101
1102const BUFFERED_UPSTREAM_EVENT_LOG: UpstreamEventLogMessages = UpstreamEventLogMessages {
1103 event: "buffering upstream SSE event",
1104 sse_error: "upstream SSE error event while buffering response",
1105 done: "received upstream DONE while buffering response",
1106 parsing: Some("parsing buffered upstream chat JSON chunk"),
1107 json_error: "upstream JSON error chunk while buffering response",
1108 missing_choices: "buffered upstream chat chunk is missing choices array",
1109 parsed: Some("parsed buffered upstream chat chunk"),
1110 unexpected_choice_count: "unexpected buffered upstream choice count",
1111};
1112
1113const STREAMING_UPSTREAM_EVENT_LOG: UpstreamEventLogMessages = UpstreamEventLogMessages {
1114 event: "transforming streaming upstream SSE event",
1115 sse_error: "upstream SSE error event while streaming response",
1116 done: "received upstream DONE while streaming response",
1117 parsing: Some("parsing streaming upstream chat JSON chunk"),
1118 json_error: "upstream JSON error chunk while streaming response",
1119 missing_choices: "streaming upstream chat chunk is missing choices array",
1120 parsed: Some("parsed streaming upstream chat chunk"),
1121 unexpected_choice_count: "unexpected streaming upstream choice count",
1122};
1123
1124const TOOL_EMULATED_UPSTREAM_EVENT_LOG: UpstreamEventLogMessages = UpstreamEventLogMessages {
1125 event: "transforming tool-emulated streaming upstream SSE event",
1126 sse_error: "upstream SSE error event while streaming tool-emulated response",
1127 done: "received upstream DONE while streaming tool-emulated response",
1128 parsing: None,
1129 json_error: "upstream JSON error chunk while streaming tool-emulated response",
1130 missing_choices: "tool-emulated upstream chat chunk is missing choices array",
1131 parsed: None,
1132 unexpected_choice_count: "unexpected tool-emulated upstream choice count",
1133};
1134
1135enum UpstreamEventKind {
1137 Done,
1139 Usage(Value),
1141 Choice { value: Value, choice: Value },
1143}
1144
1145fn classify_upstream_event(
1149 event: RawSseEvent,
1150 log: &UpstreamEventLogMessages,
1151) -> Result<UpstreamEventKind, ChatStreamError> {
1152 let event_type = event.event.as_deref().unwrap_or("message");
1153 let is_done = event.data.trim() == "[DONE]";
1154 debug!(event_type, is_done, "{}", log.event);
1155
1156 if event.event.as_deref() == Some("error") {
1157 warn!("{}", log.sse_error);
1158 return Err(ChatStreamError::upstream_event(event.data));
1159 }
1160
1161 if is_done {
1162 info!("{}", log.done);
1163 return Ok(UpstreamEventKind::Done);
1164 }
1165
1166 if let Some(parsing) = log.parsing {
1167 debug!("{}", parsing);
1168 }
1169 let value: Value = serde_json::from_str(&event.data).map_err(ChatStreamError::json_event)?;
1170 if let Some(error) = value.get("error") {
1171 warn!("{}", log.json_error);
1172 return Err(ChatStreamError::upstream_event(error.to_string()));
1173 }
1174
1175 let Some(choices) = value.get("choices").and_then(Value::as_array) else {
1176 warn!("{}", log.missing_choices);
1177 return Err(ChatStreamError::malformed_event(
1178 "upstream chat chunk is missing choices array",
1179 ));
1180 };
1181 if let Some(parsed) = log.parsed {
1182 debug!(choice_count = choices.len(), "{}", parsed);
1183 }
1184
1185 if choices.is_empty() {
1186 return Ok(UpstreamEventKind::Usage(value));
1187 }
1188 if choices.len() != 1 {
1189 warn!(
1190 choice_count = choices.len(),
1191 "{}", log.unexpected_choice_count
1192 );
1193 return Err(ChatStreamError::malformed_event(format!(
1194 "expected exactly one upstream choice, got {}",
1195 choices.len(),
1196 )));
1197 }
1198
1199 let choice = choices[0].clone();
1200 Ok(UpstreamEventKind::Choice { value, choice })
1201}
1202
1203struct ChunkContext {
1206 codec: E2eeCodec,
1207 proxy_instance_key: ProxyInstanceKey,
1208 fallback_id: String,
1209 fallback_created: i64,
1210 fallback_model: String,
1211}
1212
1213impl ChunkContext {
1214 fn new(codec: E2eeCodec, proxy_instance_key: ProxyInstanceKey, fallback_model: String) -> Self {
1216 Self {
1217 codec,
1218 proxy_instance_key,
1219 fallback_id: format!("chatcmpl-local-{}", uuid::Uuid::new_v4()),
1220 fallback_created: unix_timestamp_now(),
1221 fallback_model,
1222 }
1223 }
1224
1225 fn decrypt(&self, content: Option<&str>) -> Result<Option<String>, ChatStreamError> {
1227 self.codec
1228 .decrypt_response_content(content, self.proxy_instance_key.private_key())
1229 .map_err(ChatStreamError::decryption)
1230 }
1231
1232 fn chunk_with_choice(
1234 &self,
1235 upstream: &Value,
1236 index: u64,
1237 delta: Value,
1238 finish_reason: Value,
1239 ) -> Value {
1240 json!({
1241 "id": string_field(upstream, "id").unwrap_or(&self.fallback_id),
1242 "object": string_field(upstream, "object").unwrap_or("chat.completion.chunk"),
1243 "created": integer_field(upstream, "created").unwrap_or(self.fallback_created),
1244 "model": string_field(upstream, "model").unwrap_or(&self.fallback_model),
1245 "choices": [{
1246 "index": index,
1247 "delta": delta,
1248 "finish_reason": finish_reason,
1249 }],
1250 })
1251 }
1252
1253 fn usage_chunk(&self, upstream: &Value, usage: &Value) -> Value {
1255 json!({
1256 "id": string_field(upstream, "id").unwrap_or(&self.fallback_id),
1257 "object": string_field(upstream, "object").unwrap_or("chat.completion.chunk"),
1258 "created": integer_field(upstream, "created").unwrap_or(self.fallback_created),
1259 "model": string_field(upstream, "model").unwrap_or(&self.fallback_model),
1260 "choices": [],
1261 "usage": usage,
1262 })
1263 }
1264}
1265
1266struct OpenAiChatCompletionBuffer {
1268 ctx: ChunkContext,
1269 id: Option<String>,
1270 created: Option<i64>,
1271 model: Option<String>,
1272 choice_index: Option<u64>,
1273 saw_encrypted_response_field: bool,
1274 content: String,
1275 reasoning_content: String,
1276 finish_reason: Option<Value>,
1277 usage: Option<Value>,
1278}
1279
1280impl OpenAiChatCompletionBuffer {
1281 fn new(codec: E2eeCodec, proxy_instance_key: ProxyInstanceKey, fallback_model: String) -> Self {
1283 Self {
1284 ctx: ChunkContext::new(codec, proxy_instance_key, fallback_model),
1285 id: None,
1286 created: None,
1287 model: None,
1288 choice_index: None,
1289 saw_encrypted_response_field: false,
1290 content: String::new(),
1291 reasoning_content: String::new(),
1292 finish_reason: None,
1293 usage: None,
1294 }
1295 }
1296
1297 fn handle_event(&mut self, event: RawSseEvent) -> Result<bool, ChatStreamError> {
1299 match classify_upstream_event(event, &BUFFERED_UPSTREAM_EVENT_LOG)? {
1300 UpstreamEventKind::Done => {
1301 if !self.saw_encrypted_response_field {
1302 self.ctx.decrypt(None)?;
1303 }
1304 if self.finish_reason.is_none() {
1305 self.finish_reason = Some(Value::String("stop".to_owned()));
1306 }
1307 Ok(true)
1308 }
1309 UpstreamEventKind::Usage(value) => {
1310 self.record_metadata(&value);
1311 self.handle_usage_chunk(&value).map(|()| false)
1312 }
1313 UpstreamEventKind::Choice { value, choice } => {
1314 self.record_metadata(&value);
1315 self.handle_choice_chunk(&choice)?;
1316 Ok(false)
1317 }
1318 }
1319 }
1320
1321 fn handle_usage_chunk(&mut self, value: &Value) -> Result<(), ChatStreamError> {
1323 let Some(usage) = value.get("usage") else {
1324 warn!("buffered upstream chunk has no choices and no usage");
1325 return Err(ChatStreamError::malformed_event(
1326 "upstream chunk has no choices and no usage",
1327 ));
1328 };
1329
1330 info!("buffered upstream usage chunk");
1331 self.usage = Some(usage.clone());
1332 Ok(())
1333 }
1334
1335 fn handle_choice_chunk(&mut self, choice: &Value) -> Result<(), ChatStreamError> {
1337 let choice = choice.as_object().ok_or_else(|| {
1338 ChatStreamError::malformed_event("upstream choice must be a JSON object")
1339 })?;
1340 let index = normalized_choice_index(choice.get("index"))?;
1341 match self.choice_index {
1342 Some(existing) if existing != index => {
1343 return Err(ChatStreamError::malformed_event(
1344 "upstream choice index changed while buffering a completion",
1345 ));
1346 }
1347 None => self.choice_index = Some(index),
1348 Some(_) => {}
1349 }
1350
1351 let finish_reason = normalized_finish_reason(choice.get("finish_reason"))?;
1352 let delta = choice.get("delta").unwrap_or(&Value::Null);
1353 let content = encrypted_delta_content(delta)?;
1354 let reasoning_content = encrypted_delta_reasoning_content(delta)?;
1355 debug!(
1356 choice_index = index,
1357 has_encrypted_content = content.is_some(),
1358 has_encrypted_reasoning_content = reasoning_content.is_some(),
1359 has_finish_reason = !finish_reason.is_null(),
1360 "transforming buffered upstream choice chunk"
1361 );
1362
1363 if let Some(content) = content {
1364 let decrypted = self.ctx.decrypt(Some(content))?;
1365 self.saw_encrypted_response_field = true;
1366 debug!(
1367 choice_index = index,
1368 has_decrypted_content = decrypted.is_some(),
1369 "decrypted buffered upstream content chunk"
1370 );
1371 if let Some(content) = decrypted {
1372 self.content.push_str(&content);
1373 }
1374 }
1375
1376 if let Some(reasoning_content) = reasoning_content {
1377 let decrypted = self.ctx.decrypt(Some(reasoning_content))?;
1378 self.saw_encrypted_response_field = true;
1379 debug!(
1380 choice_index = index,
1381 has_decrypted_reasoning_content = decrypted.is_some(),
1382 "decrypted buffered upstream reasoning content chunk"
1383 );
1384 if let Some(reasoning_content) = decrypted {
1385 self.reasoning_content.push_str(&reasoning_content);
1386 }
1387 }
1388
1389 if !finish_reason.is_null() {
1390 self.finish_reason = Some(finish_reason);
1391 }
1392
1393 Ok(())
1394 }
1395
1396 fn record_metadata(&mut self, value: &Value) {
1398 if self.id.is_none()
1399 && let Some(id) = string_field(value, "id")
1400 {
1401 self.id = Some(id.to_owned());
1402 }
1403 if self.created.is_none()
1404 && let Some(created) = integer_field(value, "created")
1405 {
1406 self.created = Some(created);
1407 }
1408 if self.model.is_none()
1409 && let Some(model) = string_field(value, "model")
1410 {
1411 self.model = Some(model.to_owned());
1412 }
1413 }
1414
1415 fn into_response(self) -> Value {
1417 let mut message = serde_json::Map::new();
1418 message.insert("role".to_owned(), Value::String("assistant".to_owned()));
1419 if !self.reasoning_content.is_empty() {
1420 message.insert(
1421 "reasoning_content".to_owned(),
1422 Value::String(self.reasoning_content),
1423 );
1424 }
1425 message.insert("content".to_owned(), Value::String(self.content));
1426
1427 json!({
1428 "id": self.id.unwrap_or(self.ctx.fallback_id),
1429 "object": "chat.completion",
1430 "created": self.created.unwrap_or(self.ctx.fallback_created),
1431 "model": self.model.unwrap_or(self.ctx.fallback_model),
1432 "choices": [{
1433 "index": self.choice_index.unwrap_or(0),
1434 "message": Value::Object(message),
1435 "finish_reason": self.finish_reason.unwrap_or_else(|| Value::String("stop".to_owned())),
1436 }],
1437 "usage": self.usage.unwrap_or(Value::Null),
1438 })
1439 }
1440}
1441
1442fn sse_event_boundary(buffer: &str) -> Option<(usize, usize)> {
1444 ["\r\n\r\n", "\n\n", "\r\r"]
1445 .into_iter()
1446 .filter_map(|delimiter| buffer.find(delimiter).map(|index| (index, delimiter.len())))
1447 .min_by_key(|(index, _)| *index)
1448}
1449
1450fn parse_sse_event(raw: &str) -> Result<Option<RawSseEvent>, ChatStreamError> {
1452 let mut event = None;
1453 let mut data_lines = Vec::new();
1454 let mut saw_non_comment_field = false;
1455
1456 for line in raw.lines() {
1457 let line = line.strip_suffix('\r').unwrap_or(line);
1458 if line.is_empty() || line.starts_with(':') {
1459 continue;
1460 }
1461
1462 saw_non_comment_field = true;
1463 let (field, value) = line.split_once(':').unwrap_or((line, ""));
1464 let value = value.strip_prefix(' ').unwrap_or(value);
1465 match field {
1466 "event" => event = Some(value.to_owned()),
1467 "data" => data_lines.push(value.to_owned()),
1468 "id" | "retry" => {}
1469 other => {
1470 warn!(field = other, "unsupported upstream SSE field");
1471 return Err(ChatStreamError::malformed_event(format!(
1472 "unsupported upstream SSE field {other:?}",
1473 )));
1474 }
1475 }
1476 }
1477
1478 if data_lines.is_empty() {
1479 return if saw_non_comment_field {
1480 warn!("upstream SSE event did not contain a data field");
1481 Err(ChatStreamError::malformed_event(
1482 "upstream SSE event did not contain a data field",
1483 ))
1484 } else {
1485 debug!("ignored upstream SSE comment or heartbeat event");
1486 Ok(None)
1487 };
1488 }
1489
1490 debug!(
1491 event_type = event.as_deref().unwrap_or("message"),
1492 data_line_count = data_lines.len(),
1493 "parsed upstream SSE event"
1494 );
1495
1496 Ok(Some(RawSseEvent {
1497 event,
1498 data: data_lines.join("\n"),
1499 }))
1500}
1501
1502struct OpenAiChatStreamTransformer {
1504 ctx: ChunkContext,
1505 include_usage_requested: bool,
1506 sent_role: bool,
1507 sent_final_finish: bool,
1508}
1509
1510impl OpenAiChatStreamTransformer {
1511 fn new(
1513 codec: E2eeCodec,
1514 proxy_instance_key: ProxyInstanceKey,
1515 fallback_model: String,
1516 include_usage_requested: bool,
1517 ) -> Self {
1518 Self {
1519 ctx: ChunkContext::new(codec, proxy_instance_key, fallback_model),
1520 include_usage_requested,
1521 sent_role: false,
1522 sent_final_finish: false,
1523 }
1524 }
1525
1526 fn handle_choice_chunk(
1528 &mut self,
1529 value: &Value,
1530 choice: &Value,
1531 ) -> Result<Vec<StreamOutput>, ChatStreamError> {
1532 let choice = choice.as_object().ok_or_else(|| {
1533 ChatStreamError::malformed_event("upstream choice must be a JSON object")
1534 })?;
1535 let finish_reason = normalized_finish_reason(choice.get("finish_reason"))?;
1536 let delta = choice.get("delta").unwrap_or(&Value::Null);
1537 let content = encrypted_delta_content(delta)?;
1538 let reasoning_content = encrypted_delta_reasoning_content(delta)?;
1539 debug!(
1540 has_encrypted_content = content.is_some(),
1541 has_encrypted_reasoning_content = reasoning_content.is_some(),
1542 has_finish_reason = !finish_reason.is_null(),
1543 "transforming streaming upstream choice chunk"
1544 );
1545
1546 let mut output = Vec::new();
1547
1548 if content.is_none() && reasoning_content.is_none() {
1549 if !finish_reason.is_null() {
1550 output.push(StreamOutput::Json(self.chunk_with_choice(
1551 value,
1552 choice.get("index"),
1553 json!({}),
1554 finish_reason,
1555 )?));
1556 self.sent_final_finish = true;
1557 }
1558 return Ok(output);
1559 }
1560
1561 let decrypted_content = match content {
1562 Some(content) => self.ctx.decrypt(Some(content))?,
1563 None => None,
1564 };
1565 let decrypted_reasoning_content = match reasoning_content {
1566 Some(reasoning_content) => self.ctx.decrypt(Some(reasoning_content))?,
1567 None => None,
1568 };
1569 debug!(
1570 has_decrypted_content = decrypted_content.is_some(),
1571 has_decrypted_reasoning_content = decrypted_reasoning_content.is_some(),
1572 "decrypted streaming upstream content chunk"
1573 );
1574
1575 if decrypted_content.is_some() || decrypted_reasoning_content.is_some() {
1576 let mut delta = serde_json::Map::new();
1577
1578 if !self.sent_role {
1579 delta.insert("role".to_owned(), Value::String("assistant".to_owned()));
1580 self.sent_role = true;
1581 }
1582
1583 if let Some(reasoning_content) = decrypted_reasoning_content {
1584 delta.insert(
1585 "reasoning_content".to_owned(),
1586 Value::String(reasoning_content),
1587 );
1588 }
1589
1590 if let Some(content) = decrypted_content {
1591 delta.insert("content".to_owned(), Value::String(content));
1592 }
1593
1594 let final_finish = !finish_reason.is_null();
1595 let content_finish_reason = if final_finish {
1596 Value::Null
1597 } else {
1598 finish_reason.clone()
1599 };
1600 output.push(StreamOutput::Json(self.chunk_with_choice(
1601 value,
1602 choice.get("index"),
1603 Value::Object(delta),
1604 content_finish_reason,
1605 )?));
1606 if final_finish {
1607 output.push(StreamOutput::Json(self.chunk_with_choice(
1608 value,
1609 choice.get("index"),
1610 json!({}),
1611 finish_reason,
1612 )?));
1613 self.sent_final_finish = true;
1614 }
1615 return Ok(output);
1616 }
1617
1618 Ok(output)
1619 }
1620
1621 fn handle_usage_chunk(&self, value: &Value) -> Result<Vec<StreamOutput>, ChatStreamError> {
1623 let Some(usage) = value.get("usage") else {
1624 warn!("streaming upstream chunk has no choices and no usage");
1625 return Err(ChatStreamError::malformed_event(
1626 "upstream chunk has no choices and no usage",
1627 ));
1628 };
1629
1630 if !self.include_usage_requested {
1634 debug!("streaming upstream usage chunk ignored because client did not request usage");
1635 return Ok(Vec::new());
1636 }
1637
1638 info!("streaming upstream usage chunk forwarded");
1639 Ok(vec![StreamOutput::Json(self.ctx.usage_chunk(value, usage))])
1640 }
1641
1642 fn finish_chunk(&self) -> Value {
1644 self.ctx
1645 .chunk_with_choice(&Value::Null, 0, json!({}), Value::String("stop".to_owned()))
1646 }
1647
1648 fn chunk_with_choice(
1650 &self,
1651 upstream: &Value,
1652 index: Option<&Value>,
1653 delta: Value,
1654 finish_reason: Value,
1655 ) -> Result<Value, ChatStreamError> {
1656 let index = normalized_choice_index(index)?;
1657 Ok(self
1658 .ctx
1659 .chunk_with_choice(upstream, index, delta, finish_reason))
1660 }
1661}
1662
1663impl ChatSseTransformer for OpenAiChatStreamTransformer {
1664 fn handle_event(&mut self, event: RawSseEvent) -> Result<Vec<StreamOutput>, ChatStreamError> {
1666 match classify_upstream_event(event, &STREAMING_UPSTREAM_EVENT_LOG)? {
1667 UpstreamEventKind::Done => {
1668 let mut output = Vec::new();
1669 if !self.sent_final_finish {
1670 debug!("synthesizing final streaming finish chunk before DONE");
1671 output.push(StreamOutput::Json(self.finish_chunk()));
1672 self.sent_final_finish = true;
1673 }
1674 output.push(StreamOutput::Done);
1675 Ok(output)
1676 }
1677 UpstreamEventKind::Usage(value) => self.handle_usage_chunk(&value),
1678 UpstreamEventKind::Choice { value, choice } => {
1679 self.handle_choice_chunk(&value, &choice)
1680 }
1681 }
1682 }
1683}
1684
1685const TOOL_CALL_START_MARKER: &str = "<tool_call>";
1686
1687struct OpenAiToolEmulatedChatStreamTransformer {
1693 ctx: ChunkContext,
1694 tool_context: ToolEmulationContext,
1695 include_usage_requested: bool,
1696 sent_role: bool,
1697 sent_final_finish: bool,
1698 pending_text: String,
1699 tool_buffer: String,
1700 buffering_tool_call: bool,
1701 emitted_tool_calls: bool,
1702 suppress_normal_output: bool,
1703}
1704
1705impl OpenAiToolEmulatedChatStreamTransformer {
1706 fn new(
1708 tool_context: &ToolEmulationContext,
1709 codec: E2eeCodec,
1710 proxy_instance_key: ProxyInstanceKey,
1711 fallback_model: String,
1712 include_usage_requested: bool,
1713 suppress_normal_output: bool,
1714 ) -> Result<Self, ChatStreamError> {
1715 Ok(Self {
1716 ctx: ChunkContext::new(codec, proxy_instance_key, fallback_model),
1717 tool_context: tool_context.clone(),
1718 include_usage_requested,
1719 sent_role: false,
1720 sent_final_finish: false,
1721 pending_text: String::new(),
1722 tool_buffer: String::new(),
1723 buffering_tool_call: false,
1724 emitted_tool_calls: false,
1725 suppress_normal_output,
1726 })
1727 }
1728
1729 fn handle_choice_chunk(
1731 &mut self,
1732 value: &Value,
1733 choice: &Value,
1734 ) -> Result<Vec<StreamOutput>, ChatStreamError> {
1735 let choice = choice.as_object().ok_or_else(|| {
1736 ChatStreamError::malformed_event("upstream choice must be a JSON object")
1737 })?;
1738 let index = normalized_choice_index(choice.get("index"))?;
1739 let finish_reason = normalized_finish_reason(choice.get("finish_reason"))?;
1740 let delta = choice.get("delta").unwrap_or(&Value::Null);
1741 let content = encrypted_delta_content(delta)?;
1742 let reasoning_content = encrypted_delta_reasoning_content(delta)?;
1743
1744 let mut output = Vec::new();
1745
1746 if let Some(reasoning_content) = reasoning_content
1747 && let Some(reasoning_content) = self.ctx.decrypt(Some(reasoning_content))?
1748 && !self.sent_final_finish
1749 && !self.suppress_normal_output
1750 {
1751 output.push(self.reasoning_chunk(value, index, reasoning_content));
1752 }
1753
1754 if let Some(content) = content
1755 && let Some(content) = self.ctx.decrypt(Some(content))?
1756 && !self.sent_final_finish
1757 {
1758 output.extend(self.push_decrypted_content(value, index, &content)?);
1759 }
1760
1761 if !finish_reason.is_null() && !self.sent_final_finish {
1762 output.extend(self.finish_buffered_content(value, index, finish_reason)?);
1763 }
1764
1765 Ok(output)
1766 }
1767
1768 fn push_decrypted_content(
1770 &mut self,
1771 upstream: &Value,
1772 index: u64,
1773 content: &str,
1774 ) -> Result<Vec<StreamOutput>, ChatStreamError> {
1775 if self.buffering_tool_call {
1776 self.tool_buffer.push_str(content);
1777 self.ensure_tool_buffer_within_limit()?;
1778 return Ok(Vec::new());
1779 }
1780
1781 self.pending_text.push_str(content);
1782 if let Some(marker_index) = self.pending_text.find(TOOL_CALL_START_MARKER) {
1783 let text = self.pending_text[..marker_index].to_owned();
1784 self.tool_buffer = self.pending_text[marker_index..].to_owned();
1785 self.pending_text.clear();
1786 self.buffering_tool_call = true;
1787 self.ensure_tool_buffer_within_limit()?;
1788 if self.suppress_normal_output {
1789 return Ok(Vec::new());
1790 }
1791 return Ok(self.text_chunk_if_not_empty(upstream, index, text));
1792 }
1793
1794 if self.suppress_normal_output {
1795 self.ensure_pending_text_within_limit()?;
1796 return Ok(Vec::new());
1797 }
1798
1799 let streamable_len = streamable_pending_text_len(&self.pending_text);
1800 if streamable_len == 0 {
1801 return Ok(Vec::new());
1802 }
1803
1804 let text = self.pending_text[..streamable_len].to_owned();
1805 self.pending_text.drain(..streamable_len);
1806 Ok(vec![
1807 self.text_field_chunk(upstream, index, "content", text),
1808 ])
1809 }
1810
1811 fn finish_buffered_content(
1813 &mut self,
1814 upstream: &Value,
1815 index: u64,
1816 finish_reason: Value,
1817 ) -> Result<Vec<StreamOutput>, ChatStreamError> {
1818 let mut output = Vec::new();
1819
1820 if self.buffering_tool_call {
1821 output.extend(self.buffered_tool_call_chunks(upstream, index)?);
1822 } else if self.suppress_normal_output {
1823 let invalid_output = std::mem::take(&mut self.pending_text);
1824 return Err(ChatStreamError::invalid_tool_call(
1825 "expected corrected streamed response to include a tool call",
1826 invalid_output,
1827 ));
1828 } else if !self.pending_text.is_empty() {
1829 let text = std::mem::take(&mut self.pending_text);
1830 output.push(self.text_field_chunk(upstream, index, "content", text));
1831 }
1832
1833 let finish_reason = if self.emitted_tool_calls {
1834 Value::String("tool_calls".to_owned())
1835 } else {
1836 finish_reason
1837 };
1838 output.push(StreamOutput::Json(self.ctx.chunk_with_choice(
1839 upstream,
1840 index,
1841 json!({}),
1842 finish_reason,
1843 )));
1844 self.sent_final_finish = true;
1845 Ok(output)
1846 }
1847
1848 fn buffered_tool_call_chunks(
1850 &mut self,
1851 upstream: &Value,
1852 index: u64,
1853 ) -> Result<Vec<StreamOutput>, ChatStreamError> {
1854 self.ensure_tool_buffer_within_limit()?;
1855 match self
1856 .tool_context
1857 .classify_assistant_output(&self.tool_buffer)
1858 {
1859 ToolOutputClassification::ToolCalls(tool_calls) => {
1860 self.emitted_tool_calls = true;
1861 Ok(tool_calls
1862 .iter()
1863 .enumerate()
1864 .map(|(tool_index, tool_call)| {
1865 self.full_tool_call_chunk(upstream, index, tool_index, tool_call)
1866 })
1867 .collect())
1868 }
1869 ToolOutputClassification::NormalText => {
1870 let text = std::mem::take(&mut self.tool_buffer);
1871 self.buffering_tool_call = false;
1872 Ok(self.text_chunk_if_not_empty(upstream, index, text))
1873 }
1874 ToolOutputClassification::InvalidToolCall {
1875 error,
1876 invalid_output,
1877 } => {
1878 error!(
1879 validation_error = %error,
1880 payload_bytes = self.tool_buffer.len(),
1881 payload = %self.tool_buffer,
1882 "buffered streamed tool-call payload failed validation"
1883 );
1884 Err(ChatStreamError::invalid_tool_call(
1885 format!("tool call parsing failed: {error}"),
1886 invalid_output,
1887 ))
1888 }
1889 }
1890 }
1891
1892 fn ensure_tool_buffer_within_limit(&self) -> Result<(), ChatStreamError> {
1894 if self.tool_buffer.len() > self.tool_context.config().tool_call_max_bytes {
1895 return Err(ChatStreamError::malformed_event(format!(
1896 "tool call output exceeded max size of {} bytes",
1897 self.tool_context.config().tool_call_max_bytes
1898 )));
1899 }
1900 Ok(())
1901 }
1902
1903 fn ensure_pending_text_within_limit(&self) -> Result<(), ChatStreamError> {
1905 if self.pending_text.len() > self.tool_context.config().tool_call_max_bytes {
1906 return Err(ChatStreamError::invalid_tool_call(
1907 format!(
1908 "corrected streamed response exceeded the tool call max size of {} bytes before emitting a tool call",
1909 self.tool_context.config().tool_call_max_bytes
1910 ),
1911 self.pending_text.clone(),
1912 ));
1913 }
1914 Ok(())
1915 }
1916
1917 fn text_chunk_if_not_empty(
1919 &mut self,
1920 upstream: &Value,
1921 index: u64,
1922 text: String,
1923 ) -> Vec<StreamOutput> {
1924 if text.is_empty() {
1925 Vec::new()
1926 } else {
1927 vec![self.text_field_chunk(upstream, index, "content", text)]
1928 }
1929 }
1930
1931 fn reasoning_chunk(
1933 &mut self,
1934 upstream: &Value,
1935 index: u64,
1936 reasoning_content: String,
1937 ) -> StreamOutput {
1938 self.text_field_chunk(upstream, index, "reasoning_content", reasoning_content)
1939 }
1940
1941 fn text_field_chunk(
1943 &mut self,
1944 upstream: &Value,
1945 index: u64,
1946 field: &'static str,
1947 text: String,
1948 ) -> StreamOutput {
1949 let mut delta = serde_json::Map::new();
1950 self.insert_role_if_needed(&mut delta);
1951 delta.insert(field.to_owned(), Value::String(text));
1952
1953 StreamOutput::Json(self.ctx.chunk_with_choice(
1954 upstream,
1955 index,
1956 Value::Object(delta),
1957 Value::Null,
1958 ))
1959 }
1960
1961 fn insert_role_if_needed(&mut self, delta: &mut serde_json::Map<String, Value>) {
1963 if !self.sent_role {
1964 delta.insert("role".to_owned(), Value::String("assistant".to_owned()));
1965 self.sent_role = true;
1966 }
1967 }
1968
1969 fn full_tool_call_chunk(
1971 &mut self,
1972 upstream: &Value,
1973 index: u64,
1974 tool_index: usize,
1975 tool_call: &ValidatedToolCall,
1976 ) -> StreamOutput {
1977 let mut delta = serde_json::Map::new();
1978 self.insert_role_if_needed(&mut delta);
1979
1980 let mut tool_call_value = tool_call.to_openai_value();
1981
1982 if let Some(tool_call_object) = tool_call_value.as_object_mut() {
1983 tool_call_object.insert("index".to_owned(), json!(tool_index));
1984 }
1985 delta.insert("tool_calls".to_owned(), Value::Array(vec![tool_call_value]));
1986
1987 StreamOutput::Json(self.ctx.chunk_with_choice(
1988 upstream,
1989 index,
1990 Value::Object(delta),
1991 Value::Null,
1992 ))
1993 }
1994
1995 fn handle_usage_chunk(&self, value: &Value) -> Result<Vec<StreamOutput>, ChatStreamError> {
1997 let Some(usage) = value.get("usage") else {
1998 warn!("tool-emulated upstream chunk has no choices and no usage");
1999 return Err(ChatStreamError::malformed_event(
2000 "upstream chunk has no choices and no usage",
2001 ));
2002 };
2003
2004 if !self.include_usage_requested {
2006 return Ok(Vec::new());
2007 }
2008
2009 Ok(vec![StreamOutput::Json(self.ctx.usage_chunk(value, usage))])
2010 }
2011
2012 fn finish_stream(&mut self) -> Result<Vec<StreamOutput>, ChatStreamError> {
2014 let upstream = &Value::Null;
2015 let mut output = Vec::new();
2016
2017 if !self.sent_final_finish {
2018 output.extend(self.finish_buffered_content(
2019 upstream,
2020 0,
2021 Value::String("stop".to_owned()),
2022 )?);
2023 }
2024
2025 output.push(StreamOutput::Done);
2026 Ok(output)
2027 }
2028}
2029
2030fn streamable_pending_text_len(pending_text: &str) -> usize {
2032 let protected_suffix_len = TOOL_CALL_START_MARKER.len().saturating_sub(1);
2033 if pending_text.len() <= protected_suffix_len {
2034 return 0;
2035 }
2036
2037 let mut split_at = pending_text.len() - protected_suffix_len;
2038 while !pending_text.is_char_boundary(split_at) {
2039 split_at -= 1;
2040 }
2041 split_at
2042}
2043
2044impl ChatSseTransformer for OpenAiToolEmulatedChatStreamTransformer {
2045 fn handle_event(&mut self, event: RawSseEvent) -> Result<Vec<StreamOutput>, ChatStreamError> {
2047 match classify_upstream_event(event, &TOOL_EMULATED_UPSTREAM_EVENT_LOG)? {
2048 UpstreamEventKind::Done => self.finish_stream(),
2049 UpstreamEventKind::Usage(value) => self.handle_usage_chunk(&value),
2050 UpstreamEventKind::Choice { value, choice } => {
2051 self.handle_choice_chunk(&value, &choice)
2052 }
2053 }
2054 }
2055}
2056
2057#[derive(Debug, Clone, PartialEq, Eq)]
2059enum StreamOutput {
2060 Json(Value),
2061 Done,
2062}
2063
2064fn normalized_choice_index(index: Option<&Value>) -> Result<u64, ChatStreamError> {
2066 match index {
2067 Some(Value::Number(number)) => number.as_u64().ok_or_else(|| {
2068 ChatStreamError::malformed_event("upstream choice index must be a non-negative integer")
2069 }),
2070 Some(_) => Err(ChatStreamError::malformed_event(
2071 "upstream choice index must be a non-negative integer",
2072 )),
2073 None => Ok(0),
2074 }
2075}
2076
2077fn normalized_finish_reason(value: Option<&Value>) -> Result<Value, ChatStreamError> {
2079 match value {
2080 Some(Value::Null) | None => Ok(Value::Null),
2081 Some(Value::String(reason)) => Ok(Value::String(reason.clone())),
2082 Some(_) => Err(ChatStreamError::malformed_event(
2083 "upstream finish_reason must be a string or null",
2084 )),
2085 }
2086}
2087
2088fn encrypted_delta_content(delta: &Value) -> Result<Option<&str>, ChatStreamError> {
2090 encrypted_delta_text_field(delta, "content")
2091}
2092
2093fn encrypted_delta_reasoning_content(delta: &Value) -> Result<Option<&str>, ChatStreamError> {
2095 encrypted_delta_text_field(delta, "reasoning_content")
2096}
2097
2098fn encrypted_delta_text_field<'a>(
2100 delta: &'a Value,
2101 field: &'static str,
2102) -> Result<Option<&'a str>, ChatStreamError> {
2103 match delta.get(field) {
2104 Some(Value::Null) => {
2105 debug!(field, "ignoring null upstream delta text field");
2106 Ok(None)
2107 }
2108 Some(Value::String(content)) if content.is_empty() => {
2109 debug!(field, "ignoring empty upstream delta text field");
2110 Ok(None)
2111 }
2112 Some(Value::String(content)) => Ok(Some(content.as_str())),
2113 Some(_) => Err(ChatStreamError::malformed_event(format!(
2114 "upstream delta.{field} must be a string or null"
2115 ))),
2116 None => Ok(None),
2117 }
2118}
2119
2120fn string_field<'a>(value: &'a Value, field: &str) -> Option<&'a str> {
2122 value.get(field).and_then(Value::as_str)
2123}
2124
2125fn integer_field(value: &Value, field: &str) -> Option<i64> {
2127 value.get(field).and_then(Value::as_i64)
2128}
2129
2130fn unix_timestamp_now() -> i64 {
2132 SystemTime::now()
2133 .duration_since(UNIX_EPOCH)
2134 .map(|duration| duration.as_secs() as i64)
2135 .unwrap_or(0)
2136}
2137
2138async fn method_not_allowed(method: Method, uri: Uri) -> ProxyError {
2140 ProxyError::MethodNotAllowed { method, uri }
2141}
2142
2143async fn not_found(uri: Uri) -> ProxyError {
2145 ProxyError::NotFound { uri }
2146}
2147
2148#[derive(Debug, Error)]
2150pub enum ChatStreamError {
2151 #[error("Venice upstream stream failed: {message}")]
2152 UpstreamStream { message: String },
2153 #[error("Venice upstream stream emitted an error event: {message}")]
2154 UpstreamEvent { message: String },
2155 #[error("Venice upstream stream event is malformed: {message}")]
2156 MalformedEvent { message: String },
2157 #[error("streamed tool call failed validation: {validation_error}")]
2158 InvalidToolCall {
2159 validation_error: String,
2160 invalid_output: String,
2161 },
2162 #[error("failed to decrypt Venice E2EE response chunk: {source}")]
2163 Decryption { source: E2eeCodecError },
2164}
2165
2166impl ChatStreamError {
2167 fn upstream_stream(source: reqwest::Error) -> Self {
2169 Self::UpstreamStream {
2170 message: source.to_string(),
2171 }
2172 }
2173
2174 fn upstream_event(message: impl Into<String>) -> Self {
2176 Self::UpstreamEvent {
2177 message: message.into(),
2178 }
2179 }
2180
2181 fn malformed_event(message: impl Into<String>) -> Self {
2183 Self::MalformedEvent {
2184 message: message.into(),
2185 }
2186 }
2187
2188 fn invalid_utf8(source: std::str::Utf8Error) -> Self {
2190 Self::MalformedEvent {
2191 message: format!("upstream SSE bytes are not valid UTF-8: {source}"),
2192 }
2193 }
2194
2195 fn invalid_tool_call(
2197 validation_error: impl Into<String>,
2198 invalid_output: impl Into<String>,
2199 ) -> Self {
2200 Self::InvalidToolCall {
2201 validation_error: validation_error.into(),
2202 invalid_output: invalid_output.into(),
2203 }
2204 }
2205
2206 fn json_event(source: serde_json::Error) -> Self {
2208 Self::MalformedEvent {
2209 message: format!("upstream SSE data is not valid JSON: {source}"),
2210 }
2211 }
2212
2213 fn decryption(source: E2eeCodecError) -> Self {
2215 Self::Decryption { source }
2216 }
2217
2218 fn api_error_type(&self) -> &'static str {
2220 match self {
2221 Self::UpstreamStream { .. }
2222 | Self::UpstreamEvent { .. }
2223 | Self::MalformedEvent { .. }
2224 | Self::InvalidToolCall { .. } => "proxy_upstream_error",
2225 Self::Decryption { .. } => "proxy_e2ee_error",
2226 }
2227 }
2228
2229 fn api_error_code(&self) -> &'static str {
2231 match self {
2232 Self::UpstreamStream { .. } => "upstream_stream_error",
2233 Self::UpstreamEvent { .. } => "upstream_stream_error",
2234 Self::MalformedEvent { .. } => "upstream_malformed_response",
2235 Self::InvalidToolCall { .. } => "tool_call_validation_failed",
2236 Self::Decryption { .. } => "e2ee_response_decryption_failed",
2237 }
2238 }
2239}
2240
2241#[derive(Debug, Error)]
2243pub enum ProxyError {
2244 #[error(transparent)]
2245 Venice(#[from] VeniceClientError),
2246 #[error(transparent)]
2247 Attestation(#[from] AttestationError),
2248 #[error(transparent)]
2249 Session(#[from] SessionError),
2250 #[error(transparent)]
2251 ChatRequest(#[from] ChatRequestError),
2252 #[error(transparent)]
2253 ChatConstruction(#[from] ChatConstructionError),
2254 #[error(transparent)]
2255 ChatStream(#[from] ChatStreamError),
2256 #[error("The model failed to produce a valid tool call after correction attempts.")]
2257 ToolCallRetryExhausted {
2258 max_retries: u32,
2259 last_validation_error: String,
2260 },
2261 #[error(
2262 "proxy instance key is unavailable; keys.generate_proxy_instance_key_on_startup must be enabled for E2EE chat requests"
2263 )]
2264 ProxyInstanceKeyUnavailable,
2265 #[error("session does not contain an attested model public key after attestation verification")]
2266 MissingAttestedModelKey,
2267 #[error("method {method} is not supported for {uri}")]
2268 MethodNotAllowed { method: Method, uri: Uri },
2269 #[error("route {uri} was not found")]
2270 NotFound { uri: Uri },
2271}
2272
2273impl ProxyError {
2274 fn status(&self) -> StatusCode {
2276 match self {
2277 Self::Venice(_) => StatusCode::BAD_GATEWAY,
2278 Self::Attestation(error) if error.verifier_unavailable() => {
2279 StatusCode::SERVICE_UNAVAILABLE
2280 }
2281 Self::Attestation(_) => StatusCode::BAD_GATEWAY,
2282 Self::Session(
2283 SessionError::MissingSessionIdentifier | SessionError::InvalidHeaderValue { .. },
2284 ) => StatusCode::BAD_REQUEST,
2285 Self::Session(_) => StatusCode::INTERNAL_SERVER_ERROR,
2286 Self::ChatRequest(_) => StatusCode::BAD_REQUEST,
2287 Self::ChatConstruction(_)
2288 | Self::ChatStream(_)
2289 | Self::ToolCallRetryExhausted { .. } => StatusCode::BAD_GATEWAY,
2290 Self::ProxyInstanceKeyUnavailable | Self::MissingAttestedModelKey => {
2291 StatusCode::INTERNAL_SERVER_ERROR
2292 }
2293 Self::MethodNotAllowed { .. } => StatusCode::METHOD_NOT_ALLOWED,
2294 Self::NotFound { .. } => StatusCode::NOT_FOUND,
2295 }
2296 }
2297
2298 fn error_type(&self) -> &'static str {
2300 match self {
2301 Self::Venice(error) => error.api_error_type(),
2302 Self::Attestation(error) => error.api_error_type(),
2303 Self::Session(
2304 SessionError::MissingSessionIdentifier | SessionError::InvalidHeaderValue { .. },
2305 ) => "invalid_request_error",
2306 Self::Session(_) => "proxy_session_error",
2307 Self::ChatRequest(_) => "invalid_request_error",
2308 Self::ChatConstruction(_) => "proxy_e2ee_error",
2309 Self::ChatStream(error) => error.api_error_type(),
2310 Self::ToolCallRetryExhausted { .. } => "proxy_tool_call_error",
2311 Self::ProxyInstanceKeyUnavailable => "proxy_configuration_error",
2312 Self::MissingAttestedModelKey => "proxy_attestation_error",
2313 Self::MethodNotAllowed { .. } | Self::NotFound { .. } => "invalid_request_error",
2314 }
2315 }
2316
2317 fn code(&self) -> &'static str {
2319 match self {
2320 Self::Venice(error) => error.api_error_code(),
2321 Self::Attestation(error) => error.api_error_code(),
2322 Self::Session(SessionError::MissingSessionIdentifier) => "session_identifier_missing",
2323 Self::Session(SessionError::InvalidHeaderValue { .. }) => "invalid_session_header",
2324 Self::Session(_) => "session_error",
2325 Self::ChatRequest(error) => error.api_error_code(),
2326 Self::ChatConstruction(error) => error.api_error_code(),
2327 Self::ChatStream(error) => error.api_error_code(),
2328 Self::ToolCallRetryExhausted { .. } => "invalid_tool_call",
2329 Self::ProxyInstanceKeyUnavailable => "proxy_instance_key_unavailable",
2330 Self::MissingAttestedModelKey => "attestation_failed",
2331 Self::MethodNotAllowed { .. } => "method_not_allowed",
2332 Self::NotFound { .. } => "not_found",
2333 }
2334 }
2335}
2336
2337impl IntoResponse for ProxyError {
2338 fn into_response(self) -> Response {
2340 let status = self.status();
2341 let error_code = self.code();
2342 let error_type = self.error_type();
2343
2344 if status.is_server_error() {
2345 error!(
2346 status = status.as_u16(),
2347 error_code,
2348 error_type,
2349 error = %self,
2350 "proxy request failed"
2351 );
2352 } else {
2353 warn!(
2354 status = status.as_u16(),
2355 error_code,
2356 error_type,
2357 error = %self,
2358 "proxy request rejected"
2359 );
2360 }
2361
2362 let mut response = if let Self::ToolCallRetryExhausted {
2363 max_retries,
2364 last_validation_error,
2365 } = &self
2366 {
2367 let body = json!({
2368 "error": {
2369 "message": self.to_string(),
2370 "type": error_type,
2371 "code": error_code,
2372 "details": {
2373 "max_retries": max_retries,
2374 "last_validation_error": last_validation_error,
2375 },
2376 }
2377 });
2378 (status, Json(body)).into_response()
2379 } else {
2380 let body = ErrorResponse::new(self.to_string(), error_type, error_code);
2381 (status, Json(body)).into_response()
2382 };
2383
2384 apply_error_headers(response.headers_mut(), error_code);
2385 response
2386 }
2387}
2388
2389#[derive(Debug, Clone, Default, PartialEq, Eq)]
2394pub struct ProxyMetadataHeaders {
2395 pub e2ee: Option<String>,
2396 pub attestation_mode: Option<String>,
2397 pub attested_model: Option<String>,
2398 pub tee_provider: Option<String>,
2399 pub tdx_verified: Option<bool>,
2400 pub tdx_debug: Option<bool>,
2401 pub nvidia_verified: Option<String>,
2402 pub key_binding: Option<bool>,
2403 pub session_id: Option<String>,
2404 pub session_scope: Option<String>,
2405 pub tool_mode: Option<String>,
2406 pub tool_retries: Option<u32>,
2407}
2408
2409impl ProxyMetadataHeaders {
2410 pub fn from_config(config: &ProxyConfig) -> Self {
2413 Self {
2414 attestation_mode: Some(config.attestation.mode.as_str().to_owned()),
2415 tool_mode: Some(config.tools.mode.as_str().to_owned()),
2416 ..Self::default()
2417 }
2418 }
2419
2420 pub fn for_verified_chat(config: &ProxyConfig, session: &SessionContext) -> Self {
2422 let tee_provider = session
2423 .attestation_tee_provider
2424 .clone()
2425 .unwrap_or_else(|| "unknown".to_owned());
2426 let tdx_debug = session.attestation_tdx_debug;
2427 let nvidia_verified = session
2428 .attestation_nvidia_verified
2429 .clone()
2430 .unwrap_or_else(|| "not-present".to_owned());
2431
2432 Self {
2433 e2ee: Some("verified".to_owned()),
2434 attestation_mode: Some(config.attestation.mode.as_str().to_owned()),
2435 attested_model: Some(session.model_id.clone()),
2436 tee_provider: Some(tee_provider),
2437 tdx_verified: config.attestation.require_tdx.then_some(true),
2438 tdx_debug,
2439 nvidia_verified: Some(nvidia_verified),
2440 key_binding: Some(true),
2441 session_id: Some(session.agent_session_id.clone()),
2442 session_scope: Some(session.scope.as_str().to_owned()),
2443 tool_mode: Some(config.tools.mode.as_str().to_owned()),
2444 tool_retries: None,
2445 }
2446 }
2447
2448 pub fn apply(&self, headers: &mut HeaderMap) {
2450 insert_optional_header(headers, HEADER_PROXY_E2EE, self.e2ee.as_deref());
2451 insert_optional_header(
2452 headers,
2453 HEADER_PROXY_ATTESTATION_MODE,
2454 self.attestation_mode.as_deref(),
2455 );
2456 insert_optional_header(
2457 headers,
2458 HEADER_PROXY_ATTESTED_MODEL,
2459 self.attested_model.as_deref(),
2460 );
2461 insert_optional_header(
2462 headers,
2463 HEADER_PROXY_TEE_PROVIDER,
2464 self.tee_provider.as_deref(),
2465 );
2466 insert_optional_bool_header(headers, HEADER_PROXY_TDX_VERIFIED, self.tdx_verified);
2467 insert_optional_bool_header(headers, HEADER_PROXY_TDX_DEBUG, self.tdx_debug);
2468 insert_optional_header(
2469 headers,
2470 HEADER_PROXY_NVIDIA_VERIFIED,
2471 self.nvidia_verified.as_deref(),
2472 );
2473 insert_optional_bool_header(headers, HEADER_PROXY_KEY_BINDING, self.key_binding);
2474 insert_optional_header(headers, HEADER_PROXY_SESSION_ID, self.session_id.as_deref());
2475 insert_optional_header(
2476 headers,
2477 HEADER_PROXY_SESSION_SCOPE,
2478 self.session_scope.as_deref(),
2479 );
2480 insert_optional_header(headers, HEADER_PROXY_TOOL_MODE, self.tool_mode.as_deref());
2481 if let Some(tool_retries) = self.tool_retries {
2482 insert_header(
2483 headers,
2484 HEADER_PROXY_TOOL_RETRIES,
2485 &tool_retries.to_string(),
2486 );
2487 }
2488 }
2489}
2490
2491pub fn apply_error_headers(headers: &mut HeaderMap, error_code: &str) {
2493 insert_header(headers, HEADER_PROXY_ERROR_CODE, error_code);
2494}
2495
2496fn insert_optional_header(headers: &mut HeaderMap, name: &'static str, value: Option<&str>) {
2498 if let Some(value) = value {
2499 insert_header(headers, name, value);
2500 }
2501}
2502
2503fn insert_optional_bool_header(headers: &mut HeaderMap, name: &'static str, value: Option<bool>) {
2505 if let Some(value) = value {
2506 insert_header(headers, name, if value { "true" } else { "false" });
2507 }
2508}
2509
2510fn insert_header(headers: &mut HeaderMap, name: &'static str, value: &str) {
2512 let Ok(name) = HeaderName::from_bytes(name.as_bytes()) else {
2513 return;
2514 };
2515 let Ok(value) = HeaderValue::from_str(value) else {
2516 return;
2517 };
2518 headers.insert(name, value);
2519}
2520
2521#[cfg(test)]
2522mod tests {
2523 use super::*;
2524 use std::{
2525 collections::{HashMap, VecDeque},
2526 sync::{Arc, Mutex},
2527 time::Duration,
2528 };
2529
2530 use axum::{
2531 body::Body,
2532 extract::Query,
2533 http::Request,
2534 routing::{get, post},
2535 };
2536 use serde_json::json;
2537
2538 use crate::config::NvidiaRequirement;
2539 use tower::ServiceExt;
2540
2541 fn test_app() -> Router {
2542 router_with_venice_client(ProxyConfig::default(), test_venice_client())
2543 }
2544
2545 fn test_venice_client() -> VeniceClient {
2546 test_venice_client_for_base_url("http://127.0.0.1:1/api/v1")
2547 }
2548
2549 fn test_venice_client_for_base_url(base_url: impl AsRef<str>) -> VeniceClient {
2550 VeniceClient::new(base_url.as_ref(), "test-api-key", Duration::from_secs(1))
2551 .expect("test Venice client should build")
2552 }
2553
2554 fn chat_config_with_basic_test_attestation() -> ProxyConfig {
2555 let mut config = ProxyConfig::default();
2556 config.attestation.require_tdx = false;
2557 config.attestation.require_nvidia = NvidiaRequirement::Never;
2558 config
2559 }
2560
2561 #[test]
2562 fn app_state_initializes_key_and_session_managers_from_config() {
2563 let state = AppState::from_parts(ProxyConfig::default(), test_venice_client());
2564
2565 let key = state
2566 .proxy_instance_key()
2567 .expect("default config should generate startup key");
2568 assert_eq!(key.public_key_hex().len(), 130);
2569 assert!(state.session_manager().is_empty());
2570 assert_eq!(
2571 state.attestation_verifier().policy(),
2572 &ProxyConfig::default().attestation
2573 );
2574
2575 let mut config = ProxyConfig::default();
2576 config.keys.generate_proxy_instance_key_on_startup = false;
2577 let state = AppState::from_parts(config, test_venice_client());
2578 assert!(state.proxy_instance_key().is_none());
2579 }
2580
2581 async fn error_body(response: Response) -> ErrorResponse {
2582 let bytes = axum::body::to_bytes(response.into_body(), usize::MAX)
2583 .await
2584 .expect("response body should buffer");
2585 serde_json::from_slice(&bytes).expect("response should be OpenAI-style error JSON")
2586 }
2587
2588 #[tokio::test]
2589 async fn chat_route_ignores_upstream_role_only_chunk_before_encrypted_content() {
2590 let response = streaming_chat_response(
2591 "chat-route-role-only",
2592 r#"{"model":"e2ee-test","messages":[{"role":"user","content":"hello"}],"stream":true}"#,
2593 vec![
2594 MockStreamFrame::Role,
2595 MockStreamFrame::Text("Hello"),
2596 MockStreamFrame::Finish("stop"),
2597 MockStreamFrame::Done,
2598 ],
2599 )
2600 .await;
2601
2602 assert_eq!(response.status(), StatusCode::OK);
2603 let body = response_body(response).await;
2604 let data = sse_data(&body);
2605 assert_eq!(data.len(), 3);
2606 let first: Value = serde_json::from_str(data[0]).expect("first chunk should be JSON");
2607 assert_eq!(first["choices"][0]["delta"]["role"], "assistant");
2608 assert_eq!(first["choices"][0]["delta"]["content"], "Hello");
2609 assert_eq!(data[2], "[DONE]");
2610 }
2611
2612 #[tokio::test]
2613 async fn chat_route_streams_decrypted_normal_assistant_text() {
2614 let response = streaming_chat_response(
2615 "chat-route-test",
2616 r#"{"model":"e2ee-test","messages":[{"role":"user","content":"hello"}],"stream":true}"#,
2617 vec![
2618 MockStreamFrame::NullContent,
2619 MockStreamFrame::EmptyContent,
2620 MockStreamFrame::Text("Hello"),
2621 MockStreamFrame::Finish("stop"),
2622 MockStreamFrame::Done,
2623 ],
2624 )
2625 .await;
2626
2627 assert_eq!(response.status(), StatusCode::OK);
2628 assert_eq!(
2629 response.headers().get(HEADER_PROXY_E2EE).unwrap(),
2630 "verified"
2631 );
2632 assert_eq!(
2633 response.headers().get(HEADER_PROXY_ATTESTED_MODEL).unwrap(),
2634 "e2ee-test"
2635 );
2636
2637 let body = response_body(response).await;
2638 let data = sse_data(&body);
2639 assert_eq!(data.len(), 3);
2640
2641 let first: Value = serde_json::from_str(data[0]).expect("first chunk should be JSON");
2642 assert_eq!(first["object"], "chat.completion.chunk");
2643 assert_eq!(first["model"], "e2ee-test");
2644 assert_eq!(first["choices"][0]["delta"]["role"], "assistant");
2645 assert_eq!(first["choices"][0]["delta"]["content"], "Hello");
2646 assert!(first["choices"][0]["finish_reason"].is_null());
2647
2648 let final_chunk: Value = serde_json::from_str(data[1]).expect("final chunk should be JSON");
2649 assert_eq!(final_chunk["choices"][0]["delta"], json!({}));
2650 assert_eq!(final_chunk["choices"][0]["finish_reason"], "stop");
2651 assert_eq!(data[2], "[DONE]");
2652 }
2653
2654 #[tokio::test]
2655 async fn chat_route_streams_decrypted_reasoning_content() {
2656 let response = streaming_chat_response(
2657 "chat-route-reasoning-stream",
2658 r#"{"model":"e2ee-test","messages":[{"role":"user","content":"hello"}],"stream":true,"reasoning":{"effort":"high"}}"#,
2659 vec![
2660 MockStreamFrame::Reasoning("Thinking"),
2661 MockStreamFrame::Text("Answer"),
2662 MockStreamFrame::Finish("stop"),
2663 MockStreamFrame::Done,
2664 ],
2665 )
2666 .await;
2667
2668 assert_eq!(response.status(), StatusCode::OK);
2669 let body = response_body(response).await;
2670 let data = sse_data(&body);
2671 assert_eq!(data.len(), 4);
2672 let reasoning: Value =
2673 serde_json::from_str(data[0]).expect("reasoning chunk should be JSON");
2674 let answer: Value = serde_json::from_str(data[1]).expect("answer chunk should be JSON");
2675
2676 assert_eq!(reasoning["choices"][0]["delta"]["role"], "assistant");
2677 assert_eq!(
2678 reasoning["choices"][0]["delta"]["reasoning_content"],
2679 "Thinking"
2680 );
2681 assert!(answer["choices"][0]["delta"].get("role").is_none());
2682 assert_eq!(answer["choices"][0]["delta"]["content"], "Answer");
2683 assert_eq!(data.last().copied(), Some("[DONE]"));
2684 }
2685
2686 #[tokio::test]
2687 async fn chat_route_streams_multiple_decrypted_content_chunks() {
2688 let response = streaming_chat_response(
2689 "chat-route-multiple-chunks",
2690 r#"{"model":"e2ee-test","messages":[{"role":"user","content":"hello"}],"stream":true}"#,
2691 vec![
2692 MockStreamFrame::Text("Hello"),
2693 MockStreamFrame::Text(" world"),
2694 MockStreamFrame::Finish("stop"),
2695 MockStreamFrame::Done,
2696 ],
2697 )
2698 .await;
2699
2700 assert_eq!(response.status(), StatusCode::OK);
2701 let body = response_body(response).await;
2702 let data = sse_data(&body);
2703 let first: Value = serde_json::from_str(data[0]).expect("first chunk should be JSON");
2704 let second: Value = serde_json::from_str(data[1]).expect("second chunk should be JSON");
2705
2706 assert_eq!(first["choices"][0]["delta"]["role"], "assistant");
2707 assert_eq!(first["choices"][0]["delta"]["content"], "Hello");
2708 assert!(second["choices"][0]["delta"].get("role").is_none());
2709 assert_eq!(second["choices"][0]["delta"]["content"], " world");
2710 assert_eq!(data.last().copied(), Some("[DONE]"));
2711 }
2712
2713 #[tokio::test]
2714 async fn chat_route_passes_through_usage_chunk_when_requested_and_upstream_provides_it() {
2715 let response = streaming_chat_response(
2716 "chat-route-usage",
2717 r#"{"model":"e2ee-test","messages":[{"role":"user","content":"hello"}],"stream":true,"stream_options":{"include_usage":true}}"#,
2718 vec![
2719 MockStreamFrame::Text("Hello"),
2720 MockStreamFrame::Finish("stop"),
2721 MockStreamFrame::Usage,
2722 MockStreamFrame::Done,
2723 ],
2724 )
2725 .await;
2726
2727 assert_eq!(response.status(), StatusCode::OK);
2728 let body = response_body(response).await;
2729 let data = sse_data(&body);
2730 assert_eq!(data.len(), 4);
2731 let usage_chunk: Value = serde_json::from_str(data[2]).expect("usage chunk should be JSON");
2732 assert_eq!(usage_chunk["choices"], json!([]));
2733 assert_eq!(usage_chunk["usage"]["total_tokens"], 3);
2734 assert_eq!(data[3], "[DONE]");
2735 }
2736
2737 #[tokio::test]
2738 async fn chat_route_returns_buffered_non_streaming_completion() {
2739 let response = chat_response(
2740 "chat-route-non-streaming-success",
2741 r#"{"model":"e2ee-test","messages":[{"role":"user","content":"hello"}],"stream":false}"#,
2742 vec![
2743 MockStreamFrame::NullContent,
2744 MockStreamFrame::EmptyContent,
2745 MockStreamFrame::Text("Hello"),
2746 MockStreamFrame::Text(" world"),
2747 MockStreamFrame::Finish("stop"),
2748 MockStreamFrame::Done,
2749 ],
2750 )
2751 .await;
2752
2753 assert_eq!(response.status(), StatusCode::OK);
2754 assert_eq!(
2755 response.headers().get(HEADER_PROXY_E2EE).unwrap(),
2756 "verified"
2757 );
2758 let body = json_body(response).await;
2759 assert_eq!(body["object"], "chat.completion");
2760 assert_eq!(body["id"], "chatcmpl-upstream-test");
2761 assert_eq!(body["created"], 1_717_171_717);
2762 assert_eq!(body["model"], "e2ee-test");
2763 assert_eq!(body["choices"][0]["index"], 0);
2764 assert_eq!(body["choices"][0]["message"]["role"], "assistant");
2765 assert_eq!(body["choices"][0]["message"]["content"], "Hello world");
2766 assert_eq!(body["choices"][0]["finish_reason"], "stop");
2767 assert!(body["usage"].is_null());
2768 }
2769
2770 #[tokio::test]
2771 async fn chat_route_returns_buffered_reasoning_content() {
2772 let response = chat_response(
2773 "chat-route-reasoning-non-streaming",
2774 r#"{"model":"e2ee-test","messages":[{"role":"user","content":"hello"}],"stream":false,"reasoning_effort":"medium"}"#,
2775 vec![
2776 MockStreamFrame::Reasoning("Think "),
2777 MockStreamFrame::Reasoning("first."),
2778 MockStreamFrame::Text("Answer"),
2779 MockStreamFrame::Finish("stop"),
2780 MockStreamFrame::Done,
2781 ],
2782 )
2783 .await;
2784
2785 assert_eq!(response.status(), StatusCode::OK);
2786 let body = json_body(response).await;
2787 assert_eq!(
2788 body["choices"][0]["message"]["reasoning_content"],
2789 "Think first."
2790 );
2791 assert_eq!(body["choices"][0]["message"]["content"], "Answer");
2792 }
2793
2794 #[tokio::test]
2795 async fn chat_route_treats_omitted_stream_as_buffered_non_streaming() {
2796 let response = chat_response(
2797 "chat-route-omitted-stream",
2798 r#"{"model":"e2ee-test","messages":[{"role":"user","content":"hello"}]}"#,
2799 vec![MockStreamFrame::Text("Hello"), MockStreamFrame::Done],
2800 )
2801 .await;
2802
2803 assert_eq!(response.status(), StatusCode::OK);
2804 let body = json_body(response).await;
2805 assert_eq!(body["object"], "chat.completion");
2806 assert_eq!(body["choices"][0]["message"]["content"], "Hello");
2807 assert_eq!(body["choices"][0]["finish_reason"], "stop");
2808 }
2809
2810 #[tokio::test]
2811 async fn chat_route_streams_incremental_tool_call_chunks() {
2812 let response = streaming_chat_response(
2813 "chat-route-tool-stream",
2814 r#"{"model":"e2ee-test","messages":[{"role":"user","content":"search"}],"stream":true,"tools":[{"type":"function","function":{"name":"search_web","parameters":{"type":"object","properties":{"query":{"type":"string"}},"required":["query"],"additionalProperties":false}}}]}"#,
2815 vec![
2816 MockStreamFrame::Text("<tool_call>\n{\"name\":\"search_web\",\"arguments\":{\"query\":\"example\"}}\n</tool_call>"),
2817 MockStreamFrame::Finish("stop"),
2818 MockStreamFrame::Done,
2819 ],
2820 )
2821 .await;
2822
2823 assert_eq!(response.status(), StatusCode::OK);
2824 let body = response_body(response).await;
2825 let chunks = sse_json_chunks(&body);
2826
2827 assert_eq!(chunks[0]["choices"][0]["delta"]["role"], "assistant");
2828
2829 let tool_calls = streamed_tool_call_deltas(&chunks);
2830 assert!(!tool_calls.is_empty());
2831 let first = tool_calls[0];
2832 assert_eq!(first["index"], 0);
2833 assert!(first["id"].as_str().unwrap().starts_with("call_"));
2834 assert_eq!(first["type"], "function");
2835 assert_eq!(first["function"]["name"], "search_web");
2836 for later in &tool_calls[1..] {
2837 assert!(later.get("id").is_none());
2838 assert!(later.get("type").is_none());
2839 assert!(later["function"].get("name").is_none());
2840 }
2841 assert_eq!(
2842 streamed_tool_call_arguments(&chunks, 0),
2843 r#"{"query":"example"}"#
2844 );
2845
2846 let final_chunk = chunks.last().expect("stream should have chunks");
2847 assert_eq!(final_chunk["choices"][0]["delta"], json!({}));
2848 assert_eq!(final_chunk["choices"][0]["finish_reason"], "tool_calls");
2849 }
2850
2851 #[tokio::test]
2852 async fn chat_route_streams_text_then_incremental_tool_call() {
2853 let response = streaming_chat_response(
2854 "chat-route-tool-stream-mixed-text",
2855 r#"{"model":"e2ee-test","messages":[{"role":"user","content":"search"}],"stream":true,"tools":[{"type":"function","function":{"name":"search_web","parameters":{"type":"object","properties":{"query":{"type":"string"}},"required":["query"],"additionalProperties":false}}}]}"#,
2856 vec![
2857 MockStreamFrame::NullContent,
2858 MockStreamFrame::EmptyContent,
2859 MockStreamFrame::Text("I'll check that. "),
2860 MockStreamFrame::Text("<tool_call>{\"name\":\"search_web\",\"arguments\":{\"query\":\"example\"}}"),
2861 MockStreamFrame::Text("</tool_call>"),
2862 MockStreamFrame::Finish("stop"),
2863 MockStreamFrame::Done,
2864 ],
2865 )
2866 .await;
2867
2868 assert_eq!(response.status(), StatusCode::OK);
2869 let body = response_body(response).await;
2870 let chunks = sse_json_chunks(&body);
2871
2872 assert_eq!(chunks[0]["choices"][0]["delta"]["role"], "assistant");
2873 assert_eq!(streamed_content(&chunks), "I'll check that. ");
2874
2875 let tool_calls = streamed_tool_call_deltas(&chunks);
2876 assert!(!tool_calls.is_empty());
2877 assert_eq!(tool_calls[0]["function"]["name"], "search_web");
2878 assert_eq!(
2879 streamed_tool_call_arguments(&chunks, 0),
2880 r#"{"query":"example"}"#
2881 );
2882
2883 let final_chunk = chunks.last().expect("stream should have chunks");
2884 assert_eq!(final_chunk["choices"][0]["finish_reason"], "tool_calls");
2885 }
2886
2887 #[tokio::test]
2888 async fn chat_route_fails_closed_on_unterminated_streamed_tool_call() {
2889 let response = streaming_chat_response(
2892 "chat-route-tool-stream-missing-close",
2893 r#"{"model":"e2ee-test","messages":[{"role":"user","content":"search"}],"stream":true,"tools":[{"type":"function","function":{"name":"search_web","parameters":{"type":"object","properties":{"query":{"type":"string"}},"required":["query"],"additionalProperties":false}}}]}"#,
2894 vec![
2895 MockStreamFrame::Text("I'll check that. "),
2896 MockStreamFrame::Text("<tool_call>{\"name\":"),
2897 MockStreamFrame::Finish("stop"),
2898 MockStreamFrame::Done,
2899 ],
2900 )
2901 .await;
2902
2903 assert_stream_body_fails(response).await;
2904 }
2905
2906 #[tokio::test]
2907 async fn chat_route_streams_hermes_format_tool_call_from_glm_model() {
2908 let response = streaming_chat_response(
2911 "chat-route-tool-stream-glm-hermes",
2912 r#"{"model":"e2ee-glm-5-1","messages":[{"role":"user","content":"search"}],"stream":true,"tools":[{"type":"function","function":{"name":"search_web","parameters":{"type":"object","properties":{"query":{"type":"string"}},"required":["query"],"additionalProperties":false}}}]}"#,
2913 vec![
2914 MockStreamFrame::Text("<tool_call>\n{\"name\":\"search_web\",\"arguments\":"),
2915 MockStreamFrame::Text("{\"query\":\"example\"}}\n</tool_call>"),
2916 MockStreamFrame::Finish("stop"),
2917 MockStreamFrame::Done,
2918 ],
2919 )
2920 .await;
2921
2922 assert_eq!(response.status(), StatusCode::OK);
2923 let body = response_body(response).await;
2924 let chunks = sse_json_chunks(&body);
2925
2926 let tool_calls = streamed_tool_call_deltas(&chunks);
2927 assert!(!tool_calls.is_empty());
2928 assert_eq!(tool_calls[0]["function"]["name"], "search_web");
2929 assert_eq!(
2930 streamed_tool_call_arguments(&chunks, 0),
2931 r#"{"query":"example"}"#
2932 );
2933
2934 let final_chunk = chunks.last().expect("stream should have chunks");
2935 assert_eq!(final_chunk["choices"][0]["finish_reason"], "tool_calls");
2936 }
2937
2938 #[tokio::test]
2939 async fn chat_route_recovers_streamed_tool_call_with_truncated_closing_marker() {
2940 let response = streaming_chat_response(
2943 "chat-route-tool-stream-truncated-close",
2944 r#"{"model":"e2ee-glm-4-7-flash-p","messages":[{"role":"user","content":"search"}],"stream":true,"tools":[{"type":"function","function":{"name":"search_web","parameters":{"type":"object","properties":{"query":{"type":"string"}},"required":["query"],"additionalProperties":false}}}]}"#,
2945 vec![
2946 MockStreamFrame::Text("<tool_call>\n{\"name\":\"search_web\",\"arguments\":{\"query\":\"example\"}}\n"),
2947 MockStreamFrame::Finish("stop"),
2948 MockStreamFrame::Done,
2949 ],
2950 )
2951 .await;
2952
2953 assert_eq!(response.status(), StatusCode::OK);
2954 let body = response_body(response).await;
2955 let chunks = sse_json_chunks(&body);
2956
2957 let tool_calls = streamed_tool_call_deltas(&chunks);
2958 assert!(!tool_calls.is_empty());
2959 assert_eq!(tool_calls[0]["function"]["name"], "search_web");
2960 assert_eq!(
2961 streamed_tool_call_arguments(&chunks, 0),
2962 r#"{"query":"example"}"#
2963 );
2964
2965 let final_chunk = chunks.last().expect("stream should have chunks");
2966 assert_eq!(final_chunk["choices"][0]["finish_reason"], "tool_calls");
2967 }
2968
2969 #[tokio::test]
2970 async fn chat_route_streams_multiple_tool_calls_split_across_chunks() {
2971 let response = streaming_chat_response(
2972 "chat-route-tool-stream-multiple-calls",
2973 r#"{"model":"e2ee-test","messages":[{"role":"user","content":"search"}],"stream":true,"tools":[{"type":"function","function":{"name":"search_web","parameters":{"type":"object","properties":{"query":{"type":"string"}},"required":["query"],"additionalProperties":false}}}]}"#,
2974 vec![
2975 MockStreamFrame::Text("<tool_call>{\"name\":\"search_web\",\"arguments\":{\"query\":\"first\"}}"),
2976 MockStreamFrame::Text("</tool_call><tool_call>{\"name\":\"search_web\",\"arguments\":"),
2977 MockStreamFrame::Text("{\"query\":\"second\"}}</tool_call>"),
2978 MockStreamFrame::Finish("stop"),
2979 MockStreamFrame::Done,
2980 ],
2981 )
2982 .await;
2983
2984 assert_eq!(response.status(), StatusCode::OK);
2985 let body = response_body(response).await;
2986 let chunks = sse_json_chunks(&body);
2987
2988 assert_eq!(chunks[0]["choices"][0]["delta"]["role"], "assistant");
2989 let tool_calls = streamed_tool_call_deltas(&chunks);
2990 let first = tool_calls
2991 .iter()
2992 .find(|tool_call| tool_call["index"] == 0 && tool_call.get("id").is_some())
2993 .expect("first call should have an id-bearing fragment");
2994 let second = tool_calls
2995 .iter()
2996 .find(|tool_call| tool_call["index"] == 1 && tool_call.get("id").is_some())
2997 .expect("second call should have an id-bearing fragment");
2998 assert_eq!(first["function"]["name"], "search_web");
2999 assert_eq!(second["function"]["name"], "search_web");
3000 assert_ne!(first["id"], second["id"]);
3001 assert_eq!(
3002 streamed_tool_call_arguments(&chunks, 0),
3003 r#"{"query":"first"}"#
3004 );
3005 assert_eq!(
3006 streamed_tool_call_arguments(&chunks, 1),
3007 r#"{"query":"second"}"#
3008 );
3009
3010 let final_chunk = chunks.last().expect("stream should have chunks");
3011 assert_eq!(final_chunk["choices"][0]["delta"], json!({}));
3012 assert_eq!(final_chunk["choices"][0]["finish_reason"], "tool_calls");
3013 }
3014
3015 #[tokio::test]
3016 async fn chat_route_tool_stream_passes_through_usage_chunk_when_requested() {
3017 let response = streaming_chat_response(
3018 "chat-route-tool-stream-usage",
3019 r#"{"model":"e2ee-test","messages":[{"role":"user","content":"search"}],"stream":true,"stream_options":{"include_usage":true},"tools":[{"type":"function","function":{"name":"search_web","parameters":{"type":"object","properties":{"query":{"type":"string"}},"required":["query"],"additionalProperties":false}}}]}"#,
3020 vec![
3021 MockStreamFrame::Text("<tool_call>{\"name\":\"search_web\",\"arguments\":{\"query\":\"example\"}}</tool_call>"),
3022 MockStreamFrame::Finish("stop"),
3023 MockStreamFrame::Usage,
3024 MockStreamFrame::Done,
3025 ],
3026 )
3027 .await;
3028
3029 assert_eq!(response.status(), StatusCode::OK);
3030 let body = response_body(response).await;
3031 let chunks = sse_json_chunks(&body);
3032
3033 let usage_chunk = chunks.last().expect("stream should have chunks");
3035 assert_eq!(usage_chunk["choices"], json!([]));
3036 assert_eq!(usage_chunk["usage"]["total_tokens"], 3);
3037 let finish_chunk = &chunks[chunks.len() - 2];
3038 assert_eq!(finish_chunk["choices"][0]["finish_reason"], "tool_calls");
3039 }
3040
3041 #[tokio::test]
3042 async fn chat_route_fails_closed_when_streamed_tool_call_exceeds_max_bytes() {
3043 let model_public_key = ProxyInstanceKey::generate().public_key_hex().to_owned();
3044 let base_url = spawn_streaming_venice_server(
3045 model_public_key,
3046 true,
3047 vec![
3048 MockStreamFrame::Text("<tool_call>{\"name\":\"search_web\",\"arguments\":{\"query\":\"this argument body is much longer than the configured cap\"}}</tool_call>"),
3049 MockStreamFrame::Finish("stop"),
3050 MockStreamFrame::Done,
3051 ],
3052 )
3053 .await;
3054 let mut config = chat_config_with_basic_test_attestation();
3055 config.tools.tool_call_max_bytes = 16;
3056
3057 let response = request_chat_with_config(
3058 config,
3059 "chat-route-tool-stream-max-bytes",
3060 r#"{"model":"e2ee-test","messages":[{"role":"user","content":"search"}],"stream":true,"tools":[{"type":"function","function":{"name":"search_web","parameters":{"type":"object","properties":{"query":{"type":"string"}},"required":["query"],"additionalProperties":false}}}]}"#,
3061 base_url,
3062 )
3063 .await;
3064
3065 assert_stream_body_fails(response).await;
3066 }
3067
3068 #[tokio::test]
3069 async fn chat_route_streams_all_tool_calls_when_parallel_tool_calls_false() {
3070 let response = streaming_chat_response(
3073 "chat-route-tool-stream-parallel-false",
3074 r#"{"model":"e2ee-test","messages":[{"role":"user","content":"search"}],"stream":true,"parallel_tool_calls":false,"tools":[{"type":"function","function":{"name":"search_web","parameters":{"type":"object","properties":{"query":{"type":"string"}},"required":["query"],"additionalProperties":false}}}]}"#,
3075 vec![
3076 MockStreamFrame::Text("<tool_call>{\"name\":\"search_web\",\"arguments\":{\"query\":\"first\"}}</tool_call>"),
3077 MockStreamFrame::Text("<tool_call>{\"name\":\"search_web\",\"arguments\":{\"query\":\"second\"}}</tool_call>"),
3078 MockStreamFrame::Finish("stop"),
3079 MockStreamFrame::Done,
3080 ],
3081 )
3082 .await;
3083
3084 assert_eq!(response.status(), StatusCode::OK);
3085 let body = response_body(response).await;
3086 let chunks = sse_json_chunks(&body);
3087
3088 assert_eq!(
3089 streamed_tool_call_arguments(&chunks, 0),
3090 r#"{"query":"first"}"#
3091 );
3092 assert_eq!(
3093 streamed_tool_call_arguments(&chunks, 1),
3094 r#"{"query":"second"}"#
3095 );
3096
3097 let final_chunk = chunks.last().expect("stream should have chunks");
3098 assert_eq!(final_chunk["choices"][0]["finish_reason"], "tool_calls");
3099 }
3100
3101 #[tokio::test]
3102 async fn chat_route_returns_non_streaming_tool_call_body_from_mixed_text() {
3103 let response = chat_response(
3104 "chat-route-tool-non-stream-mixed-text",
3105 r#"{"model":"e2ee-test","messages":[{"role":"user","content":"search"}],"stream":false,"tools":[{"type":"function","function":{"name":"search_web","parameters":{"type":"object","properties":{"query":{"type":"string"}},"required":["query"]}}}]}"#,
3106 vec![
3107 MockStreamFrame::Text("I'll check that. <tool_call>{\"name\":\"search_web\",\"arguments\":{\"query\":\"example\"}}</tool_call>"),
3108 MockStreamFrame::Done,
3109 ],
3110 )
3111 .await;
3112
3113 assert_eq!(response.status(), StatusCode::OK);
3114 let body = json_body(response).await;
3115 assert_eq!(body["choices"][0]["finish_reason"], "tool_calls");
3116 let tool_call = &body["choices"][0]["message"]["tool_calls"][0];
3117 assert_eq!(tool_call["function"]["name"], "search_web");
3118 assert_eq!(tool_call["function"]["arguments"], r#"{"query":"example"}"#);
3119 }
3120
3121 #[tokio::test]
3122 async fn chat_route_returns_non_streaming_tool_call_body() {
3123 let response = chat_response(
3124 "chat-route-tool-non-stream",
3125 r#"{"model":"e2ee-test","messages":[{"role":"user","content":"search"}],"stream":false,"tools":[{"type":"function","function":{"name":"search_web","parameters":{"type":"object","properties":{"query":{"type":"string"}},"required":["query"]}}}]}"#,
3126 vec![
3127 MockStreamFrame::Text("<tool_call>{\"name\":\"search_web\",\"arguments\":{\"query\":\"example\"}}</tool_call>"),
3128 MockStreamFrame::Done,
3129 ],
3130 )
3131 .await;
3132
3133 assert_eq!(response.status(), StatusCode::OK);
3134 let body = json_body(response).await;
3135 assert_eq!(body["object"], "chat.completion");
3136 assert!(body["choices"][0]["message"]["content"].is_null());
3137 assert_eq!(body["choices"][0]["finish_reason"], "tool_calls");
3138 let tool_call = &body["choices"][0]["message"]["tool_calls"][0];
3139 assert!(tool_call["id"].as_str().unwrap().starts_with("call_"));
3140 assert_eq!(tool_call["type"], "function");
3141 assert_eq!(tool_call["function"]["name"], "search_web");
3142 assert_eq!(tool_call["function"]["arguments"], r#"{"query":"example"}"#);
3143 }
3144
3145 #[tokio::test]
3146 async fn chat_route_returns_non_streaming_multiple_tool_calls() {
3147 let response = chat_response(
3148 "chat-route-tool-non-stream-multiple-calls",
3149 r#"{"model":"e2ee-test","messages":[{"role":"user","content":"search"}],"stream":false,"tools":[{"type":"function","function":{"name":"search_web","parameters":{"type":"object","properties":{"query":{"type":"string"}},"required":["query"]}}}]}"#,
3150 vec![
3151 MockStreamFrame::Text("<tool_call>{\"name\":\"search_web\",\"arguments\":{\"query\":\"first\"}}</tool_call>\n<tool_call>{\"name\":\"search_web\",\"arguments\":{\"query\":\"second\"}}</tool_call>"),
3152 MockStreamFrame::Done,
3153 ],
3154 )
3155 .await;
3156
3157 assert_eq!(response.status(), StatusCode::OK);
3158 let body = json_body(response).await;
3159 assert_eq!(body["choices"][0]["finish_reason"], "tool_calls");
3160 assert!(body["choices"][0]["message"]["content"].is_null());
3161 let tool_calls = body["choices"][0]["message"]["tool_calls"]
3162 .as_array()
3163 .expect("tool_calls should be an array");
3164 assert_eq!(tool_calls.len(), 2);
3165 assert_eq!(tool_calls[0]["function"]["name"], "search_web");
3166 assert_eq!(
3167 tool_calls[0]["function"]["arguments"],
3168 r#"{"query":"first"}"#
3169 );
3170 assert_eq!(
3171 tool_calls[1]["function"]["arguments"],
3172 r#"{"query":"second"}"#
3173 );
3174 assert_ne!(tool_calls[0]["id"], tool_calls[1]["id"]);
3175 }
3176
3177 #[tokio::test]
3178 async fn chat_route_tool_mode_leaves_normal_text_unaffected() {
3179 let response = streaming_chat_response(
3180 "chat-route-tool-normal-text",
3181 r#"{"model":"e2ee-test","messages":[{"role":"user","content":"hello"}],"stream":true,"tools":[{"type":"function","function":{"name":"search_web","parameters":{"type":"object"}}}]}"#,
3182 vec![
3183 MockStreamFrame::Text("Hello without tools"),
3184 MockStreamFrame::Finish("stop"),
3185 MockStreamFrame::Done,
3186 ],
3187 )
3188 .await;
3189
3190 assert_eq!(response.status(), StatusCode::OK);
3191 let body = response_body(response).await;
3192 let chunks = sse_json_chunks(&body);
3193 assert_eq!(chunks[0]["choices"][0]["delta"]["role"], "assistant");
3194 assert_eq!(streamed_content(&chunks), "Hello without tools");
3195 assert!(streamed_tool_call_deltas(&chunks).is_empty());
3196 }
3197
3198 #[tokio::test]
3199 async fn chat_route_treats_marker_like_non_protocol_text_as_normal_text() {
3200 let response = streaming_chat_response(
3201 "chat-route-tool-marker-like-text",
3202 r#"{"model":"e2ee-test","messages":[{"role":"user","content":"hello"}],"stream":true,"tools":[{"type":"function","function":{"name":"search_web","parameters":{"type":"object"}}}]}"#,
3203 vec![
3204 MockStreamFrame::Text("<tool_cal>{not actually a marker}"),
3205 MockStreamFrame::Finish("stop"),
3206 MockStreamFrame::Done,
3207 ],
3208 )
3209 .await;
3210
3211 assert_eq!(response.status(), StatusCode::OK);
3212 let body = response_body(response).await;
3213 let chunks = sse_json_chunks(&body);
3214 assert_eq!(
3215 streamed_content(&chunks),
3216 "<tool_cal>{not actually a marker}"
3217 );
3218 assert!(streamed_tool_call_deltas(&chunks).is_empty());
3219 }
3220
3221 #[tokio::test]
3222 async fn chat_route_retries_invalid_tool_call_and_returns_success() {
3223 let response = chat_response_sequence(
3224 "chat-route-tool-retry-success",
3225 r#"{"model":"e2ee-test","messages":[{"role":"user","content":"search"}],"stream":false,"tools":[{"type":"function","function":{"name":"search_web","parameters":{"type":"object","properties":{"query":{"type":"string"}},"required":["query"]}}}]}"#,
3226 vec![
3227 vec![
3228 MockStreamFrame::Text("<tool_call>{\"name\":\"unknown\",\"arguments\":{\"query\":\"example\"}}</tool_call>"),
3229 MockStreamFrame::Done,
3230 ],
3231 vec![
3232 MockStreamFrame::Text("<tool_call>{\"name\":\"search_web\",\"arguments\":{\"query\":\"example\"}}</tool_call>"),
3233 MockStreamFrame::Done,
3234 ],
3235 ],
3236 )
3237 .await;
3238
3239 assert_eq!(response.status(), StatusCode::OK);
3240 assert_eq!(
3241 response.headers().get(HEADER_PROXY_TOOL_RETRIES).unwrap(),
3242 "1"
3243 );
3244 let body = json_body(response).await;
3245 assert_eq!(body["choices"][0]["finish_reason"], "tool_calls");
3246 assert_eq!(
3247 body["choices"][0]["message"]["tool_calls"][0]["function"]["name"],
3248 "search_web"
3249 );
3250 }
3251
3252 #[tokio::test]
3253 async fn chat_route_returns_retry_failure_error_shape() {
3254 let response = chat_response(
3255 "chat-route-tool-retry-failure",
3256 r#"{"model":"e2ee-test","messages":[{"role":"user","content":"search"}],"stream":false,"tools":[{"type":"function","function":{"name":"search_web","parameters":{"type":"object","properties":{"query":{"type":"string"}},"required":["query"]}}}]}"#,
3257 vec![
3258 MockStreamFrame::Text("<tool_call>{\"name\":\"unknown\",\"arguments\":{}}</tool_call>"),
3259 MockStreamFrame::Done,
3260 ],
3261 )
3262 .await;
3263
3264 assert_eq!(response.status(), StatusCode::BAD_GATEWAY);
3265 assert_eq!(
3266 response.headers().get(HEADER_PROXY_ERROR_CODE).unwrap(),
3267 "invalid_tool_call"
3268 );
3269 let body = json_body(response).await;
3270 assert_eq!(body["error"]["type"], "proxy_tool_call_error");
3271 assert_eq!(body["error"]["code"], "invalid_tool_call");
3272 assert_eq!(body["error"]["details"]["max_retries"], 2);
3273 assert!(
3274 body["error"]["details"]["last_validation_error"]
3275 .as_str()
3276 .unwrap()
3277 .contains("unknown tool name")
3278 );
3279 }
3280
3281 #[tokio::test]
3282 async fn chat_route_non_streaming_fails_closed_on_upstream_error_response() {
3283 let response = chat_response_with_upstream_status(
3284 "chat-route-non-streaming-upstream-error",
3285 r#"{"model":"e2ee-test","messages":[{"role":"user","content":"hello"}],"stream":false}"#,
3286 StatusCode::INTERNAL_SERVER_ERROR,
3287 )
3288 .await;
3289
3290 assert_eq!(response.status(), StatusCode::BAD_GATEWAY);
3291 assert_eq!(
3292 response.headers().get(HEADER_PROXY_ERROR_CODE).unwrap(),
3293 "upstream_status_error"
3294 );
3295 let body = error_body(response).await;
3296 assert_eq!(body.error.kind, "proxy_upstream_error");
3297 assert_eq!(body.error.code, "upstream_status_error");
3298 }
3299
3300 #[tokio::test]
3301 async fn chat_route_non_streaming_fails_closed_on_malformed_upstream_payload() {
3302 let response = chat_response(
3303 "chat-route-non-streaming-malformed",
3304 r#"{"model":"e2ee-test","messages":[{"role":"user","content":"hello"}],"stream":false}"#,
3305 vec![MockStreamFrame::Raw("data: {\"choices\":\"bad\"}\n\n")],
3306 )
3307 .await;
3308
3309 assert_eq!(response.status(), StatusCode::BAD_GATEWAY);
3310 assert_eq!(
3311 response.headers().get(HEADER_PROXY_ERROR_CODE).unwrap(),
3312 "upstream_malformed_response"
3313 );
3314 let body = error_body(response).await;
3315 assert_eq!(body.error.kind, "proxy_upstream_error");
3316 assert_eq!(body.error.code, "upstream_malformed_response");
3317 }
3318
3319 #[tokio::test]
3320 async fn chat_route_non_streaming_fails_closed_on_missing_encrypted_content() {
3321 let response = chat_response(
3322 "chat-route-non-streaming-missing-content",
3323 r#"{"model":"e2ee-test","messages":[{"role":"user","content":"hello"}],"stream":false}"#,
3324 vec![MockStreamFrame::Finish("stop"), MockStreamFrame::Done],
3325 )
3326 .await;
3327
3328 assert_eq!(response.status(), StatusCode::BAD_GATEWAY);
3329 assert_eq!(
3330 response.headers().get(HEADER_PROXY_ERROR_CODE).unwrap(),
3331 "e2ee_response_decryption_failed"
3332 );
3333 let body = error_body(response).await;
3334 assert_eq!(body.error.kind, "proxy_e2ee_error");
3335 assert_eq!(body.error.code, "e2ee_response_decryption_failed");
3336 }
3337
3338 #[tokio::test]
3339 async fn chat_route_non_streaming_fails_closed_on_decryption_failure() {
3340 let response = chat_response(
3341 "chat-route-non-streaming-decryption-failure",
3342 r#"{"model":"e2ee-test","messages":[{"role":"user","content":"hello"}],"stream":false}"#,
3343 vec![MockStreamFrame::TextForWrongRecipient(" secret"), MockStreamFrame::Done],
3344 )
3345 .await;
3346
3347 assert_eq!(response.status(), StatusCode::BAD_GATEWAY);
3348 assert_eq!(
3349 response.headers().get(HEADER_PROXY_ERROR_CODE).unwrap(),
3350 "e2ee_response_decryption_failed"
3351 );
3352 let body = error_body(response).await;
3353 assert_eq!(body.error.kind, "proxy_e2ee_error");
3354 assert_eq!(body.error.code, "e2ee_response_decryption_failed");
3355 }
3356
3357 #[tokio::test]
3358 async fn chat_route_non_streaming_passes_through_usage_when_available() {
3359 let response = chat_response(
3360 "chat-route-non-streaming-usage",
3361 r#"{"model":"e2ee-test","messages":[{"role":"user","content":"hello"}],"stream":false}"#,
3362 vec![
3363 MockStreamFrame::Text("Hello"),
3364 MockStreamFrame::Finish("stop"),
3365 MockStreamFrame::Usage,
3366 MockStreamFrame::Done,
3367 ],
3368 )
3369 .await;
3370
3371 assert_eq!(response.status(), StatusCode::OK);
3372 let body = json_body(response).await;
3373 assert_eq!(body["choices"][0]["message"]["content"], "Hello");
3374 assert_eq!(body["usage"]["prompt_tokens"], 1);
3375 assert_eq!(body["usage"]["completion_tokens"], 2);
3376 assert_eq!(body["usage"]["total_tokens"], 3);
3377 }
3378
3379 #[tokio::test]
3380 async fn chat_route_fails_closed_on_upstream_stream_error_event() {
3381 let response = streaming_chat_response(
3382 "chat-route-upstream-error",
3383 r#"{"model":"e2ee-test","messages":[{"role":"user","content":"hello"}],"stream":true}"#,
3384 vec![MockStreamFrame::Error("model failed")],
3385 )
3386 .await;
3387
3388 assert_stream_body_fails(response).await;
3389 }
3390
3391 #[tokio::test]
3392 async fn chat_route_fails_closed_on_malformed_upstream_event() {
3393 let response = streaming_chat_response(
3394 "chat-route-malformed-event",
3395 r#"{"model":"e2ee-test","messages":[{"role":"user","content":"hello"}],"stream":true}"#,
3396 vec![MockStreamFrame::Raw("data: {\"choices\":\n\n")],
3397 )
3398 .await;
3399
3400 assert_stream_body_fails(response).await;
3401 }
3402
3403 #[tokio::test]
3404 async fn chat_route_fails_closed_on_decryption_failure_mid_stream() {
3405 let response = streaming_chat_response(
3406 "chat-route-decryption-failure",
3407 r#"{"model":"e2ee-test","messages":[{"role":"user","content":"hello"}],"stream":true}"#,
3408 vec![
3409 MockStreamFrame::Text("Hello"),
3410 MockStreamFrame::TextForWrongRecipient(" secret"),
3411 MockStreamFrame::Done,
3412 ],
3413 )
3414 .await;
3415
3416 assert_stream_body_fails(response).await;
3417 }
3418
3419 #[tokio::test]
3420 async fn chat_route_synthesizes_final_finish_chunk_before_done_when_needed() {
3421 let response = streaming_chat_response(
3422 "chat-route-final-done",
3423 r#"{"model":"e2ee-test","messages":[{"role":"user","content":"hello"}],"stream":true}"#,
3424 vec![MockStreamFrame::Text("Hello"), MockStreamFrame::Done],
3425 )
3426 .await;
3427
3428 assert_eq!(response.status(), StatusCode::OK);
3429 let body = response_body(response).await;
3430 let data = sse_data(&body);
3431 assert_eq!(data.len(), 3);
3432 let final_chunk: Value = serde_json::from_str(data[1]).expect("final chunk should be JSON");
3433 assert_eq!(final_chunk["choices"][0]["delta"], json!({}));
3434 assert_eq!(final_chunk["choices"][0]["finish_reason"], "stop");
3435 assert_eq!(data[2], "[DONE]");
3436 }
3437
3438 #[tokio::test]
3439 async fn chat_route_attestation_failure_prevents_request_construction() {
3440 let model_public_key = ProxyInstanceKey::generate().public_key_hex().to_owned();
3441 let base_url = spawn_attestation_server(model_public_key, false).await;
3442 let app = router_with_venice_client(
3443 chat_config_with_basic_test_attestation(),
3444 test_venice_client_for_base_url(base_url),
3445 );
3446
3447 let response = app
3448 .oneshot(
3449 Request::builder()
3450 .method(Method::POST)
3451 .uri("/v1/chat/completions")
3452 .header("content-type", "application/json")
3453 .header(HEADER_PROXY_SESSION_ID, "chat-route-attestation-failure")
3454 .body(Body::from(
3455 r#"{"model":"e2ee-test","messages":[{"role":"user","content":"hello"}],"stream":false}"#,
3456 ))
3457 .expect("request should build"),
3458 )
3459 .await
3460 .expect("request should complete");
3461
3462 assert_eq!(response.status(), StatusCode::BAD_GATEWAY);
3463 assert_eq!(
3464 response.headers().get(HEADER_PROXY_ERROR_CODE).unwrap(),
3465 "attestation_upstream_not_verified"
3466 );
3467 let body = error_body(response).await;
3468 assert_eq!(body.error.kind, "proxy_attestation_error");
3469 assert_eq!(body.error.code, "attestation_upstream_not_verified");
3470 }
3471
3472 #[tokio::test]
3473 async fn unknown_route_returns_openai_style_not_found() {
3474 let response = test_app()
3475 .oneshot(
3476 Request::builder()
3477 .uri("/v1/unknown")
3478 .body(Body::empty())
3479 .expect("request should build"),
3480 )
3481 .await
3482 .expect("request should complete");
3483
3484 assert_eq!(response.status(), StatusCode::NOT_FOUND);
3485 assert_eq!(
3486 response.headers().get(HEADER_PROXY_ERROR_CODE).unwrap(),
3487 "not_found"
3488 );
3489 let body = error_body(response).await;
3490 assert_eq!(body.error.kind, "invalid_request_error");
3491 assert_eq!(body.error.code, "not_found");
3492 }
3493
3494 #[tokio::test]
3495 async fn unsupported_method_returns_openai_style_method_error() {
3496 let response = test_app()
3497 .oneshot(
3498 Request::builder()
3499 .method(Method::POST)
3500 .uri("/v1/models")
3501 .body(Body::empty())
3502 .expect("request should build"),
3503 )
3504 .await
3505 .expect("request should complete");
3506
3507 assert_eq!(response.status(), StatusCode::METHOD_NOT_ALLOWED);
3508 assert_eq!(
3509 response.headers().get(HEADER_PROXY_ERROR_CODE).unwrap(),
3510 "method_not_allowed"
3511 );
3512 let body = error_body(response).await;
3513 assert_eq!(body.error.kind, "invalid_request_error");
3514 assert_eq!(body.error.code, "method_not_allowed");
3515 }
3516
3517 #[tokio::test]
3518 async fn malformed_chat_json_uses_axum_extractor_rejection() {
3519 let response = test_app()
3520 .oneshot(
3521 Request::builder()
3522 .method(Method::POST)
3523 .uri("/v1/chat/completions")
3524 .header("content-type", "application/json")
3525 .body(Body::from("{"))
3526 .expect("request should build"),
3527 )
3528 .await
3529 .expect("request should complete");
3530
3531 assert_eq!(response.status(), StatusCode::BAD_REQUEST);
3532 assert!(response.headers().get(HEADER_PROXY_ERROR_CODE).is_none());
3533 }
3534
3535 #[tokio::test]
3536 async fn non_object_chat_json_returns_structured_invalid_request() {
3537 let response = test_app()
3538 .oneshot(
3539 Request::builder()
3540 .method(Method::POST)
3541 .uri("/v1/chat/completions")
3542 .header("content-type", "application/json")
3543 .body(Body::from("[]"))
3544 .expect("request should build"),
3545 )
3546 .await
3547 .expect("request should complete");
3548
3549 assert_eq!(response.status(), StatusCode::BAD_REQUEST);
3550 assert_eq!(
3551 response.headers().get(HEADER_PROXY_ERROR_CODE).unwrap(),
3552 "invalid_request"
3553 );
3554 let body = error_body(response).await;
3555 assert_eq!(body.error.kind, "invalid_request_error");
3556 assert_eq!(body.error.code, "invalid_request");
3557 }
3558
3559 #[derive(Debug, Clone)]
3560 enum MockStreamFrame {
3561 Role,
3562 NullContent,
3563 EmptyContent,
3564 Text(&'static str),
3565 Reasoning(&'static str),
3566 TextForWrongRecipient(&'static str),
3567 Finish(&'static str),
3568 Usage,
3569 Done,
3570 Error(&'static str),
3571 Raw(&'static str),
3572 }
3573
3574 async fn streaming_chat_response(
3575 session_id: &'static str,
3576 request_body: &'static str,
3577 frames: Vec<MockStreamFrame>,
3578 ) -> Response {
3579 chat_response(session_id, request_body, frames).await
3580 }
3581
3582 async fn chat_response(
3583 session_id: &'static str,
3584 request_body: &'static str,
3585 frames: Vec<MockStreamFrame>,
3586 ) -> Response {
3587 let model_public_key = ProxyInstanceKey::generate().public_key_hex().to_owned();
3588 let base_url = spawn_streaming_venice_server(model_public_key, true, frames).await;
3589 request_chat(session_id, request_body, base_url).await
3590 }
3591
3592 async fn chat_response_sequence(
3593 session_id: &'static str,
3594 request_body: &'static str,
3595 attempts: Vec<Vec<MockStreamFrame>>,
3596 ) -> Response {
3597 let model_public_key = ProxyInstanceKey::generate().public_key_hex().to_owned();
3598 let base_url =
3599 spawn_streaming_venice_server_sequence(model_public_key, true, attempts).await;
3600 request_chat(session_id, request_body, base_url).await
3601 }
3602
3603 async fn chat_response_with_upstream_status(
3604 session_id: &'static str,
3605 request_body: &'static str,
3606 upstream_status: StatusCode,
3607 ) -> Response {
3608 let model_public_key = ProxyInstanceKey::generate().public_key_hex().to_owned();
3609 let base_url =
3610 spawn_venice_server_with_chat_status(model_public_key, upstream_status).await;
3611 request_chat(session_id, request_body, base_url).await
3612 }
3613
3614 async fn request_chat(
3615 session_id: &'static str,
3616 request_body: &'static str,
3617 base_url: String,
3618 ) -> Response {
3619 request_chat_with_config(
3620 chat_config_with_basic_test_attestation(),
3621 session_id,
3622 request_body,
3623 base_url,
3624 )
3625 .await
3626 }
3627
3628 async fn request_chat_with_config(
3629 config: ProxyConfig,
3630 session_id: &'static str,
3631 request_body: &'static str,
3632 base_url: String,
3633 ) -> Response {
3634 let app = router_with_venice_client(config, test_venice_client_for_base_url(base_url));
3635
3636 app.oneshot(
3637 Request::builder()
3638 .method(Method::POST)
3639 .uri("/v1/chat/completions")
3640 .header("content-type", "application/json")
3641 .header(HEADER_PROXY_SESSION_ID, session_id)
3642 .body(Body::from(request_body))
3643 .expect("request should build"),
3644 )
3645 .await
3646 .expect("request should complete")
3647 }
3648
3649 async fn json_body(response: Response) -> Value {
3650 let bytes = axum::body::to_bytes(response.into_body(), usize::MAX)
3651 .await
3652 .expect("response body should buffer");
3653 serde_json::from_slice(&bytes).expect("response should be JSON")
3654 }
3655
3656 async fn response_body(response: Response) -> String {
3657 let bytes = axum::body::to_bytes(response.into_body(), usize::MAX)
3658 .await
3659 .expect("response body should buffer");
3660 String::from_utf8(bytes.to_vec()).expect("response body should be UTF-8")
3661 }
3662
3663 async fn assert_stream_body_fails(response: Response) {
3664 assert_eq!(response.status(), StatusCode::OK);
3665 let result = axum::body::to_bytes(response.into_body(), usize::MAX).await;
3666 assert!(
3667 result.is_err(),
3668 "stream body should fail closed instead of completing successfully"
3669 );
3670 }
3671
3672 fn sse_data(body: &str) -> Vec<&str> {
3673 body.lines()
3674 .filter_map(|line| line.strip_prefix("data: "))
3675 .collect()
3676 }
3677
3678 fn sse_json_chunks(body: &str) -> Vec<Value> {
3680 let data = sse_data(body);
3681 assert_eq!(data.last().copied(), Some("[DONE]"));
3682 data[..data.len() - 1]
3683 .iter()
3684 .map(|chunk| serde_json::from_str(chunk).expect("SSE chunk should be JSON"))
3685 .collect()
3686 }
3687
3688 fn streamed_content(chunks: &[Value]) -> String {
3690 chunks
3691 .iter()
3692 .filter_map(|chunk| chunk["choices"][0]["delta"]["content"].as_str())
3693 .collect()
3694 }
3695
3696 fn streamed_tool_call_deltas(chunks: &[Value]) -> Vec<&Value> {
3698 chunks
3699 .iter()
3700 .filter_map(|chunk| chunk["choices"][0]["delta"]["tool_calls"].as_array())
3701 .flatten()
3702 .collect()
3703 }
3704
3705 fn streamed_tool_call_arguments(chunks: &[Value], index: u64) -> String {
3707 streamed_tool_call_deltas(chunks)
3708 .iter()
3709 .filter(|tool_call| tool_call["index"] == json!(index))
3710 .filter_map(|tool_call| tool_call["function"]["arguments"].as_str())
3711 .collect()
3712 }
3713
3714 async fn spawn_streaming_venice_server(
3715 model_public_key: String,
3716 verified: bool,
3717 frames: Vec<MockStreamFrame>,
3718 ) -> String {
3719 spawn_streaming_venice_server_sequence(model_public_key, verified, vec![frames]).await
3720 }
3721
3722 async fn spawn_streaming_venice_server_sequence(
3723 model_public_key: String,
3724 verified: bool,
3725 attempts: Vec<Vec<MockStreamFrame>>,
3726 ) -> String {
3727 let chat_attempts = Arc::new(Mutex::new(VecDeque::from(attempts)));
3728 let attestation_key = model_public_key.clone();
3729 let app = Router::new()
3730 .route(
3731 "/api/v1/tee/attestation",
3732 get(move |Query(query): Query<HashMap<String, String>>| {
3733 let model_public_key = attestation_key.clone();
3734 async move {
3735 Json(json!({
3736 "api_version": "aci/1",
3737 "attestation": {
3738 "tee_type": "tdx",
3739 "evidence": {}
3740 },
3741 "verified": verified,
3742 "nonce": query.get("nonce").cloned().unwrap_or_default(),
3743 "model": query.get("model").cloned().unwrap_or_default(),
3744 "tee_provider": "phala",
3745 "signing_public_key": model_public_key,
3746 }))
3747 }
3748 }),
3749 )
3750 .route(
3751 "/api/v1/chat/completions",
3752 post(move |headers: HeaderMap, Json(body): Json<Value>| {
3753 let chat_attempts = chat_attempts.clone();
3754 async move {
3755 let Some(client_public_key) = headers
3756 .get(crate::venice::HEADER_VENICE_TEE_CLIENT_PUB_KEY)
3757 .and_then(|value| value.to_str().ok())
3758 else {
3759 return (
3760 StatusCode::BAD_REQUEST,
3761 [("content-type", "text/plain")],
3762 "missing client key".to_owned(),
3763 );
3764 };
3765 if body.get("stream").and_then(Value::as_bool) != Some(true) {
3766 return (
3767 StatusCode::BAD_REQUEST,
3768 [("content-type", "text/plain")],
3769 "upstream request must stream".to_owned(),
3770 );
3771 }
3772 let messages = body.get("messages").and_then(Value::as_array);
3773 if messages.is_none_or(|messages| {
3774 messages.is_empty()
3775 || !messages.iter().all(|message| {
3776 message.get("role").and_then(Value::as_str).is_some()
3777 && message
3778 .get("content")
3779 .and_then(Value::as_str)
3780 .is_some_and(|content| {
3781 !content.is_empty()
3782 && content
3783 .chars()
3784 .all(|ch| ch.is_ascii_hexdigit())
3785 })
3786 })
3787 }) {
3788 return (
3789 StatusCode::BAD_REQUEST,
3790 [("content-type", "text/plain")],
3791 "messages must be encrypted message objects".to_owned(),
3792 );
3793 }
3794
3795 let frames = {
3796 let mut attempts = chat_attempts
3797 .lock()
3798 .expect("mock chat attempts mutex should not be poisoned");
3799 if attempts.len() > 1 {
3800 attempts.pop_front().expect("attempts length checked above")
3801 } else {
3802 attempts.front().cloned().unwrap_or_default()
3803 }
3804 };
3805
3806 (
3807 StatusCode::OK,
3808 [("content-type", "text/event-stream")],
3809 render_mock_sse(&frames, client_public_key),
3810 )
3811 }
3812 }),
3813 );
3814 let listener = TcpListener::bind(("127.0.0.1", 0))
3815 .await
3816 .expect("mock Venice listener should bind");
3817 let addr = listener
3818 .local_addr()
3819 .expect("mock Venice listener should have local address");
3820
3821 tokio::spawn(async move {
3822 axum::serve(listener, app)
3823 .await
3824 .expect("mock Venice server should run");
3825 });
3826
3827 format!("http://{addr}/api/v1")
3828 }
3829
3830 async fn spawn_venice_server_with_chat_status(
3831 model_public_key: String,
3832 upstream_status: StatusCode,
3833 ) -> String {
3834 let attestation_key = model_public_key.clone();
3835 let app = Router::new()
3836 .route(
3837 "/api/v1/tee/attestation",
3838 get(move |Query(query): Query<HashMap<String, String>>| {
3839 let model_public_key = attestation_key.clone();
3840 async move {
3841 Json(json!({
3842 "api_version": "aci/1",
3843 "attestation": {
3844 "tee_type": "tdx",
3845 "evidence": {}
3846 },
3847 "verified": true,
3848 "nonce": query.get("nonce").cloned().unwrap_or_default(),
3849 "model": query.get("model").cloned().unwrap_or_default(),
3850 "tee_provider": "phala",
3851 "signing_public_key": model_public_key,
3852 }))
3853 }
3854 }),
3855 )
3856 .route(
3857 "/api/v1/chat/completions",
3858 post(move || async move { upstream_status }),
3859 );
3860 let listener = TcpListener::bind(("127.0.0.1", 0))
3861 .await
3862 .expect("mock Venice listener should bind");
3863 let addr = listener
3864 .local_addr()
3865 .expect("mock Venice listener should have local address");
3866
3867 tokio::spawn(async move {
3868 axum::serve(listener, app)
3869 .await
3870 .expect("mock Venice server should run");
3871 });
3872
3873 format!("http://{addr}/api/v1")
3874 }
3875
3876 fn render_mock_sse(frames: &[MockStreamFrame], client_public_key: &str) -> String {
3877 let codec = E2eeCodec::default();
3878 let mut output = String::new();
3879 for frame in frames {
3880 match frame {
3881 MockStreamFrame::Role => {
3882 output.push_str(&format!("data: {}\n\n", upstream_role_chunk()));
3883 }
3884 MockStreamFrame::NullContent => {
3885 output.push_str(&format!("data: {}\n\n", upstream_null_content_chunk()));
3886 }
3887 MockStreamFrame::EmptyContent => {
3888 output.push_str(&format!(
3889 "data: {}\n\n",
3890 upstream_content_chunk(String::new())
3891 ));
3892 }
3893 MockStreamFrame::Text(content) => {
3894 let encrypted = codec
3895 .encrypt_content(content, client_public_key)
3896 .expect("mock content should encrypt")
3897 .into_hex();
3898 output.push_str(&format!("data: {}\n\n", upstream_content_chunk(encrypted)));
3899 }
3900 MockStreamFrame::Reasoning(content) => {
3901 let encrypted = codec
3902 .encrypt_content(content, client_public_key)
3903 .expect("mock reasoning content should encrypt")
3904 .into_hex();
3905 output.push_str(&format!(
3906 "data: {}\n\n",
3907 upstream_reasoning_content_chunk(encrypted)
3908 ));
3909 }
3910 MockStreamFrame::TextForWrongRecipient(content) => {
3911 let wrong_key = ProxyInstanceKey::generate();
3912 let encrypted = codec
3913 .encrypt_content(content, wrong_key.public_key_hex())
3914 .expect("mock content should encrypt")
3915 .into_hex();
3916 output.push_str(&format!("data: {}\n\n", upstream_content_chunk(encrypted)));
3917 }
3918 MockStreamFrame::Finish(reason) => {
3919 output.push_str(&format!("data: {}\n\n", upstream_finish_chunk(reason)));
3920 }
3921 MockStreamFrame::Usage => {
3922 output.push_str(&format!("data: {}\n\n", upstream_usage_chunk()));
3923 }
3924 MockStreamFrame::Done => output.push_str("data: [DONE]\n\n"),
3925 MockStreamFrame::Error(message) => {
3926 output.push_str(&format!(
3927 "event: error\ndata: {}\n\n",
3928 json!({ "message": message })
3929 ));
3930 }
3931 MockStreamFrame::Raw(raw) => output.push_str(raw),
3932 }
3933 }
3934 output
3935 }
3936
3937 fn upstream_role_chunk() -> Value {
3938 json!({
3939 "id": "chatcmpl-upstream-test",
3940 "object": "chat.completion.chunk",
3941 "created": 1_717_171_717,
3942 "model": "e2ee-test",
3943 "choices": [{
3944 "index": 0,
3945 "delta": { "role": "assistant" },
3946 "finish_reason": null,
3947 }],
3948 })
3949 }
3950
3951 fn upstream_content_chunk(encrypted_content: String) -> Value {
3952 json!({
3953 "id": "chatcmpl-upstream-test",
3954 "object": "chat.completion.chunk",
3955 "created": 1_717_171_717,
3956 "model": "e2ee-test",
3957 "choices": [{
3958 "index": 0,
3959 "delta": { "content": encrypted_content },
3960 "finish_reason": null,
3961 }],
3962 })
3963 }
3964
3965 fn upstream_reasoning_content_chunk(encrypted_content: String) -> Value {
3966 json!({
3967 "id": "chatcmpl-upstream-test",
3968 "object": "chat.completion.chunk",
3969 "created": 1_717_171_717,
3970 "model": "e2ee-test",
3971 "choices": [{
3972 "index": 0,
3973 "delta": { "reasoning_content": encrypted_content },
3974 "finish_reason": null,
3975 }],
3976 })
3977 }
3978
3979 fn upstream_null_content_chunk() -> Value {
3980 json!({
3981 "id": "chatcmpl-upstream-test",
3982 "object": "chat.completion.chunk",
3983 "created": 1_717_171_717,
3984 "model": "e2ee-test",
3985 "choices": [{
3986 "index": 0,
3987 "delta": { "content": Value::Null },
3988 "finish_reason": null,
3989 }],
3990 })
3991 }
3992
3993 fn upstream_finish_chunk(reason: &str) -> Value {
3994 json!({
3995 "id": "chatcmpl-upstream-test",
3996 "object": "chat.completion.chunk",
3997 "created": 1_717_171_717,
3998 "model": "e2ee-test",
3999 "choices": [{
4000 "index": 0,
4001 "delta": {},
4002 "finish_reason": reason,
4003 }],
4004 })
4005 }
4006
4007 fn upstream_usage_chunk() -> Value {
4008 json!({
4009 "id": "chatcmpl-upstream-test",
4010 "object": "chat.completion.chunk",
4011 "created": 1_717_171_717,
4012 "model": "e2ee-test",
4013 "choices": [],
4014 "usage": {
4015 "prompt_tokens": 1,
4016 "completion_tokens": 2,
4017 "total_tokens": 3,
4018 },
4019 })
4020 }
4021
4022 async fn spawn_attestation_server(model_public_key: String, verified: bool) -> String {
4023 let app = Router::new().route(
4024 "/api/v1/tee/attestation",
4025 get(move |Query(query): Query<HashMap<String, String>>| {
4026 let model_public_key = model_public_key.clone();
4027 async move {
4028 Json(json!({
4029 "api_version": "aci/1",
4030 "attestation": {
4031 "tee_type": "tdx",
4032 "evidence": {}
4033 },
4034 "verified": verified,
4035 "nonce": query.get("nonce").cloned().unwrap_or_default(),
4036 "model": query.get("model").cloned().unwrap_or_default(),
4037 "tee_provider": "phala",
4038 "signing_public_key": model_public_key,
4039 }))
4040 }
4041 }),
4042 );
4043 let listener = TcpListener::bind(("127.0.0.1", 0))
4044 .await
4045 .expect("mock attestation listener should bind");
4046 let addr = listener
4047 .local_addr()
4048 .expect("mock attestation listener should have local address");
4049
4050 tokio::spawn(async move {
4051 axum::serve(listener, app)
4052 .await
4053 .expect("mock attestation server should run");
4054 });
4055
4056 format!("http://{addr}/api/v1")
4057 }
4058
4059 #[test]
4060 fn metadata_header_helper_only_emits_safe_config_headers_by_default() {
4061 let config = ProxyConfig::default();
4062 let metadata = ProxyMetadataHeaders::from_config(&config);
4063 let mut headers = HeaderMap::new();
4064
4065 metadata.apply(&mut headers);
4066
4067 assert_eq!(
4068 headers.get(HEADER_PROXY_ATTESTATION_MODE).unwrap(),
4069 "independent"
4070 );
4071 assert_eq!(headers.get(HEADER_PROXY_TOOL_MODE).unwrap(), "emulated");
4072 assert!(headers.get(HEADER_PROXY_E2EE).is_none());
4073 assert!(headers.get(HEADER_PROXY_KEY_BINDING).is_none());
4074 }
4075}