1use std::io;
4use std::path::{Path, PathBuf};
5
6use async_trait::async_trait;
7use cellos_core::ports::EventSink;
8use cellos_core::{CellosError, CloudEventV1};
9use tokio::io::AsyncWriteExt;
10use tokio::sync::Mutex;
11
12#[cfg(unix)]
18const ENOSPC: i32 = 28;
19
20pub const MAX_EVENT_BYTES: usize = 1024 * 1024;
32
33fn is_storage_full(err: &io::Error) -> bool {
39 if matches!(err.kind(), io::ErrorKind::StorageFull) {
40 return true;
41 }
42 #[cfg(unix)]
43 {
44 if err.raw_os_error() == Some(ENOSPC) {
45 return true;
46 }
47 }
48 false
49}
50
51fn sink_full_error(path: &Path, err: &io::Error) -> CellosError {
57 CellosError::EventSink(format!("sink full: {} ({err})", path.display()))
58}
59
60pub struct JsonlEventSink {
62 path: PathBuf,
63 state: Mutex<Option<tokio::fs::File>>,
64}
65
66impl JsonlEventSink {
67 pub fn new(path: impl Into<PathBuf>) -> Self {
68 Self {
69 path: path.into(),
70 state: Mutex::new(None),
71 }
72 }
73}
74
75#[async_trait]
76impl EventSink for JsonlEventSink {
77 async fn emit(&self, event: &CloudEventV1) -> Result<(), CellosError> {
78 let line = serde_json::to_string(event)
82 .map_err(|e| CellosError::EventSink(format!("jsonl serialize: {e}")))?;
83 if line.len() > MAX_EVENT_BYTES {
84 return Err(CellosError::EventSink(format!(
85 "jsonl event too large: {} bytes exceeds MAX_EVENT_BYTES ({} bytes)",
86 line.len(),
87 MAX_EVENT_BYTES
88 )));
89 }
90 let mut guard = self.state.lock().await;
91 if guard.is_none() {
92 let f = tokio::fs::OpenOptions::new()
93 .create(true)
94 .append(true)
95 .open(&self.path)
96 .await
97 .map_err(|e| {
98 if is_storage_full(&e) {
99 sink_full_error(&self.path, &e)
100 } else {
101 CellosError::EventSink(format!("jsonl open {}: {e}", self.path.display()))
102 }
103 })?;
104 *guard = Some(f);
105 }
106 let file = guard.as_mut().expect("just opened");
107 file.write_all(line.as_bytes()).await.map_err(|e| {
108 if is_storage_full(&e) {
109 sink_full_error(&self.path, &e)
110 } else {
111 CellosError::EventSink(format!("jsonl write: {e}"))
112 }
113 })?;
114 file.write_all(b"\n").await.map_err(|e| {
115 if is_storage_full(&e) {
116 sink_full_error(&self.path, &e)
117 } else {
118 CellosError::EventSink(format!("jsonl newline: {e}"))
119 }
120 })?;
121 file.flush().await.map_err(|e| {
125 if is_storage_full(&e) {
126 sink_full_error(&self.path, &e)
127 } else {
128 CellosError::EventSink(format!("jsonl flush: {e}"))
129 }
130 })?;
131 Ok(())
132 }
133}
134
135#[cfg(test)]
136mod tests {
137 use super::*;
138 use cellos_core::CloudEventV1;
139
140 fn test_event(id: &str) -> CloudEventV1 {
141 CloudEventV1 {
142 specversion: "1.0".into(),
143 id: id.into(),
144 source: "test".into(),
145 ty: "test.event".into(),
146 datacontenttype: None,
147 data: Some(serde_json::json!({ "msg": id })),
148 time: None,
149 traceparent: None,
150 }
151 }
152
153 #[tokio::test]
154 async fn writes_one_json_line_per_event() {
155 let tmp = tempfile::tempdir().unwrap();
156 let path = tmp.path().join("events.jsonl");
157 let sink = JsonlEventSink::new(&path);
158
159 sink.emit(&test_event("evt-1")).await.unwrap();
160 sink.emit(&test_event("evt-2")).await.unwrap();
161
162 let content = tokio::fs::read_to_string(&path).await.unwrap();
163 let lines: Vec<&str> = content.lines().collect();
164 assert_eq!(lines.len(), 2, "expected 2 lines");
165
166 let obj1: serde_json::Value = serde_json::from_str(lines[0]).expect("valid JSON line 1");
167 let obj2: serde_json::Value = serde_json::from_str(lines[1]).expect("valid JSON line 2");
168 assert_eq!(obj1["id"], "evt-1");
169 assert_eq!(obj2["id"], "evt-2");
170 }
171
172 #[tokio::test]
173 async fn appends_across_separate_sink_instances() {
174 let tmp = tempfile::tempdir().unwrap();
175 let path = tmp.path().join("append.jsonl");
176
177 let sink1 = JsonlEventSink::new(&path);
178 sink1.emit(&test_event("first")).await.unwrap();
179 drop(sink1);
180
181 let sink2 = JsonlEventSink::new(&path);
182 sink2.emit(&test_event("second")).await.unwrap();
183 drop(sink2);
184
185 let content = tokio::fs::read_to_string(&path).await.unwrap();
186 let lines: Vec<&str> = content.lines().collect();
187 assert_eq!(lines.len(), 2, "both events must be present");
188 assert!(lines[0].contains("first"));
189 assert!(lines[1].contains("second"));
190 }
191
192 #[tokio::test]
193 async fn each_line_is_valid_cloud_event() {
194 let tmp = tempfile::tempdir().unwrap();
195 let path = tmp.path().join("ce.jsonl");
196 let sink = JsonlEventSink::new(&path);
197 sink.emit(&test_event("round-trip")).await.unwrap();
198
199 let content = tokio::fs::read_to_string(&path).await.unwrap();
200 let parsed: CloudEventV1 =
201 serde_json::from_str(content.trim_end()).expect("valid CloudEventV1");
202 assert_eq!(parsed.id, "round-trip");
203 assert_eq!(parsed.specversion, "1.0");
204 }
205
206 #[test]
207 fn classifies_storage_full_kind_as_full() {
208 let e = io::Error::from(io::ErrorKind::StorageFull);
210 assert!(is_storage_full(&e));
211 }
212
213 #[cfg(unix)]
214 #[test]
215 fn classifies_raw_enospc_as_full() {
216 let e = io::Error::from_raw_os_error(ENOSPC);
219 assert!(is_storage_full(&e));
220 }
221
222 #[test]
223 fn unrelated_errors_are_not_classified_as_full() {
224 let e = io::Error::from(io::ErrorKind::PermissionDenied);
228 assert!(!is_storage_full(&e));
229 }
230
231 #[test]
232 fn sink_full_message_starts_with_actionable_prefix() {
233 let path = std::path::Path::new("/tmp/test.jsonl");
236 let inner = io::Error::from(io::ErrorKind::StorageFull);
237 let err = sink_full_error(path, &inner);
238 let msg = err.to_string();
239 assert!(
240 msg.contains("sink full:"),
241 "missing actionable prefix in: {msg}"
242 );
243 assert!(msg.contains("/tmp/test.jsonl"), "missing path in: {msg}");
244 }
245}