Skip to main content

faucet_core/
dlq.rs

1//! Dead-letter queue (DLQ) wiring shared by the pipeline runner.
2//!
3//! The types defined here are config-shaped: they describe *what* the
4//! pipeline should do with row-level failures, not *how* the routing is
5//! executed. The execution lives in [`run_stream`](crate::run_stream).
6//!
7//! See `docs/superpowers/specs/2026-05-24-dlq-design.md`.
8
9use crate::FaucetError;
10use crate::traits::Sink;
11use schemars::JsonSchema;
12use serde::{Deserialize, Serialize};
13use serde_json::{Value, json};
14use std::fmt;
15use std::sync::Arc;
16use std::time::{SystemTime, UNIX_EPOCH};
17
18/// Policy applied when a sink reports an outer failure (the whole batch
19/// failed, no per-row info).
20#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize, JsonSchema, PartialEq, Eq)]
21#[serde(rename_all = "snake_case")]
22pub enum OnBatchError {
23    /// Surface the underlying [`FaucetError`] and fail the pipeline (default).
24    #[default]
25    Propagate,
26    /// Treat every row in the failed page as a DLQ candidate. Unsafe with
27    /// best-effort APIs that haven't overridden
28    /// [`Sink::write_batch_partial`] — already-committed rows would land in
29    /// the DLQ as duplicates. Use with atomic sinks (single-statement
30    /// INSERT, file writes) where the failure mode is "nothing landed".
31    DlqAll,
32}
33
34/// Pipeline-level DLQ wiring.
35#[derive(Clone)]
36pub struct DlqConfig {
37    /// Sink that receives DLQ envelopes.
38    pub sink: Arc<dyn Sink>,
39    /// What to do when the main sink fails wholesale.
40    pub on_batch_error: OnBatchError,
41    /// Per-page failure budget. `None` = unlimited.
42    ///
43    /// This budget is **shared across both sink-side row failures and
44    /// quality-check quarantines**: a record routed to the DLQ by a
45    /// `quarantine` quality check counts against it just as a sink-side
46    /// row failure does.
47    pub max_failures_per_page: Option<usize>,
48    /// Cumulative failure budget across the run. `None` = unlimited.
49    ///
50    /// This budget is **shared across both sink-side row failures and
51    /// quality-check quarantines**: records quarantined by the quality pass
52    /// accumulate in this counter alongside sink-side failures.
53    pub max_failures_total: Option<usize>,
54    /// Always `true` in v1. Reserved for a future "headers-only" mode.
55    pub include_original_payload: bool,
56}
57
58impl DlqConfig {
59    /// Convenience constructor: `propagate` policy, no budgets, payload
60    /// included.
61    pub fn new(sink: Arc<dyn Sink>) -> Self {
62        Self {
63            sink,
64            on_batch_error: OnBatchError::Propagate,
65            max_failures_per_page: None,
66            max_failures_total: None,
67            include_original_payload: true,
68        }
69    }
70}
71
72impl fmt::Debug for DlqConfig {
73    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
74        f.debug_struct("DlqConfig")
75            .field("sink", &self.sink.connector_name())
76            .field("on_batch_error", &self.on_batch_error)
77            .field("max_failures_per_page", &self.max_failures_per_page)
78            .field("max_failures_total", &self.max_failures_total)
79            .field("include_original_payload", &self.include_original_payload)
80            .finish()
81    }
82}
83
84/// Counters returned alongside [`PipelineResult`](crate::PipelineResult)
85/// when a DLQ is wired.
86#[derive(Debug, Clone, Default, PartialEq, Eq)]
87pub struct DlqStats {
88    /// Total rows routed to the DLQ across the run.
89    pub records_dlq: usize,
90    /// Pages that produced at least one DLQ record.
91    pub pages_with_failures: usize,
92}
93
94/// Reason a page produced DLQ traffic. Used as a metric label and span
95/// attribute; closed-set enum so cardinality stays bounded.
96#[derive(Debug, Clone, Copy, PartialEq, Eq)]
97pub enum DlqReason {
98    /// At least one per-row outcome was `Err`, surfaced by an overriding
99    /// [`Sink::write_batch_partial`].
100    Partial,
101    /// The whole batch failed and the configured policy was
102    /// [`OnBatchError::DlqAll`].
103    DlqAll,
104    /// A record was quarantined (or batch-quarantined) by a data-quality check.
105    Quality,
106}
107
108impl DlqReason {
109    /// Returns the stable Prometheus label value for this reason.
110    /// Closed-set values: `"partial"`, `"dlq_all"`, or `"quality"`.
111    pub fn as_str(self) -> &'static str {
112        match self {
113            DlqReason::Partial => "partial",
114            DlqReason::DlqAll => "dlq_all",
115            DlqReason::Quality => "quality",
116        }
117    }
118}
119
120/// Build a single DLQ envelope.
121///
122/// The schema is fixed; see the design spec for the rationale. `payload`
123/// is included verbatim — no truncation, no transformation.
124pub fn build_envelope(
125    payload: &Value,
126    error: &FaucetError,
127    sink_name: &str,
128    pipeline_name: &str,
129    row: &str,
130    record_index: usize,
131) -> Value {
132    let kind = crate::observability::decorator::error_kind(error);
133    let message = error.to_string();
134    // `as_millis()` returns u128. Convert via TryFrom so we saturate at
135    // i64::MAX instead of silently wrapping to a negative number. The
136    // saturation ceiling (year ~292,000,000) is impossible in practice,
137    // so this only ever fires on a corrupt clock. `unwrap_or(0)` covers
138    // the (also impossible on modern systems) clock-before-epoch case.
139    let ts_ms = SystemTime::now()
140        .duration_since(UNIX_EPOCH)
141        .map(|d| i64::try_from(d.as_millis()).unwrap_or(i64::MAX))
142        .unwrap_or(0);
143    json!({
144        "error": { "kind": kind, "message": message },
145        "payload": payload,
146        "ts_ms": ts_ms,
147        "sink": sink_name,
148        "pipeline": pipeline_name,
149        "row": row,
150        "record_index": record_index,
151    })
152}
153
154#[cfg(test)]
155mod tests {
156    use super::*;
157
158    #[test]
159    fn envelope_has_all_required_fields() {
160        let payload = json!({"user_id": 7, "name": "Alice"});
161        let err = FaucetError::Sink("row rejected: bad timestamp".into());
162        let env = build_envelope(&payload, &err, "bigquery", "users_etl", "us", 3);
163
164        assert_eq!(env["error"]["kind"], "Sink");
165        assert!(
166            env["error"]["message"]
167                .as_str()
168                .unwrap()
169                .contains("row rejected")
170        );
171        assert_eq!(env["payload"], payload);
172        assert!(env["ts_ms"].as_i64().unwrap() > 0);
173        assert_eq!(env["sink"], "bigquery");
174        assert_eq!(env["pipeline"], "users_etl");
175        assert_eq!(env["row"], "us");
176        assert_eq!(env["record_index"], 3);
177    }
178
179    #[test]
180    fn envelope_preserves_payload_byte_for_byte() {
181        let payload = json!({
182            "nested": { "a": [1, 2, 3], "b": null, "c": true },
183            "unicode": "café — résumé"
184        });
185        let env = build_envelope(&payload, &FaucetError::Sink("x".into()), "s", "p", "", 0);
186        assert_eq!(env["payload"], payload);
187    }
188
189    #[test]
190    fn envelope_empty_row_serializes_as_empty_string() {
191        let env = build_envelope(&json!({}), &FaucetError::Sink("x".into()), "s", "", "", 0);
192        assert_eq!(env["row"], "");
193        assert_eq!(env["pipeline"], "");
194    }
195
196    #[test]
197    fn on_batch_error_defaults_to_propagate() {
198        assert_eq!(OnBatchError::default(), OnBatchError::Propagate);
199    }
200
201    #[test]
202    fn on_batch_error_serializes_snake_case() {
203        let prop = serde_json::to_string(&OnBatchError::Propagate).unwrap();
204        let all = serde_json::to_string(&OnBatchError::DlqAll).unwrap();
205        assert_eq!(prop, "\"propagate\"");
206        assert_eq!(all, "\"dlq_all\"");
207    }
208
209    #[test]
210    fn on_batch_error_deserializes_snake_case() {
211        let prop: OnBatchError = serde_json::from_str("\"propagate\"").unwrap();
212        let all: OnBatchError = serde_json::from_str("\"dlq_all\"").unwrap();
213        assert_eq!(prop, OnBatchError::Propagate);
214        assert_eq!(all, OnBatchError::DlqAll);
215    }
216
217    #[test]
218    fn dlq_reason_strings() {
219        assert_eq!(DlqReason::Partial.as_str(), "partial");
220        assert_eq!(DlqReason::DlqAll.as_str(), "dlq_all");
221    }
222
223    #[test]
224    fn dlq_reason_quality_string() {
225        assert_eq!(DlqReason::Quality.as_str(), "quality");
226    }
227}