1use std::path::{Path, PathBuf};
2
3use anyhow::{Result, anyhow};
4use async_trait::async_trait;
5use serde::Deserialize;
6use serde_json::Value;
7use tokio::fs::File;
8use tokio::io::{AsyncWriteExt, BufWriter};
9use tokio::sync::Mutex;
10
11use crate::config::{parse_config, redact_secret_path};
12use crate::envelope::Envelope;
13use crate::pipeline::ErrorPolicy;
14use crate::retry::RetryPolicy;
15use crate::sinks::{ManagedSink, Sink, WriteOne};
16
17pub struct FileSink {
26 id: String,
27 path: PathBuf,
28 format: Format,
29 state: Mutex<WriterState>,
30}
31
32#[derive(Debug, Clone)]
35pub enum Format {
36 Jsonl {
37 body: BodyFormat,
38 },
39 Csv {
42 columns: Vec<String>,
43 },
44}
45
46#[derive(Debug, Clone, Copy, Default, Eq, PartialEq, Deserialize)]
48#[serde(rename_all = "snake_case")]
49pub enum BodyFormat {
50 #[default]
51 Payload,
52 Envelope,
53}
54
55struct WriterState {
56 writer: Option<BufWriter<File>>,
57 needs_header: bool,
61}
62
63impl FileSink {
64 pub fn new(id: impl Into<String>, path: impl Into<PathBuf>, format: Format) -> Result<Self> {
65 let path = path.into();
66 Ok(Self {
67 id: id.into(),
68 path,
69 state: Mutex::new(WriterState {
70 writer: None,
71 needs_header: matches!(&format, Format::Csv { .. }),
72 }),
73 format,
74 })
75 }
76
77 fn ensure_open(&self, state: &mut WriterState) -> Result<()> {
78 if state.writer.is_some() {
79 return Ok(());
80 }
81
82 if let Some(parent) = self.path.parent()
83 && !parent.as_os_str().is_empty()
84 {
85 std::fs::create_dir_all(parent).map_err(|e| {
86 anyhow!(
87 "failed to create parent dir for {}: {e}",
88 redact_secret_path(&self.path)
89 )
90 })?;
91 }
92
93 if matches!(&self.format, Format::Csv { .. }) {
94 state.needs_header = std::fs::metadata(&self.path)
95 .map(|m| m.len() == 0)
96 .unwrap_or(true);
97 }
98
99 let std_file = std::fs::OpenOptions::new()
100 .create(true)
101 .append(true)
102 .open(&self.path)
103 .map_err(|e| anyhow!("failed to open {}: {e}", redact_secret_path(&self.path)))?;
104 state.writer = Some(BufWriter::new(File::from_std(std_file)));
105 Ok(())
106 }
107}
108
109#[async_trait]
110impl WriteOne for FileSink {
111 fn id(&self) -> &str {
112 &self.id
113 }
114
115 async fn write(&self, env: &Envelope) -> Result<()> {
116 let mut buf = String::new();
117
118 match &self.format {
119 Format::Jsonl { body } => {
120 let value = match body {
121 BodyFormat::Payload => serde_json::to_string(&env.payload)?,
122 BodyFormat::Envelope => serde_json::to_string(env)?,
123 };
124 buf.push_str(&value);
125 buf.push('\n');
126 }
127 Format::Csv { columns } => {
128 let env_value = serde_json::to_value(env)?;
129 let mut state = self.state.lock().await;
130 self.ensure_open(&mut state)?;
131 let wrote_header = state.needs_header;
132 if state.needs_header {
133 write_csv_row(&mut buf, columns.iter().map(String::as_str));
134 }
135 let row = columns.iter().map(|col| extract_csv_cell(&env_value, col));
136 let row_strings: Vec<String> = row.collect();
137 write_csv_row(&mut buf, row_strings.iter().map(String::as_str));
138
139 state
140 .writer
141 .as_mut()
142 .expect("writer is opened above")
143 .write_all(buf.as_bytes())
144 .await
145 .map_err(|e| {
146 anyhow!("write to {} failed: {e}", redact_secret_path(&self.path))
147 })?;
148 state
149 .writer
150 .as_mut()
151 .expect("writer is opened above")
152 .flush()
153 .await
154 .map_err(|e| {
155 anyhow!("flush of {} failed: {e}", redact_secret_path(&self.path))
156 })?;
157 if wrote_header {
158 state.needs_header = false;
159 }
160 return Ok(());
161 }
162 }
163
164 let mut state = self.state.lock().await;
165 self.ensure_open(&mut state)?;
166 state
167 .writer
168 .as_mut()
169 .expect("writer is opened above")
170 .write_all(buf.as_bytes())
171 .await
172 .map_err(|e| anyhow!("write to {} failed: {e}", redact_secret_path(&self.path)))?;
173 state
174 .writer
175 .as_mut()
176 .expect("writer is opened above")
177 .flush()
178 .await
179 .map_err(|e| anyhow!("flush of {} failed: {e}", redact_secret_path(&self.path)))?;
180 Ok(())
181 }
182}
183
184fn extract_csv_cell(env: &Value, dotted: &str) -> String {
188 let mut current = env;
189 for segment in dotted.split('.') {
190 match current.get(segment) {
191 Some(v) => current = v,
192 None => return String::new(),
193 }
194 }
195 match current {
196 Value::Null => String::new(),
197 Value::String(s) => s.clone(),
198 Value::Bool(b) => b.to_string(),
199 Value::Number(n) => n.to_string(),
200 other => other.to_string(),
202 }
203}
204
205fn write_csv_row<'a>(buf: &mut String, fields: impl Iterator<Item = &'a str>) {
206 let mut first = true;
207 for field in fields {
208 if !first {
209 buf.push(',');
210 }
211 first = false;
212 write_csv_field(buf, field);
213 }
214 buf.push('\n');
215}
216
217fn write_csv_field(buf: &mut String, field: &str) {
218 let needs_quoting =
219 field.contains(',') || field.contains('"') || field.contains('\n') || field.contains('\r');
220 if !needs_quoting {
221 buf.push_str(field);
222 return;
223 }
224 buf.push('"');
225 for c in field.chars() {
226 if c == '"' {
227 buf.push('"');
228 }
229 buf.push(c);
230 }
231 buf.push('"');
232}
233
234#[derive(Debug, Deserialize)]
235struct FileSinkConfig {
236 path: PathBuf,
237 #[serde(default)]
238 format: FormatConfig,
239 #[serde(default)]
240 body: BodyFormat,
241 #[serde(default)]
242 columns: Vec<String>,
243}
244
245#[derive(Debug, Clone, Copy, Default, Deserialize)]
246#[serde(rename_all = "snake_case")]
247enum FormatConfig {
248 #[default]
249 Jsonl,
250 Csv,
251}
252
253pub fn file_sink_factory(
259 id: &str,
260 config: Value,
261 on_error: ErrorPolicy,
262 retry: Option<RetryPolicy>,
263) -> Result<Box<dyn Sink>> {
264 let config: FileSinkConfig = parse_config("file", config)?;
265
266 let format = match config.format {
267 FormatConfig::Jsonl => Format::Jsonl { body: config.body },
268 FormatConfig::Csv => {
269 if config.columns.is_empty() {
270 return Err(anyhow!(
271 "invalid config for component type 'file': csv format requires a non-empty 'columns' list"
272 ));
273 }
274 Format::Csv {
275 columns: config.columns,
276 }
277 }
278 };
279
280 let file = FileSink::new(id, config.path, format)?;
281 let mut sink = ManagedSink::new(file).with_error_policy(on_error);
282 if let Some(policy) = retry {
283 sink = sink.with_retry(policy);
284 }
285 Ok(Box::new(sink))
286}
287
288pub fn jsonl(id: impl Into<String>, path: impl AsRef<Path>) -> Result<FileSink> {
291 FileSink::new(
292 id,
293 path.as_ref().to_path_buf(),
294 Format::Jsonl {
295 body: BodyFormat::Payload,
296 },
297 )
298}
299
300#[cfg(test)]
301mod tests {
302 use super::*;
303 use serde_json::json;
304
305 fn read(path: &Path) -> String {
306 std::fs::read_to_string(path).unwrap()
307 }
308
309 #[tokio::test]
310 async fn jsonl_writes_one_payload_per_line() {
311 let dir = tempfile::tempdir().unwrap();
312 let path = dir.path().join("out.jsonl");
313 let sink = FileSink::new(
314 "file",
315 &path,
316 Format::Jsonl {
317 body: BodyFormat::Payload,
318 },
319 )
320 .unwrap();
321
322 sink.write(&Envelope::new("src", json!({ "n": 1 })))
323 .await
324 .unwrap();
325 sink.write(&Envelope::new("src", json!({ "n": 2 })))
326 .await
327 .unwrap();
328
329 let contents = read(&path);
330 let lines: Vec<&str> = contents.lines().collect();
331 assert_eq!(lines.len(), 2);
332 let first: Value = serde_json::from_str(lines[0]).unwrap();
333 assert_eq!(first, json!({ "n": 1 }));
334 let second: Value = serde_json::from_str(lines[1]).unwrap();
335 assert_eq!(second, json!({ "n": 2 }));
336 }
337
338 #[tokio::test]
339 async fn jsonl_envelope_mode_persists_meta() {
340 let dir = tempfile::tempdir().unwrap();
341 let path = dir.path().join("out.jsonl");
342 let sink = FileSink::new(
343 "file",
344 &path,
345 Format::Jsonl {
346 body: BodyFormat::Envelope,
347 },
348 )
349 .unwrap();
350
351 let mut env = Envelope::new("src", json!({ "id": 7 }));
352 env.meta.key = Some("k1".into());
353 sink.write(&env).await.unwrap();
354
355 let line = read(&path);
356 let parsed: Value = serde_json::from_str(line.trim()).unwrap();
357 assert_eq!(parsed["payload"], json!({ "id": 7 }));
358 assert_eq!(parsed["meta"]["source_id"], "src");
359 assert_eq!(parsed["meta"]["key"], "k1");
360 }
361
362 #[tokio::test]
363 async fn jsonl_appends_across_sink_instances() {
364 let dir = tempfile::tempdir().unwrap();
367 let path = dir.path().join("out.jsonl");
368
369 let sink = FileSink::new(
370 "file",
371 &path,
372 Format::Jsonl {
373 body: BodyFormat::Payload,
374 },
375 )
376 .unwrap();
377 sink.write(&Envelope::new("src", json!({ "n": 1 })))
378 .await
379 .unwrap();
380 drop(sink);
381
382 let sink = FileSink::new(
383 "file",
384 &path,
385 Format::Jsonl {
386 body: BodyFormat::Payload,
387 },
388 )
389 .unwrap();
390 sink.write(&Envelope::new("src", json!({ "n": 2 })))
391 .await
392 .unwrap();
393 drop(sink);
394
395 let contents = read(&path);
396 assert_eq!(contents.lines().count(), 2);
397 }
398
399 #[tokio::test]
400 async fn csv_writes_header_then_rows() {
401 let dir = tempfile::tempdir().unwrap();
402 let path = dir.path().join("out.csv");
403 let sink = FileSink::new(
404 "file",
405 &path,
406 Format::Csv {
407 columns: vec!["payload.id".into(), "payload.name".into()],
408 },
409 )
410 .unwrap();
411
412 sink.write(&Envelope::new("src", json!({ "id": 1, "name": "alice" })))
413 .await
414 .unwrap();
415 sink.write(&Envelope::new("src", json!({ "id": 2, "name": "bob" })))
416 .await
417 .unwrap();
418
419 let contents = read(&path);
420 let lines: Vec<&str> = contents.lines().collect();
421 assert_eq!(lines, vec!["payload.id,payload.name", "1,alice", "2,bob"]);
422 }
423
424 #[tokio::test]
425 #[cfg(target_os = "linux")]
426 async fn csv_keeps_header_pending_after_write_failure() {
427 let dev_full = Path::new("/dev/full");
428 if !dev_full.exists() {
429 return;
430 }
431
432 let sink = FileSink::new(
433 "file",
434 dev_full,
435 Format::Csv {
436 columns: vec!["payload.id".into()],
437 },
438 )
439 .unwrap();
440
441 sink.write(&Envelope::new("src", json!({ "id": 1 })))
442 .await
443 .expect_err("expected write to /dev/full to fail");
444
445 let state = sink.state.lock().await;
446 assert!(
447 state.needs_header,
448 "header should remain pending until write and flush succeed"
449 );
450 }
451
452 #[tokio::test]
453 async fn csv_skips_header_when_appending_to_existing_file() {
454 let dir = tempfile::tempdir().unwrap();
455 let path = dir.path().join("out.csv");
456 std::fs::write(&path, "id,name\n0,seed\n").unwrap();
457
458 let sink = FileSink::new(
459 "file",
460 &path,
461 Format::Csv {
462 columns: vec!["payload.id".into(), "payload.name".into()],
463 },
464 )
465 .unwrap();
466 sink.write(&Envelope::new("src", json!({ "id": 1, "name": "alice" })))
467 .await
468 .unwrap();
469 drop(sink);
470
471 let contents = read(&path);
472 let lines: Vec<&str> = contents.lines().collect();
473 assert_eq!(lines, vec!["id,name", "0,seed", "1,alice"]);
474 }
475
476 #[tokio::test]
477 async fn csv_quotes_fields_with_special_characters() {
478 let dir = tempfile::tempdir().unwrap();
479 let path = dir.path().join("out.csv");
480 let sink = FileSink::new(
481 "file",
482 &path,
483 Format::Csv {
484 columns: vec!["payload.text".into(), "payload.note".into()],
485 },
486 )
487 .unwrap();
488
489 sink.write(&Envelope::new(
490 "src",
491 json!({
492 "text": "hello, world",
493 "note": "She said \"hi\"\nto me",
494 }),
495 ))
496 .await
497 .unwrap();
498
499 let contents = read(&path);
500 let lines: Vec<&str> = contents.lines().collect();
501 assert_eq!(lines[0], "payload.text,payload.note");
502 assert!(contents.contains("\"hello, world\""));
505 assert!(contents.contains("\"She said \"\"hi\"\"\nto me\""));
506 }
507
508 #[tokio::test]
509 async fn csv_pulls_from_meta_via_dotted_paths() {
510 let dir = tempfile::tempdir().unwrap();
511 let path = dir.path().join("out.csv");
512 let sink = FileSink::new(
513 "file",
514 &path,
515 Format::Csv {
516 columns: vec![
517 "meta.source_id".into(),
518 "meta.key".into(),
519 "payload.v".into(),
520 "payload.missing".into(),
521 ],
522 },
523 )
524 .unwrap();
525
526 let mut env = Envelope::new("src", json!({ "v": 42 }));
527 env.meta.key = Some("k".into());
528 sink.write(&env).await.unwrap();
529
530 let contents = read(&path);
531 let last = contents.lines().last().unwrap();
532 assert_eq!(last, "src,k,42,");
533 }
534
535 #[tokio::test]
536 async fn creates_parent_directories() {
537 let dir = tempfile::tempdir().unwrap();
538 let path = dir.path().join("nested/sub/out.jsonl");
539 let sink = jsonl("file", &path).unwrap();
540 sink.write(&Envelope::new("src", json!({ "n": 1 })))
541 .await
542 .unwrap();
543 assert!(path.exists());
544 }
545
546 #[tokio::test]
551 async fn factory_defaults_to_jsonl_payload() {
552 let dir = tempfile::tempdir().unwrap();
553 let path = dir.path().join("out.jsonl");
554
555 let sink = file_sink_factory(
556 "file",
557 json!({ "path": path.to_str().unwrap() }),
558 ErrorPolicy::Drop,
559 None,
560 )
561 .unwrap();
562
563 let (tx, rx) = tokio::sync::mpsc::channel(1);
564 let cancel = tokio_util::sync::CancellationToken::new();
565 let handle = tokio::spawn(async move { sink.run(rx, cancel).await });
566
567 tx.send(Envelope::new("src", json!({ "v": 1 })))
568 .await
569 .unwrap();
570 drop(tx);
571 handle.await.unwrap();
572
573 let contents = read(&path);
574 let parsed: Value = serde_json::from_str(contents.trim()).unwrap();
575 assert_eq!(parsed, json!({ "v": 1 }));
576 }
577
578 #[test]
579 fn factory_rejects_csv_without_columns() {
580 let dir = tempfile::tempdir().unwrap();
581 let err = file_sink_factory(
582 "file",
583 json!({
584 "path": dir.path().join("x.csv").to_str().unwrap(),
585 "format": "csv",
586 }),
587 ErrorPolicy::Drop,
588 None,
589 )
590 .err()
591 .expect("expected csv-without-columns error");
592 let msg = format!("{err:#}");
593 assert!(msg.contains("csv format requires"), "{msg}");
594 }
595
596 #[test]
597 fn factory_reports_missing_path_with_uniform_prefix() {
598 let err = file_sink_factory("file", json!({}), ErrorPolicy::Drop, None)
599 .err()
600 .expect("expected missing-path error");
601 let msg = format!("{err:#}");
602 assert!(
603 msg.contains("invalid config for component type 'file'"),
604 "{msg}"
605 );
606 }
607}