Skip to main content

cellos_sink_jsonl/
lib.rs

1//! Append [`CloudEventV1`](cellos_core::CloudEventV1) as one JSON object per line.
2
3use std::io;
4use std::path::{Path, PathBuf};
5
6use async_trait::async_trait;
7use cellos_core::ports::EventSink;
8use cellos_core::{CellosError, CloudEventV1};
9use tokio::io::AsyncWriteExt;
10use tokio::sync::Mutex;
11
12/// POSIX `ENOSPC` — "No space left on device".
13///
14/// Hard-coded rather than pulling in the `libc` crate so the sink stays
15/// dependency-light and works on any Unix the workspace targets. The value is
16/// fixed by the POSIX/Linux ABI.
17#[cfg(unix)]
18const ENOSPC: i32 = 28;
19
20/// Maximum serialized size, in bytes, accepted by [`JsonlEventSink::emit`].
21///
22/// A single event larger than this cap is rejected with a typed
23/// [`CellosError::EventSink`] *before* any bytes are written. The cap
24/// exists to bound memory use and prevent a runaway producer from
25/// committing arbitrarily large lines to the sink (which would block the
26/// async writer and inflate downstream JSONL parsers).
27///
28/// The value (1 MiB) is generous for normal CloudEvents — typical event
29/// payloads are < 4 KiB — yet small enough that an obviously oversized
30/// event is rejected fast.
31pub const MAX_EVENT_BYTES: usize = 1024 * 1024;
32
33/// Returns `true` when the [`io::Error`] represents a "disk full" / `ENOSPC`
34/// condition, regardless of which `ErrorKind` the platform mapped it to.
35///
36/// `io::ErrorKind::StorageFull` (stable since 1.83) is the preferred signal,
37/// but older or non-Linux platforms may still surface the raw OS code.
38fn is_storage_full(err: &io::Error) -> bool {
39    if matches!(err.kind(), io::ErrorKind::StorageFull) {
40        return true;
41    }
42    #[cfg(unix)]
43    {
44        if err.raw_os_error() == Some(ENOSPC) {
45            return true;
46        }
47    }
48    false
49}
50
51/// Build the actionable "sink full" error message expected by callers.
52///
53/// The roadmap criterion says messages must start with `"sink full: "` so that
54/// log scrapers / supervisors can recognise and back-off without parsing the
55/// underlying `io::Error`.
56fn sink_full_error(path: &Path, err: &io::Error) -> CellosError {
57    CellosError::EventSink(format!("sink full: {} ({err})", path.display()))
58}
59
60/// Lazily opens the file on first [`EventSink::emit`].
61pub struct JsonlEventSink {
62    path: PathBuf,
63    state: Mutex<Option<tokio::fs::File>>,
64}
65
66impl JsonlEventSink {
67    pub fn new(path: impl Into<PathBuf>) -> Self {
68        Self {
69            path: path.into(),
70            state: Mutex::new(None),
71        }
72    }
73}
74
75#[async_trait]
76impl EventSink for JsonlEventSink {
77    async fn emit(&self, event: &CloudEventV1) -> Result<(), CellosError> {
78        // Serialize first so we can enforce the size cap *before* opening or
79        // touching the file. This keeps oversized events from costing us a
80        // lazy `open(O_CREAT)` syscall (or worse, a partial write).
81        let line = serde_json::to_string(event)
82            .map_err(|e| CellosError::EventSink(format!("jsonl serialize: {e}")))?;
83        if line.len() > MAX_EVENT_BYTES {
84            return Err(CellosError::EventSink(format!(
85                "jsonl event too large: {} bytes exceeds MAX_EVENT_BYTES ({} bytes)",
86                line.len(),
87                MAX_EVENT_BYTES
88            )));
89        }
90        let mut guard = self.state.lock().await;
91        if guard.is_none() {
92            let f = tokio::fs::OpenOptions::new()
93                .create(true)
94                .append(true)
95                .open(&self.path)
96                .await
97                .map_err(|e| {
98                    if is_storage_full(&e) {
99                        sink_full_error(&self.path, &e)
100                    } else {
101                        CellosError::EventSink(format!("jsonl open {}: {e}", self.path.display()))
102                    }
103                })?;
104            *guard = Some(f);
105        }
106        let file = guard.as_mut().expect("just opened");
107        file.write_all(line.as_bytes()).await.map_err(|e| {
108            if is_storage_full(&e) {
109                sink_full_error(&self.path, &e)
110            } else {
111                CellosError::EventSink(format!("jsonl write: {e}"))
112            }
113        })?;
114        file.write_all(b"\n").await.map_err(|e| {
115            if is_storage_full(&e) {
116                sink_full_error(&self.path, &e)
117            } else {
118                CellosError::EventSink(format!("jsonl newline: {e}"))
119            }
120        })?;
121        // Flushing surfaces ENOSPC on filesystems that buffer writes (notably
122        // tmpfs with size limits), since `write_all` may succeed against the
123        // page cache but the underlying commit fails.
124        file.flush().await.map_err(|e| {
125            if is_storage_full(&e) {
126                sink_full_error(&self.path, &e)
127            } else {
128                CellosError::EventSink(format!("jsonl flush: {e}"))
129            }
130        })?;
131        Ok(())
132    }
133}
134
135#[cfg(test)]
136mod tests {
137    use super::*;
138    use cellos_core::CloudEventV1;
139
140    fn test_event(id: &str) -> CloudEventV1 {
141        CloudEventV1 {
142            specversion: "1.0".into(),
143            id: id.into(),
144            source: "test".into(),
145            ty: "test.event".into(),
146            datacontenttype: None,
147            data: Some(serde_json::json!({ "msg": id })),
148            time: None,
149            traceparent: None,
150        }
151    }
152
153    #[tokio::test]
154    async fn writes_one_json_line_per_event() {
155        let tmp = tempfile::tempdir().unwrap();
156        let path = tmp.path().join("events.jsonl");
157        let sink = JsonlEventSink::new(&path);
158
159        sink.emit(&test_event("evt-1")).await.unwrap();
160        sink.emit(&test_event("evt-2")).await.unwrap();
161
162        let content = tokio::fs::read_to_string(&path).await.unwrap();
163        let lines: Vec<&str> = content.lines().collect();
164        assert_eq!(lines.len(), 2, "expected 2 lines");
165
166        let obj1: serde_json::Value = serde_json::from_str(lines[0]).expect("valid JSON line 1");
167        let obj2: serde_json::Value = serde_json::from_str(lines[1]).expect("valid JSON line 2");
168        assert_eq!(obj1["id"], "evt-1");
169        assert_eq!(obj2["id"], "evt-2");
170    }
171
172    #[tokio::test]
173    async fn appends_across_separate_sink_instances() {
174        let tmp = tempfile::tempdir().unwrap();
175        let path = tmp.path().join("append.jsonl");
176
177        let sink1 = JsonlEventSink::new(&path);
178        sink1.emit(&test_event("first")).await.unwrap();
179        drop(sink1);
180
181        let sink2 = JsonlEventSink::new(&path);
182        sink2.emit(&test_event("second")).await.unwrap();
183        drop(sink2);
184
185        let content = tokio::fs::read_to_string(&path).await.unwrap();
186        let lines: Vec<&str> = content.lines().collect();
187        assert_eq!(lines.len(), 2, "both events must be present");
188        assert!(lines[0].contains("first"));
189        assert!(lines[1].contains("second"));
190    }
191
192    #[tokio::test]
193    async fn each_line_is_valid_cloud_event() {
194        let tmp = tempfile::tempdir().unwrap();
195        let path = tmp.path().join("ce.jsonl");
196        let sink = JsonlEventSink::new(&path);
197        sink.emit(&test_event("round-trip")).await.unwrap();
198
199        let content = tokio::fs::read_to_string(&path).await.unwrap();
200        let parsed: CloudEventV1 =
201            serde_json::from_str(content.trim_end()).expect("valid CloudEventV1");
202        assert_eq!(parsed.id, "round-trip");
203        assert_eq!(parsed.specversion, "1.0");
204    }
205
206    #[test]
207    fn classifies_storage_full_kind_as_full() {
208        // The preferred signal: `io::ErrorKind::StorageFull` (stable 1.83+).
209        let e = io::Error::from(io::ErrorKind::StorageFull);
210        assert!(is_storage_full(&e));
211    }
212
213    #[cfg(unix)]
214    #[test]
215    fn classifies_raw_enospc_as_full() {
216        // Older / non-Linux platforms may surface ENOSPC as `Other` with a
217        // raw OS code. The fallback path should still recognise it.
218        let e = io::Error::from_raw_os_error(ENOSPC);
219        assert!(is_storage_full(&e));
220    }
221
222    #[test]
223    fn unrelated_errors_are_not_classified_as_full() {
224        // EACCES / permission denied must NOT trip the sink-full classifier
225        // — that would mis-route ordinary I/O failures into the back-off
226        // path supervisors reserve for genuine ENOSPC.
227        let e = io::Error::from(io::ErrorKind::PermissionDenied);
228        assert!(!is_storage_full(&e));
229    }
230
231    #[test]
232    fn sink_full_message_starts_with_actionable_prefix() {
233        // The roadmap criterion: error message must start with `sink full:`
234        // so log scrapers can match without parsing the wrapped io::Error.
235        let path = std::path::Path::new("/tmp/test.jsonl");
236        let inner = io::Error::from(io::ErrorKind::StorageFull);
237        let err = sink_full_error(path, &inner);
238        let msg = err.to_string();
239        assert!(
240            msg.contains("sink full:"),
241            "missing actionable prefix in: {msg}"
242        );
243        assert!(msg.contains("/tmp/test.jsonl"), "missing path in: {msg}");
244    }
245}