Skip to main content

feldera_types/transport/
postgres.rs

1use serde::{Deserialize, Serialize};
2use std::fmt::Display;
3use utoipa::ToSchema;
4
5/// PostgreSQL write mode.
6///
7/// Determines how the PostgreSQL output connector writes data to the target table.
8#[derive(Debug, Clone, Eq, PartialEq, Deserialize, Serialize, ToSchema, Default)]
9pub enum PostgresWriteMode {
10    /// Materialized mode: perform direct INSERT, UPDATE, and DELETE operations on the table.
11    /// This is the default behavior and maintains the postgres table as a materialized snapshot of the output view.
12    #[default]
13    #[serde(rename = "materialized")]
14    Materialized,
15
16    /// CDC (Change Data Capture) mode: write all operations as INSERT operations
17    /// into a Postgres table that serves as an append-only event log.
18    /// In this mode, inserts, updates, and deletes are all represented as new rows
19    /// with metadata columns describing the operation type and timestamp.
20    #[serde(rename = "cdc")]
21    Cdc,
22}
23
24impl Display for PostgresWriteMode {
25    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
26        match self {
27            Self::Materialized => write!(f, "materialized"),
28            Self::Cdc => write!(f, "cdc"),
29        }
30    }
31}
32
33/// Postgres input connector configuration.
34#[derive(Debug, Clone, Eq, PartialEq, Deserialize, Serialize, ToSchema)]
35pub struct PostgresReaderConfig {
36    /// Postgres URI.
37    /// See: <https://docs.rs/tokio-postgres/0.7.12/tokio_postgres/config/struct.Config.html>
38    pub uri: String,
39
40    /// Query that specifies what data to fetch from postgres.
41    pub query: String,
42}
43
44/// Postgres output connector configuration.
45#[derive(Debug, Clone, Eq, PartialEq, Deserialize, Serialize, ToSchema)]
46pub struct PostgresWriterConfig {
47    /// Postgres URI.
48    /// See: <https://docs.rs/tokio-postgres/0.7.12/tokio_postgres/config/struct.Config.html>
49    pub uri: String,
50
51    /// The table to write the output to.
52    pub table: String,
53
54    /// Write mode for the connector.
55    ///
56    /// - `materialized` (default): Perform direct INSERT, UPDATE, and DELETE operations on the table.
57    /// - `cdc`: Write all operations as INSERT operations into an append-only event log
58    ///   with additional metadata columns describing the operation type and timestamp.
59    #[serde(default)]
60    #[schema(default = PostgresWriteMode::default)]
61    pub mode: PostgresWriteMode,
62
63    /// Name of the operation metadata column in CDC mode.
64    ///
65    /// Only used when `mode = "cdc"`. This column will contain:
66    /// - `"i"` for insert operations
67    /// - `"u"` for upsert operations
68    /// - `"d"` for delete operations
69    ///
70    /// Default: `"__feldera_op"`
71    #[serde(default = "default_cdc_op_column")]
72    #[schema(default = default_cdc_op_column)]
73    pub cdc_op_column: String,
74
75    /// Name of the timestamp metadata column in CDC mode.
76    ///
77    /// Only used when `mode = "cdc"`. This column will contain the timestamp
78    /// (in RFC 3339 format) when the batch of updates was output
79    /// by the pipeline.
80    ///
81    /// Default: `"__feldera_ts"`
82    #[serde(default = "default_cdc_ts_column")]
83    #[schema(default = default_cdc_ts_column)]
84    pub cdc_ts_column: String,
85
86    /// A sequence of CA certificates in PEM format.
87    pub ssl_ca_pem: Option<String>,
88
89    /// Path to a file containing a sequence of CA certificates in PEM format.
90    pub ssl_ca_location: Option<String>,
91
92    /// The client certificate in PEM format.
93    pub ssl_client_pem: Option<String>,
94
95    /// Path to the client certificate.
96    pub ssl_client_location: Option<String>,
97
98    /// The client certificate key in PEM format.
99    pub ssl_client_key: Option<String>,
100
101    /// Path to the client certificate key.
102    pub ssl_client_key_location: Option<String>,
103
104    /// The path to the certificate chain file.
105    /// The file must contain a sequence of PEM-formatted certificates,
106    /// the first being the leaf certificate, and the remainder forming
107    /// the chain of certificates up to and including the trusted root certificate.
108    pub ssl_certificate_chain_location: Option<String>,
109
110    /// True to enable hostname verification when using TLS. True by default.
111    pub verify_hostname: Option<bool>,
112
113    /// The maximum number of records in a single buffer.
114    pub max_records_in_buffer: Option<usize>,
115
116    /// The maximum buffer size in for a single operation.
117    /// Note that the buffers of `INSERT`, `UPDATE` and `DELETE` queries are
118    /// separate.
119    /// Default: 1 MiB
120    #[schema(default = default_max_buffer_size)]
121    #[serde(default = "default_max_buffer_size")]
122    pub max_buffer_size_bytes: usize,
123
124    /// Specifies how the connector handles conflicts when executing an `INSERT`
125    /// into a table with a primary key. By default, an existing row with the same
126    /// key is overwritten. Setting this flag to `true` preserves the existing row
127    /// and ignores the new insert.
128    ///
129    /// This setting does not affect `UPDATE` statements, which always replace the
130    /// value associated with the key.
131    ///
132    /// This setting is not supported when `mode = "cdc"`, since all operations
133    /// are performed as append-only `INSERT`s into the target table.
134    /// Any conflict in CDC mode will result in an error.
135    ///
136    /// Default: `false`
137    #[serde(default)]
138    pub on_conflict_do_nothing: bool,
139}
140
141fn default_max_buffer_size() -> usize {
142    usize::pow(2, 20)
143}
144
145fn default_cdc_op_column() -> String {
146    "__feldera_op".to_string()
147}
148
149fn default_cdc_ts_column() -> String {
150    "__feldera_ts".to_string()
151}
152
153impl PostgresWriterConfig {
154    pub fn validate(&self) -> Result<(), String> {
155        match self.mode {
156            PostgresWriteMode::Cdc => {
157                if self.cdc_op_column.trim().is_empty() {
158                    return Err("cdc_op_column cannot be empty in CDC mode".to_string());
159                }
160                if self.cdc_ts_column.trim().is_empty() {
161                    return Err("cdc_ts_column cannot be empty in CDC mode".to_string());
162                }
163
164                if !self.cdc_op_column.is_ascii() {
165                    return Err("cdc_op_column must contain only ASCII characters".to_string());
166                }
167
168                if !self.cdc_ts_column.is_ascii() {
169                    return Err("cdc_ts_column must contain only ASCII characters".to_string());
170                }
171
172                if self.on_conflict_do_nothing {
173                    return Err("on_conflict_do_nothing not supported in CDC mode since all operations are performed as append-only INSERTs into the target table".to_string());
174                }
175            }
176            PostgresWriteMode::Materialized => {
177                if self.cdc_ts_column != default_cdc_ts_column()
178                    && !self.cdc_ts_column.trim().is_empty()
179                {
180                    return Err(
181                        "cdc_ts_column must not be set when in MATERIALIZED mode".to_string()
182                    );
183                }
184                if self.cdc_op_column != default_cdc_op_column()
185                    && !self.cdc_op_column.trim().is_empty()
186                {
187                    return Err(
188                        "cdc_op_column must not be set when in MATERIALIZED mode".to_string()
189                    );
190                }
191            }
192        };
193
194        Ok(())
195    }
196}