Skip to main content

faucet_sink_jsonl/
config.rs

1//! JSON Lines sink configuration.
2
3use std::path::PathBuf;
4
5use faucet_core::DEFAULT_BATCH_SIZE;
6use schemars::JsonSchema;
7use serde::{Deserialize, Serialize};
8
9/// Configuration for the JSON Lines file sink.
10#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
11pub struct JsonlSinkConfig {
12    /// Path to the output file.
13    pub path: PathBuf,
14    /// Whether to append to an existing file (default: false, truncates).
15    #[serde(default)]
16    pub append: bool,
17    /// Whether to pretty-print each JSON record (default: false, compact).
18    #[serde(default)]
19    pub pretty: bool,
20    /// Records per upstream [`StreamPage`](faucet_core::StreamPage). The JSONL
21    /// sink writes records to disk one at a time through a buffered async
22    /// writer, so this field has **no behavioural impact** at the sink — it is
23    /// exposed purely for config parity across every sink in the workspace.
24    /// Defaults to [`DEFAULT_BATCH_SIZE`].
25    ///
26    /// `batch_size = 0` (the "no batching" sentinel) and any positive value
27    /// produce byte-for-byte identical output for this sink: each record is
28    /// serialised and appended individually regardless of how upstream chunked
29    /// the page.
30    #[serde(default = "default_batch_size")]
31    pub batch_size: usize,
32    /// Compression codec for the output file. Defaults to
33    /// [`CompressionConfig::Auto`](faucet_core::CompressionConfig::Auto) —
34    /// `.gz` / `.zst` suffix selects gzip / zstd, anything else writes
35    /// uncompressed. Requires the crate-local `compression` feature.
36    #[cfg(feature = "compression")]
37    #[serde(default)]
38    pub compression: faucet_core::CompressionConfig,
39}
40
41fn default_batch_size() -> usize {
42    DEFAULT_BATCH_SIZE
43}
44
45impl JsonlSinkConfig {
46    /// Create a new config with the required file path and sensible defaults.
47    pub fn new(path: impl Into<PathBuf>) -> Self {
48        Self {
49            path: path.into(),
50            append: false,
51            pretty: false,
52            batch_size: DEFAULT_BATCH_SIZE,
53            #[cfg(feature = "compression")]
54            compression: faucet_core::CompressionConfig::Auto,
55        }
56    }
57
58    /// Append to an existing file instead of truncating.
59    pub fn append(mut self, append: bool) -> Self {
60        self.append = append;
61        self
62    }
63
64    /// Pretty-print each JSON record (one record still per logical entry,
65    /// but with indentation). Note: this breaks strict JSONL format.
66    pub fn pretty(mut self, pretty: bool) -> Self {
67        self.pretty = pretty;
68        self
69    }
70
71    /// Set the compression codec. Available only with the `compression` feature.
72    #[cfg(feature = "compression")]
73    pub fn compression(mut self, c: faucet_core::CompressionConfig) -> Self {
74        self.compression = c;
75        self
76    }
77
78    /// Set the per-page record count hint reported alongside other sink
79    /// configs.
80    ///
81    /// This sink writes per-record through a buffered writer, so the value is
82    /// observably a no-op: `0` (the "no batching" sentinel) and any positive
83    /// value produce the same on-disk output. Present for symmetry with sinks
84    /// whose `batch_size` does drive I/O sizing (e.g. SQL multi-row inserts,
85    /// BigQuery streaming inserts).
86    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
87        self.batch_size = batch_size;
88        self
89    }
90}
91
92#[cfg(test)]
93mod tests {
94    use super::*;
95
96    #[test]
97    fn default_config() {
98        let config = JsonlSinkConfig::new("/tmp/out.jsonl");
99        assert_eq!(config.path, PathBuf::from("/tmp/out.jsonl"));
100        assert!(!config.append);
101        assert!(!config.pretty);
102    }
103
104    #[test]
105    fn builder_methods() {
106        let config = JsonlSinkConfig::new("/tmp/out.jsonl")
107            .append(true)
108            .pretty(true);
109        assert!(config.append);
110        assert!(config.pretty);
111    }
112
113    #[test]
114    fn batch_size_defaults_to_default_batch_size() {
115        let config = JsonlSinkConfig::new("/tmp/out.jsonl");
116        assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
117    }
118
119    #[test]
120    fn with_batch_size_overrides_default() {
121        let config = JsonlSinkConfig::new("/tmp/out.jsonl").with_batch_size(250);
122        assert_eq!(config.batch_size, 250);
123    }
124
125    #[test]
126    fn batch_size_zero_is_accepted_as_no_batching_sentinel() {
127        let config = JsonlSinkConfig::new("/tmp/out.jsonl").with_batch_size(0);
128        assert_eq!(config.batch_size, 0);
129        assert!(faucet_core::validate_batch_size(config.batch_size).is_ok());
130    }
131
132    #[test]
133    fn batch_size_above_max_is_rejected_by_validate_batch_size() {
134        let config =
135            JsonlSinkConfig::new("/tmp/out.jsonl").with_batch_size(faucet_core::MAX_BATCH_SIZE + 1);
136        assert!(faucet_core::validate_batch_size(config.batch_size).is_err());
137    }
138
139    #[test]
140    fn batch_size_deserializes_from_json() {
141        let json = r#"{
142            "path": "/tmp/out.jsonl",
143            "append": false,
144            "pretty": false,
145            "batch_size": 500
146        }"#;
147        let config: JsonlSinkConfig = serde_json::from_str(json).unwrap();
148        assert_eq!(config.batch_size, 500);
149    }
150
151    #[test]
152    fn batch_size_defaults_when_missing_in_json() {
153        let json = r#"{"path": "/tmp/out.jsonl"}"#;
154        let config: JsonlSinkConfig = serde_json::from_str(json).unwrap();
155        assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
156    }
157}