Skip to main content

feldera_types/transport/
postgres.rs

1use serde::{Deserialize, Serialize};
2use std::fmt::Display;
3use utoipa::ToSchema;
4
5/// PostgreSQL write mode.
6///
7/// Determines how the PostgreSQL output connector writes data to the target table.
8#[derive(Debug, Clone, Eq, PartialEq, Deserialize, Serialize, ToSchema, Default)]
9pub enum PostgresWriteMode {
10    /// Materialized mode: perform direct INSERT, UPDATE, and DELETE operations on the table.
11    /// This is the default behavior and maintains the postgres table as a materialized snapshot of the output view.
12    #[default]
13    #[serde(rename = "materialized")]
14    Materialized,
15
16    /// CDC (Change Data Capture) mode: write all operations as INSERT operations
17    /// into a Postgres table that serves as an append-only event log.
18    /// In this mode, inserts, updates, and deletes are all represented as new rows
19    /// with metadata columns describing the operation type and timestamp.
20    #[serde(rename = "cdc")]
21    Cdc,
22}
23
24impl Display for PostgresWriteMode {
25    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
26        match self {
27            Self::Materialized => write!(f, "materialized"),
28            Self::Cdc => write!(f, "cdc"),
29        }
30    }
31}
32
33/// TLS/SSL configuration for PostgreSQL connectors.
34#[derive(Debug, Clone, Eq, PartialEq, Deserialize, Serialize, ToSchema, Default)]
35pub struct PostgresTlsConfig {
36    /// A sequence of CA certificates in PEM format.
37    pub ssl_ca_pem: Option<String>,
38
39    /// Path to a file containing a sequence of CA certificates in PEM format.
40    pub ssl_ca_location: Option<String>,
41
42    /// The client certificate in PEM format.
43    pub ssl_client_pem: Option<String>,
44
45    /// Path to the client certificate.
46    pub ssl_client_location: Option<String>,
47
48    /// The client certificate key in PEM format.
49    pub ssl_client_key: Option<String>,
50
51    /// Path to the client certificate key.
52    pub ssl_client_key_location: Option<String>,
53
54    /// The path to the certificate chain file.
55    /// The file must contain a sequence of PEM-formatted certificates,
56    /// the first being the leaf certificate, and the remainder forming
57    /// the chain of certificates up to and including the trusted root certificate.
58    pub ssl_certificate_chain_location: Option<String>,
59
60    /// True to enable hostname verification when using TLS. True by default.
61    pub verify_hostname: Option<bool>,
62}
63
64impl PostgresTlsConfig {
65    pub fn has_tls(&self) -> bool {
66        self.ssl_ca_pem.is_some() || self.ssl_ca_location.is_some()
67    }
68}
69
70/// Postgres CDC input connector configuration.
71///
72/// Uses logical replication to capture ongoing changes from a Postgres database.
73/// Requires a pre-created publication and a user with REPLICATION privilege.
74/// Tables must have primary keys and `REPLICA IDENTITY FULL` is recommended
75/// for UPDATE/DELETE support.
76#[derive(Debug, Clone, Eq, PartialEq, Deserialize, Serialize, ToSchema)]
77pub struct PostgresCdcReaderConfig {
78    /// Postgres connection URI. The user must have REPLICATION privilege.
79    /// See: <https://docs.rs/tokio-postgres/0.7.12/tokio_postgres/config/struct.Config.html>
80    pub uri: String,
81
82    /// Name of the pre-created Postgres publication to replicate from.
83    pub publication: String,
84
85    /// Postgres table to replicate (e.g. "public.orders").
86    /// Must be included in the publication.
87    pub source_table: String,
88
89    /// TLS/SSL configuration.
90    #[serde(flatten)]
91    #[schema(inline)]
92    pub tls: PostgresTlsConfig,
93}
94
95/// Postgres input connector configuration.
96#[derive(Debug, Clone, Eq, PartialEq, Deserialize, Serialize, ToSchema)]
97pub struct PostgresReaderConfig {
98    /// Postgres URI.
99    /// See: <https://docs.rs/tokio-postgres/0.7.12/tokio_postgres/config/struct.Config.html>
100    pub uri: String,
101
102    /// Query that specifies what data to fetch from postgres.
103    pub query: String,
104
105    /// TLS/SSL configuration.
106    #[serde(flatten)]
107    #[schema(inline)]
108    pub tls: PostgresTlsConfig,
109}
110
111/// Postgres output connector configuration.
112#[derive(Debug, Clone, Eq, PartialEq, Deserialize, Serialize, ToSchema)]
113pub struct PostgresWriterConfig {
114    /// Postgres URI.
115    /// See: <https://docs.rs/tokio-postgres/0.7.12/tokio_postgres/config/struct.Config.html>
116    pub uri: String,
117
118    /// The table to write the output to.
119    pub table: String,
120
121    /// Write mode for the connector.
122    ///
123    /// - `materialized` (default): Perform direct INSERT, UPDATE, and DELETE operations on the table.
124    /// - `cdc`: Write all operations as INSERT operations into an append-only event log
125    ///   with additional metadata columns describing the operation type and timestamp.
126    #[serde(default)]
127    #[schema(default = PostgresWriteMode::default)]
128    pub mode: PostgresWriteMode,
129
130    /// Name of the operation metadata column in CDC mode.
131    ///
132    /// Only used when `mode = "cdc"`. This column will contain:
133    /// - `"i"` for insert operations
134    /// - `"u"` for upsert operations
135    /// - `"d"` for delete operations
136    ///
137    /// Default: `"__feldera_op"`
138    #[serde(default = "default_cdc_op_column")]
139    #[schema(default = default_cdc_op_column)]
140    pub cdc_op_column: String,
141
142    /// Name of the timestamp metadata column in CDC mode.
143    ///
144    /// Only used when `mode = "cdc"`. This column will contain the timestamp
145    /// (in RFC 3339 format) when the batch of updates was output
146    /// by the pipeline.
147    ///
148    /// Default: `"__feldera_ts"`
149    #[serde(default = "default_cdc_ts_column")]
150    #[schema(default = default_cdc_ts_column)]
151    pub cdc_ts_column: String,
152
153    /// TLS/SSL configuration.
154    #[serde(flatten)]
155    #[schema(inline)]
156    pub tls: PostgresTlsConfig,
157
158    /// The maximum number of records in a single buffer.
159    pub max_records_in_buffer: Option<usize>,
160
161    /// The maximum buffer size in for a single operation.
162    /// Note that the buffers of `INSERT`, `UPDATE` and `DELETE` queries are
163    /// separate.
164    /// Default: 1 MiB
165    #[schema(default = default_max_buffer_size)]
166    #[serde(default = "default_max_buffer_size")]
167    pub max_buffer_size_bytes: usize,
168
169    /// Specifies how the connector handles conflicts when executing an `INSERT`
170    /// into a table with a primary key. By default, an existing row with the same
171    /// key is overwritten. Setting this flag to `true` preserves the existing row
172    /// and ignores the new insert.
173    ///
174    /// This setting does not affect `UPDATE` statements, which always replace the
175    /// value associated with the key.
176    ///
177    /// This setting is not supported when `mode = "cdc"`, since all operations
178    /// are performed as append-only `INSERT`s into the target table.
179    /// Any conflict in CDC mode will result in an error.
180    ///
181    /// Default: `false`
182    #[serde(default)]
183    pub on_conflict_do_nothing: bool,
184
185    /// The number of threads to use during encoding.
186    ///
187    /// Default: 1
188    #[serde(default = "default_writer_threads")]
189    #[schema(default = default_writer_threads)]
190    pub threads: usize,
191
192    /// The names of the extra columns in the Postgres table that are not part of the view schema.
193    ///
194    /// These connector can write user-defined values, configured using the `set_extra_columns` connector command,
195    /// to these columns.
196    #[serde(default, skip_serializing_if = "Vec::is_empty")]
197    pub extra_columns: Vec<String>,
198}
199
200fn default_max_buffer_size() -> usize {
201    usize::pow(2, 20)
202}
203
204fn default_writer_threads() -> usize {
205    1
206}
207
208fn default_cdc_op_column() -> String {
209    "__feldera_op".to_string()
210}
211
212fn default_cdc_ts_column() -> String {
213    "__feldera_ts".to_string()
214}
215
216impl PostgresWriterConfig {
217    pub fn validate(&self) -> Result<(), String> {
218        match self.mode {
219            PostgresWriteMode::Cdc => {
220                if self.cdc_op_column.trim().is_empty() {
221                    return Err("cdc_op_column cannot be empty in CDC mode".to_string());
222                }
223                if self.cdc_ts_column.trim().is_empty() {
224                    return Err("cdc_ts_column cannot be empty in CDC mode".to_string());
225                }
226
227                if !self.cdc_op_column.is_ascii() {
228                    return Err("cdc_op_column must contain only ASCII characters".to_string());
229                }
230
231                if !self.cdc_ts_column.is_ascii() {
232                    return Err("cdc_ts_column must contain only ASCII characters".to_string());
233                }
234
235                if self.on_conflict_do_nothing {
236                    return Err("on_conflict_do_nothing not supported in CDC mode since all operations are performed as append-only INSERTs into the target table".to_string());
237                }
238            }
239            PostgresWriteMode::Materialized => {
240                if self.cdc_ts_column != default_cdc_ts_column()
241                    && !self.cdc_ts_column.trim().is_empty()
242                {
243                    return Err(
244                        "cdc_ts_column must not be set when in MATERIALIZED mode".to_string()
245                    );
246                }
247                if self.cdc_op_column != default_cdc_op_column()
248                    && !self.cdc_op_column.trim().is_empty()
249                {
250                    return Err(
251                        "cdc_op_column must not be set when in MATERIALIZED mode".to_string()
252                    );
253                }
254            }
255        };
256
257        if self.threads == 0 {
258            return Err("threads must be at least 1".to_string());
259        }
260
261        Ok(())
262    }
263}