pub mod discord;
pub mod smtp;
pub mod telegram;
pub mod webhook;
use std::collections::HashSet;
use std::sync::Arc;
use serde::Serialize;
use tracing::warn;
use crate::config::{NotificationConfig, NotificationTarget};
use crate::format::short_digest;
use crate::rollback::RollbackReason;
use discord::DiscordNotifier;
use smtp::{SmtpNotifier, SmtpParams};
use telegram::TelegramNotifier;
use webhook::WebhookNotifier;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum Trigger {
Available,
Succeeded,
Failed,
}
impl Trigger {
pub fn as_str(self) -> &'static str {
match self {
Trigger::Available => "available",
Trigger::Succeeded => "succeeded",
Trigger::Failed => "failed",
}
}
pub fn parse(token: &str) -> Result<Self, String> {
match token.trim().to_ascii_lowercase().as_str() {
"available" => Ok(Trigger::Available),
"succeeded" => Ok(Trigger::Succeeded),
"failed" => Ok(Trigger::Failed),
other => Err(other.to_string()),
}
}
pub fn all() -> HashSet<Trigger> {
[Trigger::Available, Trigger::Succeeded, Trigger::Failed]
.into_iter()
.collect()
}
}
#[derive(Debug, Clone)]
pub enum NotifyEvent {
UpdateAvailable {
container: String,
image: String,
latest_digest: String,
},
UpdateSucceeded {
container: String,
image: String,
new_id: String,
},
UpdateFailed {
container: String,
reason: RollbackReason,
old_image_ref: String,
new_image_ref: String,
restored_from: String,
},
}
impl NotifyEvent {
pub fn trigger(&self) -> Trigger {
match self {
NotifyEvent::UpdateAvailable { .. } => Trigger::Available,
NotifyEvent::UpdateSucceeded { .. } => Trigger::Succeeded,
NotifyEvent::UpdateFailed { .. } => Trigger::Failed,
}
}
pub fn container(&self) -> &str {
match self {
NotifyEvent::UpdateAvailable { container, .. }
| NotifyEvent::UpdateSucceeded { container, .. }
| NotifyEvent::UpdateFailed { container, .. } => container,
}
}
pub fn render(&self) -> RenderedMessage {
let (title, body) = match self {
NotifyEvent::UpdateAvailable {
container,
image,
latest_digest,
} => (
format!("Update available: {container}"),
format!(
"A newer image is available for {image} ({}). \
Not applied — this container is in watch mode.",
short_digest(latest_digest)
),
),
NotifyEvent::UpdateSucceeded {
container,
image,
new_id,
} => (
format!("Updated: {container}"),
format!(
"{container} was updated to {image} and passed its health check \
(new container {}).",
short_digest(new_id)
),
),
NotifyEvent::UpdateFailed {
container,
reason,
old_image_ref,
new_image_ref,
restored_from,
} => (
format!("Update failed: {container}"),
format!(
"Updating {container} from {old_image_ref} to {new_image_ref} failed \
the health gate ({}); rolled back to the previous container ({restored_from}).",
reason_text(*reason)
),
),
};
RenderedMessage {
title,
body,
trigger: self.trigger(),
container: self.container().to_string(),
}
}
}
fn parse_triggers(
name: &str,
triggers: Option<Vec<String>>,
) -> Result<HashSet<Trigger>, NotifyError> {
match triggers {
None => Ok(Trigger::all()),
Some(list) => list
.iter()
.map(|t| {
Trigger::parse(t).map_err(|bad| NotifyError::Config {
name: name.to_string(),
reason: format!(
"unknown trigger `{bad}` (expected available, succeeded, or failed)"
),
})
})
.collect(),
}
}
fn build_target(
name: &str,
target: NotificationTarget,
http: &reqwest::Client,
) -> Result<Target, NotifyError> {
let (raw_triggers, notifier): (Option<Vec<String>>, Box<dyn Notifier>) = match target {
NotificationTarget::Webhook { url, triggers } => (
triggers,
Box::new(WebhookNotifier::new(name, url.expose(), http.clone())),
),
NotificationTarget::Discord {
webhook_url,
triggers,
} => (
triggers,
Box::new(DiscordNotifier::new(
name,
webhook_url.expose(),
http.clone(),
)),
),
NotificationTarget::Telegram {
bot_token,
chat_id,
triggers,
} => (
triggers,
Box::new(TelegramNotifier::new(
name,
bot_token,
chat_id,
http.clone(),
)),
),
NotificationTarget::Smtp {
host,
port,
username,
password,
from,
to,
starttls,
triggers,
} => (
triggers,
Box::new(SmtpNotifier::new(SmtpParams {
name: name.to_string(),
host,
port,
username,
password,
from,
to,
starttls,
})?),
),
};
let triggers = parse_triggers(name, raw_triggers)?;
Ok(Target { triggers, notifier })
}
fn reason_text(reason: RollbackReason) -> &'static str {
match reason {
RollbackReason::HealthTimeout => "health check timed out",
RollbackReason::Crashed => "the new container crashed",
}
}
#[derive(Debug, Clone)]
pub struct RenderedMessage {
pub title: String,
pub body: String,
pub trigger: Trigger,
pub container: String,
}
#[derive(Debug, thiserror::Error)]
pub enum NotifyError {
#[error("notification request failed: {0}")]
Http(#[from] reqwest::Error),
#[error("notification target returned HTTP {0}")]
Status(reqwest::StatusCode),
#[error("smtp send failed: {0}")]
Smtp(String),
#[error("invalid notification config for `{name}`: {reason}")]
Config { name: String, reason: String },
}
async fn post_json<B: Serialize + ?Sized>(
client: &reqwest::Client,
url: &str,
body: &B,
) -> Result<(), NotifyError> {
let resp = client
.post(url)
.json(body)
.send()
.await
.map_err(reqwest::Error::without_url)?;
let status = resp.status();
if status.is_success() {
Ok(())
} else {
Err(NotifyError::Status(status))
}
}
#[async_trait::async_trait]
pub trait Notifier: Send + Sync {
fn name(&self) -> &str;
async fn send(&self, msg: &RenderedMessage) -> Result<(), NotifyError>;
}
struct Target {
triggers: HashSet<Trigger>,
notifier: Box<dyn Notifier>,
}
#[derive(Clone)]
pub struct Dispatcher {
targets: Arc<Vec<Target>>,
}
impl Dispatcher {
pub fn noop() -> Self {
Self {
targets: Arc::new(Vec::new()),
}
}
pub fn from_config(config: NotificationConfig, http: reqwest::Client) -> Self {
let mut targets = Vec::with_capacity(config.targets.len());
for (name, target) in config.targets {
match build_target(&name, target, &http) {
Ok(t) => targets.push(t),
Err(e) => {
warn!(target = %name, error = %e, "skipping invalid notification target")
}
}
}
Self {
targets: Arc::new(targets),
}
}
#[cfg(test)]
fn from_targets(targets: Vec<Target>) -> Self {
Self {
targets: Arc::new(targets),
}
}
pub fn is_empty(&self) -> bool {
self.targets.is_empty()
}
pub async fn dispatch(&self, event: &NotifyEvent) {
if self.targets.is_empty() {
return;
}
let trigger = event.trigger();
let msg = event.render();
for target in self.targets.iter() {
if !target.triggers.contains(&trigger) {
continue;
}
if let Err(e) = target.notifier.send(&msg).await {
warn!(target = %target.notifier.name(), error = %e, "notification failed; continuing");
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Mutex;
use std::sync::atomic::{AtomicUsize, Ordering};
const DIG: &str = "sha256:abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789";
fn available() -> NotifyEvent {
NotifyEvent::UpdateAvailable {
container: "web".into(),
image: "nginx:latest".into(),
latest_digest: DIG.into(),
}
}
fn succeeded() -> NotifyEvent {
NotifyEvent::UpdateSucceeded {
container: "web".into(),
image: "nginx:latest".into(),
new_id: DIG.into(),
}
}
fn failed() -> NotifyEvent {
NotifyEvent::UpdateFailed {
container: "web".into(),
reason: RollbackReason::HealthTimeout,
old_image_ref: "nginx:1.0".into(),
new_image_ref: "nginx:1.1".into(),
restored_from: "web-old-1700000000".into(),
}
}
struct RecordingNotifier {
name: String,
fail: bool,
seen: Arc<Mutex<Vec<RenderedMessage>>>,
calls: Arc<AtomicUsize>,
}
impl RecordingNotifier {
fn new(name: &str) -> (Self, Arc<Mutex<Vec<RenderedMessage>>>, Arc<AtomicUsize>) {
let seen = Arc::new(Mutex::new(Vec::new()));
let calls = Arc::new(AtomicUsize::new(0));
(
Self {
name: name.into(),
fail: false,
seen: seen.clone(),
calls: calls.clone(),
},
seen,
calls,
)
}
fn failing(name: &str) -> (Self, Arc<AtomicUsize>) {
let calls = Arc::new(AtomicUsize::new(0));
(
Self {
name: name.into(),
fail: true,
seen: Arc::new(Mutex::new(Vec::new())),
calls: calls.clone(),
},
calls,
)
}
}
#[async_trait::async_trait]
impl Notifier for RecordingNotifier {
fn name(&self) -> &str {
&self.name
}
async fn send(&self, msg: &RenderedMessage) -> Result<(), NotifyError> {
self.calls.fetch_add(1, Ordering::SeqCst);
if self.fail {
return Err(NotifyError::Status(
reqwest::StatusCode::INTERNAL_SERVER_ERROR,
));
}
self.seen.lock().unwrap().push(msg.clone());
Ok(())
}
}
fn target(triggers: HashSet<Trigger>, notifier: impl Notifier + 'static) -> Target {
Target {
triggers,
notifier: Box::new(notifier),
}
}
#[test]
fn trigger_parse_roundtrips_and_rejects_junk() {
for t in [Trigger::Available, Trigger::Succeeded, Trigger::Failed] {
assert_eq!(Trigger::parse(t.as_str()), Ok(t));
}
assert_eq!(Trigger::parse(" FAILED "), Ok(Trigger::Failed));
assert_eq!(Trigger::parse("nope"), Err("nope".to_string()));
}
#[test]
fn render_maps_each_event_to_its_trigger_and_wording() {
let a = available().render();
assert_eq!(a.trigger, Trigger::Available);
assert!(a.title.contains("Update available"));
assert!(a.body.contains("watch mode"));
assert!(a.body.contains("sha256:abcdef012345…"));
assert!(!a.body.contains(DIG));
let s = succeeded().render();
assert_eq!(s.trigger, Trigger::Succeeded);
assert!(s.title.contains("Updated"));
let f = failed().render();
assert_eq!(f.trigger, Trigger::Failed);
assert!(f.title.contains("Update failed"));
assert!(f.body.contains("health check timed out"));
assert!(f.body.contains("web-old-1700000000"));
}
#[tokio::test]
async fn empty_dispatcher_is_a_noop() {
Dispatcher::noop().dispatch(&succeeded()).await; assert!(Dispatcher::noop().is_empty());
}
#[tokio::test]
async fn only_subscribed_targets_receive_an_event() {
let (failures_only, seen_f, calls_f) = RecordingNotifier::new("failures");
let (all, seen_a, calls_a) = RecordingNotifier::new("all");
let d = Dispatcher::from_targets(vec![
target([Trigger::Failed].into_iter().collect(), failures_only),
target(Trigger::all(), all),
]);
d.dispatch(&succeeded()).await;
assert_eq!(
calls_f.load(Ordering::SeqCst),
0,
"failures-only skips success"
);
assert_eq!(
calls_a.load(Ordering::SeqCst),
1,
"all-subscriber gets success"
);
assert!(seen_f.lock().unwrap().is_empty());
assert_eq!(seen_a.lock().unwrap().len(), 1);
d.dispatch(&failed()).await;
assert_eq!(
calls_f.load(Ordering::SeqCst),
1,
"failures-only gets failure"
);
assert_eq!(
calls_a.load(Ordering::SeqCst),
2,
"all-subscriber also gets failure"
);
}
#[tokio::test]
async fn a_failing_target_does_not_block_a_later_one() {
let (boom, boom_calls) = RecordingNotifier::failing("boom");
let (ok, seen_ok, ok_calls) = RecordingNotifier::new("ok");
let d = Dispatcher::from_targets(vec![
target(Trigger::all(), boom),
target(Trigger::all(), ok),
]);
d.dispatch(&succeeded()).await; assert_eq!(boom_calls.load(Ordering::SeqCst), 1);
assert_eq!(ok_calls.load(Ordering::SeqCst), 1);
assert_eq!(seen_ok.lock().unwrap().len(), 1);
}
#[test]
fn omitted_triggers_subscribe_to_all_three() {
assert_eq!(parse_triggers("x", None).unwrap(), Trigger::all());
}
#[test]
fn empty_triggers_list_subscribes_to_nothing() {
assert!(parse_triggers("x", Some(vec![])).unwrap().is_empty());
}
#[test]
fn from_config_skips_a_target_with_an_unknown_trigger() {
use crate::config::{NotificationConfig, NotificationTarget, Secret};
let mut targets = std::collections::HashMap::new();
targets.insert(
"hook".to_string(),
NotificationTarget::Webhook {
url: Secret::new("https://example.com"),
triggers: Some(vec!["bogus".to_string()]),
},
);
let d = Dispatcher::from_config(NotificationConfig { targets }, crate::http::client());
assert!(d.is_empty());
}
}