feldera_types/transport/postgres.rs
1use serde::{Deserialize, Serialize};
2use std::fmt::Display;
3use utoipa::ToSchema;
4
5/// PostgreSQL write mode.
6///
7/// Determines how the PostgreSQL output connector writes data to the target table.
8#[derive(Debug, Clone, Eq, PartialEq, Deserialize, Serialize, ToSchema, Default)]
9pub enum PostgresWriteMode {
10 /// Materialized mode: perform direct INSERT, UPDATE, and DELETE operations on the table.
11 /// This is the default behavior and maintains the postgres table as a materialized snapshot of the output view.
12 #[default]
13 #[serde(rename = "materialized")]
14 Materialized,
15
16 /// CDC (Change Data Capture) mode: write all operations as INSERT operations
17 /// into a Postgres table that serves as an append-only event log.
18 /// In this mode, inserts, updates, and deletes are all represented as new rows
19 /// with metadata columns describing the operation type and timestamp.
20 #[serde(rename = "cdc")]
21 Cdc,
22}
23
24impl Display for PostgresWriteMode {
25 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
26 match self {
27 Self::Materialized => write!(f, "materialized"),
28 Self::Cdc => write!(f, "cdc"),
29 }
30 }
31}
32
33/// TLS/SSL configuration for PostgreSQL connectors.
34#[derive(Debug, Clone, Eq, PartialEq, Deserialize, Serialize, ToSchema, Default)]
35pub struct PostgresTlsConfig {
36 /// A sequence of CA certificates in PEM format.
37 pub ssl_ca_pem: Option<String>,
38
39 /// Path to a file containing a sequence of CA certificates in PEM format.
40 pub ssl_ca_location: Option<String>,
41
42 /// The client certificate in PEM format.
43 pub ssl_client_pem: Option<String>,
44
45 /// Path to the client certificate.
46 pub ssl_client_location: Option<String>,
47
48 /// The client certificate key in PEM format.
49 pub ssl_client_key: Option<String>,
50
51 /// Path to the client certificate key.
52 pub ssl_client_key_location: Option<String>,
53
54 /// The path to the certificate chain file.
55 /// The file must contain a sequence of PEM-formatted certificates,
56 /// the first being the leaf certificate, and the remainder forming
57 /// the chain of certificates up to and including the trusted root certificate.
58 pub ssl_certificate_chain_location: Option<String>,
59
60 /// True to enable hostname verification when using TLS. True by default.
61 pub verify_hostname: Option<bool>,
62}
63
64impl PostgresTlsConfig {
65 pub fn has_tls(&self) -> bool {
66 self.ssl_ca_pem.is_some() || self.ssl_ca_location.is_some()
67 }
68}
69
70/// Postgres input connector configuration.
71#[derive(Debug, Clone, Eq, PartialEq, Deserialize, Serialize, ToSchema)]
72pub struct PostgresReaderConfig {
73 /// Postgres URI.
74 /// See: <https://docs.rs/tokio-postgres/0.7.12/tokio_postgres/config/struct.Config.html>
75 pub uri: String,
76
77 /// Query that specifies what data to fetch from postgres.
78 pub query: String,
79
80 /// TLS/SSL configuration.
81 #[serde(flatten)]
82 #[schema(inline)]
83 pub tls: PostgresTlsConfig,
84}
85
86/// Postgres output connector configuration.
87#[derive(Debug, Clone, Eq, PartialEq, Deserialize, Serialize, ToSchema)]
88pub struct PostgresWriterConfig {
89 /// Postgres URI.
90 /// See: <https://docs.rs/tokio-postgres/0.7.12/tokio_postgres/config/struct.Config.html>
91 pub uri: String,
92
93 /// The table to write the output to.
94 pub table: String,
95
96 /// Write mode for the connector.
97 ///
98 /// - `materialized` (default): Perform direct INSERT, UPDATE, and DELETE operations on the table.
99 /// - `cdc`: Write all operations as INSERT operations into an append-only event log
100 /// with additional metadata columns describing the operation type and timestamp.
101 #[serde(default)]
102 #[schema(default = PostgresWriteMode::default)]
103 pub mode: PostgresWriteMode,
104
105 /// Name of the operation metadata column in CDC mode.
106 ///
107 /// Only used when `mode = "cdc"`. This column will contain:
108 /// - `"i"` for insert operations
109 /// - `"u"` for upsert operations
110 /// - `"d"` for delete operations
111 ///
112 /// Default: `"__feldera_op"`
113 #[serde(default = "default_cdc_op_column")]
114 #[schema(default = default_cdc_op_column)]
115 pub cdc_op_column: String,
116
117 /// Name of the timestamp metadata column in CDC mode.
118 ///
119 /// Only used when `mode = "cdc"`. This column will contain the timestamp
120 /// (in RFC 3339 format) when the batch of updates was output
121 /// by the pipeline.
122 ///
123 /// Default: `"__feldera_ts"`
124 #[serde(default = "default_cdc_ts_column")]
125 #[schema(default = default_cdc_ts_column)]
126 pub cdc_ts_column: String,
127
128 /// TLS/SSL configuration.
129 #[serde(flatten)]
130 #[schema(inline)]
131 pub tls: PostgresTlsConfig,
132
133 /// The maximum number of records in a single buffer.
134 pub max_records_in_buffer: Option<usize>,
135
136 /// The maximum buffer size in for a single operation.
137 /// Note that the buffers of `INSERT`, `UPDATE` and `DELETE` queries are
138 /// separate.
139 /// Default: 1 MiB
140 #[schema(default = default_max_buffer_size)]
141 #[serde(default = "default_max_buffer_size")]
142 pub max_buffer_size_bytes: usize,
143
144 /// Specifies how the connector handles conflicts when executing an `INSERT`
145 /// into a table with a primary key. By default, an existing row with the same
146 /// key is overwritten. Setting this flag to `true` preserves the existing row
147 /// and ignores the new insert.
148 ///
149 /// This setting does not affect `UPDATE` statements, which always replace the
150 /// value associated with the key.
151 ///
152 /// This setting is not supported when `mode = "cdc"`, since all operations
153 /// are performed as append-only `INSERT`s into the target table.
154 /// Any conflict in CDC mode will result in an error.
155 ///
156 /// Default: `false`
157 #[serde(default)]
158 pub on_conflict_do_nothing: bool,
159
160 /// The number of threads to use during encoding.
161 ///
162 /// Default: 1
163 #[serde(default = "default_writer_threads")]
164 #[schema(default = default_writer_threads)]
165 pub threads: usize,
166}
167
168fn default_max_buffer_size() -> usize {
169 usize::pow(2, 20)
170}
171
172fn default_writer_threads() -> usize {
173 1
174}
175
176fn default_cdc_op_column() -> String {
177 "__feldera_op".to_string()
178}
179
180fn default_cdc_ts_column() -> String {
181 "__feldera_ts".to_string()
182}
183
184impl PostgresWriterConfig {
185 pub fn validate(&self) -> Result<(), String> {
186 match self.mode {
187 PostgresWriteMode::Cdc => {
188 if self.cdc_op_column.trim().is_empty() {
189 return Err("cdc_op_column cannot be empty in CDC mode".to_string());
190 }
191 if self.cdc_ts_column.trim().is_empty() {
192 return Err("cdc_ts_column cannot be empty in CDC mode".to_string());
193 }
194
195 if !self.cdc_op_column.is_ascii() {
196 return Err("cdc_op_column must contain only ASCII characters".to_string());
197 }
198
199 if !self.cdc_ts_column.is_ascii() {
200 return Err("cdc_ts_column must contain only ASCII characters".to_string());
201 }
202
203 if self.on_conflict_do_nothing {
204 return Err("on_conflict_do_nothing not supported in CDC mode since all operations are performed as append-only INSERTs into the target table".to_string());
205 }
206 }
207 PostgresWriteMode::Materialized => {
208 if self.cdc_ts_column != default_cdc_ts_column()
209 && !self.cdc_ts_column.trim().is_empty()
210 {
211 return Err(
212 "cdc_ts_column must not be set when in MATERIALIZED mode".to_string()
213 );
214 }
215 if self.cdc_op_column != default_cdc_op_column()
216 && !self.cdc_op_column.trim().is_empty()
217 {
218 return Err(
219 "cdc_op_column must not be set when in MATERIALIZED mode".to_string()
220 );
221 }
222 }
223 };
224
225 if self.threads == 0 {
226 return Err("threads must be at least 1".to_string());
227 }
228
229 Ok(())
230 }
231}