use std::env;
use std::sync::OnceLock;
pub(crate) const DEFAULT_SUBJECT: &str = "spg.wal.sql";
static CONFIG: OnceLock<Option<Config>> = OnceLock::new();
#[derive(Debug, Clone)]
pub(crate) struct Config {
pub target: Target,
pub subject: String,
}
#[derive(Debug, Clone)]
pub(crate) enum Target {
LogStderr,
}
fn config() -> Option<&'static Config> {
CONFIG
.get_or_init(|| {
let target_str = env::var("SPG_PUBSUB_TARGET").ok()?;
let target = match target_str.as_str() {
"log" => Target::LogStderr,
other => {
eprintln!(
"spg-server: unknown SPG_PUBSUB_TARGET={other:?}; \
supported: log. Disabling pubsub."
);
return None;
}
};
let subject =
env::var("SPG_PUBSUB_SUBJECT").unwrap_or_else(|_| DEFAULT_SUBJECT.to_string());
Some(Config { target, subject })
})
.as_ref()
}
pub(crate) fn publish_sql(sql: &str) {
let Some(cfg) = config() else { return };
let frame = encode_nats_pub(&cfg.subject, sql.as_bytes());
match cfg.target {
Target::LogStderr => {
eprint!("{frame}");
}
}
}
pub(crate) fn encode_nats_pub(subject: &str, payload: &[u8]) -> String {
let mut out = String::with_capacity(16 + subject.len() + payload.len());
out.push_str("PUB ");
out.push_str(subject);
out.push(' ');
out.push_str(&payload.len().to_string());
out.push_str("\r\n");
out.push_str(&String::from_utf8_lossy(payload));
out.push_str("\r\n");
out
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn nats_pub_frame_shape() {
let s = encode_nats_pub("spg.wal.sql", b"INSERT INTO t VALUES (1)");
assert!(s.starts_with("PUB spg.wal.sql 24\r\n"));
assert!(s.ends_with("INSERT INTO t VALUES (1)\r\n"));
}
#[test]
fn nats_pub_empty_payload() {
let s = encode_nats_pub("x", b"");
assert_eq!(s, "PUB x 0\r\n\r\n");
}
#[test]
fn nats_pub_handles_newlines_in_sql() {
let sql = "INSERT INTO t VALUES\n(1, 'a')";
let s = encode_nats_pub("x", sql.as_bytes());
assert!(s.starts_with(&format!("PUB x {}\r\n", sql.len())));
assert!(s.contains(sql));
}
}