use std::path::Path;
use crate::app::agent::session::processor as legacy_processor;
use crate::session::ui_types as models;
use serde_json::Value;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use vw_gateway_client::vw_api_types::id::SessionId;
use vw_gateway_client::{GatewayChatStreamRequest, GatewayChatUsage};
use super::tui_v2::runtime::gateway::{GatewayUiRuntime, normalize_optional_str_ref};
use super::tui_v2::runtime::stream_adapter::{UiRuntimeEvent, UiRuntimeTerminalEvent};
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct SessionProcessorCliResult {
pub(crate) output: String,
pub(crate) usage: models::TokenUsage,
pub(crate) step_finishes: usize,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) enum SessionProcessorComparableTerminal {
Done {
finish_reason: Option<String>,
message_id: Option<String>,
parent_message_id: Option<String>,
},
Cancelled {
reason: Option<String>,
message_id: Option<String>,
parent_message_id: Option<String>,
},
TimedOut {
message: String,
message_id: Option<String>,
parent_message_id: Option<String>,
},
Error(String),
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct SessionProcessorComparableResult {
pub(crate) output: String,
pub(crate) usage: models::TokenUsage,
pub(crate) step_finishes: usize,
pub(crate) terminal: SessionProcessorComparableTerminal,
}
impl SessionProcessorComparableResult {
pub(crate) fn into_cli_result(self) -> anyhow::Result<SessionProcessorCliResult> {
let SessionProcessorComparableResult { output, usage, step_finishes, terminal } = self;
match terminal {
SessionProcessorComparableTerminal::Done { .. } => {
Ok(SessionProcessorCliResult { output, usage, step_finishes })
}
SessionProcessorComparableTerminal::Cancelled { reason, .. } => Err(anyhow::anyhow!(
reason.unwrap_or_else(|| "session processor cancelled".to_string())
)),
SessionProcessorComparableTerminal::TimedOut { message, .. } => {
Err(anyhow::anyhow!(message))
}
SessionProcessorComparableTerminal::Error(message) => Err(anyhow::anyhow!(message)),
}
}
}
pub(crate) async fn run_session_processor_comparable_for_cli(
req: legacy_processor::Request,
delta_tx: Option<mpsc::Sender<String>>,
) -> anyhow::Result<SessionProcessorComparableResult> {
let (event_tx, mut event_rx) = mpsc::unbounded_channel::<legacy_processor::StreamEvent>();
tokio::task::spawn_blocking(move || {
legacy_processor::run(req, move |ev| event_tx.send(ev).is_ok());
});
let mut output = String::new();
let mut step_finishes = 0usize;
while let Some(ev) = event_rx.recv().await {
match ev {
legacy_processor::StreamEvent::Delta(delta) => {
output.push_str(&delta);
if let Some(tx) = delta_tx.as_ref() {
let _ = tx.send(delta).await;
}
}
legacy_processor::StreamEvent::Done(done_usage) => {
return Ok(SessionProcessorComparableResult {
output,
usage: done_usage,
step_finishes,
terminal: SessionProcessorComparableTerminal::Done {
finish_reason: None,
message_id: None,
parent_message_id: None,
},
});
}
legacy_processor::StreamEvent::Error(err) => {
return Ok(SessionProcessorComparableResult {
output,
usage: models::TokenUsage::default(),
step_finishes,
terminal: SessionProcessorComparableTerminal::Error(err),
});
}
legacy_processor::StreamEvent::StepStart { .. }
| legacy_processor::StreamEvent::PostToolRound { .. } => {}
legacy_processor::StreamEvent::StepFinish { .. } => {
step_finishes = step_finishes.saturating_add(1);
}
}
}
anyhow::bail!("session processor exited without terminal event")
}
pub(crate) async fn run_session_processor_for_cli(
req: legacy_processor::Request,
delta_tx: Option<mpsc::Sender<String>>,
) -> anyhow::Result<SessionProcessorCliResult> {
run_session_processor_comparable_for_cli(req, delta_tx).await?.into_cli_result()
}
#[cfg_attr(not(test), allow(dead_code))]
pub(crate) async fn run_gateway_runtime_for_cli(
runtime: &GatewayUiRuntime,
req: legacy_processor::Request,
delta_tx: Option<mpsc::Sender<String>>,
) -> anyhow::Result<SessionProcessorComparableResult> {
ensure_runtime_directory_matches_request(runtime, &req)?;
let body = gateway_stream_request_from_processor_request(&req);
let (bridge_tx, forward_task) = start_gateway_delta_forwarder(delta_tx);
let mut output = String::new();
let mut step_finishes = 0usize;
let terminal = runtime
.stream_chat(&body, |event| {
match event {
UiRuntimeEvent::Delta(delta) => {
output.push_str(&delta);
if let Some(tx) = bridge_tx.as_ref() {
let _ = tx.send(delta);
}
}
UiRuntimeEvent::StepStart { .. }
| UiRuntimeEvent::TaskStateChanged { .. }
| UiRuntimeEvent::SessionMetadataChanged { .. }
| UiRuntimeEvent::UsageUpdated { .. }
| UiRuntimeEvent::Terminal(_)
| UiRuntimeEvent::Unknown { .. } => {}
UiRuntimeEvent::StepFinish { .. } => {
step_finishes = step_finishes.saturating_add(1);
}
}
true
})
.await;
drop(bridge_tx);
if let Some(task) = forward_task {
task.await.map_err(|err| anyhow::anyhow!("gateway delta forwarder failed: {err}"))?;
}
Ok(SessionProcessorComparableResult {
output,
usage: token_usage_from_terminal(&terminal),
step_finishes,
terminal: comparable_terminal_from_runtime_terminal(terminal),
})
}
pub(crate) fn gateway_stream_request_from_processor_request(
req: &legacy_processor::Request,
) -> GatewayChatStreamRequest {
let mut messages = Vec::with_capacity(req.history.len().saturating_add(1));
for message in &req.history {
messages.push(gateway_message_from_history(message));
}
messages.push(gateway_message_value("user", req.query.as_str()));
GatewayChatStreamRequest {
session_id: normalize_optional_str_ref(Some(req.session.as_str())).map(SessionId::from),
messages,
system: None,
model: req.model.clone(),
agent: None,
allowed_tools: None,
acp_agent: None,
acp_allowed_tools: None,
options: gateway_request_options(&req.options),
}
}
fn ensure_runtime_directory_matches_request(
runtime: &GatewayUiRuntime,
req: &legacy_processor::Request,
) -> anyhow::Result<()> {
let Some(root) = req.root.as_deref().and_then(|value| normalize_optional_str_ref(Some(value)))
else {
return Ok(());
};
let request_root = Path::new(root);
if runtime.directory() != request_root {
anyhow::bail!(
"gateway runtime directory does not match legacy processor request root: runtime={}, request={}",
runtime.directory().display(),
request_root.display()
);
}
Ok(())
}
fn start_gateway_delta_forwarder(
delta_tx: Option<mpsc::Sender<String>>,
) -> (Option<mpsc::UnboundedSender<String>>, Option<JoinHandle<()>>) {
let Some(delta_tx) = delta_tx else {
return (None, None);
};
let (bridge_tx, mut bridge_rx) = mpsc::unbounded_channel::<String>();
let task = tokio::spawn(async move {
while let Some(delta) = bridge_rx.recv().await {
if delta_tx.send(delta).await.is_err() {
break;
}
}
});
(Some(bridge_tx), Some(task))
}
fn token_usage_from_terminal(terminal: &UiRuntimeTerminalEvent) -> models::TokenUsage {
let usage = match terminal {
UiRuntimeTerminalEvent::Done { usage, .. }
| UiRuntimeTerminalEvent::Cancelled { usage, .. }
| UiRuntimeTerminalEvent::TimedOut { usage, .. } => usage.as_ref(),
UiRuntimeTerminalEvent::Error(_) => None,
};
usage.map(token_usage_from_gateway_usage).unwrap_or_default()
}
fn comparable_terminal_from_runtime_terminal(
terminal: UiRuntimeTerminalEvent,
) -> SessionProcessorComparableTerminal {
match terminal {
UiRuntimeTerminalEvent::Done { finish_reason, message_id, parent_message_id, .. } => {
SessionProcessorComparableTerminal::Done {
finish_reason,
message_id,
parent_message_id,
}
}
UiRuntimeTerminalEvent::Cancelled { reason, message_id, parent_message_id, .. } => {
SessionProcessorComparableTerminal::Cancelled { reason, message_id, parent_message_id }
}
UiRuntimeTerminalEvent::TimedOut { message, message_id, parent_message_id, .. } => {
SessionProcessorComparableTerminal::TimedOut { message, message_id, parent_message_id }
}
UiRuntimeTerminalEvent::Error(message) => {
SessionProcessorComparableTerminal::Error(message)
}
}
}
fn token_usage_from_gateway_usage(usage: &GatewayChatUsage) -> models::TokenUsage {
models::TokenUsage {
input_tokens: usage.input_tokens,
output_tokens: usage.output_tokens,
cached_tokens: usage.cached_tokens,
reasoning_tokens: usage.reasoning_tokens,
}
}
fn gateway_message_from_history(message: &models::ChatMessage) -> Value {
gateway_message_value(gateway_role(message.role), message.content.as_str())
}
fn gateway_message_value(role: &str, content: &str) -> Value {
serde_json::json!({
"role": role,
"content": content,
})
}
fn gateway_role(role: models::ChatRole) -> &'static str {
match role {
models::ChatRole::User => "user",
models::ChatRole::Assistant => "assistant",
models::ChatRole::System => "system",
models::ChatRole::Tool => "tool",
}
}
fn gateway_request_options(options: &Value) -> Option<Value> {
if options.is_null() { None } else { Some(options.clone()) }
}