use std::time::Duration;
use crate::Notification;
use super::TriggerError;
use super::http_method::HttpMethod;
use super::template::{
CompiledTemplate, TemplateError, TemplateErrorKind, compile, template_error_to_trigger_error,
};
mod builders;
pub use builders::DEFAULT_WEBHOOK_TIMEOUT;
#[cfg(test)]
mod tests;
const RING_CAP: usize = 4096;
const DEFAULT_CONTENT_TYPE: &str = "application/json";
#[derive(Clone)]
pub(super) struct WebhookConfig {
pub(super) url_template: Result<CompiledTemplate, TemplateError>,
pub(super) method: HttpMethod,
pub(super) headers: Vec<(String, Result<CompiledTemplate, TemplateError>)>,
pub(super) body_template: Option<Result<CompiledTemplate, TemplateError>>,
}
impl std::fmt::Debug for WebhookConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let WebhookConfig {
url_template,
method,
headers,
body_template,
} = self;
let url_state: &dyn std::fmt::Display = match url_template {
Ok(_) => &"<compiled-url-template-redacted>",
Err(_) => &"<bad-url-template-redacted>",
};
let body_state: &dyn std::fmt::Display = match body_template {
None => &"<default-notification-json>",
Some(Ok(_)) => &"<compiled-body-template-redacted>",
Some(Err(_)) => &"<bad-body-template-redacted>",
};
f.debug_struct("WebhookConfig")
.field("url_template", &format_args!("{url_state}"))
.field("method", method)
.field("header_count", &headers.len())
.field("body_template", &format_args!("{body_state}"))
.finish()
}
}
#[derive(Debug)]
pub(super) struct RenderedWebhook {
pub url: String,
pub headers: Vec<(String, String)>,
pub body: String,
}
fn render_webhook_parts(
cfg: &WebhookConfig,
notification: &Notification,
) -> Result<RenderedWebhook, TriggerError> {
render_webhook_parts_with_env(cfg, notification, |name| match std::env::var(name) {
Ok(value) => Ok(value),
Err(std::env::VarError::NotPresent) => Err(TemplateErrorKind::EnvNotSet),
Err(std::env::VarError::NotUnicode(_)) => Err(TemplateErrorKind::EnvNotUnicode),
})
}
pub(super) fn render_webhook_parts_with_env<F>(
cfg: &WebhookConfig,
notification: &Notification,
env_resolver: F,
) -> Result<RenderedWebhook, TriggerError>
where
F: Fn(&str) -> Result<String, TemplateErrorKind>,
{
let url_template = cfg
.url_template
.as_ref()
.map_err(|e| template_error_to_trigger_error(e.clone(), "webhook url"))?;
let url = url_template
.render_with_env(notification, &env_resolver)
.map_err(|e| template_error_to_trigger_error(e, "webhook url"))?;
let mut headers: Vec<(String, String)> = Vec::with_capacity(cfg.headers.len());
for (name, value_template) in &cfg.headers {
let tmpl = value_template
.as_ref()
.map_err(|e| template_error_to_trigger_error(e.clone(), "webhook header"))?;
let value = tmpl
.render_with_env(notification, &env_resolver)
.map_err(|e| template_error_to_trigger_error(e, "webhook header"))?;
headers.push((name.clone(), value));
}
let body = match &cfg.body_template {
Some(template_result) => {
let tmpl = template_result
.as_ref()
.map_err(|e| template_error_to_trigger_error(e.clone(), "webhook body"))?;
tmpl.render_with_env(notification, &env_resolver)
.map_err(|e| template_error_to_trigger_error(e, "webhook body"))?
}
None => serde_json::to_string(notification).map_err(TriggerError::Encode)?,
};
Ok(RenderedWebhook { url, headers, body })
}
pub(super) async fn dispatch_webhook(
cfg: &WebhookConfig,
http: &reqwest::Client,
timeout: Option<Duration>,
notification: &Notification,
) -> Result<(), TriggerError> {
let RenderedWebhook { url, headers, body } = render_webhook_parts(cfg, notification)?;
let mut request = http.request(cfg.method.to_reqwest(), &url);
let user_set_content_type = headers
.iter()
.any(|(k, _)| k.eq_ignore_ascii_case("content-type"));
for (name, value) in headers {
request = request.header(name, value);
}
if !user_set_content_type {
request = request.header(reqwest::header::CONTENT_TYPE, DEFAULT_CONTENT_TYPE);
}
request = request.body(body);
if let Some(t) = timeout {
request = request.timeout(t);
}
let response = match request.send().await {
Ok(r) => r,
Err(e) => {
let is_timeout = e.is_timeout();
let is_builder = e.is_builder();
let is_connect = e.is_connect();
tracing::debug!(
event.name = "client.trigger.webhook.send_failed",
is_timeout,
is_builder,
is_connect,
"webhook request send failed"
);
if is_timeout {
if let Some(t) = timeout {
return Err(TriggerError::Timeout(t));
}
}
if is_builder {
return Err(TriggerError::WebhookBuild {
reason: "request build failed (invalid URL or header value)".to_string(),
});
}
return Err(TriggerError::Webhook {
status: None,
body_tail: String::new(),
});
}
};
let drain = capture_body_tail(response).await;
match drain {
BodyDrain::Complete { status, body_tail }
| BodyDrain::PartialNoTimeout { status, body_tail } => {
if status.is_success() {
Ok(())
} else {
Err(TriggerError::Webhook {
status: Some(status),
body_tail,
})
}
}
BodyDrain::Timeout => {
if let Some(t) = timeout {
Err(TriggerError::Timeout(t))
} else {
Err(TriggerError::Webhook {
status: None,
body_tail: String::new(),
})
}
}
}
}
enum BodyDrain {
Complete {
status: reqwest::StatusCode,
body_tail: String,
},
PartialNoTimeout {
status: reqwest::StatusCode,
body_tail: String,
},
Timeout,
}
async fn capture_body_tail(mut response: reqwest::Response) -> BodyDrain {
let status = response.status();
let mut ring: Vec<u8> = Vec::with_capacity(RING_CAP);
loop {
match response.chunk().await {
Ok(Some(bytes)) => {
let slice = bytes.as_ref();
let n = slice.len();
if ring.len() + n > RING_CAP {
if n >= RING_CAP {
ring.clear();
ring.extend_from_slice(&slice[n - RING_CAP..]);
continue;
}
let overflow = ring.len() + n - RING_CAP;
ring.drain(..overflow);
}
ring.extend_from_slice(slice);
}
Ok(None) => {
return BodyDrain::Complete {
status,
body_tail: String::from_utf8_lossy(&ring).into_owned(),
};
}
Err(e) => {
let timed_out = e.is_timeout();
tracing::debug!(
event.name = "client.trigger.webhook.body_read_error",
timed_out,
"response body read error during chunked drain"
);
if timed_out {
return BodyDrain::Timeout;
}
return BodyDrain::PartialNoTimeout {
status,
body_tail: String::from_utf8_lossy(&ring).into_owned(),
};
}
}
}
}
pub(super) fn build_webhook_config(url: impl Into<String>) -> WebhookConfig {
let url_str = url.into();
let compiled = compile(&url_str);
WebhookConfig {
url_template: compiled,
method: HttpMethod::Post,
headers: Vec::new(),
body_template: None,
}
}
pub(super) fn webhook_add_header(
cfg: &mut WebhookConfig,
name: impl Into<String>,
value: impl Into<String>,
) {
let value_str = value.into();
let compiled = compile(&value_str);
cfg.headers.push((name.into(), compiled));
}
pub(super) fn webhook_set_body_template(cfg: &mut WebhookConfig, body: impl Into<String>) {
let body_str = body.into();
let compiled = compile(&body_str);
cfg.body_template = Some(compiled);
}
pub(super) fn webhook_set_method(cfg: &mut WebhookConfig, method: HttpMethod) {
cfg.method = method;
}