faucet_sink_sqlite/
config.rs1use faucet_core::DEFAULT_BATCH_SIZE;
4use schemars::JsonSchema;
5use serde::{Deserialize, Serialize};
6
7#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
9#[serde(rename_all = "snake_case")]
10pub enum SqliteColumnMapping {
11 Json { column: String },
14 AutoMap,
17}
18
19impl Default for SqliteColumnMapping {
20 fn default() -> Self {
21 Self::Json {
22 column: "data".into(),
23 }
24 }
25}
26
27#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
29pub struct SqliteSinkConfig {
30 pub database_url: String,
32 pub table_name: String,
34 pub column_mapping: SqliteColumnMapping,
36 #[serde(default = "default_batch_size")]
50 pub batch_size: usize,
51 #[serde(default = "default_max_connections")]
61 pub max_connections: u32,
62}
63
64fn default_batch_size() -> usize {
65 DEFAULT_BATCH_SIZE
66}
67
68fn default_max_connections() -> u32 {
69 1
70}
71
72impl SqliteSinkConfig {
73 pub fn new(database_url: impl Into<String>, table_name: impl Into<String>) -> Self {
75 Self {
76 database_url: database_url.into(),
77 table_name: table_name.into(),
78 column_mapping: SqliteColumnMapping::default(),
79 batch_size: DEFAULT_BATCH_SIZE,
80 max_connections: default_max_connections(),
81 }
82 }
83
84 pub fn column_mapping(mut self, mapping: SqliteColumnMapping) -> Self {
86 self.column_mapping = mapping;
87 self
88 }
89
90 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
96 self.batch_size = batch_size;
97 self
98 }
99
100 pub fn max_connections(mut self, n: u32) -> Self {
102 self.max_connections = n;
103 self
104 }
105}
106
107#[cfg(test)]
108mod tests {
109 use super::*;
110
111 #[test]
112 fn default_config() {
113 let config = SqliteSinkConfig::new("sqlite::memory:", "events");
114 assert_eq!(config.table_name, "events");
115 assert_eq!(config.batch_size, DEFAULT_BATCH_SIZE);
116 assert!(matches!(
117 config.column_mapping,
118 SqliteColumnMapping::Json { ref column } if column == "data"
119 ));
120 }
121
122 #[test]
123 fn default_max_connections_is_one() {
124 let config = SqliteSinkConfig::new("sqlite::memory:", "events");
128 assert_eq!(config.max_connections, 1);
129 }
130
131 #[test]
132 fn default_max_connections_deserializes_to_one_when_absent() {
133 let json = r#"{
134 "database_url": "sqlite::memory:",
135 "table_name": "events",
136 "column_mapping": {"json": {"column": "data"}}
137 }"#;
138 let config: SqliteSinkConfig = serde_json::from_str(json).unwrap();
139 assert_eq!(config.max_connections, 1);
140 }
141
142 #[test]
143 fn builder_methods() {
144 let config = SqliteSinkConfig::new("sqlite::memory:", "events")
145 .column_mapping(SqliteColumnMapping::AutoMap)
146 .with_batch_size(100);
147 assert_eq!(config.batch_size, 100);
148 assert!(matches!(
149 config.column_mapping,
150 SqliteColumnMapping::AutoMap
151 ));
152 }
153
154 #[test]
155 fn json_custom_column() {
156 let config = SqliteSinkConfig::new("sqlite::memory:", "events").column_mapping(
157 SqliteColumnMapping::Json {
158 column: "payload".into(),
159 },
160 );
161 assert!(matches!(
162 config.column_mapping,
163 SqliteColumnMapping::Json { ref column } if column == "payload"
164 ));
165 }
166
167 #[test]
168 fn config_with_file_path() {
169 let config = SqliteSinkConfig::new("/tmp/test.db", "events");
170 assert_eq!(config.database_url, "/tmp/test.db");
171 }
172
173 #[test]
174 fn batch_size_defaults_to_default_batch_size() {
175 let config = SqliteSinkConfig::new("sqlite::memory:", "events");
176 assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
177 }
178
179 #[test]
180 fn with_batch_size_overrides_default() {
181 let config = SqliteSinkConfig::new("sqlite::memory:", "events").with_batch_size(250);
182 assert_eq!(config.batch_size, 250);
183 }
184
185 #[test]
186 fn batch_size_zero_is_accepted_as_no_batching_sentinel() {
187 let config = SqliteSinkConfig::new("sqlite::memory:", "events").with_batch_size(0);
188 assert_eq!(config.batch_size, 0);
189 assert!(faucet_core::validate_batch_size(config.batch_size).is_ok());
190 }
191
192 #[test]
193 fn batch_size_above_max_is_rejected_by_validate_batch_size() {
194 let config = SqliteSinkConfig::new("sqlite::memory:", "events")
195 .with_batch_size(faucet_core::MAX_BATCH_SIZE + 1);
196 assert!(faucet_core::validate_batch_size(config.batch_size).is_err());
197 }
198
199 #[test]
200 fn batch_size_deserializes_from_json() {
201 let json = r#"{
202 "database_url": "sqlite::memory:",
203 "table_name": "events",
204 "column_mapping": {"json": {"column": "data"}},
205 "batch_size": 250,
206 "max_connections": 5
207 }"#;
208 let config: SqliteSinkConfig = serde_json::from_str(json).unwrap();
209 assert_eq!(config.batch_size, 250);
210 }
211
212 #[test]
213 fn batch_size_defaults_when_absent_in_json() {
214 let json = r#"{
215 "database_url": "sqlite::memory:",
216 "table_name": "events",
217 "column_mapping": {"json": {"column": "data"}},
218 "max_connections": 5
219 }"#;
220 let config: SqliteSinkConfig = serde_json::from_str(json).unwrap();
221 assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
222 }
223}