1use std::path::{Path, PathBuf};
58use std::sync::Arc;
59use std::time::{SystemTime, UNIX_EPOCH};
60
61use async_trait::async_trait;
62use cellos_core::ports::EventSink;
63use cellos_core::{CellosError, CloudEventV1};
64use tokio::io::AsyncWriteExt;
65
66pub struct DlqSink {
68 inner: Arc<dyn EventSink>,
69 dir: PathBuf,
70}
71
72impl DlqSink {
73 pub fn wrap(inner: Arc<dyn EventSink>, dir: impl Into<PathBuf>) -> Arc<dyn EventSink> {
77 Arc::new(Self {
78 inner,
79 dir: dir.into(),
80 })
81 }
82
83 pub fn from_env(inner: Arc<dyn EventSink>) -> Arc<dyn EventSink> {
92 let Ok(raw) = std::env::var("CELLOS_DLQ_DIR") else {
93 return inner;
94 };
95 let trimmed = raw.trim();
96 if trimmed.is_empty() {
97 return inner;
98 }
99 let dir = PathBuf::from(trimmed);
100 match dir_is_writable(&dir) {
101 Ok(()) => {
102 tracing::info!(
103 target: "cellos.supervisor.observability",
104 dlq_dir = %dir.display(),
105 "DLQ enabled — failed events will be persisted"
106 );
107 Self::wrap(inner, dir)
108 }
109 Err(e) => {
110 tracing::warn!(
111 target: "cellos.supervisor.observability",
112 dlq_dir = %dir.display(),
113 error = %e,
114 "CELLOS_DLQ_DIR set but directory is not usable; DLQ disabled"
115 );
116 inner
117 }
118 }
119 }
120
121 async fn write_to_dlq(
122 &self,
123 event: &CloudEventV1,
124 primary_error: &CellosError,
125 ) -> Result<(), CellosError> {
126 let filename = dlq_filename(&event.id);
127 let path = self.dir.join(filename);
128
129 let envelope = serde_json::json!({
130 "event": event,
131 "error": primary_error.to_string(),
132 });
133 let line = serde_json::to_string(&envelope)
134 .map_err(|e| CellosError::EventSink(format!("dlq serialize: {e}")))?;
135
136 let mut f = tokio::fs::OpenOptions::new()
141 .create_new(true)
142 .write(true)
143 .open(&path)
144 .await
145 .map_err(|e| CellosError::EventSink(format!("dlq open {}: {e}", path.display())))?;
146 f.write_all(line.as_bytes())
147 .await
148 .map_err(|e| CellosError::EventSink(format!("dlq write: {e}")))?;
149 f.write_all(b"\n")
150 .await
151 .map_err(|e| CellosError::EventSink(format!("dlq newline: {e}")))?;
152 f.flush()
153 .await
154 .map_err(|e| CellosError::EventSink(format!("dlq flush: {e}")))?;
155 Ok(())
156 }
157}
158
159#[async_trait]
160impl EventSink for DlqSink {
161 async fn emit(&self, event: &CloudEventV1) -> Result<(), CellosError> {
162 match self.inner.emit(event).await {
163 Ok(()) => Ok(()),
164 Err(primary_err) => match self.write_to_dlq(event, &primary_err).await {
165 Ok(()) => {
166 tracing::warn!(
167 target: "cellos.supervisor.observability",
168 event_id = %event.id,
169 event_type = %event.ty,
170 primary_error = %primary_err,
171 "primary event sink failed; event captured to DLQ"
172 );
173 Ok(())
176 }
177 Err(dlq_err) => {
178 tracing::error!(
179 target: "cellos.supervisor.observability",
180 event_id = %event.id,
181 event_type = %event.ty,
182 primary_error = %primary_err,
183 dlq_error = %dlq_err,
184 "primary event sink failed AND DLQ persistence failed; event lost"
185 );
186 Err(primary_err)
189 }
190 },
191 }
192 }
193}
194
195fn dlq_filename(event_id: &str) -> String {
197 let millis = SystemTime::now()
198 .duration_since(UNIX_EPOCH)
199 .map(|d| d.as_millis())
200 .unwrap_or(0);
201 format!("{millis}-{}.jsonl", sanitize_id(event_id))
202}
203
204fn sanitize_id(id: &str) -> String {
209 if id.is_empty() {
210 return "_".into();
211 }
212 id.chars()
213 .map(|c| {
214 if c.is_ascii_alphanumeric() || c == '.' || c == '_' || c == '-' {
215 c
216 } else {
217 '_'
218 }
219 })
220 .collect()
221}
222
223fn dir_is_writable(dir: &Path) -> std::io::Result<()> {
226 let meta = std::fs::metadata(dir)?;
227 if !meta.is_dir() {
228 return Err(std::io::Error::new(
229 std::io::ErrorKind::NotADirectory,
230 "CELLOS_DLQ_DIR is not a directory",
231 ));
232 }
233 let probe = dir.join(format!(".cellos-dlq-probe-{}", std::process::id()));
237 {
238 let _f = std::fs::OpenOptions::new()
239 .create_new(true)
240 .write(true)
241 .open(&probe)?;
242 }
243 let _ = std::fs::remove_file(&probe);
244 Ok(())
245}
246
247#[cfg(test)]
248mod tests {
249 use super::*;
250 use std::sync::Mutex;
251
252 struct CaptureSink(Mutex<Option<CloudEventV1>>);
254
255 impl CaptureSink {
256 fn new() -> Arc<Self> {
257 Arc::new(Self(Mutex::new(None)))
258 }
259 fn last(&self) -> Option<CloudEventV1> {
260 self.0.lock().unwrap().clone()
261 }
262 }
263
264 #[async_trait]
265 impl EventSink for CaptureSink {
266 async fn emit(&self, event: &CloudEventV1) -> Result<(), CellosError> {
267 *self.0.lock().unwrap() = Some(event.clone());
268 Ok(())
269 }
270 }
271
272 struct FailingSink;
274
275 #[async_trait]
276 impl EventSink for FailingSink {
277 async fn emit(&self, _event: &CloudEventV1) -> Result<(), CellosError> {
278 Err(CellosError::EventSink("primary boom".into()))
279 }
280 }
281
282 fn test_event(id: &str) -> CloudEventV1 {
283 CloudEventV1 {
284 specversion: "1.0".into(),
285 id: id.into(),
286 source: "test".into(),
287 ty: "test.event".into(),
288 datacontenttype: Some("application/json".into()),
289 data: Some(serde_json::json!({ "k": "v" })),
290 time: None,
291 traceparent: None,
292 }
293 }
294
295 #[tokio::test]
296 async fn success_path_passes_through_to_primary() {
297 let tmp = tempfile::tempdir().unwrap();
300 let capture = CaptureSink::new();
301 let sink = DlqSink::wrap(capture.clone() as Arc<dyn EventSink>, tmp.path());
302
303 let event = test_event("evt-success");
304 sink.emit(&event).await.unwrap();
305
306 assert_eq!(
307 capture.last().expect("primary saw the event").id,
308 "evt-success",
309 );
310
311 let entries: Vec<_> = std::fs::read_dir(tmp.path())
312 .unwrap()
313 .filter_map(Result::ok)
314 .collect();
315 assert!(
316 entries.is_empty(),
317 "DLQ dir must be empty on success path, found: {:?}",
318 entries.iter().map(|e| e.path()).collect::<Vec<_>>()
319 );
320 }
321
322 #[tokio::test]
323 async fn primary_error_lands_in_dlq_file() {
324 let tmp = tempfile::tempdir().unwrap();
328 let sink = DlqSink::wrap(Arc::new(FailingSink) as Arc<dyn EventSink>, tmp.path());
329
330 let event = test_event("evt-failure-1");
331 sink.emit(&event)
332 .await
333 .expect("DLQ should swallow primary error");
334
335 let entries: Vec<_> = std::fs::read_dir(tmp.path())
336 .unwrap()
337 .filter_map(Result::ok)
338 .collect();
339 assert_eq!(entries.len(), 1, "exactly one DLQ file expected");
340
341 let path = entries[0].path();
342 let name = path.file_name().unwrap().to_string_lossy().into_owned();
343 assert!(
344 name.ends_with("-evt-failure-1.jsonl"),
345 "filename should encode the event id: {name}"
346 );
347
348 let body = std::fs::read_to_string(&path).unwrap();
349 let parsed: serde_json::Value = serde_json::from_str(body.trim_end())
350 .expect("DLQ file must contain a single JSON line");
351 assert_eq!(parsed["event"]["id"], "evt-failure-1");
352 assert_eq!(parsed["event"]["type"], "test.event");
353 assert_eq!(parsed["error"], "event sink: primary boom");
354 }
355
356 #[tokio::test]
357 async fn disabled_mode_is_identity() {
358 static ENV_GUARD: Mutex<()> = Mutex::new(());
364 let _g = ENV_GUARD.lock().unwrap();
365 std::env::remove_var("CELLOS_DLQ_DIR");
366
367 let inner: Arc<dyn EventSink> = CaptureSink::new();
368 let returned = DlqSink::from_env(inner.clone());
369
370 assert!(
371 Arc::ptr_eq(&inner, &returned),
372 "from_env with CELLOS_DLQ_DIR unset must return the inner Arc unchanged"
373 );
374 }
375
376 #[tokio::test]
377 async fn from_env_empty_value_is_identity() {
378 static ENV_GUARD: Mutex<()> = Mutex::new(());
379 let _g = ENV_GUARD.lock().unwrap();
380 std::env::set_var("CELLOS_DLQ_DIR", " ");
381
382 let inner: Arc<dyn EventSink> = CaptureSink::new();
383 let returned = DlqSink::from_env(inner.clone());
384 std::env::remove_var("CELLOS_DLQ_DIR");
385
386 assert!(
387 Arc::ptr_eq(&inner, &returned),
388 "whitespace-only CELLOS_DLQ_DIR must be treated as unset"
389 );
390 }
391
392 #[tokio::test]
393 async fn from_env_nonexistent_dir_is_identity() {
394 static ENV_GUARD: Mutex<()> = Mutex::new(());
395 let _g = ENV_GUARD.lock().unwrap();
396 let bogus =
397 std::env::temp_dir().join(format!("cellos-dlq-does-not-exist-{}", std::process::id()));
398 let _ = std::fs::remove_dir_all(&bogus);
400 std::env::set_var("CELLOS_DLQ_DIR", &bogus);
401
402 let inner: Arc<dyn EventSink> = CaptureSink::new();
403 let returned = DlqSink::from_env(inner.clone());
404 std::env::remove_var("CELLOS_DLQ_DIR");
405
406 assert!(
407 Arc::ptr_eq(&inner, &returned),
408 "non-existent CELLOS_DLQ_DIR must degrade to identity (DLQ disabled)"
409 );
410 }
411
412 #[tokio::test]
413 #[allow(clippy::await_holding_lock)]
414 async fn from_env_writable_dir_wraps_inner() {
415 static ENV_GUARD: Mutex<()> = Mutex::new(());
416 let _g = ENV_GUARD.lock().unwrap();
417 let tmp = tempfile::tempdir().unwrap();
418 std::env::set_var("CELLOS_DLQ_DIR", tmp.path());
419
420 let inner: Arc<dyn EventSink> = Arc::new(FailingSink);
421 let wrapped = DlqSink::from_env(inner.clone());
422 std::env::remove_var("CELLOS_DLQ_DIR");
423
424 assert!(
426 !Arc::ptr_eq(&inner, &wrapped),
427 "writable CELLOS_DLQ_DIR must wrap the inner sink"
428 );
429
430 wrapped.emit(&test_event("env-wrap")).await.unwrap();
433 let count = std::fs::read_dir(tmp.path()).unwrap().count();
434 assert_eq!(count, 1, "wrapped sink should have written one DLQ file");
435 }
436
437 #[test]
438 fn sanitize_id_replaces_unsafe_chars() {
439 assert_eq!(sanitize_id("evt.id-123_ok"), "evt.id-123_ok");
440 assert_eq!(sanitize_id("../escape"), ".._escape"); assert_eq!(sanitize_id("evt/with/slash"), "evt_with_slash");
442 assert_eq!(sanitize_id(""), "_");
443 }
444
445 #[test]
446 fn dlq_filename_has_expected_shape() {
447 let name = dlq_filename("abc");
448 assert!(name.ends_with("-abc.jsonl"));
449 let prefix = name.trim_end_matches("-abc.jsonl");
450 assert!(
451 prefix.chars().all(|c| c.is_ascii_digit()),
452 "timestamp prefix must be all digits: {prefix}"
453 );
454 }
455}