Skip to main content

venice_e2ee_proxy/
http.rs

1//! HTTP server, route wiring, shared headers, and route errors.
2//!
3//! Routes include Venice-backed model listing, encrypted chat request
4//! construction, response transformation, and OpenAI-compatible errors/headers.
5
6use std::{
7    io,
8    sync::Arc,
9    time::{SystemTime, UNIX_EPOCH},
10};
11
12use axum::{
13    Json, Router,
14    extract::State,
15    http::{HeaderMap, HeaderName, HeaderValue, Method, StatusCode, Uri},
16    response::{
17        IntoResponse, Response,
18        sse::{Event, Sse},
19    },
20    routing::{get, post},
21};
22use serde_json::{Value, json};
23use thiserror::Error;
24use tokio::net::TcpListener;
25use tracing::{debug, error, info, warn};
26
27use crate::{
28    attestation::{AttestationError, AttestationVerifier},
29    config::ProxyConfig,
30    e2ee::{E2eeCodec, E2eeCodecError},
31    keys::ProxyInstanceKey,
32    openai::{
33        ErrorResponse,
34        chat::{
35            ChatCompletionRequest, ChatConstructionError, ChatRequestError, NormalizedChatMessage,
36        },
37    },
38    sessions::{AttestedModelState, SessionContext, SessionError, SessionManager, SessionRequest},
39    tools::{ToolEmulationContext, ToolOutputClassification, ValidatedToolCall},
40    venice::{VeniceClient, VeniceClientError},
41};
42
43pub const HEADER_PROXY_E2EE: &str = "X-Venice-Proxy-E2EE";
44pub const HEADER_PROXY_ATTESTATION_MODE: &str = "X-Venice-Proxy-Attestation-Mode";
45pub const HEADER_PROXY_ATTESTED_MODEL: &str = "X-Venice-Proxy-Attested-Model";
46pub const HEADER_PROXY_TEE_PROVIDER: &str = "X-Venice-Proxy-TEE-Provider";
47pub const HEADER_PROXY_TDX_VERIFIED: &str = "X-Venice-Proxy-TDX-Verified";
48pub const HEADER_PROXY_TDX_DEBUG: &str = "X-Venice-Proxy-TDX-Debug";
49pub const HEADER_PROXY_NVIDIA_VERIFIED: &str = "X-Venice-Proxy-NVIDIA-Verified";
50pub const HEADER_PROXY_KEY_BINDING: &str = "X-Venice-Proxy-Key-Binding";
51pub const HEADER_PROXY_SESSION_ID: &str = "X-Venice-Proxy-Session-Id";
52pub const HEADER_PROXY_SESSION_SCOPE: &str = "X-Venice-Proxy-Session-Scope";
53pub const HEADER_PROXY_TOOL_MODE: &str = "X-Venice-Proxy-Tool-Mode";
54pub const HEADER_PROXY_TOOL_RETRIES: &str = "X-Venice-Proxy-Tool-Retries";
55pub const HEADER_PROXY_ERROR_CODE: &str = "X-Venice-Proxy-Error-Code";
56
57/// Shared HTTP application state used by route handlers.
58#[derive(Debug, Clone)]
59pub struct AppState {
60    config: Arc<ProxyConfig>,
61    venice_client: VeniceClient,
62    proxy_instance_key: Option<ProxyInstanceKey>,
63    session_manager: SessionManager,
64    attestation_verifier: AttestationVerifier,
65}
66
67impl AppState {
68    /// Builds application state from configuration and a Venice client created from that config.
69    pub fn new(config: ProxyConfig) -> Result<Self, VeniceClientError> {
70        let venice_client = VeniceClient::from_config(&config)?;
71        Ok(Self::from_parts(config, venice_client))
72    }
73
74    /// Builds application state from configuration and an already-created Venice client.
75    pub fn from_parts(config: ProxyConfig, venice_client: VeniceClient) -> Self {
76        let proxy_instance_key = ProxyInstanceKey::generate_from_config(&config.keys);
77        let session_manager = SessionManager::new(config.session.clone());
78        let attestation_verifier = AttestationVerifier::from_config(&config, venice_client.clone());
79
80        Self {
81            config: Arc::new(config),
82            venice_client,
83            proxy_instance_key,
84            session_manager,
85            attestation_verifier,
86        }
87    }
88
89    /// Returns the proxy configuration shared by handlers.
90    pub fn config(&self) -> &ProxyConfig {
91        &self.config
92    }
93
94    /// Returns the Venice upstream client shared by handlers.
95    pub fn venice_client(&self) -> &VeniceClient {
96        &self.venice_client
97    }
98
99    /// Returns the startup proxy instance key when key generation is enabled.
100    pub fn proxy_instance_key(&self) -> Option<&ProxyInstanceKey> {
101        self.proxy_instance_key.as_ref()
102    }
103
104    /// Returns the in-memory session manager shared by handlers.
105    pub fn session_manager(&self) -> &SessionManager {
106        &self.session_manager
107    }
108
109    /// Returns the attestation verifier shared by handlers.
110    pub fn attestation_verifier(&self) -> &AttestationVerifier {
111        &self.attestation_verifier
112    }
113}
114
115/// Builds the HTTP router using the configured Venice API key environment
116/// variable.
117pub fn router(config: ProxyConfig) -> Result<Router, VeniceClientError> {
118    Ok(router_from_state(AppState::new(config)?))
119}
120
121/// Builds the HTTP router with an already-constructed Venice client.
122///
123/// This keeps route tests deterministic without mutating process-wide
124/// environment variables.
125pub fn router_with_venice_client(config: ProxyConfig, venice_client: VeniceClient) -> Router {
126    router_from_state(AppState::from_parts(config, venice_client))
127}
128
129/// Builds route wiring from already-initialized application state.
130fn router_from_state(state: AppState) -> Router {
131    Router::new()
132        .route("/v1/models", get(list_models).fallback(method_not_allowed))
133        .route(
134            "/v1/chat/completions",
135            post(create_chat_completion).fallback(method_not_allowed),
136        )
137        .fallback(not_found)
138        .with_state(state)
139}
140
141/// Serves an already-built router on an already-bound listener.
142pub async fn serve(listener: TcpListener, router: Router) -> io::Result<()> {
143    axum::serve(listener, router).await
144}
145
146/// Handles `GET /v1/models` by proxying Venice's supported model list.
147async fn list_models(State(state): State<AppState>) -> Result<Response, ProxyError> {
148    info!(route = "/v1/models", "listing Venice models");
149    let models = state.venice_client().list_models().await?;
150    let mut response = Json(models).into_response();
151    ProxyMetadataHeaders::from_config(state.config()).apply(response.headers_mut());
152    info!(route = "/v1/models", "Venice models response proxied");
153    Ok(response)
154}
155
156/// Handles `POST /v1/chat/completions` by encrypting the request and transforming Venice's response.
157async fn create_chat_completion(
158    State(state): State<AppState>,
159    headers: HeaderMap,
160    Json(body): Json<Value>,
161) -> Result<Response, ProxyError> {
162    let request = ChatCompletionRequest::parse(&body)?;
163    let proxy_instance_key = state
164        .proxy_instance_key()
165        .ok_or(ProxyError::ProxyInstanceKeyUnavailable)?;
166
167    let session_resolution = state
168        .session_manager()
169        .get_or_create(SessionRequest::new(&request.model, &headers).with_body(&body))?;
170    let session_created = session_resolution.created;
171    let session_replaced_expired = session_resolution.replaced_expired;
172    let session_scope = session_resolution.session.scope;
173    let session = ensure_attested_session(&state, session_resolution.session).await?;
174    let model_public_key = session
175        .attested_model_public_key
176        .as_deref()
177        .ok_or(ProxyError::MissingAttestedModelKey)?;
178
179    let codec =
180        E2eeCodec::from_config(&state.config().e2ee).map_err(ChatConstructionError::E2ee)?;
181    let tool_context = ToolEmulationContext::from_request(&state.config().tools, &request)?;
182    let metadata = ProxyMetadataHeaders::for_verified_chat(state.config(), &session);
183
184    info!(
185        route = "/v1/chat/completions",
186        model = %request.model,
187        stream = request.stream,
188        message_count = request.messages.len(),
189        tool_count = request.tools.len(),
190        tool_mode = tool_context.is_some(),
191        session_created,
192        session_replaced_expired = ?session_replaced_expired,
193        session_scope = %session_scope,
194        "chat completion request accepted"
195    );
196
197    if let Some(tool_context) = tool_context {
198        info!(model = %request.model, "using tool-emulated chat completion");
199        return openai_tool_emulated_chat_response(
200            &state,
201            &request,
202            &tool_context,
203            codec,
204            proxy_instance_key.clone(),
205            model_public_key,
206            metadata,
207        )
208        .await;
209    }
210
211    let prepared = request.to_venice_e2ee_request(&codec, model_public_key)?;
212    info!(
213        model = %request.model,
214        client_stream = prepared.client_stream,
215        "forwarding encrypted chat completion to Venice"
216    );
217
218    let upstream = state
219        .venice_client()
220        .create_chat_completion_stream(
221            &prepared.upstream,
222            proxy_instance_key.public_key_hex(),
223            model_public_key,
224        )
225        .await?;
226
227    if prepared.client_stream {
228        info!(model = %request.model, "streaming chat completion response to client");
229        let include_usage_requested = request.stream_options.include_usage.unwrap_or(false);
230        let transformer = OpenAiChatStreamTransformer::new(
231            codec,
232            proxy_instance_key.clone(),
233            request.model.clone(),
234            include_usage_requested,
235        );
236        Ok(chat_sse_response(
237            upstream,
238            transformer,
239            request.model,
240            include_usage_requested,
241            &CHAT_SSE_LOG,
242            metadata,
243        ))
244    } else {
245        info!(model = %request.model, "buffering chat completion response for client");
246        openai_chat_buffered_response(
247            upstream,
248            codec,
249            proxy_instance_key.clone(),
250            request.model,
251            metadata,
252        )
253        .await
254    }
255}
256
257/// Returns a session with cached attestation state, fetching and storing it when absent.
258async fn ensure_attested_session(
259    state: &AppState,
260    session: SessionContext,
261) -> Result<SessionContext, ProxyError> {
262    if session.attested_model_public_key.is_some() {
263        info!(model = %session.model_id, session_scope = %session.scope, "using cached model attestation");
264        return Ok(session);
265    }
266
267    info!(model = %session.model_id, session_scope = %session.scope, "fetching model attestation");
268    let attestation = state
269        .attestation_verifier()
270        .verify_model_attestation(&session.model_id)
271        .await?;
272
273    info!(
274        model = %attestation.model_id,
275        tee_provider = attestation.tee_provider.as_deref().unwrap_or("unknown"),
276        tdx_verified = attestation.tdx.verified,
277        nvidia_verified = attestation.nvidia.verified.as_header_value(),
278        "model attestation verified"
279    );
280
281    let state_update = AttestedModelState {
282        model_public_key: attestation.model_public_key,
283        tee_provider: attestation.tee_provider,
284        tdx_debug: attestation.tdx.debug.or(attestation.debug),
285        nvidia_verified: attestation.nvidia.verified.as_header_value().to_owned(),
286        verified_at: attestation.verified_at,
287    };
288
289    Ok(state
290        .session_manager()
291        .set_attested_model_state(&session.session_key, state_update)?)
292}
293
294/// Buffers an upstream SSE chat response and returns one OpenAI chat completion JSON response.
295async fn openai_chat_buffered_response(
296    upstream: reqwest::Response,
297    codec: E2eeCodec,
298    proxy_instance_key: ProxyInstanceKey,
299    fallback_model: String,
300    metadata: ProxyMetadataHeaders,
301) -> Result<Response, ProxyError> {
302    let completion =
303        buffer_openai_chat_completion(upstream, codec, proxy_instance_key, fallback_model).await?;
304    let mut response = Json(completion).into_response();
305    metadata.apply(response.headers_mut());
306    Ok(response)
307}
308
309/// Runs a chat request through tool-emulation response handling for streaming or buffered clients.
310async fn openai_tool_emulated_chat_response(
311    state: &AppState,
312    request: &ChatCompletionRequest,
313    tool_context: &ToolEmulationContext,
314    codec: E2eeCodec,
315    proxy_instance_key: ProxyInstanceKey,
316    model_public_key: &str,
317    metadata: ProxyMetadataHeaders,
318) -> Result<Response, ProxyError> {
319    info!(
320        model = %request.model,
321        max_retries = tool_context.max_retries(),
322        "starting tool-emulated chat completion"
323    );
324    if request.stream {
325        let upstream = tool_emulated_upstream_stream(
326            state,
327            request,
328            tool_context,
329            &codec,
330            &proxy_instance_key,
331            model_public_key,
332            None,
333        )
334        .await?;
335
336        return Ok(tool_emulated_chat_sse_response_with_retries(
337            ToolEmulatedChatSseRetryState {
338                state: state.clone(),
339                request: request.clone(),
340                tool_context: tool_context.clone(),
341                codec,
342                proxy_instance_key,
343                model_public_key: model_public_key.to_owned(),
344            },
345            upstream,
346            metadata,
347        ));
348    }
349
350    let mut retries = 0;
351    let mut correction: Option<(String, String)> = None;
352
353    loop {
354        let upstream = tool_emulated_upstream_stream(
355            state,
356            request,
357            tool_context,
358            &codec,
359            &proxy_instance_key,
360            model_public_key,
361            correction.as_ref(),
362        )
363        .await?;
364
365        let completion = match tokio::time::timeout(
366            tool_context.marker_timeout(),
367            buffer_openai_chat_completion(
368                upstream,
369                codec.clone(),
370                proxy_instance_key.clone(),
371                request.model.clone(),
372            ),
373        )
374        .await
375        {
376            Ok(completion) => completion?,
377            Err(_) => {
378                let validation_error = format!(
379                    "tool-emulated completion did not finish within {}",
380                    humantime::format_duration(tool_context.config().tool_call_marker_timeout)
381                );
382                if retries >= tool_context.max_retries() {
383                    return Err(ProxyError::ToolCallRetryExhausted {
384                        max_retries: tool_context.max_retries(),
385                        last_validation_error: validation_error,
386                    });
387                }
388                warn!(
389                    model = %request.model,
390                    retry = retries + 1,
391                    max_retries = tool_context.max_retries(),
392                    "tool call marker timed out; retrying with correction"
393                );
394                retries += 1;
395                correction = Some((validation_error, String::new()));
396                continue;
397            }
398        };
399        let assistant_content = completion
400            .get("choices")
401            .and_then(Value::as_array)
402            .and_then(|choices| choices.first())
403            .and_then(|choice| choice.get("message"))
404            .and_then(|message| message.get("content"))
405            .and_then(Value::as_str)
406            .unwrap_or_default();
407
408        let mut metadata = metadata.clone();
409        if retries > 0 {
410            metadata.tool_retries = Some(retries);
411        }
412
413        match tool_context.classify_assistant_output(assistant_content) {
414            ToolOutputClassification::NormalText => {
415                info!(model = %request.model, retries, "tool emulation produced normal text");
416                let mut response = Json(completion).into_response();
417                metadata.apply(response.headers_mut());
418                return Ok(response);
419            }
420            ToolOutputClassification::ToolCalls(tool_calls) => {
421                info!(
422                    model = %request.model,
423                    tool_calls = tool_calls.len(),
424                    retries,
425                    "tool emulation produced tool calls"
426                );
427                let body = openai_tool_call_completion(completion, tool_calls);
428                let mut response = Json(body).into_response();
429                metadata.apply(response.headers_mut());
430                return Ok(response);
431            }
432            ToolOutputClassification::InvalidToolCall {
433                error,
434                invalid_output,
435            } => {
436                if retries >= tool_context.max_retries() {
437                    warn!(
438                        model = %request.model,
439                        max_retries = tool_context.max_retries(),
440                        validation_error = %error,
441                        "tool call validation failed and retries were exhausted"
442                    );
443                    return Err(ProxyError::ToolCallRetryExhausted {
444                        max_retries: tool_context.max_retries(),
445                        last_validation_error: error.to_string(),
446                    });
447                }
448                warn!(
449                    model = %request.model,
450                    retry = retries + 1,
451                    max_retries = tool_context.max_retries(),
452                    validation_error = %error,
453                    "tool call validation failed; retrying with correction"
454                );
455                retries += 1;
456                correction = Some((error.to_string(), invalid_output));
457            }
458        }
459    }
460}
461
462/// Builds and sends the encrypted tool-emulated upstream request, optionally
463/// appending a correction message from `(validation_error, invalid_output)`.
464async fn tool_emulated_upstream_stream(
465    state: &AppState,
466    request: &ChatCompletionRequest,
467    tool_context: &ToolEmulationContext,
468    codec: &E2eeCodec,
469    proxy_instance_key: &ProxyInstanceKey,
470    model_public_key: &str,
471    correction: Option<&(String, String)>,
472) -> Result<reqwest::Response, ProxyError> {
473    let messages = tool_emulated_messages(request, tool_context, correction);
474    let mut tool_request = request.clone();
475    tool_request.messages = messages;
476
477    let prepared = tool_request.to_venice_e2ee_request(codec, model_public_key)?;
478
479    Ok(state
480        .venice_client()
481        .create_chat_completion_stream(
482            &prepared.upstream,
483            proxy_instance_key.public_key_hex(),
484            model_public_key,
485        )
486        .await?)
487}
488
489/// Builds the message list for a tool-emulated upstream request, including any correction prompt.
490fn tool_emulated_messages(
491    request: &ChatCompletionRequest,
492    tool_context: &ToolEmulationContext,
493    correction: Option<&(String, String)>,
494) -> Vec<NormalizedChatMessage> {
495    let mut messages = request.messages.clone();
496    let mut tool_system_content = tool_context.controller_message().content;
497
498    if let Some((validation_error, invalid_output)) = correction {
499        tool_system_content.push_str("\n\n");
500        tool_system_content.push_str(
501            &tool_context
502                .correction_message(validation_error, invalid_output)
503                .content,
504        );
505    }
506
507    append_to_system_message(&mut messages, tool_system_content);
508    messages
509}
510
511/// Appends controller content to an existing system message or inserts a new system message.
512fn append_to_system_message(messages: &mut Vec<NormalizedChatMessage>, content: String) {
513    if let Some(system_message) = messages.iter_mut().find(|message| message.role == "system") {
514        system_message.content.push_str("\n\n");
515        system_message.content.push_str(&content);
516    } else {
517        messages.insert(0, NormalizedChatMessage::new("system", content));
518    }
519}
520
521/// Rewrites a buffered text completion into an OpenAI completion containing tool calls.
522fn openai_tool_call_completion(completion: Value, tool_calls: Vec<ValidatedToolCall>) -> Value {
523    let choice = completion
524        .get("choices")
525        .and_then(Value::as_array)
526        .and_then(|choices| choices.first())
527        .cloned()
528        .unwrap_or(Value::Null);
529    let index = choice.get("index").and_then(Value::as_u64).unwrap_or(0);
530    let tool_call_values: Vec<Value> = tool_calls
531        .iter()
532        .map(ValidatedToolCall::to_openai_value)
533        .collect();
534    let reasoning_content = choice
535        .get("message")
536        .and_then(|message| message.get("reasoning_content"))
537        .and_then(Value::as_str);
538    let mut message = serde_json::Map::new();
539    message.insert("role".to_owned(), Value::String("assistant".to_owned()));
540    message.insert("content".to_owned(), Value::Null);
541    if let Some(reasoning_content) = reasoning_content {
542        message.insert(
543            "reasoning_content".to_owned(),
544            Value::String(reasoning_content.to_owned()),
545        );
546    }
547    message.insert("tool_calls".to_owned(), Value::Array(tool_call_values));
548
549    json!({
550        "id": string_field(&completion, "id").unwrap_or("chatcmpl-local"),
551        "object": "chat.completion",
552        "created": integer_field(&completion, "created").unwrap_or_else(unix_timestamp_now),
553        "model": string_field(&completion, "model").unwrap_or("unknown"),
554        "choices": [{
555            "index": index,
556            "message": Value::Object(message),
557            "finish_reason": "tool_calls",
558        }],
559        "usage": completion.get("usage").cloned().unwrap_or(Value::Null),
560    })
561}
562
563/// Buffers upstream SSE events and returns one decrypted OpenAI completion object.
564async fn buffer_openai_chat_completion(
565    mut upstream: reqwest::Response,
566    codec: E2eeCodec,
567    proxy_instance_key: ProxyInstanceKey,
568    fallback_model: String,
569) -> Result<Value, ChatStreamError> {
570    info!(model = %fallback_model, "buffering upstream chat stream");
571    let mut parser = SseEventParser::default();
572    let mut transformer =
573        OpenAiChatCompletionBuffer::new(codec, proxy_instance_key, fallback_model.clone());
574    let mut upstream_done = false;
575    let mut chunk_count = 0_u64;
576    let mut event_count = 0_u64;
577
578    while let Some(chunk) = upstream
579        .chunk()
580        .await
581        .map_err(ChatStreamError::upstream_stream)?
582    {
583        chunk_count += 1;
584        let chunk = std::str::from_utf8(&chunk).map_err(ChatStreamError::invalid_utf8)?;
585        let events = parser.push(chunk)?;
586        event_count += events.len() as u64;
587        debug!(
588            model = %fallback_model,
589            chunk_count,
590            parsed_events = events.len(),
591            total_events = event_count,
592            "parsed buffered upstream SSE chunk"
593        );
594
595        for event in events {
596            if transformer.handle_event(event)? {
597                upstream_done = true;
598                break;
599            }
600        }
601
602        if upstream_done {
603            break;
604        }
605    }
606
607    if !upstream_done {
608        warn!(
609            model = %fallback_model,
610            chunk_count,
611            event_count,
612            "buffered upstream stream ended before DONE"
613        );
614        parser.finish()?;
615        return Err(ChatStreamError::malformed_event(
616            "upstream stream ended before data: [DONE]",
617        ));
618    }
619
620    let completion = transformer.into_response();
621    info!(
622        model = %fallback_model,
623        chunk_count,
624        event_count,
625        "buffered upstream chat stream transformed"
626    );
627    Ok(completion)
628}
629
630/// Per-path log messages for the shared chat SSE pump, preserving the
631/// historical "streaming" vs "tool-emulated" wording.
632struct ChatSseLogMessages {
633    start: &'static str,
634    parsed_chunk: &'static str,
635    transformed_event: &'static str,
636    completed: &'static str,
637    ended_early: &'static str,
638}
639
640const CHAT_SSE_LOG: ChatSseLogMessages = ChatSseLogMessages {
641    start: "starting upstream chat SSE transformation",
642    parsed_chunk: "parsed streaming upstream SSE chunk",
643    transformed_event: "transformed streaming upstream SSE event",
644    completed: "completed upstream chat SSE transformation",
645    ended_early: "streaming upstream stream ended before DONE",
646};
647
648const TOOL_EMULATED_CHAT_SSE_LOG: ChatSseLogMessages = ChatSseLogMessages {
649    start: "starting tool-emulated upstream chat SSE transformation",
650    parsed_chunk: "parsed tool-emulated upstream SSE chunk",
651    transformed_event: "transformed tool-emulated upstream SSE event",
652    completed: "completed tool-emulated upstream chat SSE transformation",
653    ended_early: "tool-emulated upstream stream ended before DONE",
654};
655
656/// Transforms parsed upstream SSE events into client-facing stream outputs.
657trait ChatSseTransformer {
658    /// Converts one parsed upstream event into zero or more client-facing stream outputs.
659    fn handle_event(&mut self, event: RawSseEvent) -> Result<Vec<StreamOutput>, ChatStreamError>;
660}
661
662/// Builds a client-facing SSE response from an upstream response and transformer.
663fn chat_sse_response<T>(
664    upstream: reqwest::Response,
665    transformer: T,
666    fallback_model: String,
667    include_usage_requested: bool,
668    log: &'static ChatSseLogMessages,
669    metadata: ProxyMetadataHeaders,
670) -> Response
671where
672    T: ChatSseTransformer + Send + 'static,
673{
674    let stream = chat_sse_event_stream(
675        upstream,
676        transformer,
677        fallback_model,
678        include_usage_requested,
679        log,
680    );
681    let mut response = Sse::new(stream).into_response();
682    metadata.apply(response.headers_mut());
683    response
684}
685
686/// State needed to retry one tool-emulated chat SSE response.
687struct ToolEmulatedChatSseRetryState {
688    state: AppState,
689    request: ChatCompletionRequest,
690    tool_context: ToolEmulationContext,
691    codec: E2eeCodec,
692    proxy_instance_key: ProxyInstanceKey,
693    model_public_key: String,
694}
695
696impl ToolEmulatedChatSseRetryState {
697    fn include_usage_requested(&self) -> bool {
698        self.request.stream_options.include_usage.unwrap_or(false)
699    }
700
701    async fn upstream(
702        &self,
703        initial_upstream: &mut Option<reqwest::Response>,
704        correction: Option<&(String, String)>,
705    ) -> Result<reqwest::Response, ProxyError> {
706        if let Some(upstream) = initial_upstream.take() {
707            return Ok(upstream);
708        }
709
710        tool_emulated_upstream_stream(
711            &self.state,
712            &self.request,
713            &self.tool_context,
714            &self.codec,
715            &self.proxy_instance_key,
716            &self.model_public_key,
717            correction,
718        )
719        .await
720    }
721
722    fn transformer(
723        &self,
724        suppress_normal_output: bool,
725    ) -> Result<OpenAiToolEmulatedChatStreamTransformer, ChatStreamError> {
726        OpenAiToolEmulatedChatStreamTransformer::new(
727            &self.tool_context,
728            self.codec.clone(),
729            self.proxy_instance_key.clone(),
730            self.request.model.clone(),
731            self.include_usage_requested(),
732            suppress_normal_output,
733        )
734    }
735
736    fn log_attempt_start(&self, retries: u32, retrying: bool) {
737        info!(
738            model = %self.request.model,
739            include_usage_requested = self.include_usage_requested(),
740            retry = retries,
741            retrying,
742            "{}",
743            TOOL_EMULATED_CHAT_SSE_LOG.start
744        );
745    }
746}
747
748/// Builds a tool-emulated SSE response that can retry malformed buffered tool calls.
749fn tool_emulated_chat_sse_response_with_retries(
750    retry_state: ToolEmulatedChatSseRetryState,
751    initial_upstream: reqwest::Response,
752    metadata: ProxyMetadataHeaders,
753) -> Response {
754    let stream = tool_emulated_chat_sse_event_stream_with_retries(retry_state, initial_upstream);
755    let mut response = Sse::new(stream).into_response();
756    metadata.apply(response.headers_mut());
757    response
758}
759
760/// Streams upstream SSE chunks through a transformer into OpenAI-compatible SSE events.
761fn chat_sse_event_stream<T>(
762    mut upstream: reqwest::Response,
763    mut transformer: T,
764    fallback_model: String,
765    include_usage_requested: bool,
766    log: &'static ChatSseLogMessages,
767) -> impl futures_core::Stream<Item = Result<Event, axum::BoxError>>
768where
769    T: ChatSseTransformer + Send + 'static,
770{
771    async_stream::try_stream! {
772        info!(
773            model = %fallback_model,
774            include_usage_requested,
775            "{}", log.start
776        );
777        let mut parser = SseEventParser::default();
778        let mut stats = ChatSseStreamStats::default();
779
780        'stream: loop {
781            let events = next_sse_events(
782                &mut upstream,
783                &mut parser,
784                &fallback_model,
785                log,
786                &mut stats,
787            )
788            .await
789            .map_err(box_chat_stream_error)?;
790
791            for event in events {
792                let outputs = transformer.handle_event(event).map_err(box_chat_stream_error)?;
793                record_transformed_outputs(&fallback_model, log, outputs.len(), &mut stats);
794
795                for output in outputs {
796                    let (event, done) = stream_output_event(output);
797                    if done {
798                        log_sse_completed(&fallback_model, log, &stats);
799                        yield event;
800                        break 'stream;
801                    }
802                    yield event;
803                }
804            }
805        }
806    }
807}
808
809/// Streams tool-emulated upstream SSE chunks, retrying malformed buffered tool calls with corrections.
810fn tool_emulated_chat_sse_event_stream_with_retries(
811    retry_state: ToolEmulatedChatSseRetryState,
812    initial_upstream: reqwest::Response,
813) -> impl futures_core::Stream<Item = Result<Event, axum::BoxError>> {
814    async_stream::try_stream! {
815        let mut retries = 0_u32;
816        let mut correction: Option<(String, String)> = None;
817        let mut initial_upstream = Some(initial_upstream);
818
819        'attempts: loop {
820            let retrying = correction.is_some();
821            let mut upstream = retry_state
822                .upstream(&mut initial_upstream, correction.as_ref())
823                .await
824                .map_err(box_proxy_error)?;
825
826            retry_state.log_attempt_start(retries, retrying);
827
828            let mut transformer = retry_state
829                .transformer(retrying)
830                .map_err(box_chat_stream_error)?;
831            let mut parser = SseEventParser::default();
832            let mut stats = ChatSseStreamStats::default();
833
834            loop {
835                let events = next_sse_events(
836                    &mut upstream,
837                    &mut parser,
838                    &retry_state.request.model,
839                    &TOOL_EMULATED_CHAT_SSE_LOG,
840                    &mut stats,
841                )
842                .await
843                .map_err(box_chat_stream_error)?;
844
845                for event in events {
846                    let outputs = match tool_stream_event_outputs(
847                        &retry_state,
848                        &mut transformer,
849                        event,
850                        &mut retries,
851                        &mut correction,
852                    ) {
853                        Ok(ToolStreamEventOutputs::Outputs(outputs)) => outputs,
854                        Ok(ToolStreamEventOutputs::Retry) => continue 'attempts,
855                        Err(error) => Err::<Vec<StreamOutput>, axum::BoxError>(error)?,
856                    };
857
858                    record_transformed_outputs(
859                        &retry_state.request.model,
860                        &TOOL_EMULATED_CHAT_SSE_LOG,
861                        outputs.len(),
862                        &mut stats,
863                    );
864
865                    for output in outputs {
866                        let (event, done) = stream_output_event(output);
867                        if done {
868                            log_sse_completed(
869                                &retry_state.request.model,
870                                &TOOL_EMULATED_CHAT_SSE_LOG,
871                                &stats,
872                            );
873                            yield event;
874                            break 'attempts;
875                        }
876                        yield event;
877                    }
878                }
879            }
880        }
881    }
882}
883
884/// Result of handling one tool-emulated SSE event in the retry-aware stream path.
885enum ToolStreamEventOutputs {
886    Outputs(Vec<StreamOutput>),
887    Retry,
888}
889
890/// Handles one retry-aware tool-emulated SSE event.
891fn tool_stream_event_outputs(
892    retry_state: &ToolEmulatedChatSseRetryState,
893    transformer: &mut OpenAiToolEmulatedChatStreamTransformer,
894    event: RawSseEvent,
895    retries: &mut u32,
896    correction: &mut Option<(String, String)>,
897) -> Result<ToolStreamEventOutputs, axum::BoxError> {
898    match transformer.handle_event(event) {
899        Ok(outputs) => Ok(ToolStreamEventOutputs::Outputs(outputs)),
900        Err(ChatStreamError::InvalidToolCall {
901            validation_error,
902            invalid_output,
903        }) if *retries < retry_state.tool_context.max_retries() => {
904            warn!(
905                model = %retry_state.request.model,
906                retry = *retries + 1,
907                max_retries = retry_state.tool_context.max_retries(),
908                validation_error = %validation_error,
909                "streamed tool call validation failed; retrying with correction"
910            );
911            *retries += 1;
912            *correction = Some((validation_error, invalid_output));
913            Ok(ToolStreamEventOutputs::Retry)
914        }
915        Err(ChatStreamError::InvalidToolCall {
916            validation_error, ..
917        }) => {
918            error!(
919                model = %retry_state.request.model,
920                max_retries = retry_state.tool_context.max_retries(),
921                validation_error = %validation_error,
922                "streamed tool call validation failed and retries were exhausted"
923            );
924            Err(box_proxy_error(ProxyError::ToolCallRetryExhausted {
925                max_retries: retry_state.tool_context.max_retries(),
926                last_validation_error: validation_error,
927            }))
928        }
929        Err(error) => Err(box_chat_stream_error(error)),
930    }
931}
932
933/// Counters tracked while transforming one upstream SSE attempt.
934#[derive(Debug, Default)]
935struct ChatSseStreamStats {
936    chunk_count: u64,
937    event_count: u64,
938    output_count: u64,
939}
940
941/// Reads and parses the next upstream SSE chunk for a streaming transformation attempt.
942async fn next_sse_events(
943    upstream: &mut reqwest::Response,
944    parser: &mut SseEventParser,
945    fallback_model: &str,
946    log: &'static ChatSseLogMessages,
947    stats: &mut ChatSseStreamStats,
948) -> Result<Vec<RawSseEvent>, ChatStreamError> {
949    let Some(chunk) = upstream
950        .chunk()
951        .await
952        .map_err(ChatStreamError::upstream_stream)?
953    else {
954        warn!(
955            model = %fallback_model,
956            chunk_count = stats.chunk_count,
957            event_count = stats.event_count,
958            output_count = stats.output_count,
959            "{}",
960            log.ended_early
961        );
962        parser.finish()?;
963        return Err(ChatStreamError::malformed_event(
964            "upstream stream ended before data: [DONE]",
965        ));
966    };
967
968    stats.chunk_count += 1;
969    let chunk = std::str::from_utf8(&chunk).map_err(ChatStreamError::invalid_utf8)?;
970    let events = parser.push(chunk)?;
971    stats.event_count += events.len() as u64;
972    debug!(
973        model = %fallback_model,
974        chunk_count = stats.chunk_count,
975        parsed_events = events.len(),
976        total_events = stats.event_count,
977        "{}",
978        log.parsed_chunk
979    );
980    Ok(events)
981}
982
983/// Records and logs transformed outputs for one parsed upstream event.
984fn record_transformed_outputs(
985    fallback_model: &str,
986    log: &'static ChatSseLogMessages,
987    output_count: usize,
988    stats: &mut ChatSseStreamStats,
989) {
990    stats.output_count += output_count as u64;
991    debug!(
992        model = %fallback_model,
993        emitted_outputs = output_count,
994        total_outputs = stats.output_count,
995        "{}",
996        log.transformed_event
997    );
998}
999
1000/// Converts a transformer output into a client-facing SSE event and whether it finishes the stream.
1001fn stream_output_event(output: StreamOutput) -> (Event, bool) {
1002    match output {
1003        StreamOutput::Json(value) => (Event::default().data(value.to_string()), false),
1004        StreamOutput::Done => (Event::default().data("[DONE]"), true),
1005    }
1006}
1007
1008/// Logs completion of a client-facing SSE transformation.
1009fn log_sse_completed(
1010    fallback_model: &str,
1011    log: &'static ChatSseLogMessages,
1012    stats: &ChatSseStreamStats,
1013) {
1014    info!(
1015        model = %fallback_model,
1016        chunk_count = stats.chunk_count,
1017        event_count = stats.event_count,
1018        output_count = stats.output_count,
1019        "{}",
1020        log.completed
1021    );
1022}
1023
1024/// Converts a proxy error into an Axum boxed stream error after logging it.
1025fn box_proxy_error(error: ProxyError) -> axum::BoxError {
1026    error!(error = %error, "chat stream failed");
1027    Box::new(error)
1028}
1029
1030/// Converts a chat-stream error into an Axum boxed stream error after logging it.
1031fn box_chat_stream_error(error: ChatStreamError) -> axum::BoxError {
1032    error!(error = %error, "chat stream transformation failed");
1033    Box::new(error)
1034}
1035
1036/// Incremental parser for upstream Server-Sent Event text chunks.
1037#[derive(Debug, Default)]
1038struct SseEventParser {
1039    buffer: String,
1040}
1041
1042impl SseEventParser {
1043    /// Adds one UTF-8 chunk and returns all complete SSE events parsed from the buffer.
1044    fn push(&mut self, chunk: &str) -> Result<Vec<RawSseEvent>, ChatStreamError> {
1045        self.buffer.push_str(chunk);
1046        let mut events = Vec::new();
1047
1048        while let Some((boundary_start, boundary_len)) = sse_event_boundary(&self.buffer) {
1049            let raw = self.buffer[..boundary_start].to_owned();
1050            self.buffer.drain(..boundary_start + boundary_len);
1051            if let Some(event) = parse_sse_event(&raw)? {
1052                events.push(event);
1053            }
1054        }
1055
1056        debug!(
1057            chunk_bytes = chunk.len(),
1058            buffered_bytes = self.buffer.len(),
1059            parsed_events = events.len(),
1060            "SSE parser processed upstream chunk"
1061        );
1062        Ok(events)
1063    }
1064
1065    /// Validates that no incomplete SSE event remains after the upstream stream ends.
1066    fn finish(&self) -> Result<(), ChatStreamError> {
1067        if self.buffer.trim().is_empty() {
1068            Ok(())
1069        } else {
1070            warn!(
1071                buffered_bytes = self.buffer.len(),
1072                "upstream SSE stream ended with incomplete event"
1073            );
1074            Err(ChatStreamError::malformed_event(
1075                "upstream stream ended with an incomplete SSE event",
1076            ))
1077        }
1078    }
1079}
1080
1081/// Parsed upstream SSE event containing optional event type and joined data lines.
1082#[derive(Debug, Clone, PartialEq, Eq)]
1083struct RawSseEvent {
1084    event: Option<String>,
1085    data: String,
1086}
1087
1088/// Per-path log messages for [`classify_upstream_event`], preserving the
1089/// historical "buffered" / "streaming" / "tool-emulated" wording. Optional
1090/// entries are skipped for paths that historically did not emit them.
1091struct UpstreamEventLogMessages {
1092    event: &'static str,
1093    sse_error: &'static str,
1094    done: &'static str,
1095    parsing: Option<&'static str>,
1096    json_error: &'static str,
1097    missing_choices: &'static str,
1098    parsed: Option<&'static str>,
1099    unexpected_choice_count: &'static str,
1100}
1101
1102const BUFFERED_UPSTREAM_EVENT_LOG: UpstreamEventLogMessages = UpstreamEventLogMessages {
1103    event: "buffering upstream SSE event",
1104    sse_error: "upstream SSE error event while buffering response",
1105    done: "received upstream DONE while buffering response",
1106    parsing: Some("parsing buffered upstream chat JSON chunk"),
1107    json_error: "upstream JSON error chunk while buffering response",
1108    missing_choices: "buffered upstream chat chunk is missing choices array",
1109    parsed: Some("parsed buffered upstream chat chunk"),
1110    unexpected_choice_count: "unexpected buffered upstream choice count",
1111};
1112
1113const STREAMING_UPSTREAM_EVENT_LOG: UpstreamEventLogMessages = UpstreamEventLogMessages {
1114    event: "transforming streaming upstream SSE event",
1115    sse_error: "upstream SSE error event while streaming response",
1116    done: "received upstream DONE while streaming response",
1117    parsing: Some("parsing streaming upstream chat JSON chunk"),
1118    json_error: "upstream JSON error chunk while streaming response",
1119    missing_choices: "streaming upstream chat chunk is missing choices array",
1120    parsed: Some("parsed streaming upstream chat chunk"),
1121    unexpected_choice_count: "unexpected streaming upstream choice count",
1122};
1123
1124const TOOL_EMULATED_UPSTREAM_EVENT_LOG: UpstreamEventLogMessages = UpstreamEventLogMessages {
1125    event: "transforming tool-emulated streaming upstream SSE event",
1126    sse_error: "upstream SSE error event while streaming tool-emulated response",
1127    done: "received upstream DONE while streaming tool-emulated response",
1128    parsing: None,
1129    json_error: "upstream JSON error chunk while streaming tool-emulated response",
1130    missing_choices: "tool-emulated upstream chat chunk is missing choices array",
1131    parsed: None,
1132    unexpected_choice_count: "unexpected tool-emulated upstream choice count",
1133};
1134
1135/// A validated upstream SSE event, classified for downstream handling.
1136enum UpstreamEventKind {
1137    /// The upstream `data: [DONE]` sentinel.
1138    Done,
1139    /// A chunk with an empty `choices` array (usage-only chunk).
1140    Usage(Value),
1141    /// A chunk with exactly one choice.
1142    Choice { value: Value, choice: Value },
1143}
1144
1145/// Shared prologue for upstream SSE event handling: rejects error events and
1146/// error payloads, detects `[DONE]`, parses the JSON chunk, and validates the
1147/// `choices` array.
1148fn classify_upstream_event(
1149    event: RawSseEvent,
1150    log: &UpstreamEventLogMessages,
1151) -> Result<UpstreamEventKind, ChatStreamError> {
1152    let event_type = event.event.as_deref().unwrap_or("message");
1153    let is_done = event.data.trim() == "[DONE]";
1154    debug!(event_type, is_done, "{}", log.event);
1155
1156    if event.event.as_deref() == Some("error") {
1157        warn!("{}", log.sse_error);
1158        return Err(ChatStreamError::upstream_event(event.data));
1159    }
1160
1161    if is_done {
1162        info!("{}", log.done);
1163        return Ok(UpstreamEventKind::Done);
1164    }
1165
1166    if let Some(parsing) = log.parsing {
1167        debug!("{}", parsing);
1168    }
1169    let value: Value = serde_json::from_str(&event.data).map_err(ChatStreamError::json_event)?;
1170    if let Some(error) = value.get("error") {
1171        warn!("{}", log.json_error);
1172        return Err(ChatStreamError::upstream_event(error.to_string()));
1173    }
1174
1175    let Some(choices) = value.get("choices").and_then(Value::as_array) else {
1176        warn!("{}", log.missing_choices);
1177        return Err(ChatStreamError::malformed_event(
1178            "upstream chat chunk is missing choices array",
1179        ));
1180    };
1181    if let Some(parsed) = log.parsed {
1182        debug!(choice_count = choices.len(), "{}", parsed);
1183    }
1184
1185    if choices.is_empty() {
1186        return Ok(UpstreamEventKind::Usage(value));
1187    }
1188    if choices.len() != 1 {
1189        warn!(
1190            choice_count = choices.len(),
1191            "{}", log.unexpected_choice_count
1192        );
1193        return Err(ChatStreamError::malformed_event(format!(
1194            "expected exactly one upstream choice, got {}",
1195            choices.len(),
1196        )));
1197    }
1198
1199    let choice = choices[0].clone();
1200    Ok(UpstreamEventKind::Choice { value, choice })
1201}
1202
1203/// Shared chunk-transformation state: the E2EE codec/key used to decrypt
1204/// upstream content plus the fallback identity fields for emitted chunks.
1205struct ChunkContext {
1206    codec: E2eeCodec,
1207    proxy_instance_key: ProxyInstanceKey,
1208    fallback_id: String,
1209    fallback_created: i64,
1210    fallback_model: String,
1211}
1212
1213impl ChunkContext {
1214    /// Creates chunk transformation context with fallback identity fields for emitted chunks.
1215    fn new(codec: E2eeCodec, proxy_instance_key: ProxyInstanceKey, fallback_model: String) -> Self {
1216        Self {
1217            codec,
1218            proxy_instance_key,
1219            fallback_id: format!("chatcmpl-local-{}", uuid::Uuid::new_v4()),
1220            fallback_created: unix_timestamp_now(),
1221            fallback_model,
1222        }
1223    }
1224
1225    /// Decrypts optional upstream encrypted content using the proxy instance key.
1226    fn decrypt(&self, content: Option<&str>) -> Result<Option<String>, ChatStreamError> {
1227        self.codec
1228            .decrypt_response_content(content, self.proxy_instance_key.private_key())
1229            .map_err(ChatStreamError::decryption)
1230    }
1231
1232    /// Builds an OpenAI chat-completion chunk with one choice and fallback metadata.
1233    fn chunk_with_choice(
1234        &self,
1235        upstream: &Value,
1236        index: u64,
1237        delta: Value,
1238        finish_reason: Value,
1239    ) -> Value {
1240        json!({
1241            "id": string_field(upstream, "id").unwrap_or(&self.fallback_id),
1242            "object": string_field(upstream, "object").unwrap_or("chat.completion.chunk"),
1243            "created": integer_field(upstream, "created").unwrap_or(self.fallback_created),
1244            "model": string_field(upstream, "model").unwrap_or(&self.fallback_model),
1245            "choices": [{
1246                "index": index,
1247                "delta": delta,
1248                "finish_reason": finish_reason,
1249            }],
1250        })
1251    }
1252
1253    /// Builds an OpenAI usage-only streaming chunk from upstream metadata and usage fields.
1254    fn usage_chunk(&self, upstream: &Value, usage: &Value) -> Value {
1255        json!({
1256            "id": string_field(upstream, "id").unwrap_or(&self.fallback_id),
1257            "object": string_field(upstream, "object").unwrap_or("chat.completion.chunk"),
1258            "created": integer_field(upstream, "created").unwrap_or(self.fallback_created),
1259            "model": string_field(upstream, "model").unwrap_or(&self.fallback_model),
1260            "choices": [],
1261            "usage": usage,
1262        })
1263    }
1264}
1265
1266/// Accumulates decrypted upstream SSE chunks into a non-streaming OpenAI completion.
1267struct OpenAiChatCompletionBuffer {
1268    ctx: ChunkContext,
1269    id: Option<String>,
1270    created: Option<i64>,
1271    model: Option<String>,
1272    choice_index: Option<u64>,
1273    saw_encrypted_response_field: bool,
1274    content: String,
1275    reasoning_content: String,
1276    finish_reason: Option<Value>,
1277    usage: Option<Value>,
1278}
1279
1280impl OpenAiChatCompletionBuffer {
1281    /// Creates an empty buffered completion transformer.
1282    fn new(codec: E2eeCodec, proxy_instance_key: ProxyInstanceKey, fallback_model: String) -> Self {
1283        Self {
1284            ctx: ChunkContext::new(codec, proxy_instance_key, fallback_model),
1285            id: None,
1286            created: None,
1287            model: None,
1288            choice_index: None,
1289            saw_encrypted_response_field: false,
1290            content: String::new(),
1291            reasoning_content: String::new(),
1292            finish_reason: None,
1293            usage: None,
1294        }
1295    }
1296
1297    /// Applies one upstream SSE event and returns whether the stream reached `[DONE]`.
1298    fn handle_event(&mut self, event: RawSseEvent) -> Result<bool, ChatStreamError> {
1299        match classify_upstream_event(event, &BUFFERED_UPSTREAM_EVENT_LOG)? {
1300            UpstreamEventKind::Done => {
1301                if !self.saw_encrypted_response_field {
1302                    self.ctx.decrypt(None)?;
1303                }
1304                if self.finish_reason.is_none() {
1305                    self.finish_reason = Some(Value::String("stop".to_owned()));
1306                }
1307                Ok(true)
1308            }
1309            UpstreamEventKind::Usage(value) => {
1310                self.record_metadata(&value);
1311                self.handle_usage_chunk(&value).map(|()| false)
1312            }
1313            UpstreamEventKind::Choice { value, choice } => {
1314                self.record_metadata(&value);
1315                self.handle_choice_chunk(&choice)?;
1316                Ok(false)
1317            }
1318        }
1319    }
1320
1321    /// Stores a usage-only upstream chunk for the final buffered response.
1322    fn handle_usage_chunk(&mut self, value: &Value) -> Result<(), ChatStreamError> {
1323        let Some(usage) = value.get("usage") else {
1324            warn!("buffered upstream chunk has no choices and no usage");
1325            return Err(ChatStreamError::malformed_event(
1326                "upstream chunk has no choices and no usage",
1327            ));
1328        };
1329
1330        info!("buffered upstream usage chunk");
1331        self.usage = Some(usage.clone());
1332        Ok(())
1333    }
1334
1335    /// Decrypts and appends one upstream choice chunk into the buffered completion.
1336    fn handle_choice_chunk(&mut self, choice: &Value) -> Result<(), ChatStreamError> {
1337        let choice = choice.as_object().ok_or_else(|| {
1338            ChatStreamError::malformed_event("upstream choice must be a JSON object")
1339        })?;
1340        let index = normalized_choice_index(choice.get("index"))?;
1341        match self.choice_index {
1342            Some(existing) if existing != index => {
1343                return Err(ChatStreamError::malformed_event(
1344                    "upstream choice index changed while buffering a completion",
1345                ));
1346            }
1347            None => self.choice_index = Some(index),
1348            Some(_) => {}
1349        }
1350
1351        let finish_reason = normalized_finish_reason(choice.get("finish_reason"))?;
1352        let delta = choice.get("delta").unwrap_or(&Value::Null);
1353        let content = encrypted_delta_content(delta)?;
1354        let reasoning_content = encrypted_delta_reasoning_content(delta)?;
1355        debug!(
1356            choice_index = index,
1357            has_encrypted_content = content.is_some(),
1358            has_encrypted_reasoning_content = reasoning_content.is_some(),
1359            has_finish_reason = !finish_reason.is_null(),
1360            "transforming buffered upstream choice chunk"
1361        );
1362
1363        if let Some(content) = content {
1364            let decrypted = self.ctx.decrypt(Some(content))?;
1365            self.saw_encrypted_response_field = true;
1366            debug!(
1367                choice_index = index,
1368                has_decrypted_content = decrypted.is_some(),
1369                "decrypted buffered upstream content chunk"
1370            );
1371            if let Some(content) = decrypted {
1372                self.content.push_str(&content);
1373            }
1374        }
1375
1376        if let Some(reasoning_content) = reasoning_content {
1377            let decrypted = self.ctx.decrypt(Some(reasoning_content))?;
1378            self.saw_encrypted_response_field = true;
1379            debug!(
1380                choice_index = index,
1381                has_decrypted_reasoning_content = decrypted.is_some(),
1382                "decrypted buffered upstream reasoning content chunk"
1383            );
1384            if let Some(reasoning_content) = decrypted {
1385                self.reasoning_content.push_str(&reasoning_content);
1386            }
1387        }
1388
1389        if !finish_reason.is_null() {
1390            self.finish_reason = Some(finish_reason);
1391        }
1392
1393        Ok(())
1394    }
1395
1396    /// Records the first available upstream id, timestamp, and model metadata.
1397    fn record_metadata(&mut self, value: &Value) {
1398        if self.id.is_none()
1399            && let Some(id) = string_field(value, "id")
1400        {
1401            self.id = Some(id.to_owned());
1402        }
1403        if self.created.is_none()
1404            && let Some(created) = integer_field(value, "created")
1405        {
1406            self.created = Some(created);
1407        }
1408        if self.model.is_none()
1409            && let Some(model) = string_field(value, "model")
1410        {
1411            self.model = Some(model.to_owned());
1412        }
1413    }
1414
1415    /// Consumes the buffer and returns an OpenAI chat-completion response object.
1416    fn into_response(self) -> Value {
1417        let mut message = serde_json::Map::new();
1418        message.insert("role".to_owned(), Value::String("assistant".to_owned()));
1419        if !self.reasoning_content.is_empty() {
1420            message.insert(
1421                "reasoning_content".to_owned(),
1422                Value::String(self.reasoning_content),
1423            );
1424        }
1425        message.insert("content".to_owned(), Value::String(self.content));
1426
1427        json!({
1428            "id": self.id.unwrap_or(self.ctx.fallback_id),
1429            "object": "chat.completion",
1430            "created": self.created.unwrap_or(self.ctx.fallback_created),
1431            "model": self.model.unwrap_or(self.ctx.fallback_model),
1432            "choices": [{
1433                "index": self.choice_index.unwrap_or(0),
1434                "message": Value::Object(message),
1435                "finish_reason": self.finish_reason.unwrap_or_else(|| Value::String("stop".to_owned())),
1436            }],
1437            "usage": self.usage.unwrap_or(Value::Null),
1438        })
1439    }
1440}
1441
1442/// Finds the earliest complete SSE event delimiter in a parser buffer.
1443fn sse_event_boundary(buffer: &str) -> Option<(usize, usize)> {
1444    ["\r\n\r\n", "\n\n", "\r\r"]
1445        .into_iter()
1446        .filter_map(|delimiter| buffer.find(delimiter).map(|index| (index, delimiter.len())))
1447        .min_by_key(|(index, _)| *index)
1448}
1449
1450/// Parses one raw SSE event into its event type and data payload.
1451fn parse_sse_event(raw: &str) -> Result<Option<RawSseEvent>, ChatStreamError> {
1452    let mut event = None;
1453    let mut data_lines = Vec::new();
1454    let mut saw_non_comment_field = false;
1455
1456    for line in raw.lines() {
1457        let line = line.strip_suffix('\r').unwrap_or(line);
1458        if line.is_empty() || line.starts_with(':') {
1459            continue;
1460        }
1461
1462        saw_non_comment_field = true;
1463        let (field, value) = line.split_once(':').unwrap_or((line, ""));
1464        let value = value.strip_prefix(' ').unwrap_or(value);
1465        match field {
1466            "event" => event = Some(value.to_owned()),
1467            "data" => data_lines.push(value.to_owned()),
1468            "id" | "retry" => {}
1469            other => {
1470                warn!(field = other, "unsupported upstream SSE field");
1471                return Err(ChatStreamError::malformed_event(format!(
1472                    "unsupported upstream SSE field {other:?}",
1473                )));
1474            }
1475        }
1476    }
1477
1478    if data_lines.is_empty() {
1479        return if saw_non_comment_field {
1480            warn!("upstream SSE event did not contain a data field");
1481            Err(ChatStreamError::malformed_event(
1482                "upstream SSE event did not contain a data field",
1483            ))
1484        } else {
1485            debug!("ignored upstream SSE comment or heartbeat event");
1486            Ok(None)
1487        };
1488    }
1489
1490    debug!(
1491        event_type = event.as_deref().unwrap_or("message"),
1492        data_line_count = data_lines.len(),
1493        "parsed upstream SSE event"
1494    );
1495
1496    Ok(Some(RawSseEvent {
1497        event,
1498        data: data_lines.join("\n"),
1499    }))
1500}
1501
1502/// Transforms decrypted upstream SSE chunks into normal OpenAI streaming chat chunks.
1503struct OpenAiChatStreamTransformer {
1504    ctx: ChunkContext,
1505    include_usage_requested: bool,
1506    sent_role: bool,
1507    sent_final_finish: bool,
1508}
1509
1510impl OpenAiChatStreamTransformer {
1511    /// Creates a streaming transformer for encrypted non-tool chat responses.
1512    fn new(
1513        codec: E2eeCodec,
1514        proxy_instance_key: ProxyInstanceKey,
1515        fallback_model: String,
1516        include_usage_requested: bool,
1517    ) -> Self {
1518        Self {
1519            ctx: ChunkContext::new(codec, proxy_instance_key, fallback_model),
1520            include_usage_requested,
1521            sent_role: false,
1522            sent_final_finish: false,
1523        }
1524    }
1525
1526    /// Converts one upstream choice chunk into decrypted client-facing stream chunks.
1527    fn handle_choice_chunk(
1528        &mut self,
1529        value: &Value,
1530        choice: &Value,
1531    ) -> Result<Vec<StreamOutput>, ChatStreamError> {
1532        let choice = choice.as_object().ok_or_else(|| {
1533            ChatStreamError::malformed_event("upstream choice must be a JSON object")
1534        })?;
1535        let finish_reason = normalized_finish_reason(choice.get("finish_reason"))?;
1536        let delta = choice.get("delta").unwrap_or(&Value::Null);
1537        let content = encrypted_delta_content(delta)?;
1538        let reasoning_content = encrypted_delta_reasoning_content(delta)?;
1539        debug!(
1540            has_encrypted_content = content.is_some(),
1541            has_encrypted_reasoning_content = reasoning_content.is_some(),
1542            has_finish_reason = !finish_reason.is_null(),
1543            "transforming streaming upstream choice chunk"
1544        );
1545
1546        let mut output = Vec::new();
1547
1548        if content.is_none() && reasoning_content.is_none() {
1549            if !finish_reason.is_null() {
1550                output.push(StreamOutput::Json(self.chunk_with_choice(
1551                    value,
1552                    choice.get("index"),
1553                    json!({}),
1554                    finish_reason,
1555                )?));
1556                self.sent_final_finish = true;
1557            }
1558            return Ok(output);
1559        }
1560
1561        let decrypted_content = match content {
1562            Some(content) => self.ctx.decrypt(Some(content))?,
1563            None => None,
1564        };
1565        let decrypted_reasoning_content = match reasoning_content {
1566            Some(reasoning_content) => self.ctx.decrypt(Some(reasoning_content))?,
1567            None => None,
1568        };
1569        debug!(
1570            has_decrypted_content = decrypted_content.is_some(),
1571            has_decrypted_reasoning_content = decrypted_reasoning_content.is_some(),
1572            "decrypted streaming upstream content chunk"
1573        );
1574
1575        if decrypted_content.is_some() || decrypted_reasoning_content.is_some() {
1576            let mut delta = serde_json::Map::new();
1577
1578            if !self.sent_role {
1579                delta.insert("role".to_owned(), Value::String("assistant".to_owned()));
1580                self.sent_role = true;
1581            }
1582
1583            if let Some(reasoning_content) = decrypted_reasoning_content {
1584                delta.insert(
1585                    "reasoning_content".to_owned(),
1586                    Value::String(reasoning_content),
1587                );
1588            }
1589
1590            if let Some(content) = decrypted_content {
1591                delta.insert("content".to_owned(), Value::String(content));
1592            }
1593
1594            let final_finish = !finish_reason.is_null();
1595            let content_finish_reason = if final_finish {
1596                Value::Null
1597            } else {
1598                finish_reason.clone()
1599            };
1600            output.push(StreamOutput::Json(self.chunk_with_choice(
1601                value,
1602                choice.get("index"),
1603                Value::Object(delta),
1604                content_finish_reason,
1605            )?));
1606            if final_finish {
1607                output.push(StreamOutput::Json(self.chunk_with_choice(
1608                    value,
1609                    choice.get("index"),
1610                    json!({}),
1611                    finish_reason,
1612                )?));
1613                self.sent_final_finish = true;
1614            }
1615            return Ok(output);
1616        }
1617
1618        Ok(output)
1619    }
1620
1621    /// Converts an upstream usage-only chunk into a client-facing usage event when requested.
1622    fn handle_usage_chunk(&self, value: &Value) -> Result<Vec<StreamOutput>, ChatStreamError> {
1623        let Some(usage) = value.get("usage") else {
1624            warn!("streaming upstream chunk has no choices and no usage");
1625            return Err(ChatStreamError::malformed_event(
1626                "upstream chunk has no choices and no usage",
1627            ));
1628        };
1629
1630        // If a client requests include_usage but Venice omits a usage event,
1631        // this streaming path omits usage rather than synthesizing unverifiable
1632        // token counts.
1633        if !self.include_usage_requested {
1634            debug!("streaming upstream usage chunk ignored because client did not request usage");
1635            return Ok(Vec::new());
1636        }
1637
1638        info!("streaming upstream usage chunk forwarded");
1639        Ok(vec![StreamOutput::Json(self.ctx.usage_chunk(value, usage))])
1640    }
1641
1642    /// Builds a fallback final stop chunk for streams that reach `[DONE]` without one.
1643    fn finish_chunk(&self) -> Value {
1644        self.ctx
1645            .chunk_with_choice(&Value::Null, 0, json!({}), Value::String("stop".to_owned()))
1646    }
1647
1648    /// Builds a stream chunk after validating the optional upstream choice index.
1649    fn chunk_with_choice(
1650        &self,
1651        upstream: &Value,
1652        index: Option<&Value>,
1653        delta: Value,
1654        finish_reason: Value,
1655    ) -> Result<Value, ChatStreamError> {
1656        let index = normalized_choice_index(index)?;
1657        Ok(self
1658            .ctx
1659            .chunk_with_choice(upstream, index, delta, finish_reason))
1660    }
1661}
1662
1663impl ChatSseTransformer for OpenAiChatStreamTransformer {
1664    /// Converts one parsed upstream SSE event into normal streaming chat outputs.
1665    fn handle_event(&mut self, event: RawSseEvent) -> Result<Vec<StreamOutput>, ChatStreamError> {
1666        match classify_upstream_event(event, &STREAMING_UPSTREAM_EVENT_LOG)? {
1667            UpstreamEventKind::Done => {
1668                let mut output = Vec::new();
1669                if !self.sent_final_finish {
1670                    debug!("synthesizing final streaming finish chunk before DONE");
1671                    output.push(StreamOutput::Json(self.finish_chunk()));
1672                    self.sent_final_finish = true;
1673                }
1674                output.push(StreamOutput::Done);
1675                Ok(output)
1676            }
1677            UpstreamEventKind::Usage(value) => self.handle_usage_chunk(&value),
1678            UpstreamEventKind::Choice { value, choice } => {
1679                self.handle_choice_chunk(&value, &choice)
1680            }
1681        }
1682    }
1683}
1684
1685const TOOL_CALL_START_MARKER: &str = "<tool_call>";
1686
1687/// Streaming transformer for tool-emulated responses.
1688///
1689/// Plain assistant text is streamed until the first tool-call marker appears.
1690/// From that marker onward, output is buffered and parsed/validated as a full
1691/// tool-call block before OpenAI `delta.tool_calls` chunks are emitted.
1692struct OpenAiToolEmulatedChatStreamTransformer {
1693    ctx: ChunkContext,
1694    tool_context: ToolEmulationContext,
1695    include_usage_requested: bool,
1696    sent_role: bool,
1697    sent_final_finish: bool,
1698    pending_text: String,
1699    tool_buffer: String,
1700    buffering_tool_call: bool,
1701    emitted_tool_calls: bool,
1702    suppress_normal_output: bool,
1703}
1704
1705impl OpenAiToolEmulatedChatStreamTransformer {
1706    /// Creates a streaming transformer for encrypted tool-emulated responses.
1707    fn new(
1708        tool_context: &ToolEmulationContext,
1709        codec: E2eeCodec,
1710        proxy_instance_key: ProxyInstanceKey,
1711        fallback_model: String,
1712        include_usage_requested: bool,
1713        suppress_normal_output: bool,
1714    ) -> Result<Self, ChatStreamError> {
1715        Ok(Self {
1716            ctx: ChunkContext::new(codec, proxy_instance_key, fallback_model),
1717            tool_context: tool_context.clone(),
1718            include_usage_requested,
1719            sent_role: false,
1720            sent_final_finish: false,
1721            pending_text: String::new(),
1722            tool_buffer: String::new(),
1723            buffering_tool_call: false,
1724            emitted_tool_calls: false,
1725            suppress_normal_output,
1726        })
1727    }
1728
1729    /// Converts one upstream choice chunk into normal text, reasoning, or tool-call stream chunks.
1730    fn handle_choice_chunk(
1731        &mut self,
1732        value: &Value,
1733        choice: &Value,
1734    ) -> Result<Vec<StreamOutput>, ChatStreamError> {
1735        let choice = choice.as_object().ok_or_else(|| {
1736            ChatStreamError::malformed_event("upstream choice must be a JSON object")
1737        })?;
1738        let index = normalized_choice_index(choice.get("index"))?;
1739        let finish_reason = normalized_finish_reason(choice.get("finish_reason"))?;
1740        let delta = choice.get("delta").unwrap_or(&Value::Null);
1741        let content = encrypted_delta_content(delta)?;
1742        let reasoning_content = encrypted_delta_reasoning_content(delta)?;
1743
1744        let mut output = Vec::new();
1745
1746        if let Some(reasoning_content) = reasoning_content
1747            && let Some(reasoning_content) = self.ctx.decrypt(Some(reasoning_content))?
1748            && !self.sent_final_finish
1749            && !self.suppress_normal_output
1750        {
1751            output.push(self.reasoning_chunk(value, index, reasoning_content));
1752        }
1753
1754        if let Some(content) = content
1755            && let Some(content) = self.ctx.decrypt(Some(content))?
1756            && !self.sent_final_finish
1757        {
1758            output.extend(self.push_decrypted_content(value, index, &content)?);
1759        }
1760
1761        if !finish_reason.is_null() && !self.sent_final_finish {
1762            output.extend(self.finish_buffered_content(value, index, finish_reason)?);
1763        }
1764
1765        Ok(output)
1766    }
1767
1768    /// Buffers decrypted text until it is safe to stream or parse as tool-call output.
1769    fn push_decrypted_content(
1770        &mut self,
1771        upstream: &Value,
1772        index: u64,
1773        content: &str,
1774    ) -> Result<Vec<StreamOutput>, ChatStreamError> {
1775        if self.buffering_tool_call {
1776            self.tool_buffer.push_str(content);
1777            self.ensure_tool_buffer_within_limit()?;
1778            return Ok(Vec::new());
1779        }
1780
1781        self.pending_text.push_str(content);
1782        if let Some(marker_index) = self.pending_text.find(TOOL_CALL_START_MARKER) {
1783            let text = self.pending_text[..marker_index].to_owned();
1784            self.tool_buffer = self.pending_text[marker_index..].to_owned();
1785            self.pending_text.clear();
1786            self.buffering_tool_call = true;
1787            self.ensure_tool_buffer_within_limit()?;
1788            if self.suppress_normal_output {
1789                return Ok(Vec::new());
1790            }
1791            return Ok(self.text_chunk_if_not_empty(upstream, index, text));
1792        }
1793
1794        if self.suppress_normal_output {
1795            self.ensure_pending_text_within_limit()?;
1796            return Ok(Vec::new());
1797        }
1798
1799        let streamable_len = streamable_pending_text_len(&self.pending_text);
1800        if streamable_len == 0 {
1801            return Ok(Vec::new());
1802        }
1803
1804        let text = self.pending_text[..streamable_len].to_owned();
1805        self.pending_text.drain(..streamable_len);
1806        Ok(vec![
1807            self.text_field_chunk(upstream, index, "content", text),
1808        ])
1809    }
1810
1811    /// Flushes pending text or buffered tool-call output and emits the final finish chunk.
1812    fn finish_buffered_content(
1813        &mut self,
1814        upstream: &Value,
1815        index: u64,
1816        finish_reason: Value,
1817    ) -> Result<Vec<StreamOutput>, ChatStreamError> {
1818        let mut output = Vec::new();
1819
1820        if self.buffering_tool_call {
1821            output.extend(self.buffered_tool_call_chunks(upstream, index)?);
1822        } else if self.suppress_normal_output {
1823            let invalid_output = std::mem::take(&mut self.pending_text);
1824            return Err(ChatStreamError::invalid_tool_call(
1825                "expected corrected streamed response to include a tool call",
1826                invalid_output,
1827            ));
1828        } else if !self.pending_text.is_empty() {
1829            let text = std::mem::take(&mut self.pending_text);
1830            output.push(self.text_field_chunk(upstream, index, "content", text));
1831        }
1832
1833        let finish_reason = if self.emitted_tool_calls {
1834            Value::String("tool_calls".to_owned())
1835        } else {
1836            finish_reason
1837        };
1838        output.push(StreamOutput::Json(self.ctx.chunk_with_choice(
1839            upstream,
1840            index,
1841            json!({}),
1842            finish_reason,
1843        )));
1844        self.sent_final_finish = true;
1845        Ok(output)
1846    }
1847
1848    /// Parses buffered tool-call text and returns OpenAI streaming tool-call chunks.
1849    fn buffered_tool_call_chunks(
1850        &mut self,
1851        upstream: &Value,
1852        index: u64,
1853    ) -> Result<Vec<StreamOutput>, ChatStreamError> {
1854        self.ensure_tool_buffer_within_limit()?;
1855        match self
1856            .tool_context
1857            .classify_assistant_output(&self.tool_buffer)
1858        {
1859            ToolOutputClassification::ToolCalls(tool_calls) => {
1860                self.emitted_tool_calls = true;
1861                Ok(tool_calls
1862                    .iter()
1863                    .enumerate()
1864                    .map(|(tool_index, tool_call)| {
1865                        self.full_tool_call_chunk(upstream, index, tool_index, tool_call)
1866                    })
1867                    .collect())
1868            }
1869            ToolOutputClassification::NormalText => {
1870                let text = std::mem::take(&mut self.tool_buffer);
1871                self.buffering_tool_call = false;
1872                Ok(self.text_chunk_if_not_empty(upstream, index, text))
1873            }
1874            ToolOutputClassification::InvalidToolCall {
1875                error,
1876                invalid_output,
1877            } => {
1878                error!(
1879                    validation_error = %error,
1880                    payload_bytes = self.tool_buffer.len(),
1881                    payload = %self.tool_buffer,
1882                    "buffered streamed tool-call payload failed validation"
1883                );
1884                Err(ChatStreamError::invalid_tool_call(
1885                    format!("tool call parsing failed: {error}"),
1886                    invalid_output,
1887                ))
1888            }
1889        }
1890    }
1891
1892    /// Validates that buffered tool-call text remains within the configured byte limit.
1893    fn ensure_tool_buffer_within_limit(&self) -> Result<(), ChatStreamError> {
1894        if self.tool_buffer.len() > self.tool_context.config().tool_call_max_bytes {
1895            return Err(ChatStreamError::malformed_event(format!(
1896                "tool call output exceeded max size of {} bytes",
1897                self.tool_context.config().tool_call_max_bytes
1898            )));
1899        }
1900        Ok(())
1901    }
1902
1903    /// Validates suppressed retry text so a retry that never emits a tool marker cannot grow forever.
1904    fn ensure_pending_text_within_limit(&self) -> Result<(), ChatStreamError> {
1905        if self.pending_text.len() > self.tool_context.config().tool_call_max_bytes {
1906            return Err(ChatStreamError::invalid_tool_call(
1907                format!(
1908                    "corrected streamed response exceeded the tool call max size of {} bytes before emitting a tool call",
1909                    self.tool_context.config().tool_call_max_bytes
1910                ),
1911                self.pending_text.clone(),
1912            ));
1913        }
1914        Ok(())
1915    }
1916
1917    /// Builds zero or one streaming content chunks from decrypted assistant text.
1918    fn text_chunk_if_not_empty(
1919        &mut self,
1920        upstream: &Value,
1921        index: u64,
1922        text: String,
1923    ) -> Vec<StreamOutput> {
1924        if text.is_empty() {
1925            Vec::new()
1926        } else {
1927            vec![self.text_field_chunk(upstream, index, "content", text)]
1928        }
1929    }
1930
1931    /// Builds a streaming reasoning-content chunk from decrypted assistant text.
1932    fn reasoning_chunk(
1933        &mut self,
1934        upstream: &Value,
1935        index: u64,
1936        reasoning_content: String,
1937    ) -> StreamOutput {
1938        self.text_field_chunk(upstream, index, "reasoning_content", reasoning_content)
1939    }
1940
1941    /// Builds a streaming delta containing one assistant text field.
1942    fn text_field_chunk(
1943        &mut self,
1944        upstream: &Value,
1945        index: u64,
1946        field: &'static str,
1947        text: String,
1948    ) -> StreamOutput {
1949        let mut delta = serde_json::Map::new();
1950        self.insert_role_if_needed(&mut delta);
1951        delta.insert(field.to_owned(), Value::String(text));
1952
1953        StreamOutput::Json(self.ctx.chunk_with_choice(
1954            upstream,
1955            index,
1956            Value::Object(delta),
1957            Value::Null,
1958        ))
1959    }
1960
1961    /// Inserts the assistant role into the first emitted streaming delta.
1962    fn insert_role_if_needed(&mut self, delta: &mut serde_json::Map<String, Value>) {
1963        if !self.sent_role {
1964            delta.insert("role".to_owned(), Value::String("assistant".to_owned()));
1965            self.sent_role = true;
1966        }
1967    }
1968
1969    /// Builds a streaming delta containing one complete validated tool call.
1970    fn full_tool_call_chunk(
1971        &mut self,
1972        upstream: &Value,
1973        index: u64,
1974        tool_index: usize,
1975        tool_call: &ValidatedToolCall,
1976    ) -> StreamOutput {
1977        let mut delta = serde_json::Map::new();
1978        self.insert_role_if_needed(&mut delta);
1979
1980        let mut tool_call_value = tool_call.to_openai_value();
1981
1982        if let Some(tool_call_object) = tool_call_value.as_object_mut() {
1983            tool_call_object.insert("index".to_owned(), json!(tool_index));
1984        }
1985        delta.insert("tool_calls".to_owned(), Value::Array(vec![tool_call_value]));
1986
1987        StreamOutput::Json(self.ctx.chunk_with_choice(
1988            upstream,
1989            index,
1990            Value::Object(delta),
1991            Value::Null,
1992        ))
1993    }
1994
1995    /// Converts a tool-emulated usage-only chunk into a client-facing usage event when requested.
1996    fn handle_usage_chunk(&self, value: &Value) -> Result<Vec<StreamOutput>, ChatStreamError> {
1997        let Some(usage) = value.get("usage") else {
1998            warn!("tool-emulated upstream chunk has no choices and no usage");
1999            return Err(ChatStreamError::malformed_event(
2000                "upstream chunk has no choices and no usage",
2001            ));
2002        };
2003
2004        // OpenAI include_usage permits a usage-only event after the finish chunk.
2005        if !self.include_usage_requested {
2006            return Ok(Vec::new());
2007        }
2008
2009        Ok(vec![StreamOutput::Json(self.ctx.usage_chunk(value, usage))])
2010    }
2011
2012    /// Finishes the stream by flushing buffered output and adding the `[DONE]` sentinel.
2013    fn finish_stream(&mut self) -> Result<Vec<StreamOutput>, ChatStreamError> {
2014        let upstream = &Value::Null;
2015        let mut output = Vec::new();
2016
2017        if !self.sent_final_finish {
2018            output.extend(self.finish_buffered_content(
2019                upstream,
2020                0,
2021                Value::String("stop".to_owned()),
2022            )?);
2023        }
2024
2025        output.push(StreamOutput::Done);
2026        Ok(output)
2027    }
2028}
2029
2030/// Returns how many bytes of pending text can be streamed without splitting a possible tool marker.
2031fn streamable_pending_text_len(pending_text: &str) -> usize {
2032    let protected_suffix_len = TOOL_CALL_START_MARKER.len().saturating_sub(1);
2033    if pending_text.len() <= protected_suffix_len {
2034        return 0;
2035    }
2036
2037    let mut split_at = pending_text.len() - protected_suffix_len;
2038    while !pending_text.is_char_boundary(split_at) {
2039        split_at -= 1;
2040    }
2041    split_at
2042}
2043
2044impl ChatSseTransformer for OpenAiToolEmulatedChatStreamTransformer {
2045    /// Converts one parsed upstream SSE event into tool-emulated streaming outputs.
2046    fn handle_event(&mut self, event: RawSseEvent) -> Result<Vec<StreamOutput>, ChatStreamError> {
2047        match classify_upstream_event(event, &TOOL_EMULATED_UPSTREAM_EVENT_LOG)? {
2048            UpstreamEventKind::Done => self.finish_stream(),
2049            UpstreamEventKind::Usage(value) => self.handle_usage_chunk(&value),
2050            UpstreamEventKind::Choice { value, choice } => {
2051                self.handle_choice_chunk(&value, &choice)
2052            }
2053        }
2054    }
2055}
2056
2057/// Client-facing output emitted by chat SSE transformers.
2058#[derive(Debug, Clone, PartialEq, Eq)]
2059enum StreamOutput {
2060    Json(Value),
2061    Done,
2062}
2063
2064/// Reads an optional upstream choice index as a non-negative integer, defaulting to zero.
2065fn normalized_choice_index(index: Option<&Value>) -> Result<u64, ChatStreamError> {
2066    match index {
2067        Some(Value::Number(number)) => number.as_u64().ok_or_else(|| {
2068            ChatStreamError::malformed_event("upstream choice index must be a non-negative integer")
2069        }),
2070        Some(_) => Err(ChatStreamError::malformed_event(
2071            "upstream choice index must be a non-negative integer",
2072        )),
2073        None => Ok(0),
2074    }
2075}
2076
2077/// Reads an upstream finish reason as a string or null value.
2078fn normalized_finish_reason(value: Option<&Value>) -> Result<Value, ChatStreamError> {
2079    match value {
2080        Some(Value::Null) | None => Ok(Value::Null),
2081        Some(Value::String(reason)) => Ok(Value::String(reason.clone())),
2082        Some(_) => Err(ChatStreamError::malformed_event(
2083            "upstream finish_reason must be a string or null",
2084        )),
2085    }
2086}
2087
2088/// Reads the encrypted `delta.content` field from an upstream chunk.
2089fn encrypted_delta_content(delta: &Value) -> Result<Option<&str>, ChatStreamError> {
2090    encrypted_delta_text_field(delta, "content")
2091}
2092
2093/// Reads the encrypted `delta.reasoning_content` field from an upstream chunk.
2094fn encrypted_delta_reasoning_content(delta: &Value) -> Result<Option<&str>, ChatStreamError> {
2095    encrypted_delta_text_field(delta, "reasoning_content")
2096}
2097
2098/// Reads an optional encrypted text field from an upstream delta object.
2099fn encrypted_delta_text_field<'a>(
2100    delta: &'a Value,
2101    field: &'static str,
2102) -> Result<Option<&'a str>, ChatStreamError> {
2103    match delta.get(field) {
2104        Some(Value::Null) => {
2105            debug!(field, "ignoring null upstream delta text field");
2106            Ok(None)
2107        }
2108        Some(Value::String(content)) if content.is_empty() => {
2109            debug!(field, "ignoring empty upstream delta text field");
2110            Ok(None)
2111        }
2112        Some(Value::String(content)) => Ok(Some(content.as_str())),
2113        Some(_) => Err(ChatStreamError::malformed_event(format!(
2114            "upstream delta.{field} must be a string or null"
2115        ))),
2116        None => Ok(None),
2117    }
2118}
2119
2120/// Reads an optional string field from a JSON object.
2121fn string_field<'a>(value: &'a Value, field: &str) -> Option<&'a str> {
2122    value.get(field).and_then(Value::as_str)
2123}
2124
2125/// Reads an optional signed integer field from a JSON object.
2126fn integer_field(value: &Value, field: &str) -> Option<i64> {
2127    value.get(field).and_then(Value::as_i64)
2128}
2129
2130/// Returns the current Unix timestamp in seconds, or zero if the system clock is before the epoch.
2131fn unix_timestamp_now() -> i64 {
2132    SystemTime::now()
2133        .duration_since(UNIX_EPOCH)
2134        .map(|duration| duration.as_secs() as i64)
2135        .unwrap_or(0)
2136}
2137
2138/// Builds a method-not-allowed proxy error for an unsupported route method.
2139async fn method_not_allowed(method: Method, uri: Uri) -> ProxyError {
2140    ProxyError::MethodNotAllowed { method, uri }
2141}
2142
2143/// Builds a not-found proxy error for unmatched routes.
2144async fn not_found(uri: Uri) -> ProxyError {
2145    ProxyError::NotFound { uri }
2146}
2147
2148/// Errors returned while parsing, decrypting, or transforming upstream chat streams.
2149#[derive(Debug, Error)]
2150pub enum ChatStreamError {
2151    #[error("Venice upstream stream failed: {message}")]
2152    UpstreamStream { message: String },
2153    #[error("Venice upstream stream emitted an error event: {message}")]
2154    UpstreamEvent { message: String },
2155    #[error("Venice upstream stream event is malformed: {message}")]
2156    MalformedEvent { message: String },
2157    #[error("streamed tool call failed validation: {validation_error}")]
2158    InvalidToolCall {
2159        validation_error: String,
2160        invalid_output: String,
2161    },
2162    #[error("failed to decrypt Venice E2EE response chunk: {source}")]
2163    Decryption { source: E2eeCodecError },
2164}
2165
2166impl ChatStreamError {
2167    /// Converts an upstream chunk-read failure into a stream error.
2168    fn upstream_stream(source: reqwest::Error) -> Self {
2169        Self::UpstreamStream {
2170            message: source.to_string(),
2171        }
2172    }
2173
2174    /// Creates an error for an upstream SSE or JSON error event.
2175    fn upstream_event(message: impl Into<String>) -> Self {
2176        Self::UpstreamEvent {
2177            message: message.into(),
2178        }
2179    }
2180
2181    /// Creates an error for malformed upstream stream content.
2182    fn malformed_event(message: impl Into<String>) -> Self {
2183        Self::MalformedEvent {
2184            message: message.into(),
2185        }
2186    }
2187
2188    /// Converts invalid UTF-8 upstream bytes into a malformed-event error.
2189    fn invalid_utf8(source: std::str::Utf8Error) -> Self {
2190        Self::MalformedEvent {
2191            message: format!("upstream SSE bytes are not valid UTF-8: {source}"),
2192        }
2193    }
2194
2195    /// Creates a structured error for retryable streamed tool-call validation failures.
2196    fn invalid_tool_call(
2197        validation_error: impl Into<String>,
2198        invalid_output: impl Into<String>,
2199    ) -> Self {
2200        Self::InvalidToolCall {
2201            validation_error: validation_error.into(),
2202            invalid_output: invalid_output.into(),
2203        }
2204    }
2205
2206    /// Converts invalid upstream JSON SSE data into a malformed-event error.
2207    fn json_event(source: serde_json::Error) -> Self {
2208        Self::MalformedEvent {
2209            message: format!("upstream SSE data is not valid JSON: {source}"),
2210        }
2211    }
2212
2213    /// Converts E2EE decryption failure into a chat-stream error.
2214    fn decryption(source: E2eeCodecError) -> Self {
2215        Self::Decryption { source }
2216    }
2217
2218    /// Returns the OpenAI-compatible error type exposed for this stream error.
2219    fn api_error_type(&self) -> &'static str {
2220        match self {
2221            Self::UpstreamStream { .. }
2222            | Self::UpstreamEvent { .. }
2223            | Self::MalformedEvent { .. }
2224            | Self::InvalidToolCall { .. } => "proxy_upstream_error",
2225            Self::Decryption { .. } => "proxy_e2ee_error",
2226        }
2227    }
2228
2229    /// Returns the proxy error code exposed for this stream error.
2230    fn api_error_code(&self) -> &'static str {
2231        match self {
2232            Self::UpstreamStream { .. } => "upstream_stream_error",
2233            Self::UpstreamEvent { .. } => "upstream_stream_error",
2234            Self::MalformedEvent { .. } => "upstream_malformed_response",
2235            Self::InvalidToolCall { .. } => "tool_call_validation_failed",
2236            Self::Decryption { .. } => "e2ee_response_decryption_failed",
2237        }
2238    }
2239}
2240
2241/// Errors returned by HTTP handlers and rendered as OpenAI-compatible JSON errors.
2242#[derive(Debug, Error)]
2243pub enum ProxyError {
2244    #[error(transparent)]
2245    Venice(#[from] VeniceClientError),
2246    #[error(transparent)]
2247    Attestation(#[from] AttestationError),
2248    #[error(transparent)]
2249    Session(#[from] SessionError),
2250    #[error(transparent)]
2251    ChatRequest(#[from] ChatRequestError),
2252    #[error(transparent)]
2253    ChatConstruction(#[from] ChatConstructionError),
2254    #[error(transparent)]
2255    ChatStream(#[from] ChatStreamError),
2256    #[error("The model failed to produce a valid tool call after correction attempts.")]
2257    ToolCallRetryExhausted {
2258        max_retries: u32,
2259        last_validation_error: String,
2260    },
2261    #[error(
2262        "proxy instance key is unavailable; keys.generate_proxy_instance_key_on_startup must be enabled for E2EE chat requests"
2263    )]
2264    ProxyInstanceKeyUnavailable,
2265    #[error("session does not contain an attested model public key after attestation verification")]
2266    MissingAttestedModelKey,
2267    #[error("method {method} is not supported for {uri}")]
2268    MethodNotAllowed { method: Method, uri: Uri },
2269    #[error("route {uri} was not found")]
2270    NotFound { uri: Uri },
2271}
2272
2273impl ProxyError {
2274    /// Returns the HTTP status code for this proxy error.
2275    fn status(&self) -> StatusCode {
2276        match self {
2277            Self::Venice(_) => StatusCode::BAD_GATEWAY,
2278            Self::Attestation(error) if error.verifier_unavailable() => {
2279                StatusCode::SERVICE_UNAVAILABLE
2280            }
2281            Self::Attestation(_) => StatusCode::BAD_GATEWAY,
2282            Self::Session(
2283                SessionError::MissingSessionIdentifier | SessionError::InvalidHeaderValue { .. },
2284            ) => StatusCode::BAD_REQUEST,
2285            Self::Session(_) => StatusCode::INTERNAL_SERVER_ERROR,
2286            Self::ChatRequest(_) => StatusCode::BAD_REQUEST,
2287            Self::ChatConstruction(_)
2288            | Self::ChatStream(_)
2289            | Self::ToolCallRetryExhausted { .. } => StatusCode::BAD_GATEWAY,
2290            Self::ProxyInstanceKeyUnavailable | Self::MissingAttestedModelKey => {
2291                StatusCode::INTERNAL_SERVER_ERROR
2292            }
2293            Self::MethodNotAllowed { .. } => StatusCode::METHOD_NOT_ALLOWED,
2294            Self::NotFound { .. } => StatusCode::NOT_FOUND,
2295        }
2296    }
2297
2298    /// Returns the OpenAI-compatible error type for this proxy error.
2299    fn error_type(&self) -> &'static str {
2300        match self {
2301            Self::Venice(error) => error.api_error_type(),
2302            Self::Attestation(error) => error.api_error_type(),
2303            Self::Session(
2304                SessionError::MissingSessionIdentifier | SessionError::InvalidHeaderValue { .. },
2305            ) => "invalid_request_error",
2306            Self::Session(_) => "proxy_session_error",
2307            Self::ChatRequest(_) => "invalid_request_error",
2308            Self::ChatConstruction(_) => "proxy_e2ee_error",
2309            Self::ChatStream(error) => error.api_error_type(),
2310            Self::ToolCallRetryExhausted { .. } => "proxy_tool_call_error",
2311            Self::ProxyInstanceKeyUnavailable => "proxy_configuration_error",
2312            Self::MissingAttestedModelKey => "proxy_attestation_error",
2313            Self::MethodNotAllowed { .. } | Self::NotFound { .. } => "invalid_request_error",
2314        }
2315    }
2316
2317    /// Returns the stable proxy error code for this proxy error.
2318    fn code(&self) -> &'static str {
2319        match self {
2320            Self::Venice(error) => error.api_error_code(),
2321            Self::Attestation(error) => error.api_error_code(),
2322            Self::Session(SessionError::MissingSessionIdentifier) => "session_identifier_missing",
2323            Self::Session(SessionError::InvalidHeaderValue { .. }) => "invalid_session_header",
2324            Self::Session(_) => "session_error",
2325            Self::ChatRequest(error) => error.api_error_code(),
2326            Self::ChatConstruction(error) => error.api_error_code(),
2327            Self::ChatStream(error) => error.api_error_code(),
2328            Self::ToolCallRetryExhausted { .. } => "invalid_tool_call",
2329            Self::ProxyInstanceKeyUnavailable => "proxy_instance_key_unavailable",
2330            Self::MissingAttestedModelKey => "attestation_failed",
2331            Self::MethodNotAllowed { .. } => "method_not_allowed",
2332            Self::NotFound { .. } => "not_found",
2333        }
2334    }
2335}
2336
2337impl IntoResponse for ProxyError {
2338    /// Converts a proxy error into an OpenAI-compatible JSON HTTP response.
2339    fn into_response(self) -> Response {
2340        let status = self.status();
2341        let error_code = self.code();
2342        let error_type = self.error_type();
2343
2344        if status.is_server_error() {
2345            error!(
2346                status = status.as_u16(),
2347                error_code,
2348                error_type,
2349                error = %self,
2350                "proxy request failed"
2351            );
2352        } else {
2353            warn!(
2354                status = status.as_u16(),
2355                error_code,
2356                error_type,
2357                error = %self,
2358                "proxy request rejected"
2359            );
2360        }
2361
2362        let mut response = if let Self::ToolCallRetryExhausted {
2363            max_retries,
2364            last_validation_error,
2365        } = &self
2366        {
2367            let body = json!({
2368                "error": {
2369                    "message": self.to_string(),
2370                    "type": error_type,
2371                    "code": error_code,
2372                    "details": {
2373                        "max_retries": max_retries,
2374                        "last_validation_error": last_validation_error,
2375                    },
2376                }
2377            });
2378            (status, Json(body)).into_response()
2379        } else {
2380            let body = ErrorResponse::new(self.to_string(), error_type, error_code);
2381            (status, Json(body)).into_response()
2382        };
2383
2384        apply_error_headers(response.headers_mut(), error_code);
2385        response
2386    }
2387}
2388
2389/// Safe proxy metadata headers.
2390///
2391/// Fields are optional so handlers never claim E2EE, attestation, key-binding,
2392/// or session verification that has not happened yet.
2393#[derive(Debug, Clone, Default, PartialEq, Eq)]
2394pub struct ProxyMetadataHeaders {
2395    pub e2ee: Option<String>,
2396    pub attestation_mode: Option<String>,
2397    pub attested_model: Option<String>,
2398    pub tee_provider: Option<String>,
2399    pub tdx_verified: Option<bool>,
2400    pub tdx_debug: Option<bool>,
2401    pub nvidia_verified: Option<String>,
2402    pub key_binding: Option<bool>,
2403    pub session_id: Option<String>,
2404    pub session_scope: Option<String>,
2405    pub tool_mode: Option<String>,
2406    pub tool_retries: Option<u32>,
2407}
2408
2409impl ProxyMetadataHeaders {
2410    /// Creates safe non-assertive metadata from config before a route has
2411    /// verification/session state.
2412    pub fn from_config(config: &ProxyConfig) -> Self {
2413        Self {
2414            attestation_mode: Some(config.attestation.mode.as_str().to_owned()),
2415            tool_mode: Some(config.tools.mode.as_str().to_owned()),
2416            ..Self::default()
2417        }
2418    }
2419
2420    /// Creates metadata headers for a chat response after session attestation succeeds.
2421    pub fn for_verified_chat(config: &ProxyConfig, session: &SessionContext) -> Self {
2422        let tee_provider = session
2423            .attestation_tee_provider
2424            .clone()
2425            .unwrap_or_else(|| "unknown".to_owned());
2426        let tdx_debug = session.attestation_tdx_debug;
2427        let nvidia_verified = session
2428            .attestation_nvidia_verified
2429            .clone()
2430            .unwrap_or_else(|| "not-present".to_owned());
2431
2432        Self {
2433            e2ee: Some("verified".to_owned()),
2434            attestation_mode: Some(config.attestation.mode.as_str().to_owned()),
2435            attested_model: Some(session.model_id.clone()),
2436            tee_provider: Some(tee_provider),
2437            tdx_verified: config.attestation.require_tdx.then_some(true),
2438            tdx_debug,
2439            nvidia_verified: Some(nvidia_verified),
2440            key_binding: Some(true),
2441            session_id: Some(session.agent_session_id.clone()),
2442            session_scope: Some(session.scope.as_str().to_owned()),
2443            tool_mode: Some(config.tools.mode.as_str().to_owned()),
2444            tool_retries: None,
2445        }
2446    }
2447
2448    /// Applies all present metadata fields to an HTTP header map.
2449    pub fn apply(&self, headers: &mut HeaderMap) {
2450        insert_optional_header(headers, HEADER_PROXY_E2EE, self.e2ee.as_deref());
2451        insert_optional_header(
2452            headers,
2453            HEADER_PROXY_ATTESTATION_MODE,
2454            self.attestation_mode.as_deref(),
2455        );
2456        insert_optional_header(
2457            headers,
2458            HEADER_PROXY_ATTESTED_MODEL,
2459            self.attested_model.as_deref(),
2460        );
2461        insert_optional_header(
2462            headers,
2463            HEADER_PROXY_TEE_PROVIDER,
2464            self.tee_provider.as_deref(),
2465        );
2466        insert_optional_bool_header(headers, HEADER_PROXY_TDX_VERIFIED, self.tdx_verified);
2467        insert_optional_bool_header(headers, HEADER_PROXY_TDX_DEBUG, self.tdx_debug);
2468        insert_optional_header(
2469            headers,
2470            HEADER_PROXY_NVIDIA_VERIFIED,
2471            self.nvidia_verified.as_deref(),
2472        );
2473        insert_optional_bool_header(headers, HEADER_PROXY_KEY_BINDING, self.key_binding);
2474        insert_optional_header(headers, HEADER_PROXY_SESSION_ID, self.session_id.as_deref());
2475        insert_optional_header(
2476            headers,
2477            HEADER_PROXY_SESSION_SCOPE,
2478            self.session_scope.as_deref(),
2479        );
2480        insert_optional_header(headers, HEADER_PROXY_TOOL_MODE, self.tool_mode.as_deref());
2481        if let Some(tool_retries) = self.tool_retries {
2482            insert_header(
2483                headers,
2484                HEADER_PROXY_TOOL_RETRIES,
2485                &tool_retries.to_string(),
2486            );
2487        }
2488    }
2489}
2490
2491/// Applies proxy error metadata headers to an HTTP error response.
2492pub fn apply_error_headers(headers: &mut HeaderMap, error_code: &str) {
2493    insert_header(headers, HEADER_PROXY_ERROR_CODE, error_code);
2494}
2495
2496/// Inserts a string header only when a value is present.
2497fn insert_optional_header(headers: &mut HeaderMap, name: &'static str, value: Option<&str>) {
2498    if let Some(value) = value {
2499        insert_header(headers, name, value);
2500    }
2501}
2502
2503/// Inserts a boolean header only when a value is present.
2504fn insert_optional_bool_header(headers: &mut HeaderMap, name: &'static str, value: Option<bool>) {
2505    if let Some(value) = value {
2506        insert_header(headers, name, if value { "true" } else { "false" });
2507    }
2508}
2509
2510/// Inserts a header when both the name and value are valid HTTP header components.
2511fn insert_header(headers: &mut HeaderMap, name: &'static str, value: &str) {
2512    let Ok(name) = HeaderName::from_bytes(name.as_bytes()) else {
2513        return;
2514    };
2515    let Ok(value) = HeaderValue::from_str(value) else {
2516        return;
2517    };
2518    headers.insert(name, value);
2519}
2520
2521#[cfg(test)]
2522mod tests {
2523    use super::*;
2524    use std::{
2525        collections::{HashMap, VecDeque},
2526        sync::{Arc, Mutex},
2527        time::Duration,
2528    };
2529
2530    use axum::{
2531        body::Body,
2532        extract::Query,
2533        http::Request,
2534        routing::{get, post},
2535    };
2536    use serde_json::json;
2537
2538    use crate::config::NvidiaRequirement;
2539    use tower::ServiceExt;
2540
2541    fn test_app() -> Router {
2542        router_with_venice_client(ProxyConfig::default(), test_venice_client())
2543    }
2544
2545    fn test_venice_client() -> VeniceClient {
2546        test_venice_client_for_base_url("http://127.0.0.1:1/api/v1")
2547    }
2548
2549    fn test_venice_client_for_base_url(base_url: impl AsRef<str>) -> VeniceClient {
2550        VeniceClient::new(base_url.as_ref(), "test-api-key", Duration::from_secs(1))
2551            .expect("test Venice client should build")
2552    }
2553
2554    fn chat_config_with_basic_test_attestation() -> ProxyConfig {
2555        let mut config = ProxyConfig::default();
2556        config.attestation.require_tdx = false;
2557        config.attestation.require_nvidia = NvidiaRequirement::Never;
2558        config
2559    }
2560
2561    #[test]
2562    fn app_state_initializes_key_and_session_managers_from_config() {
2563        let state = AppState::from_parts(ProxyConfig::default(), test_venice_client());
2564
2565        let key = state
2566            .proxy_instance_key()
2567            .expect("default config should generate startup key");
2568        assert_eq!(key.public_key_hex().len(), 130);
2569        assert!(state.session_manager().is_empty());
2570        assert_eq!(
2571            state.attestation_verifier().policy(),
2572            &ProxyConfig::default().attestation
2573        );
2574
2575        let mut config = ProxyConfig::default();
2576        config.keys.generate_proxy_instance_key_on_startup = false;
2577        let state = AppState::from_parts(config, test_venice_client());
2578        assert!(state.proxy_instance_key().is_none());
2579    }
2580
2581    async fn error_body(response: Response) -> ErrorResponse {
2582        let bytes = axum::body::to_bytes(response.into_body(), usize::MAX)
2583            .await
2584            .expect("response body should buffer");
2585        serde_json::from_slice(&bytes).expect("response should be OpenAI-style error JSON")
2586    }
2587
2588    #[tokio::test]
2589    async fn chat_route_ignores_upstream_role_only_chunk_before_encrypted_content() {
2590        let response = streaming_chat_response(
2591            "chat-route-role-only",
2592            r#"{"model":"e2ee-test","messages":[{"role":"user","content":"hello"}],"stream":true}"#,
2593            vec![
2594                MockStreamFrame::Role,
2595                MockStreamFrame::Text("Hello"),
2596                MockStreamFrame::Finish("stop"),
2597                MockStreamFrame::Done,
2598            ],
2599        )
2600        .await;
2601
2602        assert_eq!(response.status(), StatusCode::OK);
2603        let body = response_body(response).await;
2604        let data = sse_data(&body);
2605        assert_eq!(data.len(), 3);
2606        let first: Value = serde_json::from_str(data[0]).expect("first chunk should be JSON");
2607        assert_eq!(first["choices"][0]["delta"]["role"], "assistant");
2608        assert_eq!(first["choices"][0]["delta"]["content"], "Hello");
2609        assert_eq!(data[2], "[DONE]");
2610    }
2611
2612    #[tokio::test]
2613    async fn chat_route_streams_decrypted_normal_assistant_text() {
2614        let response = streaming_chat_response(
2615            "chat-route-test",
2616            r#"{"model":"e2ee-test","messages":[{"role":"user","content":"hello"}],"stream":true}"#,
2617            vec![
2618                MockStreamFrame::NullContent,
2619                MockStreamFrame::EmptyContent,
2620                MockStreamFrame::Text("Hello"),
2621                MockStreamFrame::Finish("stop"),
2622                MockStreamFrame::Done,
2623            ],
2624        )
2625        .await;
2626
2627        assert_eq!(response.status(), StatusCode::OK);
2628        assert_eq!(
2629            response.headers().get(HEADER_PROXY_E2EE).unwrap(),
2630            "verified"
2631        );
2632        assert_eq!(
2633            response.headers().get(HEADER_PROXY_ATTESTED_MODEL).unwrap(),
2634            "e2ee-test"
2635        );
2636
2637        let body = response_body(response).await;
2638        let data = sse_data(&body);
2639        assert_eq!(data.len(), 3);
2640
2641        let first: Value = serde_json::from_str(data[0]).expect("first chunk should be JSON");
2642        assert_eq!(first["object"], "chat.completion.chunk");
2643        assert_eq!(first["model"], "e2ee-test");
2644        assert_eq!(first["choices"][0]["delta"]["role"], "assistant");
2645        assert_eq!(first["choices"][0]["delta"]["content"], "Hello");
2646        assert!(first["choices"][0]["finish_reason"].is_null());
2647
2648        let final_chunk: Value = serde_json::from_str(data[1]).expect("final chunk should be JSON");
2649        assert_eq!(final_chunk["choices"][0]["delta"], json!({}));
2650        assert_eq!(final_chunk["choices"][0]["finish_reason"], "stop");
2651        assert_eq!(data[2], "[DONE]");
2652    }
2653
2654    #[tokio::test]
2655    async fn chat_route_streams_decrypted_reasoning_content() {
2656        let response = streaming_chat_response(
2657            "chat-route-reasoning-stream",
2658            r#"{"model":"e2ee-test","messages":[{"role":"user","content":"hello"}],"stream":true,"reasoning":{"effort":"high"}}"#,
2659            vec![
2660                MockStreamFrame::Reasoning("Thinking"),
2661                MockStreamFrame::Text("Answer"),
2662                MockStreamFrame::Finish("stop"),
2663                MockStreamFrame::Done,
2664            ],
2665        )
2666        .await;
2667
2668        assert_eq!(response.status(), StatusCode::OK);
2669        let body = response_body(response).await;
2670        let data = sse_data(&body);
2671        assert_eq!(data.len(), 4);
2672        let reasoning: Value =
2673            serde_json::from_str(data[0]).expect("reasoning chunk should be JSON");
2674        let answer: Value = serde_json::from_str(data[1]).expect("answer chunk should be JSON");
2675
2676        assert_eq!(reasoning["choices"][0]["delta"]["role"], "assistant");
2677        assert_eq!(
2678            reasoning["choices"][0]["delta"]["reasoning_content"],
2679            "Thinking"
2680        );
2681        assert!(answer["choices"][0]["delta"].get("role").is_none());
2682        assert_eq!(answer["choices"][0]["delta"]["content"], "Answer");
2683        assert_eq!(data.last().copied(), Some("[DONE]"));
2684    }
2685
2686    #[tokio::test]
2687    async fn chat_route_streams_multiple_decrypted_content_chunks() {
2688        let response = streaming_chat_response(
2689            "chat-route-multiple-chunks",
2690            r#"{"model":"e2ee-test","messages":[{"role":"user","content":"hello"}],"stream":true}"#,
2691            vec![
2692                MockStreamFrame::Text("Hello"),
2693                MockStreamFrame::Text(" world"),
2694                MockStreamFrame::Finish("stop"),
2695                MockStreamFrame::Done,
2696            ],
2697        )
2698        .await;
2699
2700        assert_eq!(response.status(), StatusCode::OK);
2701        let body = response_body(response).await;
2702        let data = sse_data(&body);
2703        let first: Value = serde_json::from_str(data[0]).expect("first chunk should be JSON");
2704        let second: Value = serde_json::from_str(data[1]).expect("second chunk should be JSON");
2705
2706        assert_eq!(first["choices"][0]["delta"]["role"], "assistant");
2707        assert_eq!(first["choices"][0]["delta"]["content"], "Hello");
2708        assert!(second["choices"][0]["delta"].get("role").is_none());
2709        assert_eq!(second["choices"][0]["delta"]["content"], " world");
2710        assert_eq!(data.last().copied(), Some("[DONE]"));
2711    }
2712
2713    #[tokio::test]
2714    async fn chat_route_passes_through_usage_chunk_when_requested_and_upstream_provides_it() {
2715        let response = streaming_chat_response(
2716            "chat-route-usage",
2717            r#"{"model":"e2ee-test","messages":[{"role":"user","content":"hello"}],"stream":true,"stream_options":{"include_usage":true}}"#,
2718            vec![
2719                MockStreamFrame::Text("Hello"),
2720                MockStreamFrame::Finish("stop"),
2721                MockStreamFrame::Usage,
2722                MockStreamFrame::Done,
2723            ],
2724        )
2725        .await;
2726
2727        assert_eq!(response.status(), StatusCode::OK);
2728        let body = response_body(response).await;
2729        let data = sse_data(&body);
2730        assert_eq!(data.len(), 4);
2731        let usage_chunk: Value = serde_json::from_str(data[2]).expect("usage chunk should be JSON");
2732        assert_eq!(usage_chunk["choices"], json!([]));
2733        assert_eq!(usage_chunk["usage"]["total_tokens"], 3);
2734        assert_eq!(data[3], "[DONE]");
2735    }
2736
2737    #[tokio::test]
2738    async fn chat_route_returns_buffered_non_streaming_completion() {
2739        let response = chat_response(
2740            "chat-route-non-streaming-success",
2741            r#"{"model":"e2ee-test","messages":[{"role":"user","content":"hello"}],"stream":false}"#,
2742            vec![
2743                MockStreamFrame::NullContent,
2744                MockStreamFrame::EmptyContent,
2745                MockStreamFrame::Text("Hello"),
2746                MockStreamFrame::Text(" world"),
2747                MockStreamFrame::Finish("stop"),
2748                MockStreamFrame::Done,
2749            ],
2750        )
2751        .await;
2752
2753        assert_eq!(response.status(), StatusCode::OK);
2754        assert_eq!(
2755            response.headers().get(HEADER_PROXY_E2EE).unwrap(),
2756            "verified"
2757        );
2758        let body = json_body(response).await;
2759        assert_eq!(body["object"], "chat.completion");
2760        assert_eq!(body["id"], "chatcmpl-upstream-test");
2761        assert_eq!(body["created"], 1_717_171_717);
2762        assert_eq!(body["model"], "e2ee-test");
2763        assert_eq!(body["choices"][0]["index"], 0);
2764        assert_eq!(body["choices"][0]["message"]["role"], "assistant");
2765        assert_eq!(body["choices"][0]["message"]["content"], "Hello world");
2766        assert_eq!(body["choices"][0]["finish_reason"], "stop");
2767        assert!(body["usage"].is_null());
2768    }
2769
2770    #[tokio::test]
2771    async fn chat_route_returns_buffered_reasoning_content() {
2772        let response = chat_response(
2773            "chat-route-reasoning-non-streaming",
2774            r#"{"model":"e2ee-test","messages":[{"role":"user","content":"hello"}],"stream":false,"reasoning_effort":"medium"}"#,
2775            vec![
2776                MockStreamFrame::Reasoning("Think "),
2777                MockStreamFrame::Reasoning("first."),
2778                MockStreamFrame::Text("Answer"),
2779                MockStreamFrame::Finish("stop"),
2780                MockStreamFrame::Done,
2781            ],
2782        )
2783        .await;
2784
2785        assert_eq!(response.status(), StatusCode::OK);
2786        let body = json_body(response).await;
2787        assert_eq!(
2788            body["choices"][0]["message"]["reasoning_content"],
2789            "Think first."
2790        );
2791        assert_eq!(body["choices"][0]["message"]["content"], "Answer");
2792    }
2793
2794    #[tokio::test]
2795    async fn chat_route_treats_omitted_stream_as_buffered_non_streaming() {
2796        let response = chat_response(
2797            "chat-route-omitted-stream",
2798            r#"{"model":"e2ee-test","messages":[{"role":"user","content":"hello"}]}"#,
2799            vec![MockStreamFrame::Text("Hello"), MockStreamFrame::Done],
2800        )
2801        .await;
2802
2803        assert_eq!(response.status(), StatusCode::OK);
2804        let body = json_body(response).await;
2805        assert_eq!(body["object"], "chat.completion");
2806        assert_eq!(body["choices"][0]["message"]["content"], "Hello");
2807        assert_eq!(body["choices"][0]["finish_reason"], "stop");
2808    }
2809
2810    #[tokio::test]
2811    async fn chat_route_streams_incremental_tool_call_chunks() {
2812        let response = streaming_chat_response(
2813            "chat-route-tool-stream",
2814            r#"{"model":"e2ee-test","messages":[{"role":"user","content":"search"}],"stream":true,"tools":[{"type":"function","function":{"name":"search_web","parameters":{"type":"object","properties":{"query":{"type":"string"}},"required":["query"],"additionalProperties":false}}}]}"#,
2815            vec![
2816                MockStreamFrame::Text("<tool_call>\n{\"name\":\"search_web\",\"arguments\":{\"query\":\"example\"}}\n</tool_call>"),
2817                MockStreamFrame::Finish("stop"),
2818                MockStreamFrame::Done,
2819            ],
2820        )
2821        .await;
2822
2823        assert_eq!(response.status(), StatusCode::OK);
2824        let body = response_body(response).await;
2825        let chunks = sse_json_chunks(&body);
2826
2827        assert_eq!(chunks[0]["choices"][0]["delta"]["role"], "assistant");
2828
2829        let tool_calls = streamed_tool_call_deltas(&chunks);
2830        assert!(!tool_calls.is_empty());
2831        let first = tool_calls[0];
2832        assert_eq!(first["index"], 0);
2833        assert!(first["id"].as_str().unwrap().starts_with("call_"));
2834        assert_eq!(first["type"], "function");
2835        assert_eq!(first["function"]["name"], "search_web");
2836        for later in &tool_calls[1..] {
2837            assert!(later.get("id").is_none());
2838            assert!(later.get("type").is_none());
2839            assert!(later["function"].get("name").is_none());
2840        }
2841        assert_eq!(
2842            streamed_tool_call_arguments(&chunks, 0),
2843            r#"{"query":"example"}"#
2844        );
2845
2846        let final_chunk = chunks.last().expect("stream should have chunks");
2847        assert_eq!(final_chunk["choices"][0]["delta"], json!({}));
2848        assert_eq!(final_chunk["choices"][0]["finish_reason"], "tool_calls");
2849    }
2850
2851    #[tokio::test]
2852    async fn chat_route_streams_text_then_incremental_tool_call() {
2853        let response = streaming_chat_response(
2854            "chat-route-tool-stream-mixed-text",
2855            r#"{"model":"e2ee-test","messages":[{"role":"user","content":"search"}],"stream":true,"tools":[{"type":"function","function":{"name":"search_web","parameters":{"type":"object","properties":{"query":{"type":"string"}},"required":["query"],"additionalProperties":false}}}]}"#,
2856            vec![
2857                MockStreamFrame::NullContent,
2858                MockStreamFrame::EmptyContent,
2859                MockStreamFrame::Text("I'll check that. "),
2860                MockStreamFrame::Text("<tool_call>{\"name\":\"search_web\",\"arguments\":{\"query\":\"example\"}}"),
2861                MockStreamFrame::Text("</tool_call>"),
2862                MockStreamFrame::Finish("stop"),
2863                MockStreamFrame::Done,
2864            ],
2865        )
2866        .await;
2867
2868        assert_eq!(response.status(), StatusCode::OK);
2869        let body = response_body(response).await;
2870        let chunks = sse_json_chunks(&body);
2871
2872        assert_eq!(chunks[0]["choices"][0]["delta"]["role"], "assistant");
2873        assert_eq!(streamed_content(&chunks), "I'll check that. ");
2874
2875        let tool_calls = streamed_tool_call_deltas(&chunks);
2876        assert!(!tool_calls.is_empty());
2877        assert_eq!(tool_calls[0]["function"]["name"], "search_web");
2878        assert_eq!(
2879            streamed_tool_call_arguments(&chunks, 0),
2880            r#"{"query":"example"}"#
2881        );
2882
2883        let final_chunk = chunks.last().expect("stream should have chunks");
2884        assert_eq!(final_chunk["choices"][0]["finish_reason"], "tool_calls");
2885    }
2886
2887    #[tokio::test]
2888    async fn chat_route_fails_closed_on_unterminated_streamed_tool_call() {
2889        // A tool call truncated mid-JSON is beyond the lenient closing-marker
2890        // recovery; the stream fails closed.
2891        let response = streaming_chat_response(
2892            "chat-route-tool-stream-missing-close",
2893            r#"{"model":"e2ee-test","messages":[{"role":"user","content":"search"}],"stream":true,"tools":[{"type":"function","function":{"name":"search_web","parameters":{"type":"object","properties":{"query":{"type":"string"}},"required":["query"],"additionalProperties":false}}}]}"#,
2894            vec![
2895                MockStreamFrame::Text("I'll check that. "),
2896                MockStreamFrame::Text("<tool_call>{\"name\":"),
2897                MockStreamFrame::Finish("stop"),
2898                MockStreamFrame::Done,
2899            ],
2900        )
2901        .await;
2902
2903        assert_stream_body_fails(response).await;
2904    }
2905
2906    #[tokio::test]
2907    async fn chat_route_streams_hermes_format_tool_call_from_glm_model() {
2908        // Observed live: e2ee-glm-5-1 emits the prompt-instructed Hermes
2909        // format. All models parse as Hermes regardless of model ID.
2910        let response = streaming_chat_response(
2911            "chat-route-tool-stream-glm-hermes",
2912            r#"{"model":"e2ee-glm-5-1","messages":[{"role":"user","content":"search"}],"stream":true,"tools":[{"type":"function","function":{"name":"search_web","parameters":{"type":"object","properties":{"query":{"type":"string"}},"required":["query"],"additionalProperties":false}}}]}"#,
2913            vec![
2914                MockStreamFrame::Text("<tool_call>\n{\"name\":\"search_web\",\"arguments\":"),
2915                MockStreamFrame::Text("{\"query\":\"example\"}}\n</tool_call>"),
2916                MockStreamFrame::Finish("stop"),
2917                MockStreamFrame::Done,
2918            ],
2919        )
2920        .await;
2921
2922        assert_eq!(response.status(), StatusCode::OK);
2923        let body = response_body(response).await;
2924        let chunks = sse_json_chunks(&body);
2925
2926        let tool_calls = streamed_tool_call_deltas(&chunks);
2927        assert!(!tool_calls.is_empty());
2928        assert_eq!(tool_calls[0]["function"]["name"], "search_web");
2929        assert_eq!(
2930            streamed_tool_call_arguments(&chunks, 0),
2931            r#"{"query":"example"}"#
2932        );
2933
2934        let final_chunk = chunks.last().expect("stream should have chunks");
2935        assert_eq!(final_chunk["choices"][0]["finish_reason"], "tool_calls");
2936    }
2937
2938    #[tokio::test]
2939    async fn chat_route_recovers_streamed_tool_call_with_truncated_closing_marker() {
2940        // Observed live: Venice cuts `</tool_call>` for some models. A
2941        // complete call missing only the closing marker is recovered.
2942        let response = streaming_chat_response(
2943            "chat-route-tool-stream-truncated-close",
2944            r#"{"model":"e2ee-glm-4-7-flash-p","messages":[{"role":"user","content":"search"}],"stream":true,"tools":[{"type":"function","function":{"name":"search_web","parameters":{"type":"object","properties":{"query":{"type":"string"}},"required":["query"],"additionalProperties":false}}}]}"#,
2945            vec![
2946                MockStreamFrame::Text("<tool_call>\n{\"name\":\"search_web\",\"arguments\":{\"query\":\"example\"}}\n"),
2947                MockStreamFrame::Finish("stop"),
2948                MockStreamFrame::Done,
2949            ],
2950        )
2951        .await;
2952
2953        assert_eq!(response.status(), StatusCode::OK);
2954        let body = response_body(response).await;
2955        let chunks = sse_json_chunks(&body);
2956
2957        let tool_calls = streamed_tool_call_deltas(&chunks);
2958        assert!(!tool_calls.is_empty());
2959        assert_eq!(tool_calls[0]["function"]["name"], "search_web");
2960        assert_eq!(
2961            streamed_tool_call_arguments(&chunks, 0),
2962            r#"{"query":"example"}"#
2963        );
2964
2965        let final_chunk = chunks.last().expect("stream should have chunks");
2966        assert_eq!(final_chunk["choices"][0]["finish_reason"], "tool_calls");
2967    }
2968
2969    #[tokio::test]
2970    async fn chat_route_streams_multiple_tool_calls_split_across_chunks() {
2971        let response = streaming_chat_response(
2972            "chat-route-tool-stream-multiple-calls",
2973            r#"{"model":"e2ee-test","messages":[{"role":"user","content":"search"}],"stream":true,"tools":[{"type":"function","function":{"name":"search_web","parameters":{"type":"object","properties":{"query":{"type":"string"}},"required":["query"],"additionalProperties":false}}}]}"#,
2974            vec![
2975                MockStreamFrame::Text("<tool_call>{\"name\":\"search_web\",\"arguments\":{\"query\":\"first\"}}"),
2976                MockStreamFrame::Text("</tool_call><tool_call>{\"name\":\"search_web\",\"arguments\":"),
2977                MockStreamFrame::Text("{\"query\":\"second\"}}</tool_call>"),
2978                MockStreamFrame::Finish("stop"),
2979                MockStreamFrame::Done,
2980            ],
2981        )
2982        .await;
2983
2984        assert_eq!(response.status(), StatusCode::OK);
2985        let body = response_body(response).await;
2986        let chunks = sse_json_chunks(&body);
2987
2988        assert_eq!(chunks[0]["choices"][0]["delta"]["role"], "assistant");
2989        let tool_calls = streamed_tool_call_deltas(&chunks);
2990        let first = tool_calls
2991            .iter()
2992            .find(|tool_call| tool_call["index"] == 0 && tool_call.get("id").is_some())
2993            .expect("first call should have an id-bearing fragment");
2994        let second = tool_calls
2995            .iter()
2996            .find(|tool_call| tool_call["index"] == 1 && tool_call.get("id").is_some())
2997            .expect("second call should have an id-bearing fragment");
2998        assert_eq!(first["function"]["name"], "search_web");
2999        assert_eq!(second["function"]["name"], "search_web");
3000        assert_ne!(first["id"], second["id"]);
3001        assert_eq!(
3002            streamed_tool_call_arguments(&chunks, 0),
3003            r#"{"query":"first"}"#
3004        );
3005        assert_eq!(
3006            streamed_tool_call_arguments(&chunks, 1),
3007            r#"{"query":"second"}"#
3008        );
3009
3010        let final_chunk = chunks.last().expect("stream should have chunks");
3011        assert_eq!(final_chunk["choices"][0]["delta"], json!({}));
3012        assert_eq!(final_chunk["choices"][0]["finish_reason"], "tool_calls");
3013    }
3014
3015    #[tokio::test]
3016    async fn chat_route_tool_stream_passes_through_usage_chunk_when_requested() {
3017        let response = streaming_chat_response(
3018            "chat-route-tool-stream-usage",
3019            r#"{"model":"e2ee-test","messages":[{"role":"user","content":"search"}],"stream":true,"stream_options":{"include_usage":true},"tools":[{"type":"function","function":{"name":"search_web","parameters":{"type":"object","properties":{"query":{"type":"string"}},"required":["query"],"additionalProperties":false}}}]}"#,
3020            vec![
3021                MockStreamFrame::Text("<tool_call>{\"name\":\"search_web\",\"arguments\":{\"query\":\"example\"}}</tool_call>"),
3022                MockStreamFrame::Finish("stop"),
3023                MockStreamFrame::Usage,
3024                MockStreamFrame::Done,
3025            ],
3026        )
3027        .await;
3028
3029        assert_eq!(response.status(), StatusCode::OK);
3030        let body = response_body(response).await;
3031        let chunks = sse_json_chunks(&body);
3032
3033        // OpenAI include_usage can arrive after the finish chunk and must still pass through.
3034        let usage_chunk = chunks.last().expect("stream should have chunks");
3035        assert_eq!(usage_chunk["choices"], json!([]));
3036        assert_eq!(usage_chunk["usage"]["total_tokens"], 3);
3037        let finish_chunk = &chunks[chunks.len() - 2];
3038        assert_eq!(finish_chunk["choices"][0]["finish_reason"], "tool_calls");
3039    }
3040
3041    #[tokio::test]
3042    async fn chat_route_fails_closed_when_streamed_tool_call_exceeds_max_bytes() {
3043        let model_public_key = ProxyInstanceKey::generate().public_key_hex().to_owned();
3044        let base_url = spawn_streaming_venice_server(
3045            model_public_key,
3046            true,
3047            vec![
3048                MockStreamFrame::Text("<tool_call>{\"name\":\"search_web\",\"arguments\":{\"query\":\"this argument body is much longer than the configured cap\"}}</tool_call>"),
3049                MockStreamFrame::Finish("stop"),
3050                MockStreamFrame::Done,
3051            ],
3052        )
3053        .await;
3054        let mut config = chat_config_with_basic_test_attestation();
3055        config.tools.tool_call_max_bytes = 16;
3056
3057        let response = request_chat_with_config(
3058            config,
3059            "chat-route-tool-stream-max-bytes",
3060            r#"{"model":"e2ee-test","messages":[{"role":"user","content":"search"}],"stream":true,"tools":[{"type":"function","function":{"name":"search_web","parameters":{"type":"object","properties":{"query":{"type":"string"}},"required":["query"],"additionalProperties":false}}}]}"#,
3061            base_url,
3062        )
3063        .await;
3064
3065        assert_stream_body_fails(response).await;
3066    }
3067
3068    #[tokio::test]
3069    async fn chat_route_streams_all_tool_calls_when_parallel_tool_calls_false() {
3070        // `parallel_tool_calls` is accepted for OpenAI compatibility but
3071        // ignored; all parsed tool calls are streamed.
3072        let response = streaming_chat_response(
3073            "chat-route-tool-stream-parallel-false",
3074            r#"{"model":"e2ee-test","messages":[{"role":"user","content":"search"}],"stream":true,"parallel_tool_calls":false,"tools":[{"type":"function","function":{"name":"search_web","parameters":{"type":"object","properties":{"query":{"type":"string"}},"required":["query"],"additionalProperties":false}}}]}"#,
3075            vec![
3076                MockStreamFrame::Text("<tool_call>{\"name\":\"search_web\",\"arguments\":{\"query\":\"first\"}}</tool_call>"),
3077                MockStreamFrame::Text("<tool_call>{\"name\":\"search_web\",\"arguments\":{\"query\":\"second\"}}</tool_call>"),
3078                MockStreamFrame::Finish("stop"),
3079                MockStreamFrame::Done,
3080            ],
3081        )
3082        .await;
3083
3084        assert_eq!(response.status(), StatusCode::OK);
3085        let body = response_body(response).await;
3086        let chunks = sse_json_chunks(&body);
3087
3088        assert_eq!(
3089            streamed_tool_call_arguments(&chunks, 0),
3090            r#"{"query":"first"}"#
3091        );
3092        assert_eq!(
3093            streamed_tool_call_arguments(&chunks, 1),
3094            r#"{"query":"second"}"#
3095        );
3096
3097        let final_chunk = chunks.last().expect("stream should have chunks");
3098        assert_eq!(final_chunk["choices"][0]["finish_reason"], "tool_calls");
3099    }
3100
3101    #[tokio::test]
3102    async fn chat_route_returns_non_streaming_tool_call_body_from_mixed_text() {
3103        let response = chat_response(
3104            "chat-route-tool-non-stream-mixed-text",
3105            r#"{"model":"e2ee-test","messages":[{"role":"user","content":"search"}],"stream":false,"tools":[{"type":"function","function":{"name":"search_web","parameters":{"type":"object","properties":{"query":{"type":"string"}},"required":["query"]}}}]}"#,
3106            vec![
3107                MockStreamFrame::Text("I'll check that. <tool_call>{\"name\":\"search_web\",\"arguments\":{\"query\":\"example\"}}</tool_call>"),
3108                MockStreamFrame::Done,
3109            ],
3110        )
3111        .await;
3112
3113        assert_eq!(response.status(), StatusCode::OK);
3114        let body = json_body(response).await;
3115        assert_eq!(body["choices"][0]["finish_reason"], "tool_calls");
3116        let tool_call = &body["choices"][0]["message"]["tool_calls"][0];
3117        assert_eq!(tool_call["function"]["name"], "search_web");
3118        assert_eq!(tool_call["function"]["arguments"], r#"{"query":"example"}"#);
3119    }
3120
3121    #[tokio::test]
3122    async fn chat_route_returns_non_streaming_tool_call_body() {
3123        let response = chat_response(
3124            "chat-route-tool-non-stream",
3125            r#"{"model":"e2ee-test","messages":[{"role":"user","content":"search"}],"stream":false,"tools":[{"type":"function","function":{"name":"search_web","parameters":{"type":"object","properties":{"query":{"type":"string"}},"required":["query"]}}}]}"#,
3126            vec![
3127                MockStreamFrame::Text("<tool_call>{\"name\":\"search_web\",\"arguments\":{\"query\":\"example\"}}</tool_call>"),
3128                MockStreamFrame::Done,
3129            ],
3130        )
3131        .await;
3132
3133        assert_eq!(response.status(), StatusCode::OK);
3134        let body = json_body(response).await;
3135        assert_eq!(body["object"], "chat.completion");
3136        assert!(body["choices"][0]["message"]["content"].is_null());
3137        assert_eq!(body["choices"][0]["finish_reason"], "tool_calls");
3138        let tool_call = &body["choices"][0]["message"]["tool_calls"][0];
3139        assert!(tool_call["id"].as_str().unwrap().starts_with("call_"));
3140        assert_eq!(tool_call["type"], "function");
3141        assert_eq!(tool_call["function"]["name"], "search_web");
3142        assert_eq!(tool_call["function"]["arguments"], r#"{"query":"example"}"#);
3143    }
3144
3145    #[tokio::test]
3146    async fn chat_route_returns_non_streaming_multiple_tool_calls() {
3147        let response = chat_response(
3148            "chat-route-tool-non-stream-multiple-calls",
3149            r#"{"model":"e2ee-test","messages":[{"role":"user","content":"search"}],"stream":false,"tools":[{"type":"function","function":{"name":"search_web","parameters":{"type":"object","properties":{"query":{"type":"string"}},"required":["query"]}}}]}"#,
3150            vec![
3151                MockStreamFrame::Text("<tool_call>{\"name\":\"search_web\",\"arguments\":{\"query\":\"first\"}}</tool_call>\n<tool_call>{\"name\":\"search_web\",\"arguments\":{\"query\":\"second\"}}</tool_call>"),
3152                MockStreamFrame::Done,
3153            ],
3154        )
3155        .await;
3156
3157        assert_eq!(response.status(), StatusCode::OK);
3158        let body = json_body(response).await;
3159        assert_eq!(body["choices"][0]["finish_reason"], "tool_calls");
3160        assert!(body["choices"][0]["message"]["content"].is_null());
3161        let tool_calls = body["choices"][0]["message"]["tool_calls"]
3162            .as_array()
3163            .expect("tool_calls should be an array");
3164        assert_eq!(tool_calls.len(), 2);
3165        assert_eq!(tool_calls[0]["function"]["name"], "search_web");
3166        assert_eq!(
3167            tool_calls[0]["function"]["arguments"],
3168            r#"{"query":"first"}"#
3169        );
3170        assert_eq!(
3171            tool_calls[1]["function"]["arguments"],
3172            r#"{"query":"second"}"#
3173        );
3174        assert_ne!(tool_calls[0]["id"], tool_calls[1]["id"]);
3175    }
3176
3177    #[tokio::test]
3178    async fn chat_route_tool_mode_leaves_normal_text_unaffected() {
3179        let response = streaming_chat_response(
3180            "chat-route-tool-normal-text",
3181            r#"{"model":"e2ee-test","messages":[{"role":"user","content":"hello"}],"stream":true,"tools":[{"type":"function","function":{"name":"search_web","parameters":{"type":"object"}}}]}"#,
3182            vec![
3183                MockStreamFrame::Text("Hello without tools"),
3184                MockStreamFrame::Finish("stop"),
3185                MockStreamFrame::Done,
3186            ],
3187        )
3188        .await;
3189
3190        assert_eq!(response.status(), StatusCode::OK);
3191        let body = response_body(response).await;
3192        let chunks = sse_json_chunks(&body);
3193        assert_eq!(chunks[0]["choices"][0]["delta"]["role"], "assistant");
3194        assert_eq!(streamed_content(&chunks), "Hello without tools");
3195        assert!(streamed_tool_call_deltas(&chunks).is_empty());
3196    }
3197
3198    #[tokio::test]
3199    async fn chat_route_treats_marker_like_non_protocol_text_as_normal_text() {
3200        let response = streaming_chat_response(
3201            "chat-route-tool-marker-like-text",
3202            r#"{"model":"e2ee-test","messages":[{"role":"user","content":"hello"}],"stream":true,"tools":[{"type":"function","function":{"name":"search_web","parameters":{"type":"object"}}}]}"#,
3203            vec![
3204                MockStreamFrame::Text("<tool_cal>{not actually a marker}"),
3205                MockStreamFrame::Finish("stop"),
3206                MockStreamFrame::Done,
3207            ],
3208        )
3209        .await;
3210
3211        assert_eq!(response.status(), StatusCode::OK);
3212        let body = response_body(response).await;
3213        let chunks = sse_json_chunks(&body);
3214        assert_eq!(
3215            streamed_content(&chunks),
3216            "<tool_cal>{not actually a marker}"
3217        );
3218        assert!(streamed_tool_call_deltas(&chunks).is_empty());
3219    }
3220
3221    #[tokio::test]
3222    async fn chat_route_retries_invalid_tool_call_and_returns_success() {
3223        let response = chat_response_sequence(
3224            "chat-route-tool-retry-success",
3225            r#"{"model":"e2ee-test","messages":[{"role":"user","content":"search"}],"stream":false,"tools":[{"type":"function","function":{"name":"search_web","parameters":{"type":"object","properties":{"query":{"type":"string"}},"required":["query"]}}}]}"#,
3226            vec![
3227                vec![
3228                    MockStreamFrame::Text("<tool_call>{\"name\":\"unknown\",\"arguments\":{\"query\":\"example\"}}</tool_call>"),
3229                    MockStreamFrame::Done,
3230                ],
3231                vec![
3232                    MockStreamFrame::Text("<tool_call>{\"name\":\"search_web\",\"arguments\":{\"query\":\"example\"}}</tool_call>"),
3233                    MockStreamFrame::Done,
3234                ],
3235            ],
3236        )
3237        .await;
3238
3239        assert_eq!(response.status(), StatusCode::OK);
3240        assert_eq!(
3241            response.headers().get(HEADER_PROXY_TOOL_RETRIES).unwrap(),
3242            "1"
3243        );
3244        let body = json_body(response).await;
3245        assert_eq!(body["choices"][0]["finish_reason"], "tool_calls");
3246        assert_eq!(
3247            body["choices"][0]["message"]["tool_calls"][0]["function"]["name"],
3248            "search_web"
3249        );
3250    }
3251
3252    #[tokio::test]
3253    async fn chat_route_returns_retry_failure_error_shape() {
3254        let response = chat_response(
3255            "chat-route-tool-retry-failure",
3256            r#"{"model":"e2ee-test","messages":[{"role":"user","content":"search"}],"stream":false,"tools":[{"type":"function","function":{"name":"search_web","parameters":{"type":"object","properties":{"query":{"type":"string"}},"required":["query"]}}}]}"#,
3257            vec![
3258                MockStreamFrame::Text("<tool_call>{\"name\":\"unknown\",\"arguments\":{}}</tool_call>"),
3259                MockStreamFrame::Done,
3260            ],
3261        )
3262        .await;
3263
3264        assert_eq!(response.status(), StatusCode::BAD_GATEWAY);
3265        assert_eq!(
3266            response.headers().get(HEADER_PROXY_ERROR_CODE).unwrap(),
3267            "invalid_tool_call"
3268        );
3269        let body = json_body(response).await;
3270        assert_eq!(body["error"]["type"], "proxy_tool_call_error");
3271        assert_eq!(body["error"]["code"], "invalid_tool_call");
3272        assert_eq!(body["error"]["details"]["max_retries"], 2);
3273        assert!(
3274            body["error"]["details"]["last_validation_error"]
3275                .as_str()
3276                .unwrap()
3277                .contains("unknown tool name")
3278        );
3279    }
3280
3281    #[tokio::test]
3282    async fn chat_route_non_streaming_fails_closed_on_upstream_error_response() {
3283        let response = chat_response_with_upstream_status(
3284            "chat-route-non-streaming-upstream-error",
3285            r#"{"model":"e2ee-test","messages":[{"role":"user","content":"hello"}],"stream":false}"#,
3286            StatusCode::INTERNAL_SERVER_ERROR,
3287        )
3288        .await;
3289
3290        assert_eq!(response.status(), StatusCode::BAD_GATEWAY);
3291        assert_eq!(
3292            response.headers().get(HEADER_PROXY_ERROR_CODE).unwrap(),
3293            "upstream_status_error"
3294        );
3295        let body = error_body(response).await;
3296        assert_eq!(body.error.kind, "proxy_upstream_error");
3297        assert_eq!(body.error.code, "upstream_status_error");
3298    }
3299
3300    #[tokio::test]
3301    async fn chat_route_non_streaming_fails_closed_on_malformed_upstream_payload() {
3302        let response = chat_response(
3303            "chat-route-non-streaming-malformed",
3304            r#"{"model":"e2ee-test","messages":[{"role":"user","content":"hello"}],"stream":false}"#,
3305            vec![MockStreamFrame::Raw("data: {\"choices\":\"bad\"}\n\n")],
3306        )
3307        .await;
3308
3309        assert_eq!(response.status(), StatusCode::BAD_GATEWAY);
3310        assert_eq!(
3311            response.headers().get(HEADER_PROXY_ERROR_CODE).unwrap(),
3312            "upstream_malformed_response"
3313        );
3314        let body = error_body(response).await;
3315        assert_eq!(body.error.kind, "proxy_upstream_error");
3316        assert_eq!(body.error.code, "upstream_malformed_response");
3317    }
3318
3319    #[tokio::test]
3320    async fn chat_route_non_streaming_fails_closed_on_missing_encrypted_content() {
3321        let response = chat_response(
3322            "chat-route-non-streaming-missing-content",
3323            r#"{"model":"e2ee-test","messages":[{"role":"user","content":"hello"}],"stream":false}"#,
3324            vec![MockStreamFrame::Finish("stop"), MockStreamFrame::Done],
3325        )
3326        .await;
3327
3328        assert_eq!(response.status(), StatusCode::BAD_GATEWAY);
3329        assert_eq!(
3330            response.headers().get(HEADER_PROXY_ERROR_CODE).unwrap(),
3331            "e2ee_response_decryption_failed"
3332        );
3333        let body = error_body(response).await;
3334        assert_eq!(body.error.kind, "proxy_e2ee_error");
3335        assert_eq!(body.error.code, "e2ee_response_decryption_failed");
3336    }
3337
3338    #[tokio::test]
3339    async fn chat_route_non_streaming_fails_closed_on_decryption_failure() {
3340        let response = chat_response(
3341            "chat-route-non-streaming-decryption-failure",
3342            r#"{"model":"e2ee-test","messages":[{"role":"user","content":"hello"}],"stream":false}"#,
3343            vec![MockStreamFrame::TextForWrongRecipient(" secret"), MockStreamFrame::Done],
3344        )
3345        .await;
3346
3347        assert_eq!(response.status(), StatusCode::BAD_GATEWAY);
3348        assert_eq!(
3349            response.headers().get(HEADER_PROXY_ERROR_CODE).unwrap(),
3350            "e2ee_response_decryption_failed"
3351        );
3352        let body = error_body(response).await;
3353        assert_eq!(body.error.kind, "proxy_e2ee_error");
3354        assert_eq!(body.error.code, "e2ee_response_decryption_failed");
3355    }
3356
3357    #[tokio::test]
3358    async fn chat_route_non_streaming_passes_through_usage_when_available() {
3359        let response = chat_response(
3360            "chat-route-non-streaming-usage",
3361            r#"{"model":"e2ee-test","messages":[{"role":"user","content":"hello"}],"stream":false}"#,
3362            vec![
3363                MockStreamFrame::Text("Hello"),
3364                MockStreamFrame::Finish("stop"),
3365                MockStreamFrame::Usage,
3366                MockStreamFrame::Done,
3367            ],
3368        )
3369        .await;
3370
3371        assert_eq!(response.status(), StatusCode::OK);
3372        let body = json_body(response).await;
3373        assert_eq!(body["choices"][0]["message"]["content"], "Hello");
3374        assert_eq!(body["usage"]["prompt_tokens"], 1);
3375        assert_eq!(body["usage"]["completion_tokens"], 2);
3376        assert_eq!(body["usage"]["total_tokens"], 3);
3377    }
3378
3379    #[tokio::test]
3380    async fn chat_route_fails_closed_on_upstream_stream_error_event() {
3381        let response = streaming_chat_response(
3382            "chat-route-upstream-error",
3383            r#"{"model":"e2ee-test","messages":[{"role":"user","content":"hello"}],"stream":true}"#,
3384            vec![MockStreamFrame::Error("model failed")],
3385        )
3386        .await;
3387
3388        assert_stream_body_fails(response).await;
3389    }
3390
3391    #[tokio::test]
3392    async fn chat_route_fails_closed_on_malformed_upstream_event() {
3393        let response = streaming_chat_response(
3394            "chat-route-malformed-event",
3395            r#"{"model":"e2ee-test","messages":[{"role":"user","content":"hello"}],"stream":true}"#,
3396            vec![MockStreamFrame::Raw("data: {\"choices\":\n\n")],
3397        )
3398        .await;
3399
3400        assert_stream_body_fails(response).await;
3401    }
3402
3403    #[tokio::test]
3404    async fn chat_route_fails_closed_on_decryption_failure_mid_stream() {
3405        let response = streaming_chat_response(
3406            "chat-route-decryption-failure",
3407            r#"{"model":"e2ee-test","messages":[{"role":"user","content":"hello"}],"stream":true}"#,
3408            vec![
3409                MockStreamFrame::Text("Hello"),
3410                MockStreamFrame::TextForWrongRecipient(" secret"),
3411                MockStreamFrame::Done,
3412            ],
3413        )
3414        .await;
3415
3416        assert_stream_body_fails(response).await;
3417    }
3418
3419    #[tokio::test]
3420    async fn chat_route_synthesizes_final_finish_chunk_before_done_when_needed() {
3421        let response = streaming_chat_response(
3422            "chat-route-final-done",
3423            r#"{"model":"e2ee-test","messages":[{"role":"user","content":"hello"}],"stream":true}"#,
3424            vec![MockStreamFrame::Text("Hello"), MockStreamFrame::Done],
3425        )
3426        .await;
3427
3428        assert_eq!(response.status(), StatusCode::OK);
3429        let body = response_body(response).await;
3430        let data = sse_data(&body);
3431        assert_eq!(data.len(), 3);
3432        let final_chunk: Value = serde_json::from_str(data[1]).expect("final chunk should be JSON");
3433        assert_eq!(final_chunk["choices"][0]["delta"], json!({}));
3434        assert_eq!(final_chunk["choices"][0]["finish_reason"], "stop");
3435        assert_eq!(data[2], "[DONE]");
3436    }
3437
3438    #[tokio::test]
3439    async fn chat_route_attestation_failure_prevents_request_construction() {
3440        let model_public_key = ProxyInstanceKey::generate().public_key_hex().to_owned();
3441        let base_url = spawn_attestation_server(model_public_key, false).await;
3442        let app = router_with_venice_client(
3443            chat_config_with_basic_test_attestation(),
3444            test_venice_client_for_base_url(base_url),
3445        );
3446
3447        let response = app
3448            .oneshot(
3449                Request::builder()
3450                    .method(Method::POST)
3451                    .uri("/v1/chat/completions")
3452                    .header("content-type", "application/json")
3453                    .header(HEADER_PROXY_SESSION_ID, "chat-route-attestation-failure")
3454                    .body(Body::from(
3455                        r#"{"model":"e2ee-test","messages":[{"role":"user","content":"hello"}],"stream":false}"#,
3456                    ))
3457                    .expect("request should build"),
3458            )
3459            .await
3460            .expect("request should complete");
3461
3462        assert_eq!(response.status(), StatusCode::BAD_GATEWAY);
3463        assert_eq!(
3464            response.headers().get(HEADER_PROXY_ERROR_CODE).unwrap(),
3465            "attestation_upstream_not_verified"
3466        );
3467        let body = error_body(response).await;
3468        assert_eq!(body.error.kind, "proxy_attestation_error");
3469        assert_eq!(body.error.code, "attestation_upstream_not_verified");
3470    }
3471
3472    #[tokio::test]
3473    async fn unknown_route_returns_openai_style_not_found() {
3474        let response = test_app()
3475            .oneshot(
3476                Request::builder()
3477                    .uri("/v1/unknown")
3478                    .body(Body::empty())
3479                    .expect("request should build"),
3480            )
3481            .await
3482            .expect("request should complete");
3483
3484        assert_eq!(response.status(), StatusCode::NOT_FOUND);
3485        assert_eq!(
3486            response.headers().get(HEADER_PROXY_ERROR_CODE).unwrap(),
3487            "not_found"
3488        );
3489        let body = error_body(response).await;
3490        assert_eq!(body.error.kind, "invalid_request_error");
3491        assert_eq!(body.error.code, "not_found");
3492    }
3493
3494    #[tokio::test]
3495    async fn unsupported_method_returns_openai_style_method_error() {
3496        let response = test_app()
3497            .oneshot(
3498                Request::builder()
3499                    .method(Method::POST)
3500                    .uri("/v1/models")
3501                    .body(Body::empty())
3502                    .expect("request should build"),
3503            )
3504            .await
3505            .expect("request should complete");
3506
3507        assert_eq!(response.status(), StatusCode::METHOD_NOT_ALLOWED);
3508        assert_eq!(
3509            response.headers().get(HEADER_PROXY_ERROR_CODE).unwrap(),
3510            "method_not_allowed"
3511        );
3512        let body = error_body(response).await;
3513        assert_eq!(body.error.kind, "invalid_request_error");
3514        assert_eq!(body.error.code, "method_not_allowed");
3515    }
3516
3517    #[tokio::test]
3518    async fn malformed_chat_json_uses_axum_extractor_rejection() {
3519        let response = test_app()
3520            .oneshot(
3521                Request::builder()
3522                    .method(Method::POST)
3523                    .uri("/v1/chat/completions")
3524                    .header("content-type", "application/json")
3525                    .body(Body::from("{"))
3526                    .expect("request should build"),
3527            )
3528            .await
3529            .expect("request should complete");
3530
3531        assert_eq!(response.status(), StatusCode::BAD_REQUEST);
3532        assert!(response.headers().get(HEADER_PROXY_ERROR_CODE).is_none());
3533    }
3534
3535    #[tokio::test]
3536    async fn non_object_chat_json_returns_structured_invalid_request() {
3537        let response = test_app()
3538            .oneshot(
3539                Request::builder()
3540                    .method(Method::POST)
3541                    .uri("/v1/chat/completions")
3542                    .header("content-type", "application/json")
3543                    .body(Body::from("[]"))
3544                    .expect("request should build"),
3545            )
3546            .await
3547            .expect("request should complete");
3548
3549        assert_eq!(response.status(), StatusCode::BAD_REQUEST);
3550        assert_eq!(
3551            response.headers().get(HEADER_PROXY_ERROR_CODE).unwrap(),
3552            "invalid_request"
3553        );
3554        let body = error_body(response).await;
3555        assert_eq!(body.error.kind, "invalid_request_error");
3556        assert_eq!(body.error.code, "invalid_request");
3557    }
3558
3559    #[derive(Debug, Clone)]
3560    enum MockStreamFrame {
3561        Role,
3562        NullContent,
3563        EmptyContent,
3564        Text(&'static str),
3565        Reasoning(&'static str),
3566        TextForWrongRecipient(&'static str),
3567        Finish(&'static str),
3568        Usage,
3569        Done,
3570        Error(&'static str),
3571        Raw(&'static str),
3572    }
3573
3574    async fn streaming_chat_response(
3575        session_id: &'static str,
3576        request_body: &'static str,
3577        frames: Vec<MockStreamFrame>,
3578    ) -> Response {
3579        chat_response(session_id, request_body, frames).await
3580    }
3581
3582    async fn chat_response(
3583        session_id: &'static str,
3584        request_body: &'static str,
3585        frames: Vec<MockStreamFrame>,
3586    ) -> Response {
3587        let model_public_key = ProxyInstanceKey::generate().public_key_hex().to_owned();
3588        let base_url = spawn_streaming_venice_server(model_public_key, true, frames).await;
3589        request_chat(session_id, request_body, base_url).await
3590    }
3591
3592    async fn chat_response_sequence(
3593        session_id: &'static str,
3594        request_body: &'static str,
3595        attempts: Vec<Vec<MockStreamFrame>>,
3596    ) -> Response {
3597        let model_public_key = ProxyInstanceKey::generate().public_key_hex().to_owned();
3598        let base_url =
3599            spawn_streaming_venice_server_sequence(model_public_key, true, attempts).await;
3600        request_chat(session_id, request_body, base_url).await
3601    }
3602
3603    async fn chat_response_with_upstream_status(
3604        session_id: &'static str,
3605        request_body: &'static str,
3606        upstream_status: StatusCode,
3607    ) -> Response {
3608        let model_public_key = ProxyInstanceKey::generate().public_key_hex().to_owned();
3609        let base_url =
3610            spawn_venice_server_with_chat_status(model_public_key, upstream_status).await;
3611        request_chat(session_id, request_body, base_url).await
3612    }
3613
3614    async fn request_chat(
3615        session_id: &'static str,
3616        request_body: &'static str,
3617        base_url: String,
3618    ) -> Response {
3619        request_chat_with_config(
3620            chat_config_with_basic_test_attestation(),
3621            session_id,
3622            request_body,
3623            base_url,
3624        )
3625        .await
3626    }
3627
3628    async fn request_chat_with_config(
3629        config: ProxyConfig,
3630        session_id: &'static str,
3631        request_body: &'static str,
3632        base_url: String,
3633    ) -> Response {
3634        let app = router_with_venice_client(config, test_venice_client_for_base_url(base_url));
3635
3636        app.oneshot(
3637            Request::builder()
3638                .method(Method::POST)
3639                .uri("/v1/chat/completions")
3640                .header("content-type", "application/json")
3641                .header(HEADER_PROXY_SESSION_ID, session_id)
3642                .body(Body::from(request_body))
3643                .expect("request should build"),
3644        )
3645        .await
3646        .expect("request should complete")
3647    }
3648
3649    async fn json_body(response: Response) -> Value {
3650        let bytes = axum::body::to_bytes(response.into_body(), usize::MAX)
3651            .await
3652            .expect("response body should buffer");
3653        serde_json::from_slice(&bytes).expect("response should be JSON")
3654    }
3655
3656    async fn response_body(response: Response) -> String {
3657        let bytes = axum::body::to_bytes(response.into_body(), usize::MAX)
3658            .await
3659            .expect("response body should buffer");
3660        String::from_utf8(bytes.to_vec()).expect("response body should be UTF-8")
3661    }
3662
3663    async fn assert_stream_body_fails(response: Response) {
3664        assert_eq!(response.status(), StatusCode::OK);
3665        let result = axum::body::to_bytes(response.into_body(), usize::MAX).await;
3666        assert!(
3667            result.is_err(),
3668            "stream body should fail closed instead of completing successfully"
3669        );
3670    }
3671
3672    fn sse_data(body: &str) -> Vec<&str> {
3673        body.lines()
3674            .filter_map(|line| line.strip_prefix("data: "))
3675            .collect()
3676    }
3677
3678    /// Parses all SSE data chunks before the trailing `[DONE]` as JSON.
3679    fn sse_json_chunks(body: &str) -> Vec<Value> {
3680        let data = sse_data(body);
3681        assert_eq!(data.last().copied(), Some("[DONE]"));
3682        data[..data.len() - 1]
3683            .iter()
3684            .map(|chunk| serde_json::from_str(chunk).expect("SSE chunk should be JSON"))
3685            .collect()
3686    }
3687
3688    /// Concatenates all `delta.content` text across stream chunks.
3689    fn streamed_content(chunks: &[Value]) -> String {
3690        chunks
3691            .iter()
3692            .filter_map(|chunk| chunk["choices"][0]["delta"]["content"].as_str())
3693            .collect()
3694    }
3695
3696    /// Flattens all `delta.tool_calls` entries across stream chunks.
3697    fn streamed_tool_call_deltas(chunks: &[Value]) -> Vec<&Value> {
3698        chunks
3699            .iter()
3700            .filter_map(|chunk| chunk["choices"][0]["delta"]["tool_calls"].as_array())
3701            .flatten()
3702            .collect()
3703    }
3704
3705    /// Concatenates streamed argument fragments for one tool-call index.
3706    fn streamed_tool_call_arguments(chunks: &[Value], index: u64) -> String {
3707        streamed_tool_call_deltas(chunks)
3708            .iter()
3709            .filter(|tool_call| tool_call["index"] == json!(index))
3710            .filter_map(|tool_call| tool_call["function"]["arguments"].as_str())
3711            .collect()
3712    }
3713
3714    async fn spawn_streaming_venice_server(
3715        model_public_key: String,
3716        verified: bool,
3717        frames: Vec<MockStreamFrame>,
3718    ) -> String {
3719        spawn_streaming_venice_server_sequence(model_public_key, verified, vec![frames]).await
3720    }
3721
3722    async fn spawn_streaming_venice_server_sequence(
3723        model_public_key: String,
3724        verified: bool,
3725        attempts: Vec<Vec<MockStreamFrame>>,
3726    ) -> String {
3727        let chat_attempts = Arc::new(Mutex::new(VecDeque::from(attempts)));
3728        let attestation_key = model_public_key.clone();
3729        let app = Router::new()
3730            .route(
3731                "/api/v1/tee/attestation",
3732                get(move |Query(query): Query<HashMap<String, String>>| {
3733                    let model_public_key = attestation_key.clone();
3734                    async move {
3735                        Json(json!({
3736                            "api_version": "aci/1",
3737                            "attestation": {
3738                                "tee_type": "tdx",
3739                                "evidence": {}
3740                            },
3741                            "verified": verified,
3742                            "nonce": query.get("nonce").cloned().unwrap_or_default(),
3743                            "model": query.get("model").cloned().unwrap_or_default(),
3744                            "tee_provider": "phala",
3745                            "signing_public_key": model_public_key,
3746                        }))
3747                    }
3748                }),
3749            )
3750            .route(
3751                "/api/v1/chat/completions",
3752                post(move |headers: HeaderMap, Json(body): Json<Value>| {
3753                    let chat_attempts = chat_attempts.clone();
3754                    async move {
3755                        let Some(client_public_key) = headers
3756                            .get(crate::venice::HEADER_VENICE_TEE_CLIENT_PUB_KEY)
3757                            .and_then(|value| value.to_str().ok())
3758                        else {
3759                            return (
3760                                StatusCode::BAD_REQUEST,
3761                                [("content-type", "text/plain")],
3762                                "missing client key".to_owned(),
3763                            );
3764                        };
3765                        if body.get("stream").and_then(Value::as_bool) != Some(true) {
3766                            return (
3767                                StatusCode::BAD_REQUEST,
3768                                [("content-type", "text/plain")],
3769                                "upstream request must stream".to_owned(),
3770                            );
3771                        }
3772                        let messages = body.get("messages").and_then(Value::as_array);
3773                        if messages.is_none_or(|messages| {
3774                            messages.is_empty()
3775                                || !messages.iter().all(|message| {
3776                                    message.get("role").and_then(Value::as_str).is_some()
3777                                        && message
3778                                            .get("content")
3779                                            .and_then(Value::as_str)
3780                                            .is_some_and(|content| {
3781                                                !content.is_empty()
3782                                                    && content
3783                                                        .chars()
3784                                                        .all(|ch| ch.is_ascii_hexdigit())
3785                                            })
3786                                })
3787                        }) {
3788                            return (
3789                                StatusCode::BAD_REQUEST,
3790                                [("content-type", "text/plain")],
3791                                "messages must be encrypted message objects".to_owned(),
3792                            );
3793                        }
3794
3795                        let frames = {
3796                            let mut attempts = chat_attempts
3797                                .lock()
3798                                .expect("mock chat attempts mutex should not be poisoned");
3799                            if attempts.len() > 1 {
3800                                attempts.pop_front().expect("attempts length checked above")
3801                            } else {
3802                                attempts.front().cloned().unwrap_or_default()
3803                            }
3804                        };
3805
3806                        (
3807                            StatusCode::OK,
3808                            [("content-type", "text/event-stream")],
3809                            render_mock_sse(&frames, client_public_key),
3810                        )
3811                    }
3812                }),
3813            );
3814        let listener = TcpListener::bind(("127.0.0.1", 0))
3815            .await
3816            .expect("mock Venice listener should bind");
3817        let addr = listener
3818            .local_addr()
3819            .expect("mock Venice listener should have local address");
3820
3821        tokio::spawn(async move {
3822            axum::serve(listener, app)
3823                .await
3824                .expect("mock Venice server should run");
3825        });
3826
3827        format!("http://{addr}/api/v1")
3828    }
3829
3830    async fn spawn_venice_server_with_chat_status(
3831        model_public_key: String,
3832        upstream_status: StatusCode,
3833    ) -> String {
3834        let attestation_key = model_public_key.clone();
3835        let app = Router::new()
3836            .route(
3837                "/api/v1/tee/attestation",
3838                get(move |Query(query): Query<HashMap<String, String>>| {
3839                    let model_public_key = attestation_key.clone();
3840                    async move {
3841                        Json(json!({
3842                            "api_version": "aci/1",
3843                            "attestation": {
3844                                "tee_type": "tdx",
3845                                "evidence": {}
3846                            },
3847                            "verified": true,
3848                            "nonce": query.get("nonce").cloned().unwrap_or_default(),
3849                            "model": query.get("model").cloned().unwrap_or_default(),
3850                            "tee_provider": "phala",
3851                            "signing_public_key": model_public_key,
3852                        }))
3853                    }
3854                }),
3855            )
3856            .route(
3857                "/api/v1/chat/completions",
3858                post(move || async move { upstream_status }),
3859            );
3860        let listener = TcpListener::bind(("127.0.0.1", 0))
3861            .await
3862            .expect("mock Venice listener should bind");
3863        let addr = listener
3864            .local_addr()
3865            .expect("mock Venice listener should have local address");
3866
3867        tokio::spawn(async move {
3868            axum::serve(listener, app)
3869                .await
3870                .expect("mock Venice server should run");
3871        });
3872
3873        format!("http://{addr}/api/v1")
3874    }
3875
3876    fn render_mock_sse(frames: &[MockStreamFrame], client_public_key: &str) -> String {
3877        let codec = E2eeCodec::default();
3878        let mut output = String::new();
3879        for frame in frames {
3880            match frame {
3881                MockStreamFrame::Role => {
3882                    output.push_str(&format!("data: {}\n\n", upstream_role_chunk()));
3883                }
3884                MockStreamFrame::NullContent => {
3885                    output.push_str(&format!("data: {}\n\n", upstream_null_content_chunk()));
3886                }
3887                MockStreamFrame::EmptyContent => {
3888                    output.push_str(&format!(
3889                        "data: {}\n\n",
3890                        upstream_content_chunk(String::new())
3891                    ));
3892                }
3893                MockStreamFrame::Text(content) => {
3894                    let encrypted = codec
3895                        .encrypt_content(content, client_public_key)
3896                        .expect("mock content should encrypt")
3897                        .into_hex();
3898                    output.push_str(&format!("data: {}\n\n", upstream_content_chunk(encrypted)));
3899                }
3900                MockStreamFrame::Reasoning(content) => {
3901                    let encrypted = codec
3902                        .encrypt_content(content, client_public_key)
3903                        .expect("mock reasoning content should encrypt")
3904                        .into_hex();
3905                    output.push_str(&format!(
3906                        "data: {}\n\n",
3907                        upstream_reasoning_content_chunk(encrypted)
3908                    ));
3909                }
3910                MockStreamFrame::TextForWrongRecipient(content) => {
3911                    let wrong_key = ProxyInstanceKey::generate();
3912                    let encrypted = codec
3913                        .encrypt_content(content, wrong_key.public_key_hex())
3914                        .expect("mock content should encrypt")
3915                        .into_hex();
3916                    output.push_str(&format!("data: {}\n\n", upstream_content_chunk(encrypted)));
3917                }
3918                MockStreamFrame::Finish(reason) => {
3919                    output.push_str(&format!("data: {}\n\n", upstream_finish_chunk(reason)));
3920                }
3921                MockStreamFrame::Usage => {
3922                    output.push_str(&format!("data: {}\n\n", upstream_usage_chunk()));
3923                }
3924                MockStreamFrame::Done => output.push_str("data: [DONE]\n\n"),
3925                MockStreamFrame::Error(message) => {
3926                    output.push_str(&format!(
3927                        "event: error\ndata: {}\n\n",
3928                        json!({ "message": message })
3929                    ));
3930                }
3931                MockStreamFrame::Raw(raw) => output.push_str(raw),
3932            }
3933        }
3934        output
3935    }
3936
3937    fn upstream_role_chunk() -> Value {
3938        json!({
3939            "id": "chatcmpl-upstream-test",
3940            "object": "chat.completion.chunk",
3941            "created": 1_717_171_717,
3942            "model": "e2ee-test",
3943            "choices": [{
3944                "index": 0,
3945                "delta": { "role": "assistant" },
3946                "finish_reason": null,
3947            }],
3948        })
3949    }
3950
3951    fn upstream_content_chunk(encrypted_content: String) -> Value {
3952        json!({
3953            "id": "chatcmpl-upstream-test",
3954            "object": "chat.completion.chunk",
3955            "created": 1_717_171_717,
3956            "model": "e2ee-test",
3957            "choices": [{
3958                "index": 0,
3959                "delta": { "content": encrypted_content },
3960                "finish_reason": null,
3961            }],
3962        })
3963    }
3964
3965    fn upstream_reasoning_content_chunk(encrypted_content: String) -> Value {
3966        json!({
3967            "id": "chatcmpl-upstream-test",
3968            "object": "chat.completion.chunk",
3969            "created": 1_717_171_717,
3970            "model": "e2ee-test",
3971            "choices": [{
3972                "index": 0,
3973                "delta": { "reasoning_content": encrypted_content },
3974                "finish_reason": null,
3975            }],
3976        })
3977    }
3978
3979    fn upstream_null_content_chunk() -> Value {
3980        json!({
3981            "id": "chatcmpl-upstream-test",
3982            "object": "chat.completion.chunk",
3983            "created": 1_717_171_717,
3984            "model": "e2ee-test",
3985            "choices": [{
3986                "index": 0,
3987                "delta": { "content": Value::Null },
3988                "finish_reason": null,
3989            }],
3990        })
3991    }
3992
3993    fn upstream_finish_chunk(reason: &str) -> Value {
3994        json!({
3995            "id": "chatcmpl-upstream-test",
3996            "object": "chat.completion.chunk",
3997            "created": 1_717_171_717,
3998            "model": "e2ee-test",
3999            "choices": [{
4000                "index": 0,
4001                "delta": {},
4002                "finish_reason": reason,
4003            }],
4004        })
4005    }
4006
4007    fn upstream_usage_chunk() -> Value {
4008        json!({
4009            "id": "chatcmpl-upstream-test",
4010            "object": "chat.completion.chunk",
4011            "created": 1_717_171_717,
4012            "model": "e2ee-test",
4013            "choices": [],
4014            "usage": {
4015                "prompt_tokens": 1,
4016                "completion_tokens": 2,
4017                "total_tokens": 3,
4018            },
4019        })
4020    }
4021
4022    async fn spawn_attestation_server(model_public_key: String, verified: bool) -> String {
4023        let app = Router::new().route(
4024            "/api/v1/tee/attestation",
4025            get(move |Query(query): Query<HashMap<String, String>>| {
4026                let model_public_key = model_public_key.clone();
4027                async move {
4028                    Json(json!({
4029                        "api_version": "aci/1",
4030                        "attestation": {
4031                            "tee_type": "tdx",
4032                            "evidence": {}
4033                        },
4034                        "verified": verified,
4035                        "nonce": query.get("nonce").cloned().unwrap_or_default(),
4036                        "model": query.get("model").cloned().unwrap_or_default(),
4037                        "tee_provider": "phala",
4038                        "signing_public_key": model_public_key,
4039                    }))
4040                }
4041            }),
4042        );
4043        let listener = TcpListener::bind(("127.0.0.1", 0))
4044            .await
4045            .expect("mock attestation listener should bind");
4046        let addr = listener
4047            .local_addr()
4048            .expect("mock attestation listener should have local address");
4049
4050        tokio::spawn(async move {
4051            axum::serve(listener, app)
4052                .await
4053                .expect("mock attestation server should run");
4054        });
4055
4056        format!("http://{addr}/api/v1")
4057    }
4058
4059    #[test]
4060    fn metadata_header_helper_only_emits_safe_config_headers_by_default() {
4061        let config = ProxyConfig::default();
4062        let metadata = ProxyMetadataHeaders::from_config(&config);
4063        let mut headers = HeaderMap::new();
4064
4065        metadata.apply(&mut headers);
4066
4067        assert_eq!(
4068            headers.get(HEADER_PROXY_ATTESTATION_MODE).unwrap(),
4069            "independent"
4070        );
4071        assert_eq!(headers.get(HEADER_PROXY_TOOL_MODE).unwrap(), "emulated");
4072        assert!(headers.get(HEADER_PROXY_E2EE).is_none());
4073        assert!(headers.get(HEADER_PROXY_KEY_BINDING).is_none());
4074    }
4075}