1use crate::FaucetError;
10use crate::traits::Sink;
11use schemars::JsonSchema;
12use serde::{Deserialize, Serialize};
13use serde_json::{Value, json};
14use std::fmt;
15use std::sync::Arc;
16use std::time::{SystemTime, UNIX_EPOCH};
17
18#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize, JsonSchema, PartialEq, Eq)]
21#[serde(rename_all = "snake_case")]
22pub enum OnBatchError {
23 #[default]
25 Propagate,
26 DlqAll,
32}
33
34#[derive(Clone)]
36pub struct DlqConfig {
37 pub sink: Arc<dyn Sink>,
39 pub on_batch_error: OnBatchError,
41 pub max_failures_per_page: Option<usize>,
48 pub max_failures_total: Option<usize>,
54 pub include_original_payload: bool,
56}
57
58impl DlqConfig {
59 pub fn new(sink: Arc<dyn Sink>) -> Self {
62 Self {
63 sink,
64 on_batch_error: OnBatchError::Propagate,
65 max_failures_per_page: None,
66 max_failures_total: None,
67 include_original_payload: true,
68 }
69 }
70}
71
72impl fmt::Debug for DlqConfig {
73 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
74 f.debug_struct("DlqConfig")
75 .field("sink", &self.sink.connector_name())
76 .field("on_batch_error", &self.on_batch_error)
77 .field("max_failures_per_page", &self.max_failures_per_page)
78 .field("max_failures_total", &self.max_failures_total)
79 .field("include_original_payload", &self.include_original_payload)
80 .finish()
81 }
82}
83
84#[derive(Debug, Clone, Default, PartialEq, Eq)]
87pub struct DlqStats {
88 pub records_dlq: usize,
90 pub pages_with_failures: usize,
92}
93
94#[derive(Debug, Clone, Copy, PartialEq, Eq)]
97pub enum DlqReason {
98 Partial,
101 DlqAll,
104 Quality,
106}
107
108impl DlqReason {
109 pub fn as_str(self) -> &'static str {
112 match self {
113 DlqReason::Partial => "partial",
114 DlqReason::DlqAll => "dlq_all",
115 DlqReason::Quality => "quality",
116 }
117 }
118}
119
120pub fn build_envelope(
125 payload: &Value,
126 error: &FaucetError,
127 sink_name: &str,
128 pipeline_name: &str,
129 row: &str,
130 record_index: usize,
131) -> Value {
132 let kind = crate::observability::decorator::error_kind(error);
133 let message = error.to_string();
134 let ts_ms = SystemTime::now()
140 .duration_since(UNIX_EPOCH)
141 .map(|d| i64::try_from(d.as_millis()).unwrap_or(i64::MAX))
142 .unwrap_or(0);
143 json!({
144 "error": { "kind": kind, "message": message },
145 "payload": payload,
146 "ts_ms": ts_ms,
147 "sink": sink_name,
148 "pipeline": pipeline_name,
149 "row": row,
150 "record_index": record_index,
151 })
152}
153
154#[cfg(test)]
155mod tests {
156 use super::*;
157
158 #[test]
159 fn envelope_has_all_required_fields() {
160 let payload = json!({"user_id": 7, "name": "Alice"});
161 let err = FaucetError::Sink("row rejected: bad timestamp".into());
162 let env = build_envelope(&payload, &err, "bigquery", "users_etl", "us", 3);
163
164 assert_eq!(env["error"]["kind"], "Sink");
165 assert!(
166 env["error"]["message"]
167 .as_str()
168 .unwrap()
169 .contains("row rejected")
170 );
171 assert_eq!(env["payload"], payload);
172 assert!(env["ts_ms"].as_i64().unwrap() > 0);
173 assert_eq!(env["sink"], "bigquery");
174 assert_eq!(env["pipeline"], "users_etl");
175 assert_eq!(env["row"], "us");
176 assert_eq!(env["record_index"], 3);
177 }
178
179 #[test]
180 fn envelope_preserves_payload_byte_for_byte() {
181 let payload = json!({
182 "nested": { "a": [1, 2, 3], "b": null, "c": true },
183 "unicode": "café — résumé"
184 });
185 let env = build_envelope(&payload, &FaucetError::Sink("x".into()), "s", "p", "", 0);
186 assert_eq!(env["payload"], payload);
187 }
188
189 #[test]
190 fn envelope_empty_row_serializes_as_empty_string() {
191 let env = build_envelope(&json!({}), &FaucetError::Sink("x".into()), "s", "", "", 0);
192 assert_eq!(env["row"], "");
193 assert_eq!(env["pipeline"], "");
194 }
195
196 #[test]
197 fn on_batch_error_defaults_to_propagate() {
198 assert_eq!(OnBatchError::default(), OnBatchError::Propagate);
199 }
200
201 #[test]
202 fn on_batch_error_serializes_snake_case() {
203 let prop = serde_json::to_string(&OnBatchError::Propagate).unwrap();
204 let all = serde_json::to_string(&OnBatchError::DlqAll).unwrap();
205 assert_eq!(prop, "\"propagate\"");
206 assert_eq!(all, "\"dlq_all\"");
207 }
208
209 #[test]
210 fn on_batch_error_deserializes_snake_case() {
211 let prop: OnBatchError = serde_json::from_str("\"propagate\"").unwrap();
212 let all: OnBatchError = serde_json::from_str("\"dlq_all\"").unwrap();
213 assert_eq!(prop, OnBatchError::Propagate);
214 assert_eq!(all, OnBatchError::DlqAll);
215 }
216
217 #[test]
218 fn dlq_reason_strings() {
219 assert_eq!(DlqReason::Partial.as_str(), "partial");
220 assert_eq!(DlqReason::DlqAll.as_str(), "dlq_all");
221 }
222
223 #[test]
224 fn dlq_reason_quality_string() {
225 assert_eq!(DlqReason::Quality.as_str(), "quality");
226 }
227}