use crate::FaucetError;
use crate::traits::Sink;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_json::{Value, json};
use std::fmt;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize, JsonSchema, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum OnBatchError {
#[default]
Propagate,
DlqAll,
}
#[derive(Clone)]
pub struct DlqConfig {
pub sink: Arc<dyn Sink>,
pub on_batch_error: OnBatchError,
pub max_failures_per_page: Option<usize>,
pub max_failures_total: Option<usize>,
pub include_original_payload: bool,
}
impl DlqConfig {
pub fn new(sink: Arc<dyn Sink>) -> Self {
Self {
sink,
on_batch_error: OnBatchError::Propagate,
max_failures_per_page: None,
max_failures_total: None,
include_original_payload: true,
}
}
}
impl fmt::Debug for DlqConfig {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("DlqConfig")
.field("sink", &self.sink.connector_name())
.field("on_batch_error", &self.on_batch_error)
.field("max_failures_per_page", &self.max_failures_per_page)
.field("max_failures_total", &self.max_failures_total)
.field("include_original_payload", &self.include_original_payload)
.finish()
}
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct DlqStats {
pub records_dlq: usize,
pub pages_with_failures: usize,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DlqReason {
Partial,
DlqAll,
Quality,
}
impl DlqReason {
pub fn as_str(self) -> &'static str {
match self {
DlqReason::Partial => "partial",
DlqReason::DlqAll => "dlq_all",
DlqReason::Quality => "quality",
}
}
}
pub fn build_envelope(
payload: &Value,
error: &FaucetError,
sink_name: &str,
pipeline_name: &str,
row: &str,
record_index: usize,
) -> Value {
let kind = crate::observability::decorator::error_kind(error);
let message = error.to_string();
let ts_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| i64::try_from(d.as_millis()).unwrap_or(i64::MAX))
.unwrap_or(0);
json!({
"error": { "kind": kind, "message": message },
"payload": payload,
"ts_ms": ts_ms,
"sink": sink_name,
"pipeline": pipeline_name,
"row": row,
"record_index": record_index,
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn envelope_has_all_required_fields() {
let payload = json!({"user_id": 7, "name": "Alice"});
let err = FaucetError::Sink("row rejected: bad timestamp".into());
let env = build_envelope(&payload, &err, "bigquery", "users_etl", "us", 3);
assert_eq!(env["error"]["kind"], "Sink");
assert!(
env["error"]["message"]
.as_str()
.unwrap()
.contains("row rejected")
);
assert_eq!(env["payload"], payload);
assert!(env["ts_ms"].as_i64().unwrap() > 0);
assert_eq!(env["sink"], "bigquery");
assert_eq!(env["pipeline"], "users_etl");
assert_eq!(env["row"], "us");
assert_eq!(env["record_index"], 3);
}
#[test]
fn envelope_preserves_payload_byte_for_byte() {
let payload = json!({
"nested": { "a": [1, 2, 3], "b": null, "c": true },
"unicode": "café — résumé"
});
let env = build_envelope(&payload, &FaucetError::Sink("x".into()), "s", "p", "", 0);
assert_eq!(env["payload"], payload);
}
#[test]
fn envelope_empty_row_serializes_as_empty_string() {
let env = build_envelope(&json!({}), &FaucetError::Sink("x".into()), "s", "", "", 0);
assert_eq!(env["row"], "");
assert_eq!(env["pipeline"], "");
}
#[test]
fn on_batch_error_defaults_to_propagate() {
assert_eq!(OnBatchError::default(), OnBatchError::Propagate);
}
#[test]
fn on_batch_error_serializes_snake_case() {
let prop = serde_json::to_string(&OnBatchError::Propagate).unwrap();
let all = serde_json::to_string(&OnBatchError::DlqAll).unwrap();
assert_eq!(prop, "\"propagate\"");
assert_eq!(all, "\"dlq_all\"");
}
#[test]
fn on_batch_error_deserializes_snake_case() {
let prop: OnBatchError = serde_json::from_str("\"propagate\"").unwrap();
let all: OnBatchError = serde_json::from_str("\"dlq_all\"").unwrap();
assert_eq!(prop, OnBatchError::Propagate);
assert_eq!(all, OnBatchError::DlqAll);
}
#[test]
fn dlq_reason_strings() {
assert_eq!(DlqReason::Partial.as_str(), "partial");
assert_eq!(DlqReason::DlqAll.as_str(), "dlq_all");
}
#[test]
fn dlq_reason_quality_string() {
assert_eq!(DlqReason::Quality.as_str(), "quality");
}
}