#![deny(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
use std::io::{BufRead, BufReader, Write};
use std::process::{Command, Stdio};
use std::thread;
use std::time::Duration;
use anyhow::{Context, Result, anyhow};
use serde_json::{Value, json};
struct ServerProcess {
child: std::process::Child,
stdin: std::process::ChildStdin,
stdout: BufReader<std::process::ChildStdout>,
stderr: BufReader<std::process::ChildStderr>,
}
impl ServerProcess {
fn spawn() -> Result<Self> {
let mut cmd = Command::new(env!("CARGO_BIN_EXE_catenary"));
cmd.arg("serve");
cmd.arg("--root").arg(".");
cmd.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped());
let mut child = cmd.spawn().context("Failed to spawn server")?;
let stdin = child.stdin.take().context("Failed to get stdin")?;
let stdout = BufReader::new(child.stdout.take().context("Failed to get stdout")?);
let stderr = BufReader::new(child.stderr.take().context("Failed to get stderr")?);
Ok(Self {
child,
stdin,
stdout,
stderr,
})
}
fn get_session_id(&mut self) -> Result<String> {
let mut line = String::new();
for _ in 0..100 {
line.clear();
self.stderr
.read_line(&mut line)
.context("Failed to read stderr")?;
if line.contains("Session ID:") {
let id = line
.split_whitespace()
.last()
.context("Failed to parse Session ID from line")?;
return Ok(id.to_string());
}
}
Err(anyhow!("Failed to find Session ID in output"))
}
fn send(&mut self, request: &Value) -> Result<()> {
let json = serde_json::to_string(request)?;
writeln!(self.stdin, "{json}").context("Failed to write to stdin")?;
self.stdin.flush().context("Failed to flush stdin")?;
Ok(())
}
fn recv(&mut self) -> Result<Value> {
let mut line = String::new();
self.stdout
.read_line(&mut line)
.context("Failed to read from stdout")?;
serde_json::from_str(&line).context("Failed to parse JSON response")
}
}
impl Drop for ServerProcess {
fn drop(&mut self) {
let _ = self.child.kill();
let _ = self.child.wait();
}
}
#[test]
fn test_list_shows_row_numbers() -> Result<()> {
let mut server = ServerProcess::spawn()?;
let _session_id = server.get_session_id()?;
thread::sleep(Duration::from_millis(100));
let output = Command::new(env!("CARGO_BIN_EXE_catenary"))
.arg("list")
.output()
.context("Failed to run list command")?;
let stdout = String::from_utf8_lossy(&output.stdout);
assert!(
stdout.contains('#'),
"List output should contain # column header"
);
let lines: Vec<&str> = stdout.lines().collect();
let data_lines: Vec<&str> = lines
.iter()
.skip(2)
.filter(|l| !l.trim().is_empty())
.copied()
.collect();
assert!(
!data_lines.is_empty(),
"Should have at least one session row"
);
let first_row = data_lines[0].trim();
assert!(
first_row.starts_with('1'),
"First row should start with row number 1, got: {first_row}"
);
Ok(())
}
#[test]
fn test_list_shows_languages_column() -> Result<()> {
let mut server = ServerProcess::spawn()?;
let _session_id = server.get_session_id()?;
thread::sleep(Duration::from_millis(100));
let output = Command::new(env!("CARGO_BIN_EXE_catenary"))
.arg("list")
.output()
.context("Failed to run list command")?;
let stdout = String::from_utf8_lossy(&output.stdout);
assert!(
stdout.contains("LANGUAGES"),
"List output should contain LANGUAGES column header"
);
Ok(())
}
#[test]
fn test_monitor_by_row_number_starts() -> Result<()> {
use std::sync::mpsc;
let mut server = ServerProcess::spawn()?;
let _session_id = server.get_session_id()?;
thread::sleep(Duration::from_millis(500));
let mut cmd = Command::new(env!("CARGO_BIN_EXE_catenary"));
cmd.arg("monitor").arg("1");
cmd.stdout(Stdio::piped()).stderr(Stdio::null());
let mut child = cmd.spawn().context("Failed to spawn monitor")?;
let stdout = child
.stdout
.take()
.context("failed to take monitor stdout")?;
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let mut reader = BufReader::new(stdout);
let mut line = String::new();
while let Ok(n) = reader.read_line(&mut line) {
if n == 0 {
break;
}
let _ = tx.send(line.clone());
line.clear();
}
});
let line = rx.recv_timeout(Duration::from_secs(5)).unwrap_or_default();
let _ = child.kill();
let _ = child.wait();
assert!(
line.contains("Monitoring session"),
"Monitor should start monitoring a session with row number, got: {line}"
);
Ok(())
}
#[test]
fn test_monitor_invalid_row_number_fails() -> Result<()> {
let output = Command::new(env!("CARGO_BIN_EXE_catenary"))
.arg("monitor")
.arg("999")
.output()
.context("Failed to run monitor command")?;
let stderr = String::from_utf8_lossy(&output.stderr);
assert!(
stderr.contains("out of range") || stderr.contains("Row number"),
"Should report row number out of range, got: {stderr}"
);
Ok(())
}
#[test]
fn test_monitor_raw_flag() -> Result<()> {
use std::sync::mpsc;
let mut server = ServerProcess::spawn()?;
let session_id = server.get_session_id()?;
let mut cmd = Command::new(env!("CARGO_BIN_EXE_catenary"));
cmd.arg("monitor").arg(&session_id).arg("--raw");
cmd.stdout(Stdio::piped()).stderr(Stdio::null());
let mut child = cmd.spawn().context("Failed to spawn monitor")?;
let stdout = child
.stdout
.take()
.context("failed to take monitor stdout")?;
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let mut reader = BufReader::new(stdout);
let mut line = String::new();
while let Ok(n) = reader.read_line(&mut line) {
if n == 0 {
break;
}
let _ = tx.send(line.clone());
line.clear();
}
});
let _ = rx.recv_timeout(Duration::from_secs(5));
let request = json!({
"jsonrpc": "2.0",
"id": 99999,
"method": "ping"
});
server.send(&request)?;
let _response = server.recv()?;
let mut found_json = false;
let start = std::time::Instant::now();
while start.elapsed() < Duration::from_secs(2) {
if let Ok(line) = rx.recv_timeout(Duration::from_millis(100)) {
if line.contains('{') || line.contains('}') || line.contains("\"jsonrpc\"") {
found_json = true;
break;
}
}
}
let _ = child.kill();
let _ = child.wait();
assert!(found_json, "Raw mode should output JSON formatted messages");
Ok(())
}
#[test]
fn test_monitor_nocolor_flag() -> Result<()> {
use std::sync::mpsc;
let mut server = ServerProcess::spawn()?;
let session_id = server.get_session_id()?;
let mut cmd = Command::new(env!("CARGO_BIN_EXE_catenary"));
cmd.arg("monitor").arg(&session_id).arg("--nocolor");
cmd.stdout(Stdio::piped()).stderr(Stdio::null());
let mut child = cmd.spawn().context("Failed to spawn monitor")?;
let stdout = child
.stdout
.take()
.context("failed to take monitor stdout")?;
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let mut reader = BufReader::new(stdout);
let mut line = String::new();
while let Ok(n) = reader.read_line(&mut line) {
if n == 0 {
break;
}
let _ = tx.send(line.clone());
line.clear();
}
});
let _ = rx.recv_timeout(Duration::from_secs(5));
let request = json!({
"jsonrpc": "2.0",
"id": 88888,
"method": "ping"
});
server.send(&request)?;
let _response = server.recv()?;
let mut output = String::new();
let start = std::time::Instant::now();
while start.elapsed() < Duration::from_secs(2) {
if let Ok(line) = rx.recv_timeout(Duration::from_millis(100)) {
output.push_str(&line);
if output.len() > 100 {
break;
}
}
}
let _ = child.kill();
let _ = child.wait();
assert!(
!output.contains("\x1b["),
"Output should not contain ANSI escape codes with --nocolor flag"
);
Ok(())
}
#[test]
fn test_monitor_filter_flag() -> Result<()> {
use std::sync::mpsc;
let mut server = ServerProcess::spawn()?;
let session_id = server.get_session_id()?;
let mut cmd = Command::new(env!("CARGO_BIN_EXE_catenary"));
cmd.arg("monitor")
.arg(&session_id)
.arg("--filter")
.arg("ping");
cmd.stdout(Stdio::piped()).stderr(Stdio::null());
let mut child = cmd.spawn().context("Failed to spawn monitor")?;
let stdout = child
.stdout
.take()
.context("failed to take monitor stdout")?;
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let mut reader = BufReader::new(stdout);
let mut line = String::new();
while let Ok(n) = reader.read_line(&mut line) {
if n == 0 {
break;
}
let _ = tx.send(line.clone());
line.clear();
}
});
let _ = rx.recv_timeout(Duration::from_secs(5));
let ping_request = json!({
"jsonrpc": "2.0",
"id": 77777,
"method": "ping"
});
server.send(&ping_request)?;
let _response = server.recv()?;
let mut found_ping = false;
let start = std::time::Instant::now();
while start.elapsed() < Duration::from_secs(2) {
if let Ok(line) = rx.recv_timeout(Duration::from_millis(100))
&& line.contains("ping")
{
found_ping = true;
break;
}
}
let _ = child.kill();
let _ = child.wait();
assert!(found_ping, "Filter should allow ping events through");
Ok(())
}
#[test]
fn test_monitor_uses_arrows() -> Result<()> {
use std::sync::mpsc;
let mut server = ServerProcess::spawn()?;
let session_id = server.get_session_id()?;
let mut cmd = Command::new(env!("CARGO_BIN_EXE_catenary"));
cmd.arg("monitor").arg(&session_id).arg("--nocolor");
cmd.stdout(Stdio::piped()).stderr(Stdio::null());
let mut child = cmd.spawn().context("Failed to spawn monitor")?;
let stdout = child
.stdout
.take()
.context("failed to take monitor stdout")?;
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let mut reader = BufReader::new(stdout);
let mut line = String::new();
while let Ok(n) = reader.read_line(&mut line) {
if n == 0 {
break;
}
let _ = tx.send(line.clone());
line.clear();
}
});
let _ = rx.recv_timeout(Duration::from_secs(5));
let request = json!({
"jsonrpc": "2.0",
"id": 66666,
"method": "ping"
});
server.send(&request)?;
let _response = server.recv()?;
let mut found_incoming_arrow = false;
let mut found_outgoing_arrow = false;
let start = std::time::Instant::now();
while start.elapsed() < Duration::from_secs(2) {
if let Ok(line) = rx.recv_timeout(Duration::from_millis(100)) {
if line.contains('→') {
found_incoming_arrow = true;
}
if line.contains('←') {
found_outgoing_arrow = true;
}
if found_incoming_arrow && found_outgoing_arrow {
break;
}
}
}
let _ = child.kill();
let _ = child.wait();
assert!(
found_incoming_arrow,
"Should use → arrow for incoming messages"
);
assert!(
found_outgoing_arrow,
"Should use ← arrow for outgoing messages"
);
Ok(())
}