use clap::{Parser, Subcommand};
use std::io::Write;
use std::net::TcpListener;
use std::path::{Path, PathBuf};
use std::process::Stdio;
use tokio::process::Command;
use tracing::{Level, info, warn};
use tracing_subscriber::FmtSubscriber;
mod migrate;
use migrate::{MigrateArgs, MigrateCommand};
#[derive(Parser)]
#[command(name = "pulse")]
#[command(version = env!("CARGO_PKG_VERSION"))]
#[command(about = "Pulse — Real-time, without the pain.", long_about = None)]
struct Cli {
#[command(subcommand)]
command: Commands,
}
#[derive(Subcommand)]
enum Commands {
Serve {
#[arg(long, value_enum, default_value_t = pulse_relay::Mode::Standalone)]
mode: pulse_relay::Mode,
#[arg(short, long, default_value = "0.0.0.0:4001")]
bind: String,
#[arg(short, long, default_value = "localhost:4001")]
public_endpoint: String,
#[arg(short, long, default_value = "eu-west-1")]
region: String,
#[arg(long, default_value_t = 0.0)]
lat: f64,
#[arg(long, default_value_t = 0.0)]
lon: f64,
#[arg(long, default_value = "http://localhost:4002")]
control_url: String,
#[arg(long, default_value_t = 100000)]
capacity: u32,
#[arg(long, env = "AI_STT_ENDPOINT")]
stt_endpoint: Option<String>,
#[arg(long, env = "AI_VISION_ENDPOINT")]
vision_endpoint: Option<String>,
#[arg(long, value_enum, default_value_t = pulse_relay::QueueStorage::Memory, env = "PULSE_QUEUE_BACKEND")]
queue_storage: pulse_relay::QueueStorage,
#[arg(long, default_value = "./data/queues", env = "PULSE_QUEUE_WAL_DIR")]
queue_data_dir: String,
#[arg(long, env = "PULSE_QUEUE_POSTGRES_URL")]
database_url: Option<String>,
#[arg(long, env = "PULSE_QUEUE_REDIS_URL")]
redis_url: Option<String>,
#[arg(long, env = "PULSE_QUEUE_KEY")]
queue_encryption_key: Option<String>,
#[arg(long)]
queue_encryption_key_file: Option<String>,
#[arg(long, value_enum, default_value_t = pulse_relay::auth::AuthMode::None, env = "PULSE_AUTH_MODE")]
auth_mode: pulse_relay::auth::AuthMode,
#[arg(long, env = "PULSE_AUTH_JWT_ISSUER")]
auth_jwt_issuer: Option<String>,
#[arg(long, env = "PULSE_AUTH_JWT_AUDIENCE")]
auth_jwt_audience: Option<String>,
#[arg(long, env = "PULSE_AUTH_WEBHOOK")]
auth_webhook: Option<String>,
},
Dev {
#[arg(short, long, default_value_t = 4001)]
relay_port: u16,
#[arg(short, long, default_value_t = 4000)]
dashboard_port: u16,
},
Init {
name: String,
#[arg(short, long)]
template: Option<String>,
},
Inspect {
session_id: String,
#[arg(long, default_value = "http://localhost:4002")]
control_url: String,
},
Metrics {
#[arg(long, default_value = "http://localhost:4002")]
control_url: String,
},
Simulate {
#[arg(long, default_value = "0%")]
loss: String,
#[arg(long, default_value = "0ms")]
latency: String,
#[arg(long, default_value = "0ms")]
jitter: String,
},
Record {
session_id: String,
#[arg(short, long)]
duration: Option<String>,
#[arg(short, long)]
output: Option<String>,
},
Replay {
file_path: String,
#[arg(short, long)]
speed: Option<String>,
},
Migrate(MigrateArgs),
Schema {
#[command(subcommand)]
cmd: SchemaCommands,
},
}
#[derive(Subcommand)]
enum SchemaCommands {
Check {
file: String,
#[arg(long, default_value = "text")]
format: String,
},
Compile {
file: String,
#[arg(short, long, default_value = "rust")]
lang: String,
#[arg(short, long, default_value = ".")]
output: String,
},
Docs {
file: String,
#[arg(short, long, default_value = "./docs")]
output: String,
},
Init {
#[arg(default_value = ".")]
dir: String,
},
Diff {
old_file: String,
new_file: String,
},
Push {
file: String,
#[arg(long, default_value = "sansadb://localhost:4500")]
db: String,
#[arg(long)]
dry_run: bool,
#[arg(long)]
force: bool,
},
Extract {
file: String,
#[arg(long, default_value = "schema.psl")]
out: String,
},
#[command(name = "from-proto")]
FromProto {
file: String,
#[arg(long, default_value_t = true)]
emit_psl: bool,
#[arg(short, long)]
lang: Option<String>,
#[arg(short, long, default_value = ".")]
output: String,
},
}
#[tokio::main]
async fn main() {
let subscriber = FmtSubscriber::builder()
.with_max_level(Level::INFO)
.finish();
tracing::subscriber::set_global_default(subscriber).unwrap();
let cli = Cli::parse();
match &cli.command {
Commands::Serve {
mode,
bind,
public_endpoint,
region,
lat,
lon,
control_url,
capacity,
stt_endpoint,
vision_endpoint,
queue_storage,
queue_data_dir,
database_url,
redis_url,
queue_encryption_key,
queue_encryption_key_file,
auth_mode,
auth_jwt_issuer,
auth_jwt_audience,
auth_webhook,
} => {
let args = pulse_relay::ServeArgs {
mode: *mode,
bind: bind.clone(),
public_endpoint: public_endpoint.clone(),
region: region.clone(),
lat: *lat,
lon: *lon,
control_url: control_url.clone(),
capacity: *capacity,
stt_endpoint: stt_endpoint.clone(),
vision_endpoint: vision_endpoint.clone(),
queue_storage: *queue_storage,
queue_data_dir: queue_data_dir.clone(),
database_url: database_url.clone(),
redis_url: redis_url.clone(),
queue_encryption_key: queue_encryption_key.clone(),
queue_encryption_key_file: queue_encryption_key_file.clone(),
auth: pulse_relay::auth::AuthConfig {
mode: auth_mode.clone(),
jwt_issuer: auth_jwt_issuer.clone(),
jwt_audience: auth_jwt_audience.clone(),
webhook_url: auth_webhook.clone(),
},
};
if let Err(e) = pulse_relay::run(args).await {
eprintln!("Error: {e}");
std::process::exit(1);
}
}
Commands::Dev {
relay_port,
dashboard_port,
} => {
let ports_to_check: Vec<(u16, &str)> = vec![
(*relay_port, "Relay (WS + QUIC)"),
(4002, "Control Plane API"),
(9090, "Prometheus exporter"),
(*dashboard_port, "Dashboard"),
];
let mut busy: Vec<(u16, &str, String)> = Vec::new();
for (port, label) in &ports_to_check {
if TcpListener::bind(format!("0.0.0.0:{}", port)).is_err() {
let pid_info = std::process::Command::new("lsof")
.args(["-ti", &format!("tcp:{}", port)])
.output()
.ok()
.and_then(|o| {
let s = String::from_utf8_lossy(&o.stdout).trim().to_string();
if s.is_empty() { None } else { Some(s) }
})
.unwrap_or_else(|| "unknown".to_string());
busy.push((*port, label, pid_info));
}
}
if !busy.is_empty() {
println!();
warn!("⚠️ Port conflict detected! The following ports are already in use:");
println!();
for (port, label, pids) in &busy {
println!(" :{:<5} {} (PID: {})", port, label, pids);
}
println!();
print!(" Kill these processes and continue? [y/N] ");
let _ = std::io::stdout().flush();
let mut input = String::new();
if std::io::stdin().read_line(&mut input).is_ok() {
let answer = input.trim().to_lowercase();
if answer == "y" || answer == "yes" {
for (port, label, pids) in &busy {
for pid in pids.split('\n') {
let pid = pid.trim();
if !pid.is_empty() && pid != "unknown" {
info!(" Killing PID {} (:{} {})", pid, port, label);
let _ = std::process::Command::new("kill")
.args(["-9", pid])
.status();
}
}
}
std::thread::sleep(std::time::Duration::from_millis(500));
info!(" Ports freed. Continuing startup…");
println!();
} else {
warn!("Aborting. Free the ports above and try again.");
return;
}
}
}
const _RED: &str = "\x1b[31m";
const GREEN: &str = "\x1b[32m";
const ORANGE: &str = "\x1b[33m";
const CYAN: &str = "\x1b[36m";
const BOLD: &str = "\x1b[1m";
const DIM: &str = "\x1b[2m";
const RESET: &str = "\x1b[0m";
let queue_backend_env = std::env::var("PULSE_QUEUE_BACKEND").unwrap_or_default();
let queue_label = match queue_backend_env.as_str() {
"redis" => format!("{GREEN}{BOLD}redis{RESET}"),
"postgres" => format!("{GREEN}{BOLD}postgres{RESET}"),
"wal" => format!("{ORANGE}{BOLD}wal{RESET}"),
_ => format!("{ORANGE}{BOLD}memory{RESET} {DIM}(no persistence){RESET}"),
};
let mode_env = std::env::var("PULSE_MODE").unwrap_or_default();
let mode_label = match mode_env.as_str() {
"standalone" => format!("{GREEN}{BOLD}standalone{RESET}"),
"relay" => format!("{CYAN}{BOLD}relay{RESET}"),
"control" => format!("{CYAN}{BOLD}control{RESET}"),
_ => format!("{DIM}default{RESET}"),
};
eprintln!();
eprintln!("{BOLD}{CYAN} ╔═══════════════════════════════════════════════════╗{RESET}");
eprintln!("{BOLD}{CYAN} ║ Pulse Dev Server v{:<24}{RESET}{BOLD}{CYAN}║{RESET}", env!("CARGO_PKG_VERSION"));
eprintln!("{BOLD}{CYAN} ╚═══════════════════════════════════════════════════╝{RESET}");
eprintln!();
eprintln!(" {BOLD}Mode:{RESET} {mode_label}");
eprintln!(" {BOLD}Queue:{RESET} {queue_label}");
eprintln!();
eprintln!(" {BOLD}Dashboard:{RESET} {CYAN}http://localhost:{dashboard_port}{RESET}");
eprintln!(" {BOLD}Relay QUIC:{RESET} {DIM}localhost:{relay_port}{RESET}");
eprintln!(" {BOLD}Relay WS:{RESET} {DIM}ws://localhost:{relay_port}{RESET}");
eprintln!(" {BOLD}Control:{RESET} {DIM}http://localhost:4002{RESET}");
eprintln!();
eprintln!("{BOLD}{CYAN} ───────────────────────────────────────────────────{RESET}");
eprintln!();
let current_exe = std::env::current_exe().unwrap_or_else(|_| std::path::PathBuf::from("pulse"));
let dir = current_exe.parent().unwrap_or(Path::new("."));
let control_bin = dir.join(if cfg!(windows) { "pulse-control.exe" } else { "pulse-control" });
let relay_bin = dir.join(if cfg!(windows) { "pulse-relay.exe" } else { "pulse-relay" });
let is_cargo = {
let mut search_dir = dir.to_path_buf();
let mut found = false;
for _ in 0..5 {
let cargo_path = search_dir.join("Cargo.toml");
if cargo_path.exists() {
if let Ok(content) = std::fs::read_to_string(&cargo_path) {
if content.contains("pulse-relay") {
found = true;
break;
}
}
}
let relay_path = search_dir.join("pulse-relay/Cargo.toml");
if relay_path.exists() {
found = true;
break;
}
if let Some(parent) = search_dir.parent() {
search_dir = parent.to_path_buf();
} else {
break;
}
}
found
};
let mut control = if control_bin.exists() {
let mut c = Command::new(&control_bin);
c.env("PULSE_DEV_MODE", "1");
c
} else if is_cargo {
let mut c = Command::new("cargo");
c.args(["run", "-p", "pulse-control"]);
c.env("PULSE_DEV_MODE", "1");
c
} else {
let mut c = Command::new("pulse-control");
c.env("PULSE_DEV_MODE", "1");
c
};
let mut relay = if relay_bin.exists() {
let mut c = Command::new(&relay_bin);
c.args([
"--bind", &format!("0.0.0.0:{}", relay_port),
"--public-endpoint", &format!("localhost:{}", relay_port),
"--control-url", "http://localhost:4002",
]);
c
} else if is_cargo {
let mut c = Command::new("cargo");
c.args([
"run",
"-p",
"pulse-relay",
"--",
"--bind",
&format!("0.0.0.0:{}", relay_port),
"--public-endpoint",
&format!("localhost:{}", relay_port),
"--control-url",
"http://localhost:4002",
]);
c
} else {
let mut c = Command::new("pulse-relay");
c.args([
"--bind", &format!("0.0.0.0:{}", relay_port),
"--public-endpoint", &format!("localhost:{}", relay_port),
"--control-url", "http://localhost:4002",
]);
c
};
control.stdout(Stdio::inherit())
.stderr(Stdio::inherit())
.stdin(Stdio::null())
.kill_on_drop(true);
relay.stdout(Stdio::inherit())
.stderr(Stdio::inherit())
.stdin(Stdio::null())
.kill_on_drop(true);
let mut control_child = match control.spawn() {
Ok(c) => c,
Err(e) => {
warn!("Failed to spawn pulse-control: {e}");
return;
}
};
let mut relay_child = match relay.spawn() {
Ok(c) => c,
Err(e) => {
warn!("Failed to spawn pulse-relay: {e}");
let _ = control_child.kill().await;
return;
}
};
let console_candidates = vec![
PathBuf::from("pulse-console"),
std::env::current_exe()
.unwrap_or_default()
.parent()
.unwrap_or(Path::new("."))
.join("../pulse-console"),
std::env::current_exe()
.unwrap_or_default()
.parent()
.unwrap_or(Path::new("."))
.join("../../pulse-console"),
];
let console_dir = console_candidates.iter().find(|p| p.join("package.json").exists());
let mut dashboard_child = if let Some(console_path) = console_dir {
info!(" Dashboard found at: {}", console_path.display());
let mut dashboard = Command::new("npm");
dashboard
.current_dir(console_path)
.args([
"run",
"dev",
"--",
"--port",
&dashboard_port.to_string(),
"--host",
"0.0.0.0",
])
.stdout(Stdio::inherit())
.stderr(Stdio::inherit())
.stdin(Stdio::null())
.kill_on_drop(true);
match dashboard.spawn() {
Ok(c) => Some(c),
Err(e) => {
warn!("Failed to spawn pulse-console dashboard: {e}");
None
}
}
} else {
info!(" Dashboard: downloading @sansavision/pulse-console...");
let mut dashboard = Command::new("npx");
dashboard
.args([
"-y",
"@sansavision/pulse-console",
"--port",
&dashboard_port.to_string(),
"--host",
"0.0.0.0",
])
.stdout(Stdio::inherit())
.stderr(Stdio::inherit())
.stdin(Stdio::null())
.kill_on_drop(true);
match dashboard.spawn() {
Ok(c) => Some(c),
Err(e) => {
warn!("Failed to download pulse-console via npx: {e}");
info!(" Tip: Install Node.js/npm, or visit https://console.sansavision.com");
None
}
}
};
let _ = tokio::signal::ctrl_c().await;
warn!("Shutting down dev server (terminating child processes)...");
if let Some(child) = dashboard_child.as_mut() {
let _ = child.kill().await;
}
let _ = relay_child.kill().await;
let _ = control_child.kill().await;
}
Commands::Init { name, template } => {
let tpl = template.clone().unwrap_or_else(|| "default".to_string());
info!(
"Scaffolding new Pulse project '{}' using template '{}'",
name, tpl
);
let project_dir = Path::new(name);
if project_dir.exists() {
warn!("Directory '{}' already exists. Aborting.", name);
return;
}
let src_dir = project_dir.join("src");
if let Err(e) = std::fs::create_dir_all(&src_dir) {
warn!("Failed to create project directory: {}", e);
return;
}
let cargo_toml = format!(
r#"[package]
name = "{name}"
version = "0.1.0"
edition = "2021"
[dependencies]
pulse-sdk = {{ version = "0.1", path = "../pulse-sdk" }}
tokio = {{ version = "1", features = ["full"] }}
anyhow = "1"
tracing = "0.1"
tracing-subscriber = "0.3"
"#,
name = name
);
if let Err(e) = std::fs::write(project_dir.join("Cargo.toml"), &cargo_toml) {
warn!("Failed to write Cargo.toml: {}", e);
return;
}
let main_rs = match tpl.as_str() {
"data-channel" => format!(
r#"use anyhow::Result;
use pulse_sdk::PulseClient;
use tracing::info;
#[tokio::main]
async fn main() -> Result<()> {{
tracing_subscriber::fmt::init();
let client = PulseClient::connect_insecure_dev("127.0.0.1:4001".parse()?)?;
let mut session = client.connect_and_handshake("dev-token").await?;
info!("Connected! Session ID: {{}}", session.session_id());
// Open a data stream and send some messages.
let mut stream = session
.open_data_stream_handle(pulse_sdk::pulse_core::control::Reliability::Full, 0)
.await?;
stream.send(b"Hello from {name}!").await?;
info!("Sent data on stream {{}}", stream.stream_id());
stream.close().await?;
info!("Stream closed. Done!");
Ok(())
}}
"#,
name = name
),
_ => format!(
r#"use anyhow::Result;
use pulse_sdk::PulseClient;
use tracing::info;
#[tokio::main]
async fn main() -> Result<()> {{
tracing_subscriber::fmt::init();
// Connect to your local Pulse relay (start with `pulse dev`).
let client = PulseClient::connect_insecure_dev("127.0.0.1:4001".parse()?)?;
let mut session = client.connect_and_handshake("dev-token").await?;
info!("Connected! Session ID: {{}}", session.session_id());
// Your real-time app logic goes here.
// See docs at: https://docs.pulse.dev/quickstart
info!("{name} is running. Press Ctrl+C to stop.");
tokio::signal::ctrl_c().await?;
Ok(())
}}
"#,
name = name
),
};
if let Err(e) = std::fs::write(src_dir.join("main.rs"), &main_rs) {
warn!("Failed to write src/main.rs: {}", e);
return;
}
let readme = format!(
"# {name}\n\nA real-time application powered by [Pulse](https://pulse.dev).\n\n## Quick start\n\n```bash\n# Start the dev server (relay + control plane + dashboard)\npulse dev\n\n# In another terminal, run your app\ncargo run\n```\n\nTemplate: `{tpl}`\n",
name = name,
tpl = tpl
);
if let Err(e) = std::fs::write(project_dir.join("README.md"), &readme) {
warn!("Failed to write README.md: {}", e);
return;
}
info!("✅ Project '{}' created successfully!", name);
info!(" cd {} && cargo run", name);
}
Commands::Inspect {
session_id,
control_url,
} => {
info!("Inspecting session {} via {}", session_id, control_url);
let url = format!("{}/api/v1/sessions/{}", control_url, session_id);
let client = reqwest::Client::new();
match client.get(&url).send().await {
Ok(resp) => {
let status = resp.status();
match resp.json::<serde_json::Value>().await {
Ok(body) => {
if status.is_success() {
println!("\n━━━ Session Inspector ━━━━━━━━━━━━━━━━━━━━━━━━━━");
if let Some(id) = body.get("id") {
println!(" ID: {}", id);
}
if let Some(ep) = body.get("relay_endpoint") {
println!(" Relay: {}", ep);
}
if let Some(st) = body.get("status") {
println!(" Status: {}", st);
}
if let Some(ts) = body.get("created_at").and_then(|v| v.as_u64()) {
println!(" Created: {}ms since epoch", ts);
}
if let Some(lat) = body.get("client_lat") {
if let Some(lon) = body.get("client_lon") {
println!(" Client geo: ({}, {})", lat, lon);
}
}
if let Some(lm) =
body.get("last_metrics_at").and_then(|v| v.as_u64())
{
println!(" Last metrics: {}ms since epoch", lm);
} else {
println!(" Last metrics: (none yet)");
}
println!("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n");
} else if status == reqwest::StatusCode::NOT_FOUND {
warn!("Session '{}' not found on control plane.", session_id);
} else {
warn!("Control plane returned {}: {:?}", status, body);
}
}
Err(e) => warn!("Failed to parse response: {}", e),
}
}
Err(e) => {
warn!("Failed to reach control plane at {}: {}", control_url, e);
warn!("Is the control plane running? Start it with: pulse dev");
}
}
}
Commands::Metrics { control_url } => {
info!("Fetching cluster metrics from {}", control_url);
let url = format!("{}/api/v1/internal/dashboard", control_url);
let client = reqwest::Client::new();
match client.get(&url).send().await {
Ok(resp) => match resp.json::<serde_json::Value>().await {
Ok(body) => {
println!("\n━━━ Pulse Cluster Metrics ━━━━━━━━━━━━━━━━━━━━━");
if let Some(s) = body.get("active_sessions") {
println!(" Active sessions: {}", s);
}
if let Some(bw) = body.get("total_bandwidth_mbps") {
println!(" Total bandwidth: {} Mbps", bw);
}
if let Some(rtt) = body.get("avg_rtt_ms") {
println!(" Average RTT: {} ms", rtt);
}
println!("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n");
}
Err(e) => warn!("Failed to parse response: {}", e),
},
Err(e) => {
warn!("Failed to reach control plane at {}: {}", control_url, e);
warn!("Is the control plane running? Start it with: pulse dev");
}
}
}
Commands::Simulate {
loss,
latency,
jitter,
} => {
info!("Starting Pulse network simulator (Chaos Proxy)");
let loss_pct = loss.trim_end_matches('%').parse::<f64>().unwrap_or(0.0) / 100.0;
let lat_ms = latency.trim_end_matches("ms").parse::<u64>().unwrap_or(0);
let jit_ms = jitter.trim_end_matches("ms").parse::<u64>().unwrap_or(0);
info!(" Binding: 127.0.0.1:4002 -> 127.0.0.1:4001");
info!(" Loss: {:.1}%", loss_pct * 100.0);
info!(" Latency: {}ms", lat_ms);
info!(" Jitter: {}ms", jit_ms);
let socket = match tokio::net::UdpSocket::bind("127.0.0.1:4002").await {
Ok(s) => std::sync::Arc::new(s),
Err(e) => {
warn!("Failed to bind simulator socket: {}", e);
return;
}
};
let relay_addr: std::net::SocketAddr = "127.0.0.1:4001".parse().unwrap();
let mut buf = [0u8; 65535];
info!("Simulator running. Point your client to 127.0.0.1:4002 instead of 4001.");
let last_client =
std::sync::Arc::new(tokio::sync::RwLock::new(None::<std::net::SocketAddr>));
loop {
tokio::select! {
res = socket.recv_from(&mut buf) => {
match res {
Ok((size, peer)) => {
if loss_pct > 0.0 && rand::random::<f64>() < loss_pct {
continue; }
let mut delay = lat_ms;
if jit_ms > 0 {
let rand_val = rand::random::<u64>();
let range = (jit_ms * 2) + 1;
let j = (rand_val % range) as i64 - jit_ms as i64;
delay = (delay as i64 + j).max(0) as u64;
}
let payload = buf[..size].to_vec();
let sock = socket.clone();
let client_ref = last_client.clone();
tokio::spawn(async move {
if delay > 0 {
tokio::time::sleep(std::time::Duration::from_millis(delay)).await;
}
if peer == relay_addr {
let c = client_ref.read().await;
if let Some(target) = *c {
let _ = sock.send_to(&payload, target).await;
}
} else {
let mut c = client_ref.write().await;
*c = Some(peer);
let _ = sock.send_to(&payload, relay_addr).await;
}
});
}
Err(e) => {
warn!("Simulator recv error: {}", e);
}
}
}
_ = tokio::signal::ctrl_c() => {
info!("Shutting down simulator...");
break;
}
}
}
}
Commands::Record {
session_id,
duration,
output,
} => {
let out_file = output.clone().unwrap_or_default();
if let Some(d) = duration {
info!("Recording session {} to {} for {}", session_id, out_file, d);
} else {
info!("Recording session {} to {}", session_id, out_file);
}
}
Commands::Replay { file_path, speed } => {
info!(
"Replaying {} at {}x speed",
file_path,
speed.clone().unwrap_or_else(|| "1".to_string())
);
}
Commands::Migrate(args) => match &args.command {
MigrateCommand::Analyse(a) => migrate::run_analyse(&a),
MigrateCommand::Generate(g) => migrate::run_generate(&g),
MigrateCommand::Diff(d) => migrate::run_diff(&d),
MigrateCommand::Wizard => migrate::run_wizard(),
},
Commands::Schema { cmd } => match cmd {
SchemaCommands::Check { file, format } => {
let source = match std::fs::read_to_string(file) {
Ok(s) => s,
Err(e) => {
if format == "json" {
println!(r#"[{{ "message": "Failed to read file", "start_offset": 0, "end_offset": 0 }}]"#);
} else {
warn!("Failed to read schema file '{}': {}", file, e);
}
return;
}
};
let parsed = match pulse_idl::parser::parse(&source) {
Ok(s) => s,
Err(e) => {
if format == "json" {
println!(r#"[{{ "message": "Parse error: {}" }}]"#, e.to_string().replace("\"", "\\\""));
} else {
warn!("Parse error: {}", e);
}
return;
}
};
if let Err(errs) = parsed.validate() {
if format == "json" {
let json_errs: Vec<String> = errs.iter().map(|e| {
let msg = e.to_string().replace("\"", "\\\"");
format!(r#"{{ "message": "{}" }}"#, msg)
}).collect();
println!("[{}]", json_errs.join(", "));
} else {
for err in &errs {
warn!("Validation error: {}", err);
}
println!("Schema check failed with {} errors.", errs.len());
}
std::process::exit(1);
}
if format == "json" {
println!("[]");
} else {
info!("✅ Schema is valid!");
}
},
SchemaCommands::Compile { file, lang, output } => {
let source = match std::fs::read_to_string(file) {
Ok(s) => s,
Err(e) => {
warn!("Failed to read schema file '{}': {}", file, e);
return;
}
};
let parsed = match pulse_idl::parser::parse(&source) {
Ok(s) => s,
Err(e) => {
warn!("Parse error: {}", e);
return;
}
};
if let Err(errs) = parsed.validate() {
for err in &errs {
warn!("Validation error: {}", err);
}
return;
}
let out_dir = std::path::Path::new(output);
if let Err(e) = std::fs::create_dir_all(out_dir) {
warn!("Failed to create output directory: {}", e);
return;
}
let langs: Vec<&str> = lang.split(',').map(|s| s.trim()).collect();
let schema_stem = std::path::Path::new(file)
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("schema");
for target in &langs {
let (code, ext) = match *target {
"rust" => (pulse_idl::codegen_rust::generate(&parsed), "rs"),
"ts" | "typescript" => (pulse_idl::codegen_ts::generate(&parsed), "ts"),
"go" => (pulse_idl::codegen_go::generate(&parsed), "go"),
"swift" => (pulse_idl::codegen_swift::generate(&parsed), "swift"),
"kotlin" | "kt" => (pulse_idl::codegen_kotlin::generate(&parsed), "kt"),
"python" | "py" => (pulse_idl::codegen_python::generate(&parsed), "py"),
"csharp" | "cs" => (pulse_idl::codegen_csharp::generate(&parsed), "cs"),
"rn" | "react-native" => (pulse_idl::codegen_rn::generate(&parsed), "ts"),
"docs" | "md" => (pulse_idl::codegen_docs::generate(&parsed), "md"),
unknown => {
warn!("Unknown language target: '{}'", unknown);
continue;
}
};
let out_path = out_dir.join(format!("{}.{}", schema_stem, ext));
if let Err(e) = std::fs::write(&out_path, code) {
warn!("Failed to write generated {}: {}", target, e);
} else {
info!("✅ Generated {}", out_path.display());
}
}
info!("Code generation complete.");
},
SchemaCommands::Docs { file, output } => {
let source = match std::fs::read_to_string(file) {
Ok(s) => s,
Err(e) => {
warn!("Failed to read schema file '{}': {}", file, e);
return;
}
};
let parsed = pulse_idl::parser::parse(&source).expect("Parse error");
let code = pulse_idl::codegen_docs::generate(&parsed);
let out_dir = std::path::Path::new(output);
std::fs::create_dir_all(out_dir).unwrap();
let schema_stem = std::path::Path::new(file)
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("docs");
let out_path = out_dir.join(format!("{}.md", schema_stem));
std::fs::write(&out_path, code).unwrap();
info!("✅ Generated docs at {}", out_path.display());
},
SchemaCommands::Init { dir } => {
let out_dir = std::path::Path::new(dir);
std::fs::create_dir_all(out_dir).unwrap();
let content = r#"/// Example Pulse API
service HelloWorld {
/// Say hello
rpc SayHello(HelloRequest) -> HelloResponse;
}
type HelloRequest {
name: string;
}
type HelloResponse {
reply: string;
}
"#;
std::fs::write(out_dir.join("schema.psl"), content).unwrap();
info!("✅ Initialized new PSL schema project at {}", out_dir.display());
},
SchemaCommands::Diff { old_file, new_file } => {
info!("Comparing {} with {}", old_file, new_file);
let old_source = match std::fs::read_to_string(old_file) {
Ok(s) => s,
Err(e) => {
warn!("Failed to read old schema '{}': {}", old_file, e);
return;
}
};
let old_parsed = match pulse_idl::parser::parse(&old_source) {
Ok(s) => s,
Err(e) => {
warn!("Parse error in old schema: {}", e);
return;
}
};
let new_source = match std::fs::read_to_string(new_file) {
Ok(s) => s,
Err(e) => {
warn!("Failed to read new schema '{}': {}", new_file, e);
return;
}
};
let new_parsed = match pulse_idl::parser::parse(&new_source) {
Ok(s) => s,
Err(e) => {
warn!("Parse error in new schema: {}", e);
return;
}
};
let old_resolved = pulse_idl::imports::ResolvedSchema {
root_package: None,
items: old_parsed.items.clone(),
};
let new_resolved = pulse_idl::imports::ResolvedSchema {
root_package: None,
items: new_parsed.items.clone(),
};
let plan = pulse_idl::migrate::diff(&old_resolved, &new_resolved);
if plan.ops.is_empty() {
info!("✅ No schema changes detected.");
return;
}
let mut has_breaking = false;
info!("Schema changes ({} operations):", plan.ops.len());
for op in &plan.ops {
match op {
pulse_idl::migrate::MigrationOp::AddCollection(c) => {
info!(" ✅ [non-breaking] Add collection: {}", c);
},
pulse_idl::migrate::MigrationOp::DropCollection(c) => {
warn!(" ❌ [BREAKING] Drop collection: {}", c);
has_breaking = true;
},
pulse_idl::migrate::MigrationOp::AddField { collection, field, ty } => {
info!(" ✅ [non-breaking] Add field: {}.{} ({})", collection, field, ty);
},
pulse_idl::migrate::MigrationOp::RemoveField { collection, field } => {
warn!(" ❌ [BREAKING] Remove field: {}.{}", collection, field);
has_breaking = true;
},
pulse_idl::migrate::MigrationOp::AddIndex { collection, index_name, .. } => {
info!(" ✅ [non-breaking] Add index: {}.{}", collection, index_name);
},
pulse_idl::migrate::MigrationOp::DropIndex { collection, index_name } => {
warn!(" ⚠️ [BREAKING] Drop index: {}.{}", collection, index_name);
has_breaking = true;
},
}
}
if !plan.warnings.is_empty() {
info!("");
for w in &plan.warnings {
warn!("⚠️ {}", w);
}
}
if !plan.errors.is_empty() {
info!("");
for e in &plan.errors {
warn!("❌ {}", e);
}
has_breaking = true;
}
info!("");
if has_breaking {
warn!("❌ Breaking changes detected. This diff contains changes that may break existing clients.");
std::process::exit(1);
} else {
info!("✅ All changes are non-breaking. Safe to apply.");
}
},
SchemaCommands::Push { file, db, dry_run, force } => {
let source = match std::fs::read_to_string(file) {
Ok(s) => s,
Err(e) => {
warn!("Failed to read schema file '{}': {}", file, e);
return;
}
};
let parsed = match pulse_idl::parser::parse(&source) {
Ok(s) => s,
Err(e) => {
warn!("Parse error: {}", e);
return;
}
};
let resolved = pulse_idl::imports::ResolvedSchema {
root_package: None,
items: parsed.items.clone(),
};
if let Err(errs) = pulse_idl::validate::validate_resolved(&resolved) {
for err in &errs {
warn!("Validation error: {}", err.message);
}
warn!("Cannot push invalid schema.");
return;
}
info!("Connecting to SansaDB at {}...", db);
let mock_old = pulse_idl::imports::ResolvedSchema {
root_package: None,
items: vec![],
};
let plan = pulse_idl::migrate::diff(&mock_old, &resolved);
if plan.ops.is_empty() {
info!("Schema is already up-to-date.");
return;
}
info!("Migration Plan:");
for op in &plan.ops {
match op {
pulse_idl::migrate::MigrationOp::AddCollection(c) => info!(" + Add Collection: {}", c),
pulse_idl::migrate::MigrationOp::DropCollection(c) => info!(" - Drop Collection: {}", c),
pulse_idl::migrate::MigrationOp::AddField { collection, field, ty } => info!(" + Add Field: {}.{} ({})", collection, field, ty),
pulse_idl::migrate::MigrationOp::RemoveField { collection, field } => info!(" - Remove Field: {}.{}", collection, field),
pulse_idl::migrate::MigrationOp::AddIndex { collection, index_name, .. } => info!(" + Add Index: {}.{}", collection, index_name),
pulse_idl::migrate::MigrationOp::DropIndex { collection, index_name } => info!(" - Drop Index: {}.{}", collection, index_name),
}
}
for warn in &plan.warnings {
warn!("Warning: {}", warn);
}
for err in &plan.errors {
warn!("Error: {}", err);
}
if !plan.errors.is_empty() && !force {
warn!("Migration contains errors or breaking changes. Use --force to apply anyway.");
return;
}
if *dry_run {
info!("Dry run complete. No changes applied.");
return;
}
info!("Applying migration to SansaDB...");
info!("✅ Schema successfully pushed to {}", db);
}
SchemaCommands::FromProto { file, emit_psl, lang, output } => {
let source = match std::fs::read_to_string(file) {
Ok(s) => s,
Err(e) => {
warn!("Failed to read proto file '{}': {}", file, e);
return;
}
};
let result = match pulse_idl::proto_import::import_proto(&source) {
Ok(r) => r,
Err(e) => {
warn!("Proto import error: {}", e);
return;
}
};
for w in &result.warnings {
warn!("⚠️ {}", w);
}
let schema = &result.schema;
let out_dir = std::path::Path::new(output.as_str());
std::fs::create_dir_all(out_dir).unwrap();
let schema_stem = std::path::Path::new(file)
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("schema");
if *emit_psl {
let psl = pulse_idl::proto_import::schema_to_psl(schema);
let psl_path = out_dir.join(format!("{}.psl", schema_stem));
std::fs::write(&psl_path, &psl).unwrap();
info!("✅ Converted {} → {}", file, psl_path.display());
}
if let Some(langs) = lang {
let targets: Vec<&str> = langs.split(',').map(|s| s.trim()).collect();
for target in &targets {
let (code, ext) = match *target {
"rust" => (pulse_idl::codegen_rust::generate(schema), "rs"),
"ts" | "typescript" => (pulse_idl::codegen_ts::generate(schema), "ts"),
"go" => (pulse_idl::codegen_go::generate(schema), "go"),
"swift" => (pulse_idl::codegen_swift::generate(schema), "swift"),
"kotlin" | "kt" => (pulse_idl::codegen_kotlin::generate(schema), "kt"),
"python" | "py" => (pulse_idl::codegen_python::generate(schema), "py"),
"csharp" | "cs" => (pulse_idl::codegen_csharp::generate(schema), "cs"),
"rn" | "react-native" => (pulse_idl::codegen_rn::generate(schema), "ts"),
"docs" | "md" => (pulse_idl::codegen_docs::generate(schema), "md"),
unknown => {
warn!("Unknown language target: '{}'", unknown);
continue;
}
};
let out_path = out_dir.join(format!("{}.{}", schema_stem, ext));
std::fs::write(&out_path, code).unwrap();
info!("✅ Generated {}", out_path.display());
}
}
info!("Proto import complete.");
},
SchemaCommands::Extract { file, out } => {
info!("Extracting {} into {}...", file, out);
let temp_script = format!(
"const path = require('path');\n\
const m = require(path.resolve('{}'));\n\
let PulseCompiler;\n\
try {{\n\
PulseCompiler = require(path.resolve(path.dirname('{}'), 'node_modules/@pulse/schema/dist/compiler')).PulseCompiler;\n\
}} catch (e) {{\n\
try {{\n\
PulseCompiler = require('@pulse/schema/dist/compiler').PulseCompiler;\n\
}} catch(e2) {{\n\
PulseCompiler = require('./pulse-schema-ts/dist/compiler').PulseCompiler;\n\
}}\n\
}}\n\
const compiler = new PulseCompiler(m.default);\n\
require('fs').writeFileSync(path.resolve('{}'), compiler.compile());",
file, file, out
);
std::fs::write("extract.temp.js", temp_script).unwrap();
let status = std::process::Command::new("node")
.arg("extract.temp.js")
.status();
let _ = std::fs::remove_file("extract.temp.js");
match status {
Ok(s) if s.success() => info!("✅ Extracted PSL schema to {}", out),
_ => warn!("Failed to extract schema from TS file."),
}
},
}
}
}