use super::stream_adapter::{UiRuntimeEvent, UiRuntimeTerminalEvent, adapt_gateway_stream_event};
use crate::app::agent::config::Config;
use std::fmt::Write as _;
use std::future::Future;
#[cfg(not(target_arch = "wasm32"))]
use std::io::{Read, Write};
#[cfg(not(target_arch = "wasm32"))]
use std::net::{TcpStream, ToSocketAddrs};
use std::path::{Path, PathBuf};
#[cfg(not(target_arch = "wasm32"))]
use std::process::{Command, Stdio};
#[cfg(not(target_arch = "wasm32"))]
use std::time::{Duration, Instant};
use vw_gateway_client::vw_api_types::id::SessionId;
use vw_gateway_client::{GatewayAuth, GatewayChatStreamRequest, GatewayClient, GatewayEndpoint};
#[cfg(not(target_arch = "wasm32"))]
const GATEWAY_HEALTH_PATH: &str = "/v1/health";
#[cfg(not(target_arch = "wasm32"))]
const GATEWAY_HEALTH_CONNECT_TIMEOUT: Duration = Duration::from_millis(250);
#[cfg(not(target_arch = "wasm32"))]
const GATEWAY_STARTUP_TIMEOUT: Duration = Duration::from_secs(8);
#[cfg(not(target_arch = "wasm32"))]
const GATEWAY_STARTUP_POLL_INTERVAL: Duration = Duration::from_millis(150);
#[cfg(not(target_arch = "wasm32"))]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum GatewayPreflightOutcome {
Ready,
Started,
}
#[cfg(not(target_arch = "wasm32"))]
impl GatewayPreflightOutcome {
pub(crate) fn started_gateway(self) -> bool {
matches!(self, Self::Started)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct GatewayClientBootstrapConfig {
pub(crate) host: String,
pub(crate) port: u16,
pub(crate) username: Option<String>,
pub(crate) password: Option<String>,
pub(crate) skey: Option<String>,
}
impl Default for GatewayClientBootstrapConfig {
fn default() -> Self {
Self {
host: "127.0.0.1".to_string(),
port: 42617,
username: None,
password: None,
skey: None,
}
}
}
impl GatewayClientBootstrapConfig {
pub(crate) fn load(config: &Config) -> Self {
let mut bootstrap = Self::default();
let contents = match std::fs::read_to_string(&config.config_path) {
Ok(contents) => contents,
Err(err) => {
tracing::warn!(
path = %config.config_path.display(),
error = %err,
"failed to read cli gateway bootstrap config, using defaults"
);
return bootstrap;
}
};
let root = match serde_json::from_str::<serde_json::Value>(&contents) {
Ok(root) => root,
Err(err) => {
tracing::warn!(
path = %config.config_path.display(),
error = %err,
"failed to parse cli gateway bootstrap config, using defaults"
);
return bootstrap;
}
};
bootstrap.apply_json_root(&root);
bootstrap
}
pub(crate) fn endpoint(&self) -> GatewayEndpoint {
let mut endpoint = GatewayEndpoint::new(self.host.clone(), self.port);
if let Some(auth) = self.auth() {
endpoint = endpoint.with_auth(auth);
}
endpoint
}
fn apply_json_root(&mut self, root: &serde_json::Value) {
let Some(gateway) = root.get("gateway_client").and_then(serde_json::Value::as_object)
else {
return;
};
if let Some(host) = gateway.get("host").and_then(serde_json::Value::as_str) {
let host = host.trim();
if !host.is_empty() {
self.host = host.to_string();
}
}
if let Some(port) = gateway.get("port").and_then(serde_json::Value::as_u64)
&& let Ok(port) = u16::try_from(port)
&& port != 0
{
self.port = port;
}
self.username = gateway_optional_string(gateway.get("username"));
self.password = gateway_optional_string(gateway.get("password"));
self.skey = gateway_optional_string(gateway.get("skey"));
}
fn auth(&self) -> Option<GatewayAuth> {
let auth = GatewayAuth {
bearer_token: None,
username: self.username.clone(),
password: self.password.clone(),
skey: self.skey.clone(),
};
if auth.bearer_token.is_none()
&& auth.username.is_none()
&& auth.password.is_none()
&& auth.skey.is_none()
{
None
} else {
Some(auth)
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct GatewaySessionSeed {
id: Option<String>,
directory: PathBuf,
scope: Option<String>,
title: Option<String>,
}
impl GatewaySessionSeed {
pub(crate) fn new(directory: PathBuf) -> Self {
Self { id: None, directory, scope: None, title: None }
}
pub(crate) fn id(&self) -> Option<&str> {
self.id.as_deref()
}
pub(crate) fn directory(&self) -> &Path {
&self.directory
}
pub(crate) fn scope(&self) -> Option<&str> {
self.scope.as_deref()
}
pub(crate) fn title(&self) -> Option<&str> {
self.title.as_deref()
}
pub(crate) fn with_id(mut self, id: Option<String>) -> Self {
self.id = normalize_optional_string(id);
self
}
pub(crate) fn with_scope(mut self, scope: Option<String>) -> Self {
self.scope = normalize_optional_string(scope);
self
}
pub(crate) fn with_title(mut self, title: Option<String>) -> Self {
self.title = normalize_optional_string(title);
self
}
}
#[derive(Debug, Clone)]
pub(crate) struct GatewayUiRuntime {
client: GatewayClient,
session: GatewaySessionSeed,
}
impl GatewayUiRuntime {
pub(crate) fn new(client: GatewayClient, session: GatewaySessionSeed) -> Self {
Self { client, session }
}
pub(crate) fn from_config(
config: &Config,
session: GatewaySessionSeed,
) -> Result<Self, String> {
let client = gateway_client(config)?;
Ok(Self::new(client, session))
}
pub(crate) fn for_workspace(config: &Config) -> Result<Self, String> {
let request_root = std::env::current_dir().unwrap_or_else(|_| config.workspace_dir.clone());
Self::from_config(config, GatewaySessionSeed::new(request_root))
}
pub(crate) fn client(&self) -> &GatewayClient {
&self.client
}
pub(crate) fn endpoint(&self) -> &GatewayEndpoint {
self.client.endpoint()
}
pub(crate) fn session(&self) -> &GatewaySessionSeed {
&self.session
}
pub(crate) fn directory(&self) -> &Path {
self.session.directory()
}
pub(crate) fn scope(&self) -> Option<&str> {
self.session.scope()
}
pub(crate) fn title(&self) -> Option<&str> {
self.session.title()
}
pub(crate) fn session_id(&self) -> Option<&str> {
self.session.id()
}
pub(crate) fn bind_session_seed(
&mut self,
session_id: Option<String>,
scope: Option<String>,
title: Option<String>,
) {
self.session = self.session.clone().with_id(session_id).with_scope(scope).with_title(title);
}
pub(crate) fn prepare_stream_request(
&self,
body: &GatewayChatStreamRequest,
) -> GatewayChatStreamRequest {
let mut body = body.clone();
if body.session_id.is_none()
&& let Some(session_id) = self.session_id()
{
body.session_id = Some(SessionId::from(session_id));
}
body
}
pub(crate) fn resolve_session_id<'a>(
&'a self,
session_id: Option<&'a str>,
) -> Result<&'a str, String> {
normalize_optional_str_ref(session_id)
.or_else(|| self.session_id())
.ok_or_else(|| "gateway runtime session id is required".to_string())
}
pub(crate) fn directory_value(&self) -> Option<String> {
runtime_directory_value(self.directory())
}
pub(crate) async fn stream_chat(
&self,
body: &GatewayChatStreamRequest,
mut on_event: impl FnMut(UiRuntimeEvent) -> bool,
) -> UiRuntimeTerminalEvent {
let directory = self.directory_value();
let body = self.prepare_stream_request(body);
let mut terminal = None;
let result =
GatewayClient::stream_chat(self.endpoint(), directory.as_deref(), &body, |event| {
let runtime_event = adapt_gateway_stream_event(event);
if let UiRuntimeEvent::Terminal(runtime_terminal) = &runtime_event {
terminal = Some(runtime_terminal.clone());
}
if on_event(runtime_event) {
true
} else {
if terminal.is_none() {
terminal = Some(cancelled_by_consumer_terminal());
}
false
}
})
.await
.map_err(|err| annotate_gateway_transport_error(err, self.endpoint()));
finalize_stream_terminal(result, terminal)
}
#[cfg(not(target_arch = "wasm32"))]
pub(crate) fn stream_chat_blocking(
&self,
body: &GatewayChatStreamRequest,
mut on_event: impl FnMut(UiRuntimeEvent) -> bool,
) -> UiRuntimeTerminalEvent {
let directory = self.directory_value();
let body = self.prepare_stream_request(body);
let mut terminal = None;
let result = GatewayClient::stream_chat_blocking(
self.endpoint(),
directory.as_deref(),
&body,
|event| {
let runtime_event = adapt_gateway_stream_event(event);
if let UiRuntimeEvent::Terminal(runtime_terminal) = &runtime_event {
terminal = Some(runtime_terminal.clone());
}
if on_event(runtime_event) {
true
} else {
if terminal.is_none() {
terminal = Some(cancelled_by_consumer_terminal());
}
false
}
},
)
.map_err(|err| annotate_gateway_transport_error(err, self.endpoint()));
finalize_stream_terminal(result, terminal)
}
#[cfg(not(target_arch = "wasm32"))]
pub(crate) fn ensure_local_gateway_ready_blocking(
&self,
) -> Result<GatewayPreflightOutcome, String> {
if gateway_health_ready(self.endpoint()) {
return Ok(GatewayPreflightOutcome::Ready);
}
if !is_local_loopback_endpoint(self.endpoint()) {
return Err(annotate_gateway_transport_error(
"gateway preflight failed".to_string(),
self.endpoint(),
));
}
let startup_error = start_local_gateway_process(self.endpoint()).err();
if wait_for_gateway_health(self.endpoint(), GATEWAY_STARTUP_TIMEOUT) {
return Ok(GatewayPreflightOutcome::Started);
}
let mut message = annotate_gateway_transport_error(
"gateway preflight failed".to_string(),
self.endpoint(),
);
if let Some(startup_error) = startup_error {
message.push_str(" Auto-start failed: ");
message.push_str(startup_error.as_str());
} else {
let _ = write!(
message,
" Auto-start did not reach {} with status=ok within {}s.",
GATEWAY_HEALTH_PATH,
GATEWAY_STARTUP_TIMEOUT.as_secs()
);
}
Err(message)
}
}
#[cfg(not(target_arch = "wasm32"))]
pub(crate) fn block_on_gateway<F, T>(future: F) -> Result<T, String>
where
F: Future<Output = Result<T, String>>,
{
match tokio::runtime::Handle::try_current() {
Ok(handle) => tokio::task::block_in_place(|| handle.block_on(future)),
Err(_) => {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.map_err(|err| err.to_string())?;
runtime.block_on(future)
}
}
}
pub(crate) fn gateway_client_endpoint(config: &Config) -> GatewayEndpoint {
GatewayClientBootstrapConfig::load(config).endpoint()
}
pub(crate) fn gateway_client(config: &Config) -> Result<GatewayClient, String> {
GatewayClient::new(gateway_client_endpoint(config))
}
pub(crate) fn annotate_gateway_transport_error(
error: String,
endpoint: &GatewayEndpoint,
) -> String {
let message = normalize_optional_string(Some(error))
.unwrap_or_else(|| "gateway request failed".to_string());
if !looks_like_gateway_transport_error(message.as_str()) {
return message;
}
format!(
"{message}. Gateway endpoint {} is unavailable. Start it with `vibewindow gateway --host {} --port {}` and retry.",
endpoint.describe(),
endpoint.normalized_host(),
endpoint.port
)
}
fn gateway_optional_string(value: Option<&serde_json::Value>) -> Option<String> {
value
.and_then(serde_json::Value::as_str)
.map(str::trim)
.filter(|value| !value.is_empty())
.map(ToString::to_string)
}
fn normalize_optional_string(value: Option<String>) -> Option<String> {
value.map(|value| value.trim().to_string()).filter(|value| !value.is_empty())
}
fn looks_like_gateway_transport_error(message: &str) -> bool {
let normalized = message.trim().to_ascii_lowercase();
normalized.contains("error sending request")
|| normalized.contains("connection refused")
|| normalized.contains("tcp connect error")
|| normalized.contains("dns error")
|| normalized.contains("couldn't connect to server")
}
pub(crate) fn normalize_optional_str_ref(value: Option<&str>) -> Option<&str> {
value.map(str::trim).filter(|value| !value.is_empty())
}
fn runtime_directory_value(directory: &Path) -> Option<String> {
let directory = directory.to_string_lossy();
let directory = directory.trim();
if directory.is_empty() { None } else { Some(directory.to_string()) }
}
fn cancelled_by_consumer_terminal() -> UiRuntimeTerminalEvent {
UiRuntimeTerminalEvent::Cancelled {
reason: Some("cancel requested; stream stopped after the next runtime event".to_string()),
usage: None,
message_id: None,
parent_message_id: None,
}
}
fn finalize_stream_terminal(
result: Result<(), String>,
terminal: Option<UiRuntimeTerminalEvent>,
) -> UiRuntimeTerminalEvent {
if let Some(terminal) = terminal {
return terminal;
}
match result {
Ok(()) => {
UiRuntimeTerminalEvent::Error("gateway stream closed before terminal event".to_string())
}
Err(err) => UiRuntimeTerminalEvent::from_error_message(err),
}
}
#[cfg(not(target_arch = "wasm32"))]
fn is_local_loopback_endpoint(endpoint: &GatewayEndpoint) -> bool {
matches!(
endpoint
.normalized_host()
.trim()
.trim_matches(|ch| ch == '[' || ch == ']')
.to_ascii_lowercase()
.as_str(),
"127.0.0.1" | "localhost" | "::1"
)
}
#[cfg(not(target_arch = "wasm32"))]
fn start_local_gateway_process(endpoint: &GatewayEndpoint) -> Result<(), String> {
let executable = std::env::current_exe()
.map_err(|err| format!("resolve current executable failed: {err}"))?;
Command::new(executable)
.arg("gateway")
.arg("--host")
.arg(endpoint.normalized_host())
.arg("--port")
.arg(endpoint.port.to_string())
.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::null())
.spawn()
.map(|_| ())
.map_err(|err| format!("failed to start local gateway: {err}"))
}
#[cfg(not(target_arch = "wasm32"))]
fn wait_for_gateway_health(endpoint: &GatewayEndpoint, timeout: Duration) -> bool {
let started = Instant::now();
while started.elapsed() < timeout {
if gateway_health_ready(endpoint) {
return true;
}
std::thread::sleep(GATEWAY_STARTUP_POLL_INTERVAL);
}
false
}
#[cfg(not(target_arch = "wasm32"))]
fn gateway_health_ready(endpoint: &GatewayEndpoint) -> bool {
let address = match resolve_gateway_socket_address(endpoint) {
Some(address) => address,
None => return false,
};
let mut stream = match TcpStream::connect_timeout(&address, GATEWAY_HEALTH_CONNECT_TIMEOUT) {
Ok(stream) => stream,
Err(_) => return false,
};
let _ = stream.set_read_timeout(Some(GATEWAY_HEALTH_CONNECT_TIMEOUT));
let _ = stream.set_write_timeout(Some(GATEWAY_HEALTH_CONNECT_TIMEOUT));
let request = format!(
"GET {} HTTP/1.1\r\nHost: {}\r\nConnection: close\r\n\r\n",
GATEWAY_HEALTH_PATH,
endpoint.describe()
);
if stream.write_all(request.as_bytes()).is_err() {
return false;
}
let mut response = String::new();
if stream.read_to_string(&mut response).is_err() {
return false;
}
let Some(status_line) = response.lines().next() else {
return false;
};
status_line.contains(" 200 ")
&& (response.contains("\"status\":\"ok\"") || response.contains("\"status\": \"ok\""))
}
#[cfg(not(target_arch = "wasm32"))]
fn resolve_gateway_socket_address(endpoint: &GatewayEndpoint) -> Option<std::net::SocketAddr> {
format!("{}:{}", endpoint.normalized_host(), endpoint.port)
.to_socket_addrs()
.ok()?
.find(|addr| addr.is_ipv4() || addr.is_ipv6())
}
#[cfg(test)]
#[path = "gateway_tests.rs"]
mod gateway_tests;