use std::time::Duration;
use serde::Serialize;
use tracing::warn;
use zeph_config::NotificationsConfig;
use crate::redact::scrub_content;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TurnExitStatus {
Success,
Error,
}
#[derive(Debug, Clone)]
pub struct TurnSummary {
pub duration_ms: u64,
pub preview: String,
pub tool_calls: u32,
pub llm_requests: u32,
pub exit_status: TurnExitStatus,
}
#[derive(Clone)]
pub struct Notifier {
cfg: NotificationsConfig,
http: reqwest::Client,
}
impl Notifier {
#[must_use]
pub fn new(cfg: NotificationsConfig) -> Self {
let http = reqwest::Client::builder()
.connect_timeout(Duration::from_secs(5))
.timeout(Duration::from_secs(5))
.build()
.unwrap_or_default();
let mut cfg = cfg;
if cfg
.webhook_url
.as_deref()
.is_some_and(|url| !validate_webhook_url(url, cfg.webhook_allow_insecure))
{
cfg.webhook_url = None;
}
Self { cfg, http }
}
#[must_use]
pub fn should_fire(&self, summary: &TurnSummary) -> bool {
if !self.cfg.enabled {
return false;
}
if summary.llm_requests == 0 && summary.exit_status == TurnExitStatus::Success {
return false;
}
match summary.exit_status {
TurnExitStatus::Error => true,
TurnExitStatus::Success => {
if self.cfg.only_on_error {
return false;
}
summary.duration_ms >= self.cfg.min_turn_duration_ms
}
}
}
pub fn fire(&self, summary: &TurnSummary) {
let cfg = self.cfg.clone();
let http = self.http.clone();
let summary = summary.clone();
tokio::spawn(async move {
fire_all_channels(&cfg, &http, &summary).await;
});
}
pub async fn fire_test(&self) -> Result<(), NotifyTestError> {
if !self.cfg.enabled {
return Err(NotifyTestError::MasterSwitchDisabled);
}
let macos_enabled = self.cfg.macos_native;
let webhook_enabled = self.cfg.webhook_url.is_some() && self.cfg.webhook_topic.is_some();
if !macos_enabled && !webhook_enabled {
return Err(NotifyTestError::AllDisabled);
}
let summary = TurnSummary {
duration_ms: 0,
preview: "Zeph is working".to_owned(),
tool_calls: 0,
llm_requests: 1,
exit_status: TurnExitStatus::Success,
};
#[cfg(target_os = "macos")]
if macos_enabled {
fire_macos_native(&self.cfg.title, "Zeph is working")
.await
.map_err(|e| NotifyTestError::MacOsFailed(e.to_string()))?;
}
if let (Some(url), Some(topic)) = (&self.cfg.webhook_url, &self.cfg.webhook_topic) {
fire_webhook(&self.http, url, &self.cfg.title, topic, &summary)
.await
.map_err(|e| NotifyTestError::WebhookFailed(e.to_string()))?;
}
Ok(())
}
}
#[derive(Debug, thiserror::Error)]
pub enum NotifyTestError {
#[error("notifications are disabled (set notifications.enabled = true to enable)")]
MasterSwitchDisabled,
#[error("all notification channels are disabled")]
AllDisabled,
#[error("macOS notification failed: {0}")]
MacOsFailed(String),
#[error("webhook notification failed: {0}")]
WebhookFailed(String),
}
async fn fire_all_channels(
cfg: &NotificationsConfig,
http: &reqwest::Client,
summary: &TurnSummary,
) {
let title = &cfg.title;
#[cfg(target_os = "macos")]
{
let message = build_notification_message(summary);
if cfg.macos_native
&& let Err(e) = fire_macos_native(title, &message).await
{
warn!(error = %e, "macOS notification failed");
}
}
if let (Some(url), Some(topic)) = (&cfg.webhook_url, &cfg.webhook_topic)
&& let Err(e) = fire_webhook(http, url, title, topic, summary).await
{
warn!(error = %e, "webhook notification failed");
}
}
fn build_notification_message(summary: &TurnSummary) -> String {
let status = if summary.exit_status == TurnExitStatus::Error {
"Error"
} else {
"Done"
};
let safe_preview = scrub_content(&summary.preview);
if safe_preview.is_empty() {
format!("{status} — {dur}ms", dur = summary.duration_ms)
} else {
format!(
"{status} — {dur}ms\n{preview}",
dur = summary.duration_ms,
preview = safe_preview,
)
}
}
#[must_use]
pub fn sanitize_applescript_payload(s: &str, max: usize) -> String {
let cleaned: String = s
.chars()
.map(|c| {
if c.is_control() || c == '\u{2028}' || c == '\u{2029}' {
' '
} else {
c
}
})
.collect();
let char_count = cleaned.chars().count();
let truncated: String = if char_count > max {
let end = cleaned
.char_indices()
.nth(max)
.map_or(cleaned.len(), |(i, _)| i);
let mut t = cleaned[..end].to_owned();
t.push('…');
t
} else {
cleaned
};
truncated.replace(['\\', '"'], "")
}
fn validate_webhook_url(url: &str, allow_insecure: bool) -> bool {
match url.parse::<reqwest::Url>() {
Ok(parsed) => {
if parsed.scheme() == "https" {
return true;
}
if allow_insecure && parsed.scheme() == "http" {
warn!(
"webhook_url uses insecure HTTP scheme; set webhook_allow_insecure=false for production"
);
return true;
}
warn!(
scheme = parsed.scheme(),
"webhook_url has non-HTTP(S) scheme — channel disabled"
);
false
}
Err(e) => {
warn!(error = %e, "webhook_url is not a valid URL — channel disabled");
false
}
}
}
#[cfg(target_os = "macos")]
async fn fire_macos_native(
title: &str,
message: &str,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
use tokio::io::AsyncWriteExt as _;
use tokio::process::Command;
let safe_title = sanitize_applescript_payload(title, 120);
let safe_message = sanitize_applescript_payload(message, 240);
let script = format!(r#"display notification "{safe_message}" with title "{safe_title}""#);
let mut child = Command::new("osascript")
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.spawn()?;
if let Some(mut stdin) = child.stdin.take() {
stdin.write_all(script.as_bytes()).await?;
stdin.shutdown().await?;
}
let _ = tokio::time::timeout(Duration::from_secs(5), child.wait()).await;
Ok(())
}
#[derive(Serialize)]
struct NtfyWebhookBody<'a> {
topic: &'a str,
title: &'a str,
message: &'a str,
tags: Vec<&'a str>,
priority: u8,
}
async fn fire_webhook(
client: &reqwest::Client,
url: &str,
title: &str,
topic: &str,
summary: &TurnSummary,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let message = build_notification_message(summary);
let (tags, priority) = if summary.exit_status == TurnExitStatus::Error {
(vec!["zeph", "error"], 4u8)
} else {
(vec!["zeph", "turn-complete"], 3u8)
};
let body = NtfyWebhookBody {
topic,
title,
message: &message,
tags,
priority,
};
tokio::time::timeout(Duration::from_secs(5), client.post(url).json(&body).send())
.await??
.error_for_status()?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use zeph_config::NotificationsConfig;
fn make_notifier(cfg: NotificationsConfig) -> Notifier {
Notifier::new(cfg)
}
fn success_summary(duration_ms: u64, llm_requests: u32) -> TurnSummary {
TurnSummary {
duration_ms,
preview: "All done.".to_owned(),
tool_calls: 0,
llm_requests,
exit_status: TurnExitStatus::Success,
}
}
fn error_summary(duration_ms: u64, llm_requests: u32) -> TurnSummary {
TurnSummary {
duration_ms,
preview: "Error occurred.".to_owned(),
tool_calls: 0,
llm_requests,
exit_status: TurnExitStatus::Error,
}
}
#[test]
fn should_fire_disabled_master_switch() {
let n = make_notifier(NotificationsConfig {
enabled: false,
..Default::default()
});
assert!(!n.should_fire(&success_summary(5000, 1)));
}
#[test]
fn should_fire_zero_llm_success_skipped() {
let n = make_notifier(NotificationsConfig {
enabled: true,
..Default::default()
});
assert!(!n.should_fire(&success_summary(0, 0)));
}
#[test]
fn should_fire_zero_llm_error_fires() {
let n = make_notifier(NotificationsConfig {
enabled: true,
..Default::default()
});
assert!(n.should_fire(&error_summary(0, 0)));
}
#[test]
fn should_fire_only_on_error_skips_success() {
let n = make_notifier(NotificationsConfig {
enabled: true,
only_on_error: true,
..Default::default()
});
assert!(!n.should_fire(&success_summary(5000, 1)));
}
#[test]
fn should_fire_only_on_error_fires_on_error() {
let n = make_notifier(NotificationsConfig {
enabled: true,
only_on_error: true,
..Default::default()
});
assert!(n.should_fire(&error_summary(100, 1)));
}
#[test]
fn should_fire_duration_gate_success_below_threshold() {
let n = make_notifier(NotificationsConfig {
enabled: true,
min_turn_duration_ms: 3000,
..Default::default()
});
assert!(!n.should_fire(&success_summary(2999, 1)));
}
#[test]
fn should_fire_duration_gate_success_at_threshold() {
let n = make_notifier(NotificationsConfig {
enabled: true,
min_turn_duration_ms: 3000,
..Default::default()
});
assert!(n.should_fire(&success_summary(3000, 1)));
}
#[test]
fn should_fire_error_bypasses_duration_gate() {
let n = make_notifier(NotificationsConfig {
enabled: true,
min_turn_duration_ms: 3000,
..Default::default()
});
assert!(n.should_fire(&error_summary(100, 1)));
}
#[test]
fn sanitize_control_chars_replaced_with_space() {
let result = sanitize_applescript_payload("Hello\nWorld", 200);
assert!(!result.contains('\n'));
assert!(result.contains("Hello World"));
}
#[test]
fn sanitize_quotes_stripped() {
let result = sanitize_applescript_payload(r#"say "hi""#, 200);
assert!(!result.contains('"'));
assert_eq!(result, "say hi");
}
#[test]
fn sanitize_backslash_stripped() {
let result = sanitize_applescript_payload(r"C:\Users\foo", 200);
assert_eq!(result, "C:Usersfoo");
}
#[test]
fn sanitize_truncation_appends_ellipsis() {
let long = "a".repeat(300);
let result = sanitize_applescript_payload(&long, 200);
assert!(result.ends_with('…'));
assert_eq!(result.chars().count(), 201);
}
#[test]
fn sanitize_no_truncation_when_short() {
let result = sanitize_applescript_payload("short", 200);
assert_eq!(result, "short");
}
#[test]
fn sanitize_injection_attempt() {
let payload = r#""; display dialog "gotcha"; ""#;
let result = sanitize_applescript_payload(payload, 200);
assert!(!result.contains('"'));
}
#[test]
fn sanitize_applescript_payload_empty() {
assert_eq!(sanitize_applescript_payload("", 200), "");
}
#[test]
fn sanitize_tab_replaced() {
let result = sanitize_applescript_payload("a\tb", 200);
assert_eq!(result, "a b");
}
#[test]
fn sanitize_line_separators() {
let s = "hello\u{2028}world\u{2029}end";
let result = sanitize_applescript_payload(s, 200);
assert!(!result.contains('\u{2028}'));
assert!(!result.contains('\u{2029}'));
assert_eq!(result, "hello world end");
}
#[test]
fn notification_message_success() {
let summary = success_summary(1234, 1);
let msg = build_notification_message(&summary);
assert!(msg.starts_with("Done"));
assert!(msg.contains("1234ms"));
}
#[test]
fn notification_message_error() {
let summary = error_summary(500, 1);
let msg = build_notification_message(&summary);
assert!(msg.starts_with("Error"));
}
#[test]
fn notification_message_redacts_secrets() {
let summary = TurnSummary {
duration_ms: 100,
preview: "Done. Key: sk-abc123xyz".to_owned(),
tool_calls: 0,
llm_requests: 1,
exit_status: TurnExitStatus::Success,
};
let msg = build_notification_message(&summary);
assert!(!msg.contains("sk-abc123xyz"), "secret must be redacted");
assert!(
msg.contains("[REDACTED]"),
"should contain redaction marker"
);
}
}