Skip to main content

faucet_source_sqlite/
config.rs

1//! SQLite source configuration.
2
3use faucet_core::DEFAULT_BATCH_SIZE;
4use schemars::JsonSchema;
5use serde::{Deserialize, Serialize};
6
7/// Configuration for the SQLite query source.
8#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
9pub struct SqliteSourceConfig {
10    /// SQLite database URL (file path or `sqlite::memory:`).
11    pub database_url: String,
12    /// SQL query to execute.
13    pub query: String,
14    /// Maximum number of connections in the pool. Defaults to 10.
15    #[serde(default = "default_max_connections")]
16    pub max_connections: u32,
17    /// Records per emitted [`StreamPage`](faucet_core::StreamPage). Rows are
18    /// drained from the sqlx cursor and yielded whenever the buffer reaches
19    /// this size. Defaults to [`DEFAULT_BATCH_SIZE`].
20    ///
21    /// `batch_size = 0` is the "no batching" sentinel: the cursor is fully
22    /// drained and the entire result set is emitted in a single page. Useful
23    /// for small lookup tables or for sinks (e.g. SQL `COPY`, BigQuery load
24    /// jobs) that prefer one large request to many small ones.
25    #[serde(default = "default_batch_size")]
26    pub batch_size: usize,
27}
28
29fn default_max_connections() -> u32 {
30    10
31}
32
33fn default_batch_size() -> usize {
34    DEFAULT_BATCH_SIZE
35}
36
37impl SqliteSourceConfig {
38    /// Create a new config with the required database URL and query.
39    pub fn new(database_url: impl Into<String>, query: impl Into<String>) -> Self {
40        Self {
41            database_url: database_url.into(),
42            query: query.into(),
43            max_connections: 10,
44            batch_size: DEFAULT_BATCH_SIZE,
45        }
46    }
47
48    /// Set the maximum number of connections in the pool.
49    pub fn with_max_connections(mut self, max_connections: u32) -> Self {
50        self.max_connections = max_connections;
51        self
52    }
53
54    /// Set the per-page row count for [`Source::stream_pages`](faucet_core::Source::stream_pages).
55    ///
56    /// Pass `0` to opt out of batching — the entire result set is emitted in
57    /// a single [`StreamPage`](faucet_core::StreamPage).
58    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
59        self.batch_size = batch_size;
60        self
61    }
62}
63
64#[cfg(test)]
65mod tests {
66    use super::*;
67
68    #[test]
69    fn default_config() {
70        let config = SqliteSourceConfig::new("sqlite:test.db", "SELECT * FROM events");
71        assert_eq!(config.database_url, "sqlite:test.db");
72        assert_eq!(config.query, "SELECT * FROM events");
73    }
74
75    #[test]
76    fn memory_database() {
77        let config = SqliteSourceConfig::new("sqlite::memory:", "SELECT 1");
78        assert_eq!(config.database_url, "sqlite::memory:");
79    }
80
81    #[test]
82    fn batch_size_defaults_to_default_batch_size() {
83        let config = SqliteSourceConfig::new("sqlite::memory:", "SELECT 1");
84        assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
85    }
86
87    #[test]
88    fn with_batch_size_overrides_default() {
89        let config = SqliteSourceConfig::new("sqlite::memory:", "SELECT 1").with_batch_size(500);
90        assert_eq!(config.batch_size, 500);
91    }
92
93    #[test]
94    fn batch_size_zero_is_accepted_as_no_batching_sentinel() {
95        let config = SqliteSourceConfig::new("sqlite::memory:", "SELECT 1").with_batch_size(0);
96        assert_eq!(config.batch_size, 0);
97        assert!(faucet_core::validate_batch_size(config.batch_size).is_ok());
98    }
99
100    #[test]
101    fn batch_size_above_max_is_rejected_by_validate_batch_size() {
102        let config = SqliteSourceConfig::new("sqlite::memory:", "SELECT 1")
103            .with_batch_size(faucet_core::MAX_BATCH_SIZE + 1);
104        assert!(faucet_core::validate_batch_size(config.batch_size).is_err());
105    }
106
107    #[test]
108    fn batch_size_deserializes_from_json() {
109        let json = r#"{
110            "database_url": "sqlite::memory:",
111            "query": "SELECT 1",
112            "batch_size": 250
113        }"#;
114        let config: SqliteSourceConfig = serde_json::from_str(json).unwrap();
115        assert_eq!(config.batch_size, 250);
116    }
117}