relay-core-cli 0.3.1

Standalone CLI and TUI for relay-core: local proxy operation, rule management, traffic inspection
use crate::args::InterceptAction;
use anyhow::Result;
use relay_core_api::flow::{FlowUpdate, Layer};
use tokio_tungstenite::connect_async;
use futures_util::StreamExt;
use tracing::info;

pub async fn execute(control_url: String, output: String) -> Result<()> {
    let ws_url = if control_url.starts_with("https") {
            control_url.replace("https", "wss") + "/api/flows/ws"
    } else if control_url.starts_with("http") {
        control_url.replace("http", "ws") + "/api/flows/ws"
    } else {
        control_url + "/api/flows/ws"
    };

    info!("Connecting to {}", ws_url);
    let (ws_stream, _) = connect_async(ws_url).await.map_err(|e| anyhow::anyhow!("Failed to connect to control server: {}", e))?;
    let (_, mut read) = ws_stream.split();

    while let Some(msg) = read.next().await {
        let msg = match msg {
            Ok(msg) => msg,
            Err(e) => {
                return Err(anyhow::anyhow!("Failed to read message from control server: {}", e));
            }
        };

        if !msg.is_text() {
            continue;
        }

        let text = match msg.to_text() {
            Ok(text) => text,
            Err(_) => continue,
        };

        if output == "jsonl" {
            println!("{}", text);
            continue;
        }

        let update = match serde_json::from_str::<FlowUpdate>(text) {
            Ok(update) => update,
            Err(_) => continue,
        };

        match update {
            FlowUpdate::Full(flow) => {
                if output == "json" {
                    if let Ok(json) = serde_json::to_string_pretty(&flow) {
                        println!("{}", json);
                    }
                } else {
                    let url = match &flow.layer {
                        Layer::Http(h) => h.request.url.to_string(),
                        Layer::WebSocket(w) => w.handshake_request.url.to_string(),
                        _ => "unknown".to_string(),
                    };
                    let method = match &flow.layer {
                        Layer::Http(h) => h.request.method.clone(),
                        Layer::WebSocket(w) => w.handshake_request.method.clone(),
                        _ => "".to_string(),
                    };
                    info!("[Flow] {} {} {}", flow.id, method, url);
                }
            }
            FlowUpdate::WebSocketMessage { flow_id, message } => {
                if output == "table" {
                    info!("[WS] [{}] {} bytes", flow_id, message.content.size);
                }
            }
            FlowUpdate::HttpBody { flow_id, direction, body } => {
                if output == "table" {
                    info!("[Body] [{}] {:?} {} bytes", flow_id, direction, body.size);
                }
            }
        }
    }

    Ok(())
}


pub async fn execute_intercept(action: InterceptAction, control_url: String) -> Result<()> {
    let client = reqwest::Client::new();
    let url = match action {
        InterceptAction::Pause => format!("{}/api/intercept/pause", control_url),
        InterceptAction::Resume => format!("{}/api/intercept/resume", control_url),
    };
    
    let res = client.post(&url).send().await?;
    if res.status().is_success() {
        let json: serde_json::Value = res.json().await?;
        println!("{}", serde_json::to_string_pretty(&json)?);
    } else {
        eprintln!("Request failed: {}", res.status());
    }
    Ok(())
}