Skip to main content

drasi_source_postgres/
lib.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! PostgreSQL Replication Source Plugin for Drasi
16//!
17//! This plugin captures data changes from PostgreSQL databases using logical replication.
18//! It connects to PostgreSQL as a replication client and decodes Write-Ahead Log (WAL)
19//! messages in real-time, converting them to Drasi source change events.
20//!
21//! # Prerequisites
22//!
23//! Before using this source, you must configure PostgreSQL for logical replication:
24//!
25//! 1. **Enable logical replication** in `postgresql.conf`:
26//!    ```text
27//!    wal_level = logical
28//!    max_replication_slots = 10
29//!    max_wal_senders = 10
30//!    ```
31//!
32//! 2. **Create a publication** for the tables you want to monitor:
33//!    ```sql
34//!    CREATE PUBLICATION drasi_publication FOR TABLE users, orders;
35//!    ```
36//!
37//! 3. **Create a replication slot** (optional - the source can create one automatically):
38//!    ```sql
39//!    SELECT pg_create_logical_replication_slot('drasi_slot', 'pgoutput');
40//!    ```
41//!
42//! 4. **Grant replication permissions** to the database user:
43//!    ```sql
44//!    ALTER ROLE drasi_user REPLICATION;
45//!    GRANT SELECT ON TABLE users, orders TO drasi_user;
46//!    ```
47//!
48//! # Architecture
49//!
50//! The source has two main components:
51//!
52//! - **Bootstrap Handler**: Performs an initial snapshot of table data when a query
53//!   subscribes with bootstrap enabled. Uses the replication slot's snapshot LSN to
54//!   ensure consistency.
55//!
56//! - **Streaming Handler**: Continuously reads WAL messages and decodes them using
57//!   the `pgoutput` protocol. Handles INSERT, UPDATE, and DELETE operations.
58//!
59//! # Configuration
60//!
61//! | Field | Type | Default | Description |
62//! |-------|------|---------|-------------|
63//! | `host` | string | `"localhost"` | PostgreSQL host |
64//! | `port` | u16 | `5432` | PostgreSQL port |
65//! | `database` | string | *required* | Database name |
66//! | `user` | string | *required* | Database user (must have replication permission) |
67//! | `password` | string | `""` | Database password |
68//! | `tables` | string[] | `[]` | Tables to replicate |
69//! | `slot_name` | string | `"drasi_slot"` | Replication slot name |
70//! | `publication_name` | string | `"drasi_publication"` | Publication name |
71//! | `ssl_mode` | string | `"prefer"` | SSL mode: disable, prefer, require |
72//! | `table_keys` | TableKeyConfig[] | `[]` | Primary key configuration for tables |
73//!
74//! # Example Configuration (YAML)
75//!
76//! ```yaml
77//! source_type: postgres
78//! properties:
79//!   host: db.example.com
80//!   port: 5432
81//!   database: production
82//!   user: replication_user
83//!   password: secret
84//!   tables:
85//!     - users
86//!     - orders
87//!   slot_name: drasi_slot
88//!   publication_name: drasi_publication
89//!   table_keys:
90//!     - table: users
91//!       key_columns: [id]
92//!     - table: orders
93//!       key_columns: [order_id]
94//! ```
95//!
96//! # Data Format
97//!
98//! The PostgreSQL source decodes WAL messages and converts them to Drasi source changes.
99//! Each row change is mapped as follows:
100//!
101//! ## Node Mapping
102//!
103//! - **Element ID**: `{schema}:{table}:{primary_key_value}` (e.g., `public:users:123`)
104//! - **Labels**: `[{table_name}]` (e.g., `["users"]`)
105//! - **Properties**: All columns from the row (column names become property keys)
106//!
107//! ## WAL Message to SourceChange
108//!
109//! | WAL Operation | SourceChange |
110//! |---------------|--------------|
111//! | INSERT | `SourceChange::Insert { element: Node }` |
112//! | UPDATE | `SourceChange::Update { element: Node }` |
113//! | DELETE | `SourceChange::Delete { metadata }` |
114//!
115//! ## Example Mapping
116//!
117//! Given a PostgreSQL table:
118//!
119//! ```sql
120//! CREATE TABLE users (
121//!     id SERIAL PRIMARY KEY,
122//!     name VARCHAR(100),
123//!     email VARCHAR(255),
124//!     age INTEGER
125//! );
126//!
127//! INSERT INTO users (name, email, age) VALUES ('Alice', 'alice@example.com', 30);
128//! ```
129//!
130//! Produces a SourceChange equivalent to:
131//!
132//! ```json
133//! {
134//!     "type": "Insert",
135//!     "element": {
136//!         "metadata": {
137//!             "element_id": "public:users:1",
138//!             "source_id": "pg-source",
139//!             "labels": ["users"],
140//!             "effective_from": 1699900000000000
141//!         },
142//!         "properties": {
143//!             "id": 1,
144//!             "name": "Alice",
145//!             "email": "alice@example.com",
146//!             "age": 30
147//!         }
148//!     }
149//! }
150//! ```
151//!
152//! # Usage Example
153//!
154//! ```rust,ignore
155//! use drasi_source_postgres::{PostgresReplicationSource, PostgresSourceBuilder};
156//! use std::sync::Arc;
157//!
158//! let config = PostgresSourceBuilder::new()
159//!     .with_host("db.example.com")
160//!     .with_database("production")
161//!     .with_user("replication_user")
162//!     .with_password("secret")
163//!     .with_tables(vec!["users".to_string(), "orders".to_string()])
164//!     .build();
165//!
166//! let source = Arc::new(PostgresReplicationSource::new("pg-source", config)?);
167//! drasi.add_source(source).await?;
168//! ```
169
170pub mod config;
171pub mod connection;
172pub mod decoder;
173pub mod protocol;
174pub mod scram;
175pub mod stream;
176pub mod types;
177
178pub use config::{PostgresSourceConfig, SslMode, TableKeyConfig};
179
180use anyhow::Result;
181use async_trait::async_trait;
182use log::{error, info};
183use std::collections::HashMap;
184use std::sync::Arc;
185use tokio::sync::RwLock;
186
187use drasi_lib::channels::{DispatchMode, *};
188use drasi_lib::sources::base::{SourceBase, SourceBaseParams};
189use drasi_lib::Source;
190
191/// PostgreSQL replication source that captures changes via logical replication.
192///
193/// This source connects to PostgreSQL using the replication protocol and decodes
194/// WAL messages in real-time, converting them to Drasi source change events.
195///
196/// # Fields
197///
198/// - `base`: Common source functionality (dispatchers, status, lifecycle)
199/// - `config`: PostgreSQL connection and replication configuration
200pub struct PostgresReplicationSource {
201    /// Base source implementation providing common functionality
202    base: SourceBase,
203    /// PostgreSQL source configuration
204    config: PostgresSourceConfig,
205}
206
207impl PostgresReplicationSource {
208    /// Create a builder for PostgresReplicationSource
209    ///
210    /// # Example
211    ///
212    /// ```rust,ignore
213    /// use drasi_source_postgres::PostgresReplicationSource;
214    ///
215    /// let source = PostgresReplicationSource::builder("pg-source")
216    ///     .with_host("db.example.com")
217    ///     .with_database("production")
218    ///     .with_user("replication_user")
219    ///     .with_password("secret")
220    ///     .with_tables(vec!["users".to_string(), "orders".to_string()])
221    ///     .with_bootstrap_provider(my_provider)
222    ///     .build()?;
223    /// ```
224    pub fn builder(id: impl Into<String>) -> PostgresSourceBuilder {
225        PostgresSourceBuilder::new(id)
226    }
227
228    /// Create a new PostgreSQL replication source.
229    ///
230    /// The event channel is automatically injected when the source is added
231    /// to DrasiLib via `add_source()`.
232    ///
233    /// # Arguments
234    ///
235    /// * `id` - Unique identifier for this source instance
236    /// * `config` - PostgreSQL source configuration
237    ///
238    /// # Returns
239    ///
240    /// A new `PostgresReplicationSource` instance, or an error if construction fails.
241    ///
242    /// # Errors
243    ///
244    /// Returns an error if the base source cannot be initialized.
245    ///
246    /// # Example
247    ///
248    /// ```rust,ignore
249    /// use drasi_source_postgres::{PostgresReplicationSource, PostgresSourceBuilder};
250    ///
251    /// let config = PostgresSourceBuilder::new()
252    ///     .with_host("db.example.com")
253    ///     .with_database("mydb")
254    ///     .with_user("replication_user")
255    ///     .build();
256    ///
257    /// let source = PostgresReplicationSource::new("my-pg-source", config)?;
258    /// ```
259    pub fn new(id: impl Into<String>, config: PostgresSourceConfig) -> Result<Self> {
260        let id = id.into();
261        let params = SourceBaseParams::new(id);
262        Ok(Self {
263            base: SourceBase::new(params)?,
264            config,
265        })
266    }
267
268    /// Create a new PostgreSQL replication source with custom dispatch settings
269    ///
270    /// The event channel is automatically injected when the source is added
271    /// to DrasiLib via `add_source()`.
272    pub fn with_dispatch(
273        id: impl Into<String>,
274        config: PostgresSourceConfig,
275        dispatch_mode: Option<DispatchMode>,
276        dispatch_buffer_capacity: Option<usize>,
277    ) -> Result<Self> {
278        let id = id.into();
279        let mut params = SourceBaseParams::new(id);
280        if let Some(mode) = dispatch_mode {
281            params = params.with_dispatch_mode(mode);
282        }
283        if let Some(capacity) = dispatch_buffer_capacity {
284            params = params.with_dispatch_buffer_capacity(capacity);
285        }
286        Ok(Self {
287            base: SourceBase::new(params)?,
288            config,
289        })
290    }
291}
292
293#[async_trait]
294impl Source for PostgresReplicationSource {
295    fn id(&self) -> &str {
296        &self.base.id
297    }
298
299    fn type_name(&self) -> &str {
300        "postgres"
301    }
302
303    fn properties(&self) -> HashMap<String, serde_json::Value> {
304        let mut props = HashMap::new();
305        props.insert(
306            "host".to_string(),
307            serde_json::Value::String(self.config.host.clone()),
308        );
309        props.insert(
310            "port".to_string(),
311            serde_json::Value::Number(self.config.port.into()),
312        );
313        props.insert(
314            "database".to_string(),
315            serde_json::Value::String(self.config.database.clone()),
316        );
317        props.insert(
318            "user".to_string(),
319            serde_json::Value::String(self.config.user.clone()),
320        );
321        // Don't expose password in properties
322        props.insert(
323            "tables".to_string(),
324            serde_json::Value::Array(
325                self.config
326                    .tables
327                    .iter()
328                    .map(|t| serde_json::Value::String(t.clone()))
329                    .collect(),
330            ),
331        );
332        props
333    }
334
335    fn auto_start(&self) -> bool {
336        self.base.get_auto_start()
337    }
338
339    async fn start(&self) -> Result<()> {
340        if self.base.get_status().await == ComponentStatus::Running {
341            return Ok(());
342        }
343
344        self.base.set_status(ComponentStatus::Starting).await;
345        info!("Starting PostgreSQL replication source: {}", self.base.id);
346
347        let config = self.config.clone();
348        let source_id = self.base.id.clone();
349        let dispatchers = self.base.dispatchers.clone();
350        let status_tx = self.base.status_tx();
351        let status_clone = self.base.status.clone();
352
353        let task = tokio::spawn(async move {
354            if let Err(e) = run_replication(
355                source_id.clone(),
356                config,
357                dispatchers,
358                status_tx.clone(),
359                status_clone.clone(),
360            )
361            .await
362            {
363                error!("Replication task failed for {source_id}: {e}");
364                *status_clone.write().await = ComponentStatus::Error;
365                if let Some(ref tx) = *status_tx.read().await {
366                    let _ = tx
367                        .send(ComponentEvent {
368                            component_id: source_id,
369                            component_type: ComponentType::Source,
370                            status: ComponentStatus::Error,
371                            timestamp: chrono::Utc::now(),
372                            message: Some(format!("Replication failed: {e}")),
373                        })
374                        .await;
375                }
376            }
377        });
378
379        *self.base.task_handle.write().await = Some(task);
380        self.base.set_status(ComponentStatus::Running).await;
381
382        self.base
383            .send_component_event(
384                ComponentStatus::Running,
385                Some("PostgreSQL replication started".to_string()),
386            )
387            .await?;
388
389        Ok(())
390    }
391
392    async fn stop(&self) -> Result<()> {
393        if self.base.get_status().await != ComponentStatus::Running {
394            return Ok(());
395        }
396
397        info!("Stopping PostgreSQL replication source: {}", self.base.id);
398
399        self.base.set_status(ComponentStatus::Stopping).await;
400
401        // Cancel the replication task
402        if let Some(task) = self.base.task_handle.write().await.take() {
403            task.abort();
404        }
405
406        self.base.set_status(ComponentStatus::Stopped).await;
407        self.base
408            .send_component_event(
409                ComponentStatus::Stopped,
410                Some("PostgreSQL replication stopped".to_string()),
411            )
412            .await?;
413
414        Ok(())
415    }
416
417    async fn status(&self) -> ComponentStatus {
418        self.base.get_status().await
419    }
420
421    async fn subscribe(
422        &self,
423        settings: drasi_lib::config::SourceSubscriptionSettings,
424    ) -> Result<SubscriptionResponse> {
425        self.base
426            .subscribe_with_bootstrap(&settings, "PostgreSQL")
427            .await
428    }
429
430    fn as_any(&self) -> &dyn std::any::Any {
431        self
432    }
433
434    async fn initialize(&self, context: drasi_lib::context::SourceRuntimeContext) {
435        self.base.initialize(context).await;
436    }
437
438    async fn set_bootstrap_provider(
439        &self,
440        provider: Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>,
441    ) {
442        self.base.set_bootstrap_provider(provider).await;
443    }
444}
445
446async fn run_replication(
447    source_id: String,
448    config: PostgresSourceConfig,
449    dispatchers: Arc<
450        RwLock<
451            Vec<Box<dyn drasi_lib::channels::ChangeDispatcher<SourceEventWrapper> + Send + Sync>>,
452        >,
453    >,
454    status_tx: Arc<RwLock<Option<ComponentEventSender>>>,
455    status: Arc<RwLock<ComponentStatus>>,
456) -> Result<()> {
457    info!("Starting replication for source {source_id}");
458
459    let mut stream =
460        stream::ReplicationStream::new(config, source_id, dispatchers, status_tx, status);
461
462    stream.run().await
463}
464
465/// Builder for PostgreSQL source configuration.
466///
467/// Provides a fluent API for constructing PostgreSQL source configurations
468/// with sensible defaults.
469///
470/// # Example
471///
472/// ```rust,ignore
473/// use drasi_source_postgres::PostgresReplicationSource;
474///
475/// let source = PostgresReplicationSource::builder("pg-source")
476///     .with_host("db.example.com")
477///     .with_database("production")
478///     .with_user("replication_user")
479///     .with_password("secret")
480///     .with_tables(vec!["users".to_string(), "orders".to_string()])
481///     .with_slot_name("my_slot")
482///     .build()?;
483/// ```
484pub struct PostgresSourceBuilder {
485    id: String,
486    host: String,
487    port: u16,
488    database: String,
489    user: String,
490    password: String,
491    tables: Vec<String>,
492    slot_name: String,
493    publication_name: String,
494    ssl_mode: SslMode,
495    table_keys: Vec<TableKeyConfig>,
496    dispatch_mode: Option<DispatchMode>,
497    dispatch_buffer_capacity: Option<usize>,
498    bootstrap_provider: Option<Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>>,
499    auto_start: bool,
500}
501
502impl PostgresSourceBuilder {
503    /// Create a new PostgreSQL source builder with the given ID and default values
504    pub fn new(id: impl Into<String>) -> Self {
505        Self {
506            id: id.into(),
507            host: "localhost".to_string(),
508            port: 5432,
509            database: String::new(),
510            user: String::new(),
511            password: String::new(),
512            tables: Vec::new(),
513            slot_name: "drasi_slot".to_string(),
514            publication_name: "drasi_publication".to_string(),
515            ssl_mode: SslMode::default(),
516            table_keys: Vec::new(),
517            dispatch_mode: None,
518            dispatch_buffer_capacity: None,
519            bootstrap_provider: None,
520            auto_start: true,
521        }
522    }
523
524    /// Set the PostgreSQL host
525    pub fn with_host(mut self, host: impl Into<String>) -> Self {
526        self.host = host.into();
527        self
528    }
529
530    /// Set the PostgreSQL port
531    pub fn with_port(mut self, port: u16) -> Self {
532        self.port = port;
533        self
534    }
535
536    /// Set the database name
537    pub fn with_database(mut self, database: impl Into<String>) -> Self {
538        self.database = database.into();
539        self
540    }
541
542    /// Set the database user
543    pub fn with_user(mut self, user: impl Into<String>) -> Self {
544        self.user = user.into();
545        self
546    }
547
548    /// Set the database password
549    pub fn with_password(mut self, password: impl Into<String>) -> Self {
550        self.password = password.into();
551        self
552    }
553
554    /// Set the tables to replicate
555    pub fn with_tables(mut self, tables: Vec<String>) -> Self {
556        self.tables = tables;
557        self
558    }
559
560    /// Add a table to replicate
561    pub fn add_table(mut self, table: impl Into<String>) -> Self {
562        self.tables.push(table.into());
563        self
564    }
565
566    /// Set the replication slot name
567    pub fn with_slot_name(mut self, slot_name: impl Into<String>) -> Self {
568        self.slot_name = slot_name.into();
569        self
570    }
571
572    /// Set the publication name
573    pub fn with_publication_name(mut self, publication_name: impl Into<String>) -> Self {
574        self.publication_name = publication_name.into();
575        self
576    }
577
578    /// Set the SSL mode
579    pub fn with_ssl_mode(mut self, ssl_mode: SslMode) -> Self {
580        self.ssl_mode = ssl_mode;
581        self
582    }
583
584    /// Set the table key configurations
585    pub fn with_table_keys(mut self, table_keys: Vec<TableKeyConfig>) -> Self {
586        self.table_keys = table_keys;
587        self
588    }
589
590    /// Add a table key configuration
591    pub fn add_table_key(mut self, table_key: TableKeyConfig) -> Self {
592        self.table_keys.push(table_key);
593        self
594    }
595
596    /// Set the dispatch mode for this source
597    pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
598        self.dispatch_mode = Some(mode);
599        self
600    }
601
602    /// Set the dispatch buffer capacity for this source
603    pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
604        self.dispatch_buffer_capacity = Some(capacity);
605        self
606    }
607
608    /// Set the bootstrap provider for this source
609    pub fn with_bootstrap_provider(
610        mut self,
611        provider: impl drasi_lib::bootstrap::BootstrapProvider + 'static,
612    ) -> Self {
613        self.bootstrap_provider = Some(Box::new(provider));
614        self
615    }
616
617    /// Set whether this source should auto-start when DrasiLib starts.
618    ///
619    /// Default is `true`. Set to `false` if this source should only be
620    /// started manually via `start_source()`.
621    pub fn with_auto_start(mut self, auto_start: bool) -> Self {
622        self.auto_start = auto_start;
623        self
624    }
625
626    /// Set the full configuration at once
627    pub fn with_config(mut self, config: PostgresSourceConfig) -> Self {
628        self.host = config.host;
629        self.port = config.port;
630        self.database = config.database;
631        self.user = config.user;
632        self.password = config.password;
633        self.tables = config.tables;
634        self.slot_name = config.slot_name;
635        self.publication_name = config.publication_name;
636        self.ssl_mode = config.ssl_mode;
637        self.table_keys = config.table_keys;
638        self
639    }
640
641    /// Build the PostgreSQL replication source
642    ///
643    /// # Errors
644    ///
645    /// Returns an error if the source cannot be constructed.
646    pub fn build(self) -> Result<PostgresReplicationSource> {
647        let config = PostgresSourceConfig {
648            host: self.host,
649            port: self.port,
650            database: self.database,
651            user: self.user,
652            password: self.password,
653            tables: self.tables,
654            slot_name: self.slot_name,
655            publication_name: self.publication_name,
656            ssl_mode: self.ssl_mode,
657            table_keys: self.table_keys,
658        };
659
660        let mut params = SourceBaseParams::new(&self.id).with_auto_start(self.auto_start);
661        if let Some(mode) = self.dispatch_mode {
662            params = params.with_dispatch_mode(mode);
663        }
664        if let Some(capacity) = self.dispatch_buffer_capacity {
665            params = params.with_dispatch_buffer_capacity(capacity);
666        }
667        if let Some(provider) = self.bootstrap_provider {
668            params = params.with_bootstrap_provider(provider);
669        }
670
671        Ok(PostgresReplicationSource {
672            base: SourceBase::new(params)?,
673            config,
674        })
675    }
676}
677
678#[cfg(test)]
679mod tests {
680    use super::*;
681
682    mod construction {
683        use super::*;
684
685        #[test]
686        fn test_builder_with_valid_config() {
687            let source = PostgresSourceBuilder::new("test-source")
688                .with_database("testdb")
689                .with_user("testuser")
690                .build();
691            assert!(source.is_ok());
692        }
693
694        #[test]
695        fn test_builder_with_custom_config() {
696            let source = PostgresSourceBuilder::new("pg-source")
697                .with_host("192.168.1.100")
698                .with_port(5433)
699                .with_database("production")
700                .with_user("admin")
701                .with_password("secret")
702                .build()
703                .unwrap();
704            assert_eq!(source.id(), "pg-source");
705        }
706
707        #[test]
708        fn test_with_dispatch_creates_source() {
709            let config = PostgresSourceConfig {
710                host: "localhost".to_string(),
711                port: 5432,
712                database: "testdb".to_string(),
713                user: "testuser".to_string(),
714                password: String::new(),
715                tables: Vec::new(),
716                slot_name: "drasi_slot".to_string(),
717                publication_name: "drasi_publication".to_string(),
718                ssl_mode: SslMode::default(),
719                table_keys: Vec::new(),
720            };
721            let source = PostgresReplicationSource::with_dispatch(
722                "dispatch-source",
723                config,
724                Some(DispatchMode::Channel),
725                Some(2000),
726            );
727            assert!(source.is_ok());
728            assert_eq!(source.unwrap().id(), "dispatch-source");
729        }
730    }
731
732    mod properties {
733        use super::*;
734
735        #[test]
736        fn test_id_returns_correct_value() {
737            let source = PostgresSourceBuilder::new("my-pg-source")
738                .with_database("db")
739                .with_user("user")
740                .build()
741                .unwrap();
742            assert_eq!(source.id(), "my-pg-source");
743        }
744
745        #[test]
746        fn test_type_name_returns_postgres() {
747            let source = PostgresSourceBuilder::new("test")
748                .with_database("db")
749                .with_user("user")
750                .build()
751                .unwrap();
752            assert_eq!(source.type_name(), "postgres");
753        }
754
755        #[test]
756        fn test_properties_contains_connection_info() {
757            let source = PostgresSourceBuilder::new("test")
758                .with_host("db.example.com")
759                .with_port(5433)
760                .with_database("mydb")
761                .with_user("app_user")
762                .with_password("secret")
763                .with_tables(vec!["users".to_string()])
764                .build()
765                .unwrap();
766            let props = source.properties();
767
768            assert_eq!(
769                props.get("host"),
770                Some(&serde_json::Value::String("db.example.com".to_string()))
771            );
772            assert_eq!(
773                props.get("port"),
774                Some(&serde_json::Value::Number(5433.into()))
775            );
776            assert_eq!(
777                props.get("database"),
778                Some(&serde_json::Value::String("mydb".to_string()))
779            );
780            assert_eq!(
781                props.get("user"),
782                Some(&serde_json::Value::String("app_user".to_string()))
783            );
784        }
785
786        #[test]
787        fn test_properties_does_not_expose_password() {
788            let source = PostgresSourceBuilder::new("test")
789                .with_database("db")
790                .with_user("user")
791                .with_password("super_secret_password")
792                .build()
793                .unwrap();
794            let props = source.properties();
795
796            // Password should not be exposed in properties
797            assert!(!props.contains_key("password"));
798        }
799
800        #[test]
801        fn test_properties_includes_tables() {
802            let source = PostgresSourceBuilder::new("test")
803                .with_database("db")
804                .with_user("user")
805                .with_tables(vec!["users".to_string(), "orders".to_string()])
806                .build()
807                .unwrap();
808            let props = source.properties();
809
810            let tables = props.get("tables").unwrap().as_array().unwrap();
811            assert_eq!(tables.len(), 2);
812            assert_eq!(tables[0], "users");
813            assert_eq!(tables[1], "orders");
814        }
815    }
816
817    mod lifecycle {
818        use super::*;
819
820        #[tokio::test]
821        async fn test_initial_status_is_stopped() {
822            let source = PostgresSourceBuilder::new("test")
823                .with_database("db")
824                .with_user("user")
825                .build()
826                .unwrap();
827            assert_eq!(source.status().await, ComponentStatus::Stopped);
828        }
829    }
830
831    mod builder {
832        use super::*;
833
834        #[test]
835        fn test_postgres_builder_defaults() {
836            let source = PostgresSourceBuilder::new("test").build().unwrap();
837            assert_eq!(source.config.host, "localhost");
838            assert_eq!(source.config.port, 5432);
839            assert_eq!(source.config.slot_name, "drasi_slot");
840            assert_eq!(source.config.publication_name, "drasi_publication");
841        }
842
843        #[test]
844        fn test_postgres_builder_custom_values() {
845            let source = PostgresSourceBuilder::new("test")
846                .with_host("db.example.com")
847                .with_port(5433)
848                .with_database("production")
849                .with_user("app_user")
850                .with_password("secret")
851                .with_tables(vec!["users".to_string(), "orders".to_string()])
852                .build()
853                .unwrap();
854
855            assert_eq!(source.config.host, "db.example.com");
856            assert_eq!(source.config.port, 5433);
857            assert_eq!(source.config.database, "production");
858            assert_eq!(source.config.user, "app_user");
859            assert_eq!(source.config.password, "secret");
860            assert_eq!(source.config.tables.len(), 2);
861            assert_eq!(source.config.tables[0], "users");
862            assert_eq!(source.config.tables[1], "orders");
863        }
864
865        #[test]
866        fn test_builder_add_table() {
867            let source = PostgresSourceBuilder::new("test")
868                .add_table("table1")
869                .add_table("table2")
870                .add_table("table3")
871                .build()
872                .unwrap();
873
874            assert_eq!(source.config.tables.len(), 3);
875            assert_eq!(source.config.tables[0], "table1");
876            assert_eq!(source.config.tables[1], "table2");
877            assert_eq!(source.config.tables[2], "table3");
878        }
879
880        #[test]
881        fn test_builder_slot_and_publication() {
882            let source = PostgresSourceBuilder::new("test")
883                .with_slot_name("custom_slot")
884                .with_publication_name("custom_pub")
885                .build()
886                .unwrap();
887
888            assert_eq!(source.config.slot_name, "custom_slot");
889            assert_eq!(source.config.publication_name, "custom_pub");
890        }
891
892        #[test]
893        fn test_builder_id() {
894            let source = PostgresReplicationSource::builder("my-pg-source")
895                .with_database("db")
896                .with_user("user")
897                .build()
898                .unwrap();
899
900            assert_eq!(source.base.id, "my-pg-source");
901        }
902    }
903
904    mod config {
905        use super::*;
906
907        #[test]
908        fn test_config_serialization() {
909            let config = PostgresSourceConfig {
910                host: "localhost".to_string(),
911                port: 5432,
912                database: "testdb".to_string(),
913                user: "testuser".to_string(),
914                password: String::new(),
915                tables: Vec::new(),
916                slot_name: "drasi_slot".to_string(),
917                publication_name: "drasi_publication".to_string(),
918                ssl_mode: SslMode::default(),
919                table_keys: Vec::new(),
920            };
921
922            let json = serde_json::to_string(&config).unwrap();
923            let deserialized: PostgresSourceConfig = serde_json::from_str(&json).unwrap();
924
925            assert_eq!(config, deserialized);
926        }
927
928        #[test]
929        fn test_config_deserialization_with_required_fields() {
930            let json = r#"{
931                "database": "mydb",
932                "user": "myuser"
933            }"#;
934            let config: PostgresSourceConfig = serde_json::from_str(json).unwrap();
935
936            assert_eq!(config.database, "mydb");
937            assert_eq!(config.user, "myuser");
938            assert_eq!(config.host, "localhost"); // default
939            assert_eq!(config.port, 5432); // default
940            assert_eq!(config.slot_name, "drasi_slot"); // default
941        }
942
943        #[test]
944        fn test_config_deserialization_full() {
945            let json = r#"{
946                "host": "db.prod.internal",
947                "port": 5433,
948                "database": "production",
949                "user": "replication_user",
950                "password": "secret",
951                "tables": ["accounts", "transactions"],
952                "slot_name": "prod_slot",
953                "publication_name": "prod_publication"
954            }"#;
955            let config: PostgresSourceConfig = serde_json::from_str(json).unwrap();
956
957            assert_eq!(config.host, "db.prod.internal");
958            assert_eq!(config.port, 5433);
959            assert_eq!(config.database, "production");
960            assert_eq!(config.user, "replication_user");
961            assert_eq!(config.password, "secret");
962            assert_eq!(config.tables, vec!["accounts", "transactions"]);
963            assert_eq!(config.slot_name, "prod_slot");
964            assert_eq!(config.publication_name, "prod_publication");
965        }
966    }
967}