Skip to main content

cellos_sink_dlq/
lib.rs

1//! Dead Letter Queue (DLQ) [`EventSink`] wrapper — P3-03.
2//!
3//! Wraps any `Arc<dyn EventSink>` and, on a primary `emit()` failure, persists
4//! the failed [`CloudEventV1`](cellos_core::CloudEventV1) as a JSON line to a
5//! file under an operator-nominated directory. The wrapper is a no-op unless
6//! the `CELLOS_DLQ_DIR` environment variable is set to a non-empty,
7//! existing-and-writable directory at composition time.
8//!
9//! # Wire format
10//!
11//! One file is written per failed event. The file name is:
12//!
13//! ```text
14//! <timestamp-millis>-<event-id-sanitized>.jsonl
15//! ```
16//!
17//! where `<timestamp-millis>` is the Unix epoch milliseconds at the moment of
18//! the failure (monotonically increasing within a single process barring clock
19//! skew) and `<event-id-sanitized>` replaces any character outside
20//! `[A-Za-z0-9._-]` with `_` so attacker-controlled or unusual CloudEvent IDs
21//! cannot escape the DLQ directory or collide with shell globs.
22//!
23//! Each file contains exactly one JSON object on a single line:
24//!
25//! ```json
26//! {"event": <CloudEventV1>, "error": "<primary sink error message>"}
27//! ```
28//!
29//! Using one file per event (rather than a rolling JSONL) keeps writes append-
30//! free and tolerates concurrent emitters without locking, at the cost of one
31//! `inode` per failure. Operators expecting high failure rates should rotate
32//! or compact `${CELLOS_DLQ_DIR}` out of band.
33//!
34//! # Failure semantics
35//!
36//! On primary `emit()` failure, the wrapper:
37//!
38//! 1. Best-effort writes the failed event to the DLQ.
39//! 2. Logs a warning with the event id and primary error (and DLQ-write error
40//!    if the persistence itself failed).
41//! 3. Returns `Ok(())` to the caller — the DLQ has assumed responsibility for
42//!    the event, so callers should treat the emit as "delivered to the
43//!    operator's recovery channel". If the DLQ write *also* fails, the
44//!    original primary error is propagated unchanged so the supervisor still
45//!    sees a failure rather than silently dropping the event.
46//!
47//! # Activation
48//!
49//! ```text
50//! CELLOS_DLQ_DIR=/var/lib/cellos/dlq
51//! ```
52//!
53//! Disabled by default. When unset, empty, or pointing at a path that does
54//! not exist or is not writable, [`DlqSink::from_env`] returns the inner
55//! sink unwrapped (identity).
56
57use std::path::{Path, PathBuf};
58use std::sync::Arc;
59use std::time::{SystemTime, UNIX_EPOCH};
60
61use async_trait::async_trait;
62use cellos_core::ports::EventSink;
63use cellos_core::{CellosError, CloudEventV1};
64use tokio::io::AsyncWriteExt;
65
66/// EventSink wrapper that persists failed CloudEvents to a DLQ directory.
67pub struct DlqSink {
68    inner: Arc<dyn EventSink>,
69    dir: PathBuf,
70}
71
72impl DlqSink {
73    /// Wrap `inner` with a DLQ persisting to `dir`. The caller is responsible
74    /// for ensuring `dir` exists and is writable; [`DlqSink::from_env`]
75    /// performs that pre-flight check before invoking this constructor.
76    pub fn wrap(inner: Arc<dyn EventSink>, dir: impl Into<PathBuf>) -> Arc<dyn EventSink> {
77        Arc::new(Self {
78            inner,
79            dir: dir.into(),
80        })
81    }
82
83    /// Read `CELLOS_DLQ_DIR` from the environment. Returns the inner sink
84    /// unwrapped (identity) when:
85    ///
86    /// - the variable is unset, or
87    /// - the variable is empty / whitespace-only, or
88    /// - the path does not exist, is not a directory, or is not writable.
89    ///
90    /// Otherwise returns `inner` wrapped in a [`DlqSink`].
91    pub fn from_env(inner: Arc<dyn EventSink>) -> Arc<dyn EventSink> {
92        let Ok(raw) = std::env::var("CELLOS_DLQ_DIR") else {
93            return inner;
94        };
95        let trimmed = raw.trim();
96        if trimmed.is_empty() {
97            return inner;
98        }
99        let dir = PathBuf::from(trimmed);
100        match dir_is_writable(&dir) {
101            Ok(()) => {
102                tracing::info!(
103                    target: "cellos.supervisor.observability",
104                    dlq_dir = %dir.display(),
105                    "DLQ enabled — failed events will be persisted"
106                );
107                Self::wrap(inner, dir)
108            }
109            Err(e) => {
110                tracing::warn!(
111                    target: "cellos.supervisor.observability",
112                    dlq_dir = %dir.display(),
113                    error = %e,
114                    "CELLOS_DLQ_DIR set but directory is not usable; DLQ disabled"
115                );
116                inner
117            }
118        }
119    }
120
121    async fn write_to_dlq(
122        &self,
123        event: &CloudEventV1,
124        primary_error: &CellosError,
125    ) -> Result<(), CellosError> {
126        let filename = dlq_filename(&event.id);
127        let path = self.dir.join(filename);
128
129        let envelope = serde_json::json!({
130            "event": event,
131            "error": primary_error.to_string(),
132        });
133        let line = serde_json::to_string(&envelope)
134            .map_err(|e| CellosError::EventSink(format!("dlq serialize: {e}")))?;
135
136        // O_CREATE_NEW + O_WRONLY (no O_TRUNC) — one file per failure means
137        // collisions on the timestamp-millis + event-id pair are vanishingly
138        // unlikely, but if they do happen we want the write to fail loudly
139        // rather than silently truncate a prior DLQ entry.
140        let mut f = tokio::fs::OpenOptions::new()
141            .create_new(true)
142            .write(true)
143            .open(&path)
144            .await
145            .map_err(|e| CellosError::EventSink(format!("dlq open {}: {e}", path.display())))?;
146        f.write_all(line.as_bytes())
147            .await
148            .map_err(|e| CellosError::EventSink(format!("dlq write: {e}")))?;
149        f.write_all(b"\n")
150            .await
151            .map_err(|e| CellosError::EventSink(format!("dlq newline: {e}")))?;
152        f.flush()
153            .await
154            .map_err(|e| CellosError::EventSink(format!("dlq flush: {e}")))?;
155        Ok(())
156    }
157}
158
159#[async_trait]
160impl EventSink for DlqSink {
161    async fn emit(&self, event: &CloudEventV1) -> Result<(), CellosError> {
162        match self.inner.emit(event).await {
163            Ok(()) => Ok(()),
164            Err(primary_err) => match self.write_to_dlq(event, &primary_err).await {
165                Ok(()) => {
166                    tracing::warn!(
167                        target: "cellos.supervisor.observability",
168                        event_id = %event.id,
169                        event_type = %event.ty,
170                        primary_error = %primary_err,
171                        "primary event sink failed; event captured to DLQ"
172                    );
173                    // DLQ has taken custody of the event — caller should
174                    // treat the emit as delivered.
175                    Ok(())
176                }
177                Err(dlq_err) => {
178                    tracing::error!(
179                        target: "cellos.supervisor.observability",
180                        event_id = %event.id,
181                        event_type = %event.ty,
182                        primary_error = %primary_err,
183                        dlq_error = %dlq_err,
184                        "primary event sink failed AND DLQ persistence failed; event lost"
185                    );
186                    // Both paths failed — surface the original primary error
187                    // so the supervisor still sees a sink failure.
188                    Err(primary_err)
189                }
190            },
191        }
192    }
193}
194
195/// Build the DLQ filename for an event id: `<unix-millis>-<sanitized-id>.jsonl`.
196fn dlq_filename(event_id: &str) -> String {
197    let millis = SystemTime::now()
198        .duration_since(UNIX_EPOCH)
199        .map(|d| d.as_millis())
200        .unwrap_or(0);
201    format!("{millis}-{}.jsonl", sanitize_id(event_id))
202}
203
204/// Replace any character outside `[A-Za-z0-9._-]` with `_`.
205///
206/// Empty input collapses to `"_"` so the filename never ends with a stray
207/// trailing hyphen from the timestamp separator.
208fn sanitize_id(id: &str) -> String {
209    if id.is_empty() {
210        return "_".into();
211    }
212    id.chars()
213        .map(|c| {
214            if c.is_ascii_alphanumeric() || c == '.' || c == '_' || c == '-' {
215                c
216            } else {
217                '_'
218            }
219        })
220        .collect()
221}
222
223/// Pre-flight check: returns Ok if `dir` exists, is a directory, and a
224/// throwaway probe file can be created and removed inside it.
225fn dir_is_writable(dir: &Path) -> std::io::Result<()> {
226    let meta = std::fs::metadata(dir)?;
227    if !meta.is_dir() {
228        return Err(std::io::Error::new(
229            std::io::ErrorKind::NotADirectory,
230            "CELLOS_DLQ_DIR is not a directory",
231        ));
232    }
233    // Probe with O_CREATE_NEW + immediate unlink. We use a per-process probe
234    // name to avoid colliding with a parallel supervisor doing the same
235    // check.
236    let probe = dir.join(format!(".cellos-dlq-probe-{}", std::process::id()));
237    {
238        let _f = std::fs::OpenOptions::new()
239            .create_new(true)
240            .write(true)
241            .open(&probe)?;
242    }
243    let _ = std::fs::remove_file(&probe);
244    Ok(())
245}
246
247#[cfg(test)]
248mod tests {
249    use super::*;
250    use std::sync::Mutex;
251
252    /// A sink that always succeeds and records the last event seen.
253    struct CaptureSink(Mutex<Option<CloudEventV1>>);
254
255    impl CaptureSink {
256        fn new() -> Arc<Self> {
257            Arc::new(Self(Mutex::new(None)))
258        }
259        fn last(&self) -> Option<CloudEventV1> {
260            self.0.lock().unwrap().clone()
261        }
262    }
263
264    #[async_trait]
265    impl EventSink for CaptureSink {
266        async fn emit(&self, event: &CloudEventV1) -> Result<(), CellosError> {
267            *self.0.lock().unwrap() = Some(event.clone());
268            Ok(())
269        }
270    }
271
272    /// A sink that always fails with a fixed error message.
273    struct FailingSink;
274
275    #[async_trait]
276    impl EventSink for FailingSink {
277        async fn emit(&self, _event: &CloudEventV1) -> Result<(), CellosError> {
278            Err(CellosError::EventSink("primary boom".into()))
279        }
280    }
281
282    fn test_event(id: &str) -> CloudEventV1 {
283        CloudEventV1 {
284            specversion: "1.0".into(),
285            id: id.into(),
286            source: "test".into(),
287            ty: "test.event".into(),
288            datacontenttype: Some("application/json".into()),
289            data: Some(serde_json::json!({ "k": "v" })),
290            time: None,
291            traceparent: None,
292        }
293    }
294
295    #[tokio::test]
296    async fn success_path_passes_through_to_primary() {
297        // (a) When the primary emit succeeds, DlqSink must forward and write
298        // nothing to the DLQ directory.
299        let tmp = tempfile::tempdir().unwrap();
300        let capture = CaptureSink::new();
301        let sink = DlqSink::wrap(capture.clone() as Arc<dyn EventSink>, tmp.path());
302
303        let event = test_event("evt-success");
304        sink.emit(&event).await.unwrap();
305
306        assert_eq!(
307            capture.last().expect("primary saw the event").id,
308            "evt-success",
309        );
310
311        let entries: Vec<_> = std::fs::read_dir(tmp.path())
312            .unwrap()
313            .filter_map(Result::ok)
314            .collect();
315        assert!(
316            entries.is_empty(),
317            "DLQ dir must be empty on success path, found: {:?}",
318            entries.iter().map(|e| e.path()).collect::<Vec<_>>()
319        );
320    }
321
322    #[tokio::test]
323    async fn primary_error_lands_in_dlq_file() {
324        // (b) On primary failure the failed CloudEvent must appear as a JSON
325        // line under the DLQ directory and the call must return Ok (DLQ
326        // assumes custody).
327        let tmp = tempfile::tempdir().unwrap();
328        let sink = DlqSink::wrap(Arc::new(FailingSink) as Arc<dyn EventSink>, tmp.path());
329
330        let event = test_event("evt-failure-1");
331        sink.emit(&event)
332            .await
333            .expect("DLQ should swallow primary error");
334
335        let entries: Vec<_> = std::fs::read_dir(tmp.path())
336            .unwrap()
337            .filter_map(Result::ok)
338            .collect();
339        assert_eq!(entries.len(), 1, "exactly one DLQ file expected");
340
341        let path = entries[0].path();
342        let name = path.file_name().unwrap().to_string_lossy().into_owned();
343        assert!(
344            name.ends_with("-evt-failure-1.jsonl"),
345            "filename should encode the event id: {name}"
346        );
347
348        let body = std::fs::read_to_string(&path).unwrap();
349        let parsed: serde_json::Value = serde_json::from_str(body.trim_end())
350            .expect("DLQ file must contain a single JSON line");
351        assert_eq!(parsed["event"]["id"], "evt-failure-1");
352        assert_eq!(parsed["event"]["type"], "test.event");
353        assert_eq!(parsed["error"], "event sink: primary boom");
354    }
355
356    #[tokio::test]
357    async fn disabled_mode_is_identity() {
358        // (c) With CELLOS_DLQ_DIR unset, from_env must return the inner sink
359        // unmodified — no DLQ wrapping, no behavioural change.
360        // We verify by Arc-pointer-equality: the returned sink must be the
361        // same Arc instance we passed in.
362        // Guard the env mutation against test parallelism on the same var.
363        static ENV_GUARD: Mutex<()> = Mutex::new(());
364        let _g = ENV_GUARD.lock().unwrap();
365        std::env::remove_var("CELLOS_DLQ_DIR");
366
367        let inner: Arc<dyn EventSink> = CaptureSink::new();
368        let returned = DlqSink::from_env(inner.clone());
369
370        assert!(
371            Arc::ptr_eq(&inner, &returned),
372            "from_env with CELLOS_DLQ_DIR unset must return the inner Arc unchanged"
373        );
374    }
375
376    #[tokio::test]
377    async fn from_env_empty_value_is_identity() {
378        static ENV_GUARD: Mutex<()> = Mutex::new(());
379        let _g = ENV_GUARD.lock().unwrap();
380        std::env::set_var("CELLOS_DLQ_DIR", "   ");
381
382        let inner: Arc<dyn EventSink> = CaptureSink::new();
383        let returned = DlqSink::from_env(inner.clone());
384        std::env::remove_var("CELLOS_DLQ_DIR");
385
386        assert!(
387            Arc::ptr_eq(&inner, &returned),
388            "whitespace-only CELLOS_DLQ_DIR must be treated as unset"
389        );
390    }
391
392    #[tokio::test]
393    async fn from_env_nonexistent_dir_is_identity() {
394        static ENV_GUARD: Mutex<()> = Mutex::new(());
395        let _g = ENV_GUARD.lock().unwrap();
396        let bogus =
397            std::env::temp_dir().join(format!("cellos-dlq-does-not-exist-{}", std::process::id()));
398        // Make sure it really doesn't exist.
399        let _ = std::fs::remove_dir_all(&bogus);
400        std::env::set_var("CELLOS_DLQ_DIR", &bogus);
401
402        let inner: Arc<dyn EventSink> = CaptureSink::new();
403        let returned = DlqSink::from_env(inner.clone());
404        std::env::remove_var("CELLOS_DLQ_DIR");
405
406        assert!(
407            Arc::ptr_eq(&inner, &returned),
408            "non-existent CELLOS_DLQ_DIR must degrade to identity (DLQ disabled)"
409        );
410    }
411
412    #[tokio::test]
413    #[allow(clippy::await_holding_lock)]
414    async fn from_env_writable_dir_wraps_inner() {
415        static ENV_GUARD: Mutex<()> = Mutex::new(());
416        let _g = ENV_GUARD.lock().unwrap();
417        let tmp = tempfile::tempdir().unwrap();
418        std::env::set_var("CELLOS_DLQ_DIR", tmp.path());
419
420        let inner: Arc<dyn EventSink> = Arc::new(FailingSink);
421        let wrapped = DlqSink::from_env(inner.clone());
422        std::env::remove_var("CELLOS_DLQ_DIR");
423
424        // Must NOT be the identity Arc — wrapping should have happened.
425        assert!(
426            !Arc::ptr_eq(&inner, &wrapped),
427            "writable CELLOS_DLQ_DIR must wrap the inner sink"
428        );
429
430        // And a failing emit should now succeed at the boundary while
431        // landing the event in the DLQ.
432        wrapped.emit(&test_event("env-wrap")).await.unwrap();
433        let count = std::fs::read_dir(tmp.path()).unwrap().count();
434        assert_eq!(count, 1, "wrapped sink should have written one DLQ file");
435    }
436
437    #[test]
438    fn sanitize_id_replaces_unsafe_chars() {
439        assert_eq!(sanitize_id("evt.id-123_ok"), "evt.id-123_ok");
440        assert_eq!(sanitize_id("../escape"), ".._escape"); // dots allowed, only / replaced
441        assert_eq!(sanitize_id("evt/with/slash"), "evt_with_slash");
442        assert_eq!(sanitize_id(""), "_");
443    }
444
445    #[test]
446    fn dlq_filename_has_expected_shape() {
447        let name = dlq_filename("abc");
448        assert!(name.ends_with("-abc.jsonl"));
449        let prefix = name.trim_end_matches("-abc.jsonl");
450        assert!(
451            prefix.chars().all(|c| c.is_ascii_digit()),
452            "timestamp prefix must be all digits: {prefix}"
453        );
454    }
455}