prodex 0.53.0

OpenAI profile pooling and safe auto-rotate for Codex CLI and Claude Code
Documentation
use super::*;

impl<'a> RuntimeWebsocketTextMessageFlow<'a> {
    pub(super) fn handle_reuse_watchdog_tripped(
        &mut self,
        profile_name: String,
        event: &'static str,
        turn_state_override: Option<&str>,
    ) -> Result<RuntimeWebsocketMessageLoopAction> {
        let reuse_terminal_idle = self.websocket_session.last_terminal_elapsed();
        let retry_same_profile_with_fresh_connect = !self
            .websocket_reuse_fresh_retry_profiles
            .contains(&profile_name)
            && (self.bound_profile.as_deref() == Some(profile_name.as_str())
                || self.turn_state_profile.as_deref() == Some(profile_name.as_str())
                || self
                    .compact_followup_profile
                    .as_ref()
                    .is_some_and(|(owner, _)| owner == &profile_name)
                || (self.request_session_id.is_some()
                    && self.session_profile.as_deref() == Some(profile_name.as_str())));
        let nonreplayable_previous_response_reuse =
            runtime_websocket_previous_response_reuse_is_nonreplayable(
                self.previous_response_id.as_deref(),
                false,
                turn_state_override,
            );
        let stale_previous_response_reuse = runtime_websocket_previous_response_reuse_is_stale(
            nonreplayable_previous_response_reuse,
            reuse_terminal_idle,
        );
        runtime_proxy_log(
            self.shared,
            format!(
                "request={} websocket_session={} websocket_reuse_watchdog_timeout profile={} event={}",
                self.request_id, self.session_id, profile_name, event
            ),
        );
        if nonreplayable_previous_response_reuse && self.request_requires_previous_response_affinity
        {
            if !self
                .websocket_reuse_fresh_retry_profiles
                .contains(&profile_name)
            {
                self.websocket_reuse_fresh_retry_profiles
                    .insert(profile_name.clone());
                runtime_proxy_log(
                    self.shared,
                    format!(
                        "request={} websocket_session={} websocket_reuse_locked_affinity_owner_fresh_retry profile={} event={}",
                        self.request_id, self.session_id, profile_name, event
                    ),
                );
                runtime_proxy_log_chain_retried_owner(
                    self.shared,
                    RuntimeProxyChainLog {
                        request_id: self.request_id,
                        transport: "websocket",
                        route: "websocket",
                        websocket_session: Some(self.session_id),
                        profile_name: &profile_name,
                        previous_response_id: self.previous_response_id.as_deref(),
                        reason: "websocket_reuse_watchdog_locked_affinity",
                        via: None,
                    },
                    0,
                );
                return Ok(RuntimeWebsocketMessageLoopAction::Continue);
            }
            runtime_proxy_record_continuity_failure_reason(
                self.shared,
                "stale_continuation",
                "websocket_reuse_watchdog_locked_affinity",
            );
            runtime_proxy_log(
                self.shared,
                format!(
                    "request={} websocket_session={} stale_continuation reason=websocket_reuse_watchdog_locked_affinity profile={} event={}",
                    self.request_id, self.session_id, profile_name, event
                ),
            );
            runtime_proxy_log_chain_dead_upstream_confirmed(
                self.shared,
                RuntimeProxyChainLog {
                    request_id: self.request_id,
                    transport: "websocket",
                    route: "websocket",
                    websocket_session: Some(self.session_id),
                    profile_name: &profile_name,
                    previous_response_id: self.previous_response_id.as_deref(),
                    reason: "websocket_reuse_watchdog_locked_affinity",
                    via: None,
                },
                Some(event),
            );
            send_runtime_proxy_stale_continuation_websocket_error(&mut *self.local_socket)?;
            return Ok(RuntimeWebsocketMessageLoopAction::Finished);
        }
        if nonreplayable_previous_response_reuse {
            if !self
                .websocket_reuse_fresh_retry_profiles
                .contains(&profile_name)
            {
                self.websocket_reuse_fresh_retry_profiles
                    .insert(profile_name.clone());
                runtime_proxy_log(
                    self.shared,
                    format!(
                        "request={} websocket_session={} websocket_reuse_nonreplayable_fresh_retry profile={} event={}",
                        self.request_id, self.session_id, profile_name, event
                    ),
                );
                return Ok(RuntimeWebsocketMessageLoopAction::Continue);
            }
            if stale_previous_response_reuse {
                runtime_proxy_log(
                    self.shared,
                    format!(
                        "request={} websocket_session={} websocket_reuse_stale_previous_response_blocked profile={} event={} elapsed_ms={} threshold_ms={}",
                        self.request_id,
                        self.session_id,
                        profile_name,
                        event,
                        reuse_terminal_idle
                            .map(|elapsed| elapsed.as_millis())
                            .unwrap_or(0),
                        runtime_proxy_websocket_previous_response_reuse_stale_ms(),
                    ),
                );
            } else {
                runtime_proxy_log(
                    self.shared,
                    format!(
                        "request={} websocket_session={} websocket_reuse_previous_response_blocked profile={} event={} reason=missing_turn_state elapsed_ms={}",
                        self.request_id,
                        self.session_id,
                        profile_name,
                        event,
                        reuse_terminal_idle
                            .map(|elapsed| elapsed.as_millis())
                            .unwrap_or(0),
                    ),
                );
            }
            return Err(anyhow::anyhow!(
                "runtime websocket upstream closed before response.completed for previous_response_id continuation without replayable turn_state: profile={profile_name} event={event}"
            ));
        }
        if retry_same_profile_with_fresh_connect {
            self.websocket_reuse_fresh_retry_profiles
                .insert(profile_name.clone());
            runtime_proxy_log(
                self.shared,
                format!(
                    "request={} websocket_session={} websocket_reuse_owner_fresh_retry profile={} event={}",
                    self.request_id, self.session_id, profile_name, event
                ),
            );
            return Ok(RuntimeWebsocketMessageLoopAction::Continue);
        }
        self.clear_profile_affinity(&profile_name, true);
        self.excluded_profiles.insert(profile_name);
        Ok(RuntimeWebsocketMessageLoopAction::Continue)
    }

    pub(super) fn handle_previous_response_not_found(
        &mut self,
        profile_name: &str,
        turn_state: Option<String>,
        via: Option<&'static str>,
        policy: RuntimePreviousResponseNotFoundPolicy,
        update_trusted_previous_response_affinity: bool,
    ) -> Result<RuntimePreviousResponseNotFoundAction> {
        let trusted_previous_response_affinity = self.trusted_previous_response_affinity;
        let trusted_previous_response_affinity_mut = update_trusted_previous_response_affinity
            .then_some(&mut self.trusted_previous_response_affinity);
        handle_runtime_previous_response_not_found(
            RuntimePreviousResponseNotFoundContext {
                shared: self.shared,
                log_context: RuntimePreviousResponseLogContext {
                    request_id: self.request_id,
                    transport: "websocket",
                    route: "websocket",
                    websocket_session: Some(self.session_id),
                    via,
                },
                route: RuntimePreviousResponseNotFoundRoute::Websocket,
                route_kind: RuntimeRouteKind::Websocket,
                profile_name,
                turn_state,
                previous_response_id: self.previous_response_id.as_deref(),
                request_turn_state: self.request_turn_state.as_deref(),
                request_session_id: self.request_session_id.as_deref(),
                request_requires_previous_response_affinity: self
                    .request_requires_previous_response_affinity,
                trusted_previous_response_affinity,
                previous_response_fresh_fallback_used: false,
                fresh_fallback_shape: self.previous_response_fresh_fallback_shape,
                policy,
            },
            RuntimePreviousResponseNotFoundState {
                saw_previous_response_not_found: &mut self.saw_previous_response_not_found,
                previous_response_retry_candidate: &mut self.previous_response_retry_candidate,
                previous_response_retry_index: &mut self.previous_response_retry_index,
                candidate_turn_state_retry_profile: &mut self.candidate_turn_state_retry_profile,
                candidate_turn_state_retry_value: &mut self.candidate_turn_state_retry_value,
                bound_profile: &mut self.bound_profile,
                session_profile: &mut self.session_profile,
                pinned_profile: &mut self.pinned_profile,
                turn_state_profile: &mut self.turn_state_profile,
                compact_followup_profile: Some(&mut self.compact_followup_profile),
                excluded_profiles: &mut self.excluded_profiles,
                trusted_previous_response_affinity: trusted_previous_response_affinity_mut,
            },
        )
    }

    pub(super) fn apply_previous_response_not_found_action(
        &mut self,
        action: RuntimePreviousResponseNotFoundAction,
        payload: RuntimeWebsocketErrorPayload,
    ) -> Result<RuntimeWebsocketMessageLoopAction> {
        match action {
            RuntimePreviousResponseNotFoundAction::RetryOwner
            | RuntimePreviousResponseNotFoundAction::Rotate => {
                self.last_failure =
                    Some((RuntimeUpstreamFailureResponse::Websocket(payload), false));
                Ok(RuntimeWebsocketMessageLoopAction::Continue)
            }
            RuntimePreviousResponseNotFoundAction::StaleContinuation => {
                send_runtime_proxy_stale_continuation_websocket_error(&mut *self.local_socket)?;
                Ok(RuntimeWebsocketMessageLoopAction::Finished)
            }
        }
    }
}

#[cfg(test)]
mod tests {
    use super::super::test_support::{
        read_runtime_websocket_text, test_runtime_local_websocket_pair, test_runtime_shared,
        test_runtime_websocket_flow,
    };
    use super::*;

    #[test]
    fn previous_response_not_found_rotate_stashes_last_failure() {
        let _guard = acquire_test_runtime_lock();
        let shared = test_runtime_shared("continuation-rotate");
        let (mut local_socket, _client_socket) = test_runtime_local_websocket_pair();
        let mut websocket_session = RuntimeWebsocketSessionState::default();
        let mut flow =
            test_runtime_websocket_flow(&mut local_socket, &shared, &mut websocket_session);

        let action = flow
            .apply_previous_response_not_found_action(
                RuntimePreviousResponseNotFoundAction::Rotate,
                RuntimeWebsocketErrorPayload::Text("upstream error".to_string()),
            )
            .expect("rotate handling should succeed");

        assert!(matches!(
            action,
            RuntimeWebsocketMessageLoopAction::Continue
        ));
        assert!(matches!(
            flow.last_failure,
            Some((RuntimeUpstreamFailureResponse::Websocket(_), false))
        ));
    }

    #[test]
    fn stale_continuation_action_sends_error_frame_and_finishes() {
        let _guard = acquire_test_runtime_lock();
        let shared = test_runtime_shared("continuation-stale");
        let (mut local_socket, mut client_socket) = test_runtime_local_websocket_pair();
        let mut websocket_session = RuntimeWebsocketSessionState::default();
        let mut flow =
            test_runtime_websocket_flow(&mut local_socket, &shared, &mut websocket_session);

        let action = flow
            .apply_previous_response_not_found_action(
                RuntimePreviousResponseNotFoundAction::StaleContinuation,
                RuntimeWebsocketErrorPayload::Empty,
            )
            .expect("stale continuation handling should succeed");
        let frame = read_runtime_websocket_text(&mut client_socket);

        assert!(matches!(
            action,
            RuntimeWebsocketMessageLoopAction::Finished
        ));
        assert!(frame.contains("\"code\":\"stale_continuation\""));
    }
}