use serde_json::{Value, json};
use std::io::{BufRead, Write};
use crate::config::{TlsConfig, TlsMode};
pub fn run_stdio(pg_url: Option<&str>, mysql_url: Option<&str>) -> anyhow::Result<()> {
let stdin = std::io::stdin();
let mut stdout = std::io::stdout();
for line in stdin.lock().lines() {
let line = line?;
if line.trim().is_empty() {
continue;
}
let msg: Value = match serde_json::from_str(&line) {
Ok(v) => v,
Err(_) => continue,
};
let id = msg.get("id").cloned();
let Some(id) = id else { continue };
let method = msg.get("method").and_then(|m| m.as_str()).unwrap_or("");
let envelope = match dispatch(method, &msg, pg_url, mysql_url) {
Ok(result) => json!({ "jsonrpc": "2.0", "id": id, "result": result }),
Err(e) => json!({
"jsonrpc": "2.0",
"id": id,
"error": { "code": -32_000, "message": crate::redact::redact_error(&e) }
}),
};
writeln!(stdout, "{}", serde_json::to_string(&envelope)?)?;
stdout.flush()?;
}
Ok(())
}
fn dispatch(
method: &str,
msg: &Value,
pg_url: Option<&str>,
mysql_url: Option<&str>,
) -> anyhow::Result<Value> {
match method {
"initialize" => Ok(json!({
"protocolVersion": "2024-11-05",
"capabilities": { "tools": {} },
"serverInfo": {
"name": "rivet-mcp",
"version": env!("CARGO_PKG_VERSION")
}
})),
"tools/list" => Ok(json!({ "tools": tools_list() })),
"tools/call" => {
let params = msg
.get("params")
.ok_or_else(|| anyhow::anyhow!("missing params"))?;
let name = params
.get("name")
.and_then(|n| n.as_str())
.ok_or_else(|| anyhow::anyhow!("missing tool name"))?;
let args = params.get("arguments").unwrap_or(&Value::Null);
Ok(match call_tool(name, args, pg_url, mysql_url) {
Ok(v) => v,
Err(e) => json!({
"content": [{ "type": "text", "text": format!("error: {}", crate::redact::redact_error(&e)) }],
"isError": true
}),
})
}
_ => Err(anyhow::anyhow!("unknown method: {method}")),
}
}
fn tools_list() -> Value {
json!([
{
"name": "pg_active_sessions",
"description": "Show non-idle Postgres sessions: pid, state, wait event, query snippet, user, application. Useful to spot blocked or long-running queries during an export.",
"inputSchema": { "type": "object", "properties": {}, "required": [] }
},
{
"name": "pg_checkpoint_pressure",
"description": "Show pg_stat_bgwriter counters: checkpoints_timed, checkpoints_req (write-pressure indicator), write/sync times, and buffer stats. Rivet adaptive mode reacts to checkpoints_req delta.",
"inputSchema": { "type": "object", "properties": {}, "required": [] }
},
{
"name": "pg_table_stats",
"description": "Top 20 Postgres tables by live row count: n_live_tup, n_dead_tup, dead ratio, last vacuum/analyze timestamps.",
"inputSchema": {
"type": "object",
"properties": {
"schema": {
"type": "string",
"description": "Restrict to a specific schema (default: all user schemas)"
}
},
"required": []
}
},
{
"name": "pg_locks",
"description": "Show relation-level Postgres locks: pid, relation, mode, granted. Useful to diagnose lock contention during an export.",
"inputSchema": { "type": "object", "properties": {}, "required": [] }
},
{
"name": "pg_top_queries_by_io",
"description": "Top 10 queries by total I/O wait time from pg_stat_statements. Requires the pg_stat_statements extension; returns a clear error if unavailable.",
"inputSchema": { "type": "object", "properties": {}, "required": [] }
},
{
"name": "mysql_processlist",
"description": "Show MySQL SHOW PROCESSLIST: id, user, db, command, time, state, query snippet.",
"inputSchema": { "type": "object", "properties": {}, "required": [] }
},
{
"name": "mysql_key_metrics",
"description": "Key MySQL global status counters: Innodb_log_waits, Threads_running, Queries, Slow_queries, Innodb_row_lock_waits, Connections.",
"inputSchema": { "type": "object", "properties": {}, "required": [] }
},
{
"name": "mysql_table_stats",
"description": "Top 20 MySQL InnoDB tables by row count from information_schema.TABLES.",
"inputSchema": {
"type": "object",
"properties": {
"schema": {
"type": "string",
"description": "Restrict to a specific schema/database (default: all non-system schemas)"
}
},
"required": []
}
},
{
"name": "pgbouncer_pools",
"description": "Show pgBouncer pool stats (SHOW POOLS) via the pgBouncer admin interface. Requires PGBOUNCER_ADMIN_URL env var (e.g. postgresql://pgbouncer@127.0.0.1:6432/pgbouncer).",
"inputSchema": { "type": "object", "properties": {}, "required": [] }
},
{
"name": "pgbouncer_stats",
"description": "Show pgBouncer per-database stats (SHOW STATS). Requires PGBOUNCER_ADMIN_URL env var.",
"inputSchema": { "type": "object", "properties": {}, "required": [] }
}
])
}
fn call_tool(
name: &str,
args: &Value,
pg_url: Option<&str>,
mysql_url: Option<&str>,
) -> anyhow::Result<Value> {
match name {
"pg_active_sessions" => text(pg_active_sessions(require_pg(pg_url)?)),
"pg_checkpoint_pressure" => text(pg_checkpoint_pressure(require_pg(pg_url)?)),
"pg_table_stats" => {
let schema = args.get("schema").and_then(|v| v.as_str());
text(pg_table_stats(require_pg(pg_url)?, schema))
}
"pg_locks" => text(pg_locks(require_pg(pg_url)?)),
"pg_top_queries_by_io" => text(pg_top_queries_by_io(require_pg(pg_url)?)),
"mysql_processlist" => text(mysql_processlist(require_mysql(mysql_url)?)),
"mysql_key_metrics" => text(mysql_key_metrics(require_mysql(mysql_url)?)),
"mysql_table_stats" => {
let schema = args.get("schema").and_then(|v| v.as_str());
text(mysql_table_stats(require_mysql(mysql_url)?, schema))
}
"pgbouncer_pools" => text(pgbouncer_query("SHOW POOLS")),
"pgbouncer_stats" => text(pgbouncer_query("SHOW STATS")),
other => Err(anyhow::anyhow!("unknown tool: {other}")),
}
}
fn require_pg(url: Option<&str>) -> anyhow::Result<&str> {
url.ok_or_else(|| {
anyhow::anyhow!("no Postgres URL configured — pass --pg-url or set DATABASE_URL")
})
}
fn require_mysql(url: Option<&str>) -> anyhow::Result<&str> {
url.ok_or_else(|| {
anyhow::anyhow!("no MySQL URL configured — pass --mysql-url or set DATABASE_URL")
})
}
fn text(result: anyhow::Result<String>) -> anyhow::Result<Value> {
let body = result.unwrap_or_else(|e| format!("error: {}", crate::redact::redact_error(&e)));
Ok(json!({ "content": [{ "type": "text", "text": body }] }))
}
fn tls_config_from_url(url: &str) -> Option<TlsConfig> {
let (_, query) = url.split_once('?')?;
let mut mode = None;
for pair in query.split('&') {
let (key, value) = pair.split_once('=').unwrap_or((pair, ""));
if key != "sslmode" {
continue;
}
mode = match value {
"require" => Some(TlsMode::Require),
"verify-ca" => Some(TlsMode::VerifyCa),
"verify-full" => Some(TlsMode::VerifyFull),
_ => None,
};
}
mode.map(|mode| TlsConfig {
mode,
..TlsConfig::default()
})
}
fn pg_connect(url: &str) -> anyhow::Result<postgres::Client> {
let tls = tls_config_from_url(url);
crate::source::postgres::connect_client(url, tls.as_ref())
}
fn pg_val(row: &postgres::Row, idx: usize) -> String {
if let Ok(v) = row.try_get::<_, Option<String>>(idx) {
return v.unwrap_or_else(|| "NULL".into());
}
if let Ok(v) = row.try_get::<_, Option<i64>>(idx) {
return v.map(|n| n.to_string()).unwrap_or_else(|| "NULL".into());
}
if let Ok(v) = row.try_get::<_, Option<i32>>(idx) {
return v.map(|n| n.to_string()).unwrap_or_else(|| "NULL".into());
}
if let Ok(v) = row.try_get::<_, Option<f64>>(idx) {
return v
.map(|n| format!("{n:.2}"))
.unwrap_or_else(|| "NULL".into());
}
if let Ok(v) = row.try_get::<_, Option<bool>>(idx) {
return v.map(|b| b.to_string()).unwrap_or_else(|| "NULL".into());
}
if let Ok(v) = row.try_get::<_, Option<chrono::DateTime<chrono::Utc>>>(idx) {
return v
.map(|t| t.format("%Y-%m-%d %H:%M:%S").to_string())
.unwrap_or_else(|| "NULL".into());
}
if let Ok(v) = row.try_get::<_, Option<chrono::NaiveDateTime>>(idx) {
return v
.map(|t| t.format("%Y-%m-%d %H:%M:%S").to_string())
.unwrap_or_else(|| "NULL".into());
}
"?".into()
}
fn pg_rows_to_table(rows: &[postgres::Row]) -> String {
if rows.is_empty() {
return "(no rows)".into();
}
let headers: Vec<String> = rows[0]
.columns()
.iter()
.map(|c| c.name().to_string())
.collect();
let data: Vec<Vec<String>> = rows
.iter()
.map(|row| (0..headers.len()).map(|i| pg_val(row, i)).collect())
.collect();
ascii_table(&headers, &data)
}
fn pg_active_sessions(url: &str) -> anyhow::Result<String> {
let mut client = pg_connect(url)?;
let rows = client.query(
"SELECT pid::text, state, COALESCE(wait_event_type,'') AS wait_type,
COALESCE(wait_event,'') AS wait_event,
LEFT(COALESCE(query,''),100) AS query_snippet,
usename, application_name
FROM pg_stat_activity
WHERE state IS DISTINCT FROM 'idle'
ORDER BY state, pid",
&[],
)?;
Ok(format!(
"Active sessions ({})\n\n{}",
rows.len(),
pg_rows_to_table(&rows)
))
}
fn pg_checkpoint_pressure(url: &str) -> anyhow::Result<String> {
let mut client = pg_connect(url)?;
let rows = client.query(
"SELECT checkpoints_timed, checkpoints_req,
ROUND(checkpoint_write_time) AS write_ms,
ROUND(checkpoint_sync_time) AS sync_ms,
buffers_checkpoint, buffers_clean, buffers_backend,
maxwritten_clean
FROM pg_stat_bgwriter",
&[],
)?;
Ok(format!("pg_stat_bgwriter\n\n{}", pg_rows_to_table(&rows)))
}
fn pg_table_stats_sql(schema: Option<&str>) -> &'static str {
match schema {
Some(_) => {
"SELECT schemaname, relname AS tablename, n_live_tup, n_dead_tup,
(n_dead_tup * 100 / NULLIF(n_live_tup + n_dead_tup, 0)) AS dead_pct,
COALESCE(to_char(last_vacuum, 'YYYY-MM-DD HH24:MI'), '-') AS last_vacuum,
COALESCE(to_char(last_analyze, 'YYYY-MM-DD HH24:MI'), '-') AS last_analyze
FROM pg_stat_user_tables
WHERE schemaname = $1::text
ORDER BY n_live_tup DESC
LIMIT 20"
}
None => {
"SELECT schemaname, relname AS tablename, n_live_tup, n_dead_tup,
(n_dead_tup * 100 / NULLIF(n_live_tup + n_dead_tup, 0)) AS dead_pct,
COALESCE(to_char(last_vacuum, 'YYYY-MM-DD HH24:MI'), '-') AS last_vacuum,
COALESCE(to_char(last_analyze, 'YYYY-MM-DD HH24:MI'), '-') AS last_analyze
FROM pg_stat_user_tables
WHERE schemaname NOT IN ('pg_catalog','information_schema','pg_toast')
ORDER BY n_live_tup DESC
LIMIT 20"
}
}
}
fn pg_table_stats(url: &str, schema: Option<&str>) -> anyhow::Result<String> {
let mut client = pg_connect(url)?;
let sql = pg_table_stats_sql(schema);
let rows = match schema {
Some(s) => client.query(sql, &[&s])?,
None => client.query(sql, &[])?,
};
Ok(format!(
"Table stats (top 20)\n\n{}",
pg_rows_to_table(&rows)
))
}
fn pg_locks(url: &str) -> anyhow::Result<String> {
let mut client = pg_connect(url)?;
let rows = client.query(
"SELECT l.pid::text, c.relname AS relation, l.mode, l.granted::text
FROM pg_locks l
LEFT JOIN pg_class c ON c.oid = l.relation
WHERE l.relation IS NOT NULL
ORDER BY l.granted, l.pid",
&[],
)?;
if rows.is_empty() {
return Ok("No relation-level locks held.".into());
}
Ok(format!(
"Relation locks ({})\n\n{}",
rows.len(),
pg_rows_to_table(&rows)
))
}
fn pg_top_queries_by_io(url: &str) -> anyhow::Result<String> {
let mut client = pg_connect(url)?;
let available: bool = client
.query_one(
"SELECT COUNT(*) > 0 FROM pg_extension WHERE extname = 'pg_stat_statements'",
&[],
)
.ok()
.and_then(|r| r.try_get::<_, bool>(0).ok())
.unwrap_or(false);
if !available {
return Ok("pg_stat_statements extension is not installed. \
Run: CREATE EXTENSION IF NOT EXISTS pg_stat_statements;"
.into());
}
let rows = client.query(
"SELECT LEFT(query, 80) AS query, calls,
ROUND(blk_read_time + blk_write_time) AS io_ms,
ROUND(total_exec_time) AS total_exec_ms
FROM pg_stat_statements
ORDER BY blk_read_time + blk_write_time DESC
LIMIT 10",
&[],
)?;
Ok(format!(
"Top 10 queries by I/O time\n\n{}",
pg_rows_to_table(&rows)
))
}
fn mysql_pool(url: &str) -> anyhow::Result<mysql::Pool> {
let tls = tls_config_from_url(url);
crate::source::mysql::connect_pool(url, tls.as_ref())
}
fn mysql_rows_to_table(rows: &[Vec<String>], headers: &[String]) -> String {
if rows.is_empty() {
return "(no rows)".into();
}
ascii_table(headers, rows)
}
fn mysql_processlist(url: &str) -> anyhow::Result<String> {
use mysql::prelude::*;
let pool = mysql_pool(url)?;
let mut conn = pool.get_conn()?;
let mut result = conn.exec_iter("SHOW PROCESSLIST", ())?;
let cols: Vec<String> = result
.columns()
.as_ref()
.iter()
.map(|c| c.name_str().to_string())
.collect();
let row_set = result
.iter()
.ok_or_else(|| anyhow::anyhow!("no result set"))?;
let rows: Vec<Vec<String>> = row_set
.filter_map(|r| r.ok())
.map(|row| {
(0..cols.len())
.map(|i| match row.as_ref(i) {
Some(mysql::Value::Bytes(b)) => String::from_utf8_lossy(b).into_owned(),
Some(mysql::Value::Int(n)) => n.to_string(),
Some(mysql::Value::UInt(n)) => n.to_string(),
Some(mysql::Value::NULL) | None => "NULL".into(),
_ => "?".into(),
})
.collect()
})
.collect();
Ok(format!(
"SHOW PROCESSLIST ({})\n\n{}",
rows.len(),
mysql_rows_to_table(&rows, &cols)
))
}
fn mysql_key_metrics(url: &str) -> anyhow::Result<String> {
use mysql::prelude::*;
let pool = mysql_pool(url)?;
let mut conn = pool.get_conn()?;
let metrics = [
"Innodb_log_waits",
"Innodb_row_lock_waits",
"Innodb_row_lock_time_avg",
"Threads_running",
"Threads_connected",
"Queries",
"Slow_queries",
"Connections",
"Aborted_connects",
];
let in_clause = metrics
.iter()
.map(|m| format!("'{m}'"))
.collect::<Vec<_>>()
.join(",");
let sql = format!(
"SELECT variable_name, variable_value \
FROM information_schema.global_status \
WHERE variable_name IN ({in_clause})"
);
let rows: Vec<(String, String)> = conn.query(sql)?;
if rows.is_empty() {
return Ok("(no metrics returned)".into());
}
let headers = vec!["metric".to_string(), "value".to_string()];
let data: Vec<Vec<String>> = rows.into_iter().map(|(k, v)| vec![k, v]).collect();
Ok(format!(
"MySQL key metrics\n\n{}",
ascii_table(&headers, &data)
))
}
fn mysql_table_stats_sql(schema: Option<&str>) -> &'static str {
match schema {
Some(_) => {
"SELECT table_schema, table_name, table_rows, \
data_length, index_length, engine \
FROM information_schema.TABLES \
WHERE table_type = 'BASE TABLE' AND table_schema = ? \
ORDER BY table_rows DESC \
LIMIT 20"
}
None => {
"SELECT table_schema, table_name, table_rows, \
data_length, index_length, engine \
FROM information_schema.TABLES \
WHERE table_type = 'BASE TABLE' \
AND table_schema NOT IN ('information_schema','performance_schema','mysql','sys') \
ORDER BY table_rows DESC \
LIMIT 20"
}
}
}
fn mysql_table_stats(url: &str, schema: Option<&str>) -> anyhow::Result<String> {
use mysql::prelude::*;
let pool = mysql_pool(url)?;
let mut conn = pool.get_conn()?;
let sql = mysql_table_stats_sql(schema);
let mut result = match schema {
Some(s) => conn.exec_iter(sql, (s,))?,
None => conn.exec_iter(sql, ())?,
};
let cols: Vec<String> = result
.columns()
.as_ref()
.iter()
.map(|c| c.name_str().to_string())
.collect();
let row_set = result
.iter()
.ok_or_else(|| anyhow::anyhow!("no result set"))?;
let rows: Vec<Vec<String>> = row_set
.filter_map(|r| r.ok())
.map(|row| {
(0..cols.len())
.map(|i| match row.as_ref(i) {
Some(mysql::Value::Bytes(b)) => String::from_utf8_lossy(b).into_owned(),
Some(mysql::Value::Int(n)) => n.to_string(),
Some(mysql::Value::UInt(n)) => n.to_string(),
Some(mysql::Value::NULL) | None => "NULL".into(),
_ => "?".into(),
})
.collect()
})
.collect();
Ok(format!(
"Table stats (top 20)\n\n{}",
mysql_rows_to_table(&rows, &cols)
))
}
fn pgbouncer_query(sql: &str) -> anyhow::Result<String> {
let admin_url = std::env::var("PGBOUNCER_ADMIN_URL").map_err(|_| {
anyhow::anyhow!(
"PGBOUNCER_ADMIN_URL not set. \
Example: postgresql://pgbouncer@127.0.0.1:6432/pgbouncer"
)
})?;
let mut client = pg_connect(&admin_url)?;
let rows = client.query(sql, &[])?;
Ok(pg_rows_to_table(&rows))
}
fn ascii_table(headers: &[impl AsRef<str>], rows: &[Vec<String>]) -> String {
let ncols = headers.len();
let mut widths: Vec<usize> = headers.iter().map(|h| h.as_ref().len()).collect();
for row in rows {
for (i, cell) in row.iter().enumerate() {
if i < ncols {
widths[i] = widths[i].max(cell.len());
}
}
}
let fmt_row = |cells: &[String]| -> String {
cells
.iter()
.enumerate()
.map(|(i, c)| format!("{:<width$}", c, width = widths.get(i).copied().unwrap_or(0)))
.collect::<Vec<_>>()
.join(" | ")
};
let header: Vec<String> = headers.iter().map(|h| h.as_ref().to_string()).collect();
let separator = widths
.iter()
.map(|w| "-".repeat(*w))
.collect::<Vec<_>>()
.join("-+-");
let body = rows
.iter()
.map(|r| fmt_row(r))
.collect::<Vec<_>>()
.join("\n");
if body.is_empty() {
format!("{}\n{}", fmt_row(&header), separator)
} else {
format!("{}\n{}\n{}", fmt_row(&header), separator, body)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn ascii_table_widens_columns_to_longest_cell() {
let headers = ["pid", "state"];
let rows = vec![
vec!["1".into(), "active".into()],
vec!["10000".into(), "idle".into()],
];
let out = ascii_table(&headers, &rows);
let lines: Vec<&str> = out.lines().collect();
assert_eq!(lines.len(), 4, "header + separator + 2 rows");
assert_eq!(lines[0], "pid | state ");
assert_eq!(lines[1], "------+-------");
assert_eq!(lines[2], "1 | active");
assert_eq!(lines[3], "10000 | idle ");
}
#[test]
fn ascii_table_renders_header_only_when_no_rows() {
let headers = ["col_a", "col_b"];
let out = ascii_table(&headers, &[]);
assert_eq!(out, "col_a | col_b\n------+------");
}
const HOSTILE_PG: &str = "x' UNION SELECT usename, passwd, 0, 0, 0, '-', '-' FROM pg_shadow --";
const HOSTILE_MYSQL: &str =
"x' UNION SELECT user, authentication_string, 0, 0, 0, 'x' FROM mysql.user -- ";
#[test]
fn pg_table_stats_sql_binds_schema_instead_of_interpolating() {
let sql = pg_table_stats_sql(Some(HOSTILE_PG));
assert!(
sql.contains("schemaname = $1"),
"schema filter must use a bind placeholder, got: {sql}"
);
assert!(
!sql.contains(HOSTILE_PG) && !sql.contains("UNION"),
"client input must never land in the SQL text, got: {sql}"
);
assert_eq!(sql, pg_table_stats_sql(Some("public")));
}
#[test]
fn pg_table_stats_sql_no_schema_is_static_with_no_placeholder() {
let sql = pg_table_stats_sql(None);
assert!(sql.contains("schemaname NOT IN"));
assert!(!sql.contains("$1"), "fallback takes no bind params: {sql}");
}
#[test]
fn mysql_table_stats_sql_binds_schema_instead_of_interpolating() {
let sql = mysql_table_stats_sql(Some(HOSTILE_MYSQL));
assert!(
sql.contains("table_schema = ?"),
"schema filter must use a bind placeholder, got: {sql}"
);
assert!(
!sql.contains(HOSTILE_MYSQL) && !sql.contains("UNION"),
"client input must never land in the SQL text, got: {sql}"
);
assert_eq!(sql, mysql_table_stats_sql(Some("appdb")));
}
#[test]
fn mysql_table_stats_sql_no_schema_is_static_with_no_placeholder() {
let sql = mysql_table_stats_sql(None);
assert!(sql.contains("table_schema NOT IN"));
assert!(!sql.contains('?'), "fallback takes no bind params: {sql}");
}
#[test]
fn tls_config_from_url_enforces_when_sslmode_requested() {
for (url, want) in [
(
"postgresql://u:p@db.prod:5432/d?sslmode=require",
TlsMode::Require,
),
(
"postgresql://u:p@db.prod/d?sslmode=verify-ca",
TlsMode::VerifyCa,
),
(
"mysql://u:p@db.prod:3306/d?sslmode=verify-full",
TlsMode::VerifyFull,
),
] {
let cfg = tls_config_from_url(url)
.unwrap_or_else(|| panic!("expected enforced TLS for {url}"));
assert_eq!(cfg.mode, want, "url {url}");
assert!(cfg.mode.is_enforced(), "url {url} must enforce TLS");
}
}
#[test]
fn tls_config_from_url_none_for_plaintext_or_missing() {
for url in [
"postgresql://u:p@localhost/d",
"mysql://u:p@127.0.0.1:3306/d",
"postgresql://u:p@db/d?sslmode=disable",
"postgresql://u:p@db/d?sslmode=prefer",
"postgresql://u:p@db/d?sslmode=allow",
"postgresql://u:p@db/d?sslmode=REQUIRE",
"postgresql://u:p@db/d?sslmode=garbage",
"postgresql://u:p@db/d?sslmode",
"postgresql://u:p@db/d?sslmode=",
] {
assert!(tls_config_from_url(url).is_none(), "url {url} must be None");
}
}
#[test]
fn tls_config_from_url_exact_key_and_last_occurrence_wins() {
assert!(tls_config_from_url("postgresql://u:p@db/d?xsslmode=require").is_none());
let cfg = tls_config_from_url(
"postgresql://u:p@db/d?connect_timeout=10&sslmode=require&application_name=x",
)
.expect("enforced");
assert_eq!(cfg.mode, TlsMode::Require);
assert!(
tls_config_from_url("postgresql://u:p@db/d?sslmode=require&sslmode=disable").is_none()
);
}
#[test]
fn sec_mcp_error_is_redacted() {
let err = anyhow::anyhow!(
"could not connect to postgresql://rivet:s3cret@db.prod:5432/orders: timeout"
);
let value = text(Err(err)).expect("text() always returns Ok envelope");
let body = value["content"][0]["text"]
.as_str()
.expect("text content present");
assert!(
!body.contains("s3cret"),
"password must be redacted in MCP error output: {body}"
);
assert!(
body.contains("postgresql://REDACTED@db.prod:5432/orders"),
"host/path retained, userinfo redacted: {body}"
);
}
#[test]
fn ascii_table_handles_unicode_byte_width() {
let headers = ["x"];
let rows = vec![vec!["ы".into()]]; let out = ascii_table(&headers, &rows);
assert!(out.contains("x "), "header padded to byte-width 2");
}
}