vw-cli 0.2.7

Rust-first autonomous agent runtime CLI
Documentation
//! runtime 事件到状态动作的标准化桥。
//!
//! 本模块负责把 `UiRuntimeEvent` 进一步规整为状态层可直接消费的动作序列:
//! - delta/step/terminal 落到既有 reducer 动作,统一汇聚成 `UiMessage`
//! - delta 内部的 `<think>` 与 `tool <name>\n{json}` 片段在此转换为 typed UI 消息
//! - runtime 侧尚未识别的事件,显式转换为 UI warning,而不是静默吞掉
//! - timeout/error 终态补一条错误消息,便于 renderer 在 transcript 中稳定展示

use serde_json::Value;
use vw_gateway_client::GatewayChatUsage;

use super::{
    TuiAction, TuiState, TuiTerminalUpdate, TuiToolCallUpdate, TuiToolResultUpdate,
    reduce_tui_state,
};
use crate::cli::tui_v2::model::{
    UiErrorMessage, UiMessage, UiMessageBase, UiMessageId, UiSystemMessage, UiSystemMessageLevel,
    UiTokenUsage, UiToolCallState, UiTurnTerminal,
};
use crate::cli::tui_v2::runtime::stream_adapter::{UiRuntimeEvent, UiRuntimeTerminalEvent};

const THINK_OPEN_TAG: &str = "<think>";
const THINK_CLOSE_TAG: &str = "</think>";

/// 将单个 runtime 事件应用到状态层。
pub(crate) fn apply_runtime_event(state: &mut TuiState, event: UiRuntimeEvent) {
    match event {
        UiRuntimeEvent::Delta(delta) => {
            apply_runtime_delta(state, delta);
        }
        UiRuntimeEvent::StepStart { step_index, created_ms, model } => {
            reduce_tui_state(
                state,
                TuiAction::StepStarted { step_index, started_ms: created_ms, model },
            );
        }
        UiRuntimeEvent::StepFinish { step_index, finished_ms, usage, finish_reason, model } => {
            reduce_tui_state(
                state,
                TuiAction::StepFinished {
                    step_index,
                    finished_ms,
                    usage: token_usage_from_gateway(&usage),
                    finish_reason,
                    model,
                },
            );
        }
        UiRuntimeEvent::Terminal(terminal) => {
            if state.runtime.thinking_open {
                reduce_tui_state(state, TuiAction::ThinkingClosed);
                state.runtime.thinking_open = false;
            }
            apply_runtime_terminal(state, terminal);
        }
        UiRuntimeEvent::TaskStateChanged { .. } | UiRuntimeEvent::UsageUpdated { .. } => {}
        UiRuntimeEvent::SessionMetadataChanged { session_id, title } => {
            if runtime_event_targets_current_session(state, session_id.as_deref())
                && let Some(title) = title
            {
                reduce_tui_state(state, TuiAction::SessionTitleSet(title));
            }
        }
        UiRuntimeEvent::Unknown { event_type } => {
            let event_type = event_type.unwrap_or_else(|| "unknown".to_string());
            reduce_tui_state(
                state,
                TuiAction::MessagePushed(UiMessage::System(UiSystemMessage {
                    base: next_ui_message_base(state, "runtime-warning"),
                    text: format!("Unsupported gateway event: {event_type}."),
                    level: UiSystemMessageLevel::Warning,
                })),
            );
        }
    }
}

fn apply_runtime_delta(state: &mut TuiState, delta: String) {
    if !state.runtime.thinking_open
        && let Some(tool_update) = parse_tool_delta(delta.as_str())
    {
        reduce_tui_state(state, TuiAction::ToolCallUpdated(tool_update));
        return;
    }

    let mut remaining = delta.as_str();
    while !remaining.is_empty() {
        if state.runtime.thinking_open {
            if let Some(close_index) = remaining.find(THINK_CLOSE_TAG) {
                let thinking_delta = &remaining[..close_index];
                if !thinking_delta.is_empty() {
                    reduce_tui_state(
                        state,
                        TuiAction::ThinkingDeltaReceived(thinking_delta.to_string()),
                    );
                }
                reduce_tui_state(state, TuiAction::ThinkingClosed);
                state.runtime.thinking_open = false;
                remaining = &remaining[close_index + THINK_CLOSE_TAG.len()..];
                remaining = remaining.trim_start_matches(['\r', '\n']);
                if remaining.is_empty() {
                    break;
                }
                continue;
            }

            reduce_tui_state(state, TuiAction::ThinkingDeltaReceived(remaining.to_string()));
            break;
        }

        if let Some(open_index) = remaining.find(THINK_OPEN_TAG) {
            let assistant_delta = &remaining[..open_index];
            if !assistant_delta.trim().is_empty() {
                reduce_tui_state(
                    state,
                    TuiAction::AssistantDeltaReceived(assistant_delta.to_string()),
                );
            }
            state.runtime.thinking_open = true;
            remaining = &remaining[open_index + THINK_OPEN_TAG.len()..];
            continue;
        }

        reduce_tui_state(state, TuiAction::AssistantDeltaReceived(remaining.to_string()));
        break;
    }
}

fn parse_tool_delta(delta: &str) -> Option<TuiToolCallUpdate> {
    let (first, rest) = delta.split_once('\n')?;
    let tool_name = first.trim().strip_prefix("tool ")?.trim();
    if tool_name.is_empty() {
        return None;
    }

    let payload = serde_json::from_str::<Value>(rest.trim()).ok()?;
    let status = payload
        .get("status")
        .and_then(Value::as_str)
        .map(str::trim)
        .filter(|status| !status.is_empty())?;

    let arguments = runtime_text_value(payload.get("input"));
    let summary = runtime_text_value(payload.get("title"))
        .filter(|title| !title.eq_ignore_ascii_case(tool_name));

    let (state, result) = match status {
        "running" => (UiToolCallState::Running, None),
        "completed" => (
            UiToolCallState::Complete,
            Some(TuiToolResultUpdate {
                content: runtime_text_value(payload.get("output")).unwrap_or_default(),
                is_error: false,
            }),
        ),
        "denied" | "error" | "failed" => (
            UiToolCallState::Failed,
            Some(TuiToolResultUpdate {
                content: runtime_text_value(payload.get("error"))
                    .or_else(|| runtime_text_value(payload.get("output")))
                    .unwrap_or_else(|| format!("tool {tool_name} failed")),
                is_error: true,
            }),
        ),
        _ => return None,
    };

    Some(TuiToolCallUpdate { tool_name: tool_name.to_string(), summary, arguments, state, result })
}

fn runtime_text_value(value: Option<&Value>) -> Option<String> {
    let value = value?;
    match value {
        Value::Null => None,
        Value::String(text) => normalize_optional_string(text.to_string()),
        other => normalize_optional_string(other.to_string()),
    }
}

fn apply_runtime_terminal(state: &mut TuiState, terminal: UiRuntimeTerminalEvent) {
    reduce_tui_state(
        state,
        TuiAction::AssistantTerminalUpdated(TuiTerminalUpdate {
            terminal: ui_turn_terminal_from_runtime(&terminal),
            usage: terminal_usage(&terminal),
            message_id: terminal_message_id(&terminal),
            parent_message_id: terminal_parent_message_id(&terminal),
        }),
    );

    if let Some(message) = terminal_side_message(state, &terminal) {
        reduce_tui_state(state, TuiAction::MessagePushed(message));
    }
}

fn terminal_side_message(state: &TuiState, terminal: &UiRuntimeTerminalEvent) -> Option<UiMessage> {
    match terminal {
        UiRuntimeTerminalEvent::TimedOut { message, .. } => {
            Some(UiMessage::Error(UiErrorMessage {
                base: next_ui_message_base(state, "runtime-timeout"),
                message: message.clone(),
                recoverable: true,
            }))
        }
        UiRuntimeTerminalEvent::Error(message) => Some(UiMessage::Error(UiErrorMessage {
            base: next_ui_message_base(state, "runtime-error"),
            message: message.clone(),
            recoverable: true,
        })),
        UiRuntimeTerminalEvent::Done { .. } | UiRuntimeTerminalEvent::Cancelled { .. } => None,
    }
}

fn ui_turn_terminal_from_runtime(terminal: &UiRuntimeTerminalEvent) -> UiTurnTerminal {
    match terminal {
        UiRuntimeTerminalEvent::Done { finish_reason, .. } => {
            UiTurnTerminal::Done { finish_reason: finish_reason.clone() }
        }
        UiRuntimeTerminalEvent::Cancelled { reason, .. } => {
            UiTurnTerminal::Cancelled { reason: reason.clone() }
        }
        UiRuntimeTerminalEvent::TimedOut { message, .. } => {
            UiTurnTerminal::TimedOut { message: message.clone() }
        }
        UiRuntimeTerminalEvent::Error(message) => {
            UiTurnTerminal::Error { message: message.clone() }
        }
    }
}

fn terminal_usage(terminal: &UiRuntimeTerminalEvent) -> Option<UiTokenUsage> {
    match terminal {
        UiRuntimeTerminalEvent::Done { usage, .. }
        | UiRuntimeTerminalEvent::Cancelled { usage, .. }
        | UiRuntimeTerminalEvent::TimedOut { usage, .. } => {
            usage.as_ref().map(token_usage_from_gateway)
        }
        UiRuntimeTerminalEvent::Error(_) => None,
    }
}

fn terminal_message_id(terminal: &UiRuntimeTerminalEvent) -> Option<String> {
    match terminal {
        UiRuntimeTerminalEvent::Done { message_id, .. }
        | UiRuntimeTerminalEvent::Cancelled { message_id, .. }
        | UiRuntimeTerminalEvent::TimedOut { message_id, .. } => message_id.clone(),
        UiRuntimeTerminalEvent::Error(_) => None,
    }
}

fn terminal_parent_message_id(terminal: &UiRuntimeTerminalEvent) -> Option<String> {
    match terminal {
        UiRuntimeTerminalEvent::Done { parent_message_id, .. }
        | UiRuntimeTerminalEvent::Cancelled { parent_message_id, .. }
        | UiRuntimeTerminalEvent::TimedOut { parent_message_id, .. } => parent_message_id.clone(),
        UiRuntimeTerminalEvent::Error(_) => None,
    }
}

fn token_usage_from_gateway(usage: &GatewayChatUsage) -> UiTokenUsage {
    UiTokenUsage {
        input_tokens: usage.input_tokens,
        output_tokens: usage.output_tokens,
        cached_tokens: usage.cached_tokens,
        reasoning_tokens: usage.reasoning_tokens,
    }
}

fn next_ui_message_base(state: &TuiState, seed: &str) -> UiMessageBase {
    let mut base =
        UiMessageBase::new(UiMessageId::local(format!("ui-{seed}-{}", state.messages.len())));
    if let Some(session_id) = state.session.session_id.as_deref() {
        base = base.with_session_id(session_id);
    }
    base
}

fn normalize_optional_string(value: String) -> Option<String> {
    let trimmed = value.trim();
    (!trimmed.is_empty()).then(|| trimmed.to_string())
}

fn runtime_event_targets_current_session(state: &TuiState, session_id: Option<&str>) -> bool {
    match session_id.map(str::trim).filter(|value| !value.is_empty()) {
        Some(session_id) => state.session.session_id.as_deref() == Some(session_id),
        None => true,
    }
}
#[cfg(test)]
#[path = "runtime_pipeline_tests.rs"]
mod runtime_pipeline_tests;