Skip to main content

faucet_sink_postgres/
config.rs

1//! PostgreSQL sink configuration.
2
3use faucet_core::DEFAULT_BATCH_SIZE;
4use schemars::JsonSchema;
5use serde::{Deserialize, Serialize};
6
7/// How to map JSON records to table columns.
8#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
9#[serde(rename_all = "snake_case")]
10pub enum PostgresColumnMapping {
11    /// Insert each record as a single `jsonb` column. The column name
12    /// defaults to `"data"` but can be overridden.
13    Jsonb { column: String },
14    /// Map top-level JSON keys directly to table columns.
15    /// Only keys that match existing columns are inserted; extra keys are ignored.
16    AutoMap,
17}
18
19impl Default for PostgresColumnMapping {
20    fn default() -> Self {
21        Self::Jsonb {
22            column: "data".into(),
23        }
24    }
25}
26
27/// Configuration for the PostgreSQL sink.
28#[derive(Clone, Serialize, Deserialize, JsonSchema)]
29pub struct PostgresSinkConfig {
30    /// PostgreSQL connection URL (e.g. `postgres://user:pass@host/db`).
31    pub connection_url: String,
32    /// Target table name.
33    pub table_name: String,
34    /// Optional schema (namespace) qualifying [`table_name`](Self::table_name).
35    ///
36    /// When set, both the AutoMap column-discovery probe and the `INSERT`
37    /// target `schema.table_name` explicitly. When unset (the default), the
38    /// table resolves against the connection's `search_path`, and column
39    /// discovery is scoped to whichever schema the `INSERT` actually resolves
40    /// to — so a same-named table in another schema no longer pollutes the
41    /// AutoMap column set (#146 M13).
42    #[serde(default)]
43    pub schema: Option<String>,
44    /// How to map JSON records to columns.
45    pub column_mapping: PostgresColumnMapping,
46    /// Maximum rows per multi-row `INSERT` statement. Defaults to
47    /// [`DEFAULT_BATCH_SIZE`].
48    ///
49    /// When the upstream `StreamPage` carries more records than `batch_size`,
50    /// the sink slices the page into `batch_size`-row chunks and issues one
51    /// multi-row `INSERT` per chunk. When `batch_size = 0`, the entire slice
52    /// is sent in a single `INSERT` — useful when the source already chunks
53    /// to a Postgres-friendly size.
54    ///
55    /// `batch_size = 0` is the "no batching" sentinel: the entire upstream
56    /// page is forwarded in one statement, subject to Postgres' natural
57    /// per-statement bind-parameter limit of 65 535. AutoMap mode binds one
58    /// parameter per column per row, so the safe ceiling is roughly
59    /// `65_535 / num_columns` rows per call; JSONB mode binds a single
60    /// array parameter and has no such ceiling. Keep the default unless the
61    /// upstream page size is already tuned for Postgres.
62    ///
63    /// **Recommended value: ~1000** — Postgres' multi-row `INSERT` sweet
64    /// spot. Larger chunks rarely add throughput and risk hitting the
65    /// 65 535-parameter ceiling in AutoMap mode.
66    #[serde(default = "default_batch_size")]
67    pub batch_size: usize,
68    /// Maximum number of connections in the pool. Defaults to 5.
69    pub max_connections: u32,
70}
71
72fn default_batch_size() -> usize {
73    DEFAULT_BATCH_SIZE
74}
75
76impl std::fmt::Debug for PostgresSinkConfig {
77    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
78        f.debug_struct("PostgresSinkConfig")
79            .field("connection_url", &"***")
80            .field("table_name", &self.table_name)
81            .field("schema", &self.schema)
82            .field("column_mapping", &self.column_mapping)
83            .field("batch_size", &self.batch_size)
84            .field("max_connections", &self.max_connections)
85            .finish()
86    }
87}
88
89impl PostgresSinkConfig {
90    /// Create a new config with required fields and sensible defaults.
91    pub fn new(connection_url: impl Into<String>, table_name: impl Into<String>) -> Self {
92        Self {
93            connection_url: connection_url.into(),
94            table_name: table_name.into(),
95            schema: None,
96            column_mapping: PostgresColumnMapping::default(),
97            batch_size: DEFAULT_BATCH_SIZE,
98            max_connections: 5,
99        }
100    }
101
102    /// Set the schema (namespace) that qualifies the table. When unset, the
103    /// table resolves against the connection's `search_path`.
104    pub fn with_schema(mut self, schema: impl Into<String>) -> Self {
105        self.schema = Some(schema.into());
106        self
107    }
108
109    /// Set the column mapping strategy.
110    pub fn column_mapping(mut self, mapping: PostgresColumnMapping) -> Self {
111        self.column_mapping = mapping;
112        self
113    }
114
115    /// Set the per-statement row count for multi-row `INSERT`.
116    ///
117    /// Pass `0` to opt out of re-chunking — the sink forwards each upstream
118    /// [`StreamPage`](faucet_core::StreamPage) as a single `INSERT`
119    /// statement. Postgres' multi-row `INSERT` sweet spot is ~1000 rows.
120    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
121        self.batch_size = batch_size;
122        self
123    }
124
125    /// Set the maximum number of connections in the pool.
126    pub fn max_connections(mut self, n: u32) -> Self {
127        self.max_connections = n;
128        self
129    }
130}
131
132#[cfg(test)]
133mod tests {
134    use super::*;
135
136    #[test]
137    fn default_config() {
138        let config = PostgresSinkConfig::new("postgres://localhost/test", "events");
139        assert_eq!(config.table_name, "events");
140        assert_eq!(config.batch_size, DEFAULT_BATCH_SIZE);
141        assert!(matches!(
142            config.column_mapping,
143            PostgresColumnMapping::Jsonb { ref column } if column == "data"
144        ));
145    }
146
147    #[test]
148    fn builder_methods() {
149        let config = PostgresSinkConfig::new("postgres://localhost/test", "events")
150            .column_mapping(PostgresColumnMapping::AutoMap)
151            .with_batch_size(100);
152        assert_eq!(config.batch_size, 100);
153        assert!(matches!(
154            config.column_mapping,
155            PostgresColumnMapping::AutoMap
156        ));
157    }
158
159    #[test]
160    fn jsonb_custom_column() {
161        let config = PostgresSinkConfig::new("postgres://localhost/test", "events").column_mapping(
162            PostgresColumnMapping::Jsonb {
163                column: "payload".into(),
164            },
165        );
166        assert!(matches!(
167            config.column_mapping,
168            PostgresColumnMapping::Jsonb { ref column } if column == "payload"
169        ));
170    }
171
172    #[test]
173    fn with_batch_size_overrides_default() {
174        let config =
175            PostgresSinkConfig::new("postgres://localhost/test", "events").with_batch_size(250);
176        assert_eq!(config.batch_size, 250);
177    }
178
179    #[test]
180    fn batch_size_zero_is_accepted_as_no_batching_sentinel() {
181        let config =
182            PostgresSinkConfig::new("postgres://localhost/test", "events").with_batch_size(0);
183        assert_eq!(config.batch_size, 0);
184        assert!(faucet_core::validate_batch_size(config.batch_size).is_ok());
185    }
186
187    #[test]
188    fn batch_size_above_max_is_rejected_by_validate_batch_size() {
189        let config = PostgresSinkConfig::new("postgres://localhost/test", "events")
190            .with_batch_size(faucet_core::MAX_BATCH_SIZE + 1);
191        assert!(faucet_core::validate_batch_size(config.batch_size).is_err());
192    }
193
194    #[test]
195    fn batch_size_deserializes_from_json() {
196        let json = r#"{
197            "connection_url": "postgres://localhost/test",
198            "table_name": "events",
199            "column_mapping": {"jsonb": {"column": "data"}},
200            "batch_size": 250,
201            "max_connections": 5
202        }"#;
203        let config: PostgresSinkConfig = serde_json::from_str(json).unwrap();
204        assert_eq!(config.batch_size, 250);
205    }
206
207    #[test]
208    fn batch_size_defaults_when_absent_in_json() {
209        let json = r#"{
210            "connection_url": "postgres://localhost/test",
211            "table_name": "events",
212            "column_mapping": {"jsonb": {"column": "data"}},
213            "max_connections": 5
214        }"#;
215        let config: PostgresSinkConfig = serde_json::from_str(json).unwrap();
216        assert_eq!(config.batch_size, DEFAULT_BATCH_SIZE);
217    }
218
219    #[test]
220    fn config_builder_chaining() {
221        let config = PostgresSinkConfig::new("postgres://localhost/test", "events")
222            .with_batch_size(100)
223            .with_batch_size(250);
224        assert_eq!(config.batch_size, 250);
225    }
226}