Skip to main content

faucet_sink_sqlite/
config.rs

1//! SQLite sink configuration.
2
3use faucet_core::DEFAULT_BATCH_SIZE;
4use schemars::JsonSchema;
5use serde::{Deserialize, Serialize};
6
7/// How to map JSON records to table columns.
8#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
9#[serde(rename_all = "snake_case")]
10pub enum SqliteColumnMapping {
11    /// Insert each record as a single JSON text column. The column name
12    /// defaults to `"data"` but can be overridden.
13    Json { column: String },
14    /// Map top-level JSON keys directly to table columns.
15    /// Only keys that match existing columns are inserted; extra keys are ignored.
16    AutoMap,
17}
18
19impl Default for SqliteColumnMapping {
20    fn default() -> Self {
21        Self::Json {
22            column: "data".into(),
23        }
24    }
25}
26
27/// Configuration for the SQLite sink.
28#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
29pub struct SqliteSinkConfig {
30    /// SQLite database URL (file path or `:memory:`).
31    pub database_url: String,
32    /// Target table name.
33    pub table_name: String,
34    /// How to map JSON records to columns.
35    pub column_mapping: SqliteColumnMapping,
36    /// Maximum number of rows per multi-row INSERT. Defaults to
37    /// [`DEFAULT_BATCH_SIZE`] (1000); the recommended value for SQLite.
38    ///
39    /// When the upstream `StreamPage` carries more records than `batch_size`,
40    /// the sink slices the page into `batch_size`-row chunks and issues one
41    /// multi-row INSERT per chunk, each wrapped in its own `BEGIN`/`COMMIT`
42    /// transaction. When `batch_size = 0`, the entire slice is written as a
43    /// single multi-row INSERT inside one transaction — useful when the
44    /// source already emits pages sized for SQLite (e.g. a Postgres source
45    /// configured with `batch_size: 1000`).
46    ///
47    /// `batch_size = 0` is the "no batching" sentinel: upstream framing is
48    /// preserved exactly, no internal re-chunking is performed.
49    #[serde(default = "default_batch_size")]
50    pub batch_size: usize,
51    /// Maximum number of connections in the pool. Defaults to `1`.
52    ///
53    /// SQLite serializes writers at the file level, so a single SQLite file
54    /// can only ever have one writer at a time. A multi-connection pool against
55    /// one file therefore races for the write lock and risks `SQLITE_BUSY`
56    /// errors under concurrency; one writer is the safe default. Raise this only
57    /// if your workload is read-heavy against a WAL database (the sink opens the
58    /// pool in WAL mode with a `busy_timeout`, so extra connections can still
59    /// read concurrently with the single writer).
60    #[serde(default = "default_max_connections")]
61    pub max_connections: u32,
62}
63
64fn default_batch_size() -> usize {
65    DEFAULT_BATCH_SIZE
66}
67
68fn default_max_connections() -> u32 {
69    1
70}
71
72impl SqliteSinkConfig {
73    /// Create a new config with required fields and sensible defaults.
74    pub fn new(database_url: impl Into<String>, table_name: impl Into<String>) -> Self {
75        Self {
76            database_url: database_url.into(),
77            table_name: table_name.into(),
78            column_mapping: SqliteColumnMapping::default(),
79            batch_size: DEFAULT_BATCH_SIZE,
80            max_connections: default_max_connections(),
81        }
82    }
83
84    /// Set the column mapping strategy.
85    pub fn column_mapping(mut self, mapping: SqliteColumnMapping) -> Self {
86        self.column_mapping = mapping;
87        self
88    }
89
90    /// Set the maximum number of rows per multi-row INSERT.
91    ///
92    /// Pass `0` to opt out of re-chunking — the entire records slice handed
93    /// to `write_batch` is written in a single multi-row INSERT inside one
94    /// transaction, preserving upstream `StreamPage` framing.
95    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
96        self.batch_size = batch_size;
97        self
98    }
99
100    /// Set the maximum number of connections in the pool.
101    pub fn max_connections(mut self, n: u32) -> Self {
102        self.max_connections = n;
103        self
104    }
105}
106
107#[cfg(test)]
108mod tests {
109    use super::*;
110
111    #[test]
112    fn default_config() {
113        let config = SqliteSinkConfig::new("sqlite::memory:", "events");
114        assert_eq!(config.table_name, "events");
115        assert_eq!(config.batch_size, DEFAULT_BATCH_SIZE);
116        assert!(matches!(
117            config.column_mapping,
118            SqliteColumnMapping::Json { ref column } if column == "data"
119        ));
120    }
121
122    #[test]
123    fn default_max_connections_is_one() {
124        // SQLite serializes writers at the file level, so a single-writer pool
125        // is the safe default — a multi-connection pool against one SQLite file
126        // races for the write lock and risks SQLITE_BUSY (audit #146 LOW).
127        let config = SqliteSinkConfig::new("sqlite::memory:", "events");
128        assert_eq!(config.max_connections, 1);
129    }
130
131    #[test]
132    fn default_max_connections_deserializes_to_one_when_absent() {
133        let json = r#"{
134            "database_url": "sqlite::memory:",
135            "table_name": "events",
136            "column_mapping": {"json": {"column": "data"}}
137        }"#;
138        let config: SqliteSinkConfig = serde_json::from_str(json).unwrap();
139        assert_eq!(config.max_connections, 1);
140    }
141
142    #[test]
143    fn builder_methods() {
144        let config = SqliteSinkConfig::new("sqlite::memory:", "events")
145            .column_mapping(SqliteColumnMapping::AutoMap)
146            .with_batch_size(100);
147        assert_eq!(config.batch_size, 100);
148        assert!(matches!(
149            config.column_mapping,
150            SqliteColumnMapping::AutoMap
151        ));
152    }
153
154    #[test]
155    fn json_custom_column() {
156        let config = SqliteSinkConfig::new("sqlite::memory:", "events").column_mapping(
157            SqliteColumnMapping::Json {
158                column: "payload".into(),
159            },
160        );
161        assert!(matches!(
162            config.column_mapping,
163            SqliteColumnMapping::Json { ref column } if column == "payload"
164        ));
165    }
166
167    #[test]
168    fn config_with_file_path() {
169        let config = SqliteSinkConfig::new("/tmp/test.db", "events");
170        assert_eq!(config.database_url, "/tmp/test.db");
171    }
172
173    #[test]
174    fn batch_size_defaults_to_default_batch_size() {
175        let config = SqliteSinkConfig::new("sqlite::memory:", "events");
176        assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
177    }
178
179    #[test]
180    fn with_batch_size_overrides_default() {
181        let config = SqliteSinkConfig::new("sqlite::memory:", "events").with_batch_size(250);
182        assert_eq!(config.batch_size, 250);
183    }
184
185    #[test]
186    fn batch_size_zero_is_accepted_as_no_batching_sentinel() {
187        let config = SqliteSinkConfig::new("sqlite::memory:", "events").with_batch_size(0);
188        assert_eq!(config.batch_size, 0);
189        assert!(faucet_core::validate_batch_size(config.batch_size).is_ok());
190    }
191
192    #[test]
193    fn batch_size_above_max_is_rejected_by_validate_batch_size() {
194        let config = SqliteSinkConfig::new("sqlite::memory:", "events")
195            .with_batch_size(faucet_core::MAX_BATCH_SIZE + 1);
196        assert!(faucet_core::validate_batch_size(config.batch_size).is_err());
197    }
198
199    #[test]
200    fn batch_size_deserializes_from_json() {
201        let json = r#"{
202            "database_url": "sqlite::memory:",
203            "table_name": "events",
204            "column_mapping": {"json": {"column": "data"}},
205            "batch_size": 250,
206            "max_connections": 5
207        }"#;
208        let config: SqliteSinkConfig = serde_json::from_str(json).unwrap();
209        assert_eq!(config.batch_size, 250);
210    }
211
212    #[test]
213    fn batch_size_defaults_when_absent_in_json() {
214        let json = r#"{
215            "database_url": "sqlite::memory:",
216            "table_name": "events",
217            "column_mapping": {"json": {"column": "data"}},
218            "max_connections": 5
219        }"#;
220        let config: SqliteSinkConfig = serde_json::from_str(json).unwrap();
221        assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
222    }
223}