Skip to main content

courier/sinks/
file.rs

1use std::path::{Path, PathBuf};
2
3use anyhow::{Result, anyhow};
4use async_trait::async_trait;
5use serde::Deserialize;
6use serde_json::Value;
7use tokio::fs::File;
8use tokio::io::{AsyncWriteExt, BufWriter};
9use tokio::sync::Mutex;
10
11use crate::config::{parse_config, redact_secret_path};
12use crate::envelope::Envelope;
13use crate::pipeline::ErrorPolicy;
14use crate::retry::RetryPolicy;
15use crate::sinks::{ManagedSink, Sink, WriteOne};
16
17/// Local file sink. Appends one record per envelope to the configured path
18/// in either JSON Lines or CSV format.
19///
20/// The file is opened lazily on first write, then kept in append mode and
21/// flushed after every write so validation can build this sink without file
22/// system side effects. Streaming writes are not transactional: a retried
23/// write after a partial failure may produce a duplicate row — `WriteOne`
24/// semantics, same as any other `ManagedSink`-wrapped sink.
25pub struct FileSink {
26    id: String,
27    path: PathBuf,
28    format: Format,
29    state: Mutex<WriterState>,
30}
31
32/// Output shape. JSONL emits one serialized JSON value per line; CSV emits
33/// a row of configured columns with RFC 4180 quoting.
34#[derive(Debug, Clone)]
35pub enum Format {
36    Jsonl {
37        body: BodyFormat,
38    },
39    /// CSV columns are dotted paths evaluated against the full envelope —
40    /// e.g. `payload.id`, `meta.source_id`, `meta.headers.priority`.
41    Csv {
42        columns: Vec<String>,
43    },
44}
45
46/// Which slice of the envelope to serialize per JSONL line.
47#[derive(Debug, Clone, Copy, Default, Eq, PartialEq, Deserialize)]
48#[serde(rename_all = "snake_case")]
49pub enum BodyFormat {
50    #[default]
51    Payload,
52    Envelope,
53}
54
55struct WriterState {
56    writer: Option<BufWriter<File>>,
57    /// True until the CSV header row has been written. Always false for
58    /// JSONL. Refined from file size at lazy-open time so restarts on a
59    /// non-empty file resume cleanly without re-emitting headers.
60    needs_header: bool,
61}
62
63impl FileSink {
64    pub fn new(id: impl Into<String>, path: impl Into<PathBuf>, format: Format) -> Result<Self> {
65        let path = path.into();
66        Ok(Self {
67            id: id.into(),
68            path,
69            state: Mutex::new(WriterState {
70                writer: None,
71                needs_header: matches!(&format, Format::Csv { .. }),
72            }),
73            format,
74        })
75    }
76
77    fn ensure_open(&self, state: &mut WriterState) -> Result<()> {
78        if state.writer.is_some() {
79            return Ok(());
80        }
81
82        if let Some(parent) = self.path.parent()
83            && !parent.as_os_str().is_empty()
84        {
85            std::fs::create_dir_all(parent).map_err(|e| {
86                anyhow!(
87                    "failed to create parent dir for {}: {e}",
88                    redact_secret_path(&self.path)
89                )
90            })?;
91        }
92
93        if matches!(&self.format, Format::Csv { .. }) {
94            state.needs_header = std::fs::metadata(&self.path)
95                .map(|m| m.len() == 0)
96                .unwrap_or(true);
97        }
98
99        let std_file = std::fs::OpenOptions::new()
100            .create(true)
101            .append(true)
102            .open(&self.path)
103            .map_err(|e| anyhow!("failed to open {}: {e}", redact_secret_path(&self.path)))?;
104        state.writer = Some(BufWriter::new(File::from_std(std_file)));
105        Ok(())
106    }
107}
108
109#[async_trait]
110impl WriteOne for FileSink {
111    fn id(&self) -> &str {
112        &self.id
113    }
114
115    async fn write(&self, env: &Envelope) -> Result<()> {
116        let mut buf = String::new();
117
118        match &self.format {
119            Format::Jsonl { body } => {
120                let value = match body {
121                    BodyFormat::Payload => serde_json::to_string(&env.payload)?,
122                    BodyFormat::Envelope => serde_json::to_string(env)?,
123                };
124                buf.push_str(&value);
125                buf.push('\n');
126            }
127            Format::Csv { columns } => {
128                let env_value = serde_json::to_value(env)?;
129                let mut state = self.state.lock().await;
130                self.ensure_open(&mut state)?;
131                let wrote_header = state.needs_header;
132                if state.needs_header {
133                    write_csv_row(&mut buf, columns.iter().map(String::as_str));
134                }
135                let row = columns.iter().map(|col| extract_csv_cell(&env_value, col));
136                let row_strings: Vec<String> = row.collect();
137                write_csv_row(&mut buf, row_strings.iter().map(String::as_str));
138
139                state
140                    .writer
141                    .as_mut()
142                    .expect("writer is opened above")
143                    .write_all(buf.as_bytes())
144                    .await
145                    .map_err(|e| {
146                        anyhow!("write to {} failed: {e}", redact_secret_path(&self.path))
147                    })?;
148                state
149                    .writer
150                    .as_mut()
151                    .expect("writer is opened above")
152                    .flush()
153                    .await
154                    .map_err(|e| {
155                        anyhow!("flush of {} failed: {e}", redact_secret_path(&self.path))
156                    })?;
157                if wrote_header {
158                    state.needs_header = false;
159                }
160                return Ok(());
161            }
162        }
163
164        let mut state = self.state.lock().await;
165        self.ensure_open(&mut state)?;
166        state
167            .writer
168            .as_mut()
169            .expect("writer is opened above")
170            .write_all(buf.as_bytes())
171            .await
172            .map_err(|e| anyhow!("write to {} failed: {e}", redact_secret_path(&self.path)))?;
173        state
174            .writer
175            .as_mut()
176            .expect("writer is opened above")
177            .flush()
178            .await
179            .map_err(|e| anyhow!("flush of {} failed: {e}", redact_secret_path(&self.path)))?;
180        Ok(())
181    }
182}
183
184/// Pull a CSV cell value from the envelope using a dotted path. Missing
185/// fields render as an empty cell; container values (objects, arrays)
186/// are JSON-encoded so they still round-trip.
187fn extract_csv_cell(env: &Value, dotted: &str) -> String {
188    let mut current = env;
189    for segment in dotted.split('.') {
190        match current.get(segment) {
191            Some(v) => current = v,
192            None => return String::new(),
193        }
194    }
195    match current {
196        Value::Null => String::new(),
197        Value::String(s) => s.clone(),
198        Value::Bool(b) => b.to_string(),
199        Value::Number(n) => n.to_string(),
200        // Fall back to JSON for nested structures so they remain machine-readable.
201        other => other.to_string(),
202    }
203}
204
205fn write_csv_row<'a>(buf: &mut String, fields: impl Iterator<Item = &'a str>) {
206    let mut first = true;
207    for field in fields {
208        if !first {
209            buf.push(',');
210        }
211        first = false;
212        write_csv_field(buf, field);
213    }
214    buf.push('\n');
215}
216
217fn write_csv_field(buf: &mut String, field: &str) {
218    let needs_quoting =
219        field.contains(',') || field.contains('"') || field.contains('\n') || field.contains('\r');
220    if !needs_quoting {
221        buf.push_str(field);
222        return;
223    }
224    buf.push('"');
225    for c in field.chars() {
226        if c == '"' {
227            buf.push('"');
228        }
229        buf.push(c);
230    }
231    buf.push('"');
232}
233
234#[derive(Debug, Deserialize)]
235struct FileSinkConfig {
236    path: PathBuf,
237    #[serde(default)]
238    format: FormatConfig,
239    #[serde(default)]
240    body: BodyFormat,
241    #[serde(default)]
242    columns: Vec<String>,
243}
244
245#[derive(Debug, Clone, Copy, Default, Deserialize)]
246#[serde(rename_all = "snake_case")]
247enum FormatConfig {
248    #[default]
249    Jsonl,
250    Csv,
251}
252
253/// Registry factory for [`FileSink`]. Registered by
254/// `courier::registry::register_builtin` under kind `"file"`.
255///
256/// Retry and error policy are managed centrally by the registry and applied
257/// to every sink uniformly — no per-sink config needed.
258pub fn file_sink_factory(
259    id: &str,
260    config: Value,
261    on_error: ErrorPolicy,
262    retry: Option<RetryPolicy>,
263) -> Result<Box<dyn Sink>> {
264    let config: FileSinkConfig = parse_config("file", config)?;
265
266    let format = match config.format {
267        FormatConfig::Jsonl => Format::Jsonl { body: config.body },
268        FormatConfig::Csv => {
269            if config.columns.is_empty() {
270                return Err(anyhow!(
271                    "invalid config for component type 'file': csv format requires a non-empty 'columns' list"
272                ));
273            }
274            Format::Csv {
275                columns: config.columns,
276            }
277        }
278    };
279
280    let file = FileSink::new(id, config.path, format)?;
281    let mut sink = ManagedSink::new(file).with_error_policy(on_error);
282    if let Some(policy) = retry {
283        sink = sink.with_retry(policy);
284    }
285    Ok(Box::new(sink))
286}
287
288/// Public convenience: open a JSONL file sink. Used by tests and by callers
289/// wiring pipelines without going through the config layer.
290pub fn jsonl(id: impl Into<String>, path: impl AsRef<Path>) -> Result<FileSink> {
291    FileSink::new(
292        id,
293        path.as_ref().to_path_buf(),
294        Format::Jsonl {
295            body: BodyFormat::Payload,
296        },
297    )
298}
299
300#[cfg(test)]
301mod tests {
302    use super::*;
303    use serde_json::json;
304
305    fn read(path: &Path) -> String {
306        std::fs::read_to_string(path).unwrap()
307    }
308
309    #[tokio::test]
310    async fn jsonl_writes_one_payload_per_line() {
311        let dir = tempfile::tempdir().unwrap();
312        let path = dir.path().join("out.jsonl");
313        let sink = FileSink::new(
314            "file",
315            &path,
316            Format::Jsonl {
317                body: BodyFormat::Payload,
318            },
319        )
320        .unwrap();
321
322        sink.write(&Envelope::new("src", json!({ "n": 1 })))
323            .await
324            .unwrap();
325        sink.write(&Envelope::new("src", json!({ "n": 2 })))
326            .await
327            .unwrap();
328
329        let contents = read(&path);
330        let lines: Vec<&str> = contents.lines().collect();
331        assert_eq!(lines.len(), 2);
332        let first: Value = serde_json::from_str(lines[0]).unwrap();
333        assert_eq!(first, json!({ "n": 1 }));
334        let second: Value = serde_json::from_str(lines[1]).unwrap();
335        assert_eq!(second, json!({ "n": 2 }));
336    }
337
338    #[tokio::test]
339    async fn jsonl_envelope_mode_persists_meta() {
340        let dir = tempfile::tempdir().unwrap();
341        let path = dir.path().join("out.jsonl");
342        let sink = FileSink::new(
343            "file",
344            &path,
345            Format::Jsonl {
346                body: BodyFormat::Envelope,
347            },
348        )
349        .unwrap();
350
351        let mut env = Envelope::new("src", json!({ "id": 7 }));
352        env.meta.key = Some("k1".into());
353        sink.write(&env).await.unwrap();
354
355        let line = read(&path);
356        let parsed: Value = serde_json::from_str(line.trim()).unwrap();
357        assert_eq!(parsed["payload"], json!({ "id": 7 }));
358        assert_eq!(parsed["meta"]["source_id"], "src");
359        assert_eq!(parsed["meta"]["key"], "k1");
360    }
361
362    #[tokio::test]
363    async fn jsonl_appends_across_sink_instances() {
364        // Important for restart-friendliness: a fresh FileSink pointed at
365        // an existing file must extend it, not truncate.
366        let dir = tempfile::tempdir().unwrap();
367        let path = dir.path().join("out.jsonl");
368
369        let sink = FileSink::new(
370            "file",
371            &path,
372            Format::Jsonl {
373                body: BodyFormat::Payload,
374            },
375        )
376        .unwrap();
377        sink.write(&Envelope::new("src", json!({ "n": 1 })))
378            .await
379            .unwrap();
380        drop(sink);
381
382        let sink = FileSink::new(
383            "file",
384            &path,
385            Format::Jsonl {
386                body: BodyFormat::Payload,
387            },
388        )
389        .unwrap();
390        sink.write(&Envelope::new("src", json!({ "n": 2 })))
391            .await
392            .unwrap();
393        drop(sink);
394
395        let contents = read(&path);
396        assert_eq!(contents.lines().count(), 2);
397    }
398
399    #[tokio::test]
400    async fn csv_writes_header_then_rows() {
401        let dir = tempfile::tempdir().unwrap();
402        let path = dir.path().join("out.csv");
403        let sink = FileSink::new(
404            "file",
405            &path,
406            Format::Csv {
407                columns: vec!["payload.id".into(), "payload.name".into()],
408            },
409        )
410        .unwrap();
411
412        sink.write(&Envelope::new("src", json!({ "id": 1, "name": "alice" })))
413            .await
414            .unwrap();
415        sink.write(&Envelope::new("src", json!({ "id": 2, "name": "bob" })))
416            .await
417            .unwrap();
418
419        let contents = read(&path);
420        let lines: Vec<&str> = contents.lines().collect();
421        assert_eq!(lines, vec!["payload.id,payload.name", "1,alice", "2,bob"]);
422    }
423
424    #[tokio::test]
425    #[cfg(target_os = "linux")]
426    async fn csv_keeps_header_pending_after_write_failure() {
427        let dev_full = Path::new("/dev/full");
428        if !dev_full.exists() {
429            return;
430        }
431
432        let sink = FileSink::new(
433            "file",
434            dev_full,
435            Format::Csv {
436                columns: vec!["payload.id".into()],
437            },
438        )
439        .unwrap();
440
441        sink.write(&Envelope::new("src", json!({ "id": 1 })))
442            .await
443            .expect_err("expected write to /dev/full to fail");
444
445        let state = sink.state.lock().await;
446        assert!(
447            state.needs_header,
448            "header should remain pending until write and flush succeed"
449        );
450    }
451
452    #[tokio::test]
453    async fn csv_skips_header_when_appending_to_existing_file() {
454        let dir = tempfile::tempdir().unwrap();
455        let path = dir.path().join("out.csv");
456        std::fs::write(&path, "id,name\n0,seed\n").unwrap();
457
458        let sink = FileSink::new(
459            "file",
460            &path,
461            Format::Csv {
462                columns: vec!["payload.id".into(), "payload.name".into()],
463            },
464        )
465        .unwrap();
466        sink.write(&Envelope::new("src", json!({ "id": 1, "name": "alice" })))
467            .await
468            .unwrap();
469        drop(sink);
470
471        let contents = read(&path);
472        let lines: Vec<&str> = contents.lines().collect();
473        assert_eq!(lines, vec!["id,name", "0,seed", "1,alice"]);
474    }
475
476    #[tokio::test]
477    async fn csv_quotes_fields_with_special_characters() {
478        let dir = tempfile::tempdir().unwrap();
479        let path = dir.path().join("out.csv");
480        let sink = FileSink::new(
481            "file",
482            &path,
483            Format::Csv {
484                columns: vec!["payload.text".into(), "payload.note".into()],
485            },
486        )
487        .unwrap();
488
489        sink.write(&Envelope::new(
490            "src",
491            json!({
492                "text": "hello, world",
493                "note": "She said \"hi\"\nto me",
494            }),
495        ))
496        .await
497        .unwrap();
498
499        let contents = read(&path);
500        let lines: Vec<&str> = contents.lines().collect();
501        assert_eq!(lines[0], "payload.text,payload.note");
502        // The newline embedded inside the quoted field splits across raw
503        // lines — assert via a contains check rather than line index.
504        assert!(contents.contains("\"hello, world\""));
505        assert!(contents.contains("\"She said \"\"hi\"\"\nto me\""));
506    }
507
508    #[tokio::test]
509    async fn csv_pulls_from_meta_via_dotted_paths() {
510        let dir = tempfile::tempdir().unwrap();
511        let path = dir.path().join("out.csv");
512        let sink = FileSink::new(
513            "file",
514            &path,
515            Format::Csv {
516                columns: vec![
517                    "meta.source_id".into(),
518                    "meta.key".into(),
519                    "payload.v".into(),
520                    "payload.missing".into(),
521                ],
522            },
523        )
524        .unwrap();
525
526        let mut env = Envelope::new("src", json!({ "v": 42 }));
527        env.meta.key = Some("k".into());
528        sink.write(&env).await.unwrap();
529
530        let contents = read(&path);
531        let last = contents.lines().last().unwrap();
532        assert_eq!(last, "src,k,42,");
533    }
534
535    #[tokio::test]
536    async fn creates_parent_directories() {
537        let dir = tempfile::tempdir().unwrap();
538        let path = dir.path().join("nested/sub/out.jsonl");
539        let sink = jsonl("file", &path).unwrap();
540        sink.write(&Envelope::new("src", json!({ "n": 1 })))
541            .await
542            .unwrap();
543        assert!(path.exists());
544    }
545
546    // -----------------------------------------------------------------
547    // Factory / config parsing
548    // -----------------------------------------------------------------
549
550    #[tokio::test]
551    async fn factory_defaults_to_jsonl_payload() {
552        let dir = tempfile::tempdir().unwrap();
553        let path = dir.path().join("out.jsonl");
554
555        let sink = file_sink_factory(
556            "file",
557            json!({ "path": path.to_str().unwrap() }),
558            ErrorPolicy::Drop,
559            None,
560        )
561        .unwrap();
562
563        let (tx, rx) = tokio::sync::mpsc::channel(1);
564        let cancel = tokio_util::sync::CancellationToken::new();
565        let handle = tokio::spawn(async move { sink.run(rx, cancel).await });
566
567        tx.send(Envelope::new("src", json!({ "v": 1 })))
568            .await
569            .unwrap();
570        drop(tx);
571        handle.await.unwrap();
572
573        let contents = read(&path);
574        let parsed: Value = serde_json::from_str(contents.trim()).unwrap();
575        assert_eq!(parsed, json!({ "v": 1 }));
576    }
577
578    #[test]
579    fn factory_rejects_csv_without_columns() {
580        let dir = tempfile::tempdir().unwrap();
581        let err = file_sink_factory(
582            "file",
583            json!({
584                "path": dir.path().join("x.csv").to_str().unwrap(),
585                "format": "csv",
586            }),
587            ErrorPolicy::Drop,
588            None,
589        )
590        .err()
591        .expect("expected csv-without-columns error");
592        let msg = format!("{err:#}");
593        assert!(msg.contains("csv format requires"), "{msg}");
594    }
595
596    #[test]
597    fn factory_reports_missing_path_with_uniform_prefix() {
598        let err = file_sink_factory("file", json!({}), ErrorPolicy::Drop, None)
599            .err()
600            .expect("expected missing-path error");
601        let msg = format!("{err:#}");
602        assert!(
603            msg.contains("invalid config for component type 'file'"),
604            "{msg}"
605        );
606    }
607}